In [41]:
import psutil
print("CPU usage:", psutil.cpu_percent())
print("Memory usage:", psutil.virtual_memory().percent)

CPU usage: 17.9
Memory usage: 15.4


In [42]:
# GPU setup for TensorFlow and PyTorch (if available)
try:
    import tensorflow as tf
    print("TensorFlow version:", tf.__version__)
    gpus = tf.config.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
            logical_gpus = tf.config.list_logical_devices('GPU')
            print(f"TensorFlow GPUs available: {len(gpus)} physical, {len(logical_gpus)} logical")
        except Exception as e:
            print("Could not set TF GPU memory growth:", e)
    else:
        print("No TensorFlow GPU detected; TF will use CPU.")
except Exception as e:
    print("TensorFlow not available:", e)

# Optional PyTorch device setup if used later
try:
    import torch
    torch_cuda = torch.cuda.is_available()
    device = torch.device('cuda' if torch_cuda else 'cpu')
    print("PyTorch CUDA available:", torch_cuda)
    if torch_cuda:
        print("Using PyTorch device:", torch.cuda.get_device_name(0))
    else:
        print("Using PyTorch device: CPU")
except Exception as e:
    print("PyTorch not available:", e)

TensorFlow version: 2.19.0
No TensorFlow GPU detected; TF will use CPU.
PyTorch CUDA available: False
Using PyTorch device: CPU


In [43]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [44]:
# # GPU diagnostics and assertion: must run on GPU
# import os, sys
# print("Python:", sys.version)

# # TensorFlow check
# try:
#     import tensorflow as tf
#     print("TensorFlow:", tf.__version__)
#     tf_gpus = tf.config.list_physical_devices('GPU')
#     print("TF GPUs:", tf_gpus)
#     if tf_gpus:
#         # Optional: enable device placement logging for clarity
#         os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "1")  # INFO
#         try:
#             for gpu in tf_gpus:
#                 tf.config.experimental.set_memory_growth(gpu, True)
#             logical = tf.config.list_logical_devices('GPU')
#             print(f"TF GPU ready: {len(tf_gpus)} physical, {len(logical)} logical")
#         except Exception as e:
#             print("Warn: couldn't set TF memory growth:", e)
#     else:
#         raise RuntimeError("TensorFlow GPU not detected. Install CUDA/cuDNN and correct env.")
# except Exception as e:
#     raise RuntimeError(f"TensorFlow check failed: {e}")


# Fake Image Detection using Error Level Analysis (ELA) and CNN

This notebook implements a Convolutional Neural Network (CNN) to detect fake images by analyzing their compression artifacts using Error Level Analysis (ELA).

### Prerequisite: Dataset Structure
Ensure your dataset is uploaded and structured as follows:
```
dataset/
├── train/
│   ├── real/
│   └── fake/
├── validation/
│   ├── real/
│   └── fake/
└── test/
    ├── real/
    └── fake/
```

In [45]:
# If using Google Colab, uncomment the following lines to mount your drive
# from google.colab import drive
# drive.mount('/content/drive')

In [46]:
import os
import numpy as np
import matplotlib as plt
from PIL import Image, ImageChops, ImageEnhance
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.preprocessing.image import ImageDataGenerator

## 1. Configuration

In [47]:
IMAGE_SIZE = (256, 256)
BATCH_SIZE = 32
EPOCHS = 20
# Update this path to point to your dataset location
DATASET_PATH = '/content/drive/MyDrive/Data Set 1/Data Set 1'

# ELA settings
ELA_QUALITY = 90  # try 70-95
ELA_QUALITIES = None  # e.g., [80, 90, 95] to generate multiple versions

## 2. Error Level Analysis (ELA) Function
This function resaves the image at a specific quality level and calculates the difference (error) between the original and the compressed version.

In [48]:
def convert_to_ela_image(path, quality=90):
    temp_filename = "temp_ela.jpg"

    try:
        image = Image.open(path).convert("RGB")
        image.save(temp_filename, "JPEG", quality=quality)

        compressed = Image.open(temp_filename)
        ela_img = ImageChops.difference(image, compressed)

        extrema = ela_img.getextrema()
        max_diff = max([ex[1] for ex in extrema]) or 1
        scale = 255.0 / max_diff

        ela_img = ImageEnhance.Brightness(ela_img).enhance(scale)
        return ela_img.resize(IMAGE_SIZE)

    except Exception as e:
        print(f"[ELA ERROR] {path}: {e}")
        return None

def build_ela_stack(path):
    if ELA_QUALITIES and isinstance(ELA_QUALITIES, (list, tuple)):
        imgs = []
        for q in ELA_QUALITIES:
            img = convert_to_ela_image(path, quality=q)
            if img is not None:
                imgs.append(np.array(img) / 255.0)
        if len(imgs) > 0:
            # stack along channel dimension
            stacked = np.concatenate(imgs, axis=-1)
            # if channels exceed 3, truncate to first 3 to fit current model
            return stacked[..., :3]
    # fallback single quality
    img = convert_to_ela_image(path, quality=ELA_QUALITY)
    return np.array(img) / 255.0 if img is not None else None

## 3. Frequency Domain Analysis (FFT)
The FFT reveals periodic compression artifacts and copy-paste seams that may be subtle in the spatial domain. The cells below compute a log-magnitude spectrum and simple band energy statistics for each image.

In [49]:
def compute_frequency_map(img_path):
    """Return normalized log-magnitude FFT spectrum for the grayscale image."""
    image = Image.open(img_path).convert("L").resize(IMAGE_SIZE)
    spectrum = np.fft.fftshift(np.fft.fft2(image))
    magnitude = 20 * np.log(np.abs(spectrum) + 1e-8)
    normalized = (magnitude - magnitude.min()) / (np.ptp(magnitude) + 1e-8)
    return normalized

def band_energy_stats(freq_map, high_cut=0.6, mid_cut=0.3):
    """Compute average energy in low/mid/high frequency bands using radial masks."""
    h, w = freq_map.shape
    cy, cx = h // 2, w // 2
    y, x = np.ogrid[:h, :w]
    radius = np.sqrt((x - cx) ** 2 + (y - cy) ** 2)
    max_r = radius.max()

    low_mask = radius <= mid_cut * max_r
    mid_mask = (radius > mid_cut * max_r) & (radius <= high_cut * max_r)
    high_mask = radius > high_cut * max_r

    return {
        "low": float(freq_map[low_mask].mean()),
        "mid": float(freq_map[mid_mask].mean()),
        "high": float(freq_map[high_mask].mean()),
    }

def visualize_frequency_analysis(img_path):
    if not os.path.exists(img_path):
        print(f"File not found: {img_path}")
        return

    original = Image.open(img_path).resize(IMAGE_SIZE)
    ela_img = convert_to_ela_image(img_path)
    if ela_img is None:
        print(f"ELA failed for {img_path}")
        return

    freq_map = compute_frequency_map(img_path)
    energy = band_energy_stats(freq_map)

    fig, axes = plt.subplots(1, 3, figsize=(15, 4))
    axes[0].imshow(original)
    axes[0].set_title("Original")
    axes[0].axis("off")

    axes[1].imshow(ela_img)
    axes[1].set_title("ELA")
    axes[1].axis("off")

    axes[2].imshow(freq_map, cmap="magma")
    axes[2].set_title(
        f"Frequency magnitude\n(low/mid/high): {energy['low']:.3f} / {energy['mid']:.3f} / {energy['high']:.3f}")
    axes[2].axis("off")

    plt.tight_layout()
    plt.show()

    print(
        f"Energy stats for {img_path} -> low: {energy['low']:.3f}, mid: {energy['mid']:.3f}, high: {energy['high']:.3f}")

## 4. Data Loading Helper

In [50]:
def load_split(split_path):
    X = []
    Y = []

    classes = ["real", "fake"]

    for label, class_name in enumerate(classes):
        folder = os.path.join(split_path, class_name)

        if not os.path.isdir(folder):
            print(f"[WARNING] Missing folder: {folder}")
            continue

        print(f"Loading '{class_name}' images from {split_path} ...")

        for filename in os.listdir(folder):
            if not filename.lower().endswith(('.jpg', '.jpeg', '.png')):
                continue

            img_path = os.path.join(folder, filename)
            ela_img = convert_to_ela_image(img_path)

            if ela_img:
                X.append(np.array(ela_img) / 255.0)
                Y.append(label)

    return np.array(X), to_categorical(Y, 2)


def load_freq_split(split_path):
    X = []
    Y = []
    classes = ["real", "fake"]

    for label, class_name in enumerate(classes):
        folder = os.path.join(split_path, class_name)
        if not os.path.isdir(folder):
            print(f"[WARNING] Missing folder: {folder}")
            continue

        print(f"Loading '{class_name}' frequency images from {split_path} ...")
        for filename in os.listdir(folder):
            if not filename.lower().endswith(('.jpg', '.jpeg', '.png')):
                continue

            img_path = os.path.join(folder, filename)
            freq_map = compute_frequency_map(img_path)
            if freq_map is not None:
                X.append(freq_map[..., np.newaxis])  # add channel dim
                Y.append(label)

    return np.array(X), to_categorical(Y, 2)

## 5. Load Datasets

In [None]:
print("\n===== LOADING TRAIN DATA =====")
x_train, y_train = load_split(os.path.join(DATASET_PATH, "train"))
print("\n===== LOADING TRAIN (FREQ) DATA =====")
x_train_freq, y_train_freq = load_freq_split(os.path.join(DATASET_PATH, "train"))

print("\n===== LOADING VALIDATION DATA =====")
x_val, y_val = load_split(os.path.join(DATASET_PATH, "validation"))
print("\n===== LOADING VALIDATION (FREQ) DATA =====")
x_val_freq, y_val_freq = load_freq_split(os.path.join(DATASET_PATH, "validation"))

print("\n===== LOADING TEST DATA =====")
x_test, y_test = load_split(os.path.join(DATASET_PATH, "test"))
print("\n===== LOADING TEST (FREQ) DATA =====")
x_test_freq, y_test_freq = load_freq_split(os.path.join(DATASET_PATH, "test"))

print("\nDataset Summary:")
print(f"Train: {len(x_train)} images | Freq: {len(x_train_freq)} images")
print(f"Validation: {len(x_val)} images | Freq: {len(x_val_freq)} images")
print(f"Test: {len(x_test)} images | Freq: {len(x_test_freq)} images")


===== LOADING TRAIN DATA =====
Loading 'real' images from /content/drive/MyDrive/Data Set 1/Data Set 1/train ...


## 6. Build CNN Model

In [None]:
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Conv2D(64, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Conv2D(128, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Conv2D(128, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Flatten(),
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(2, activation='softmax')   # real vs fake
])

model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
 )

freq_model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 1)),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Conv2D(64, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Conv2D(128, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Conv2D(128, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Flatten(),
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(2, activation='softmax')
])

freq_model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
 )

print("ELA model:")
model.summary()
print("\nFrequency model:")
freq_model.summary()

## 7. Training

In [None]:
callbacks = [
    EarlyStopping(monitor='val_accuracy', patience=8, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6),
    ModelCheckpoint('best_ela_model.h5', monitor='val_accuracy', save_best_only=True)
]

callbacks_freq = [
    EarlyStopping(monitor='val_accuracy', patience=8, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6),
    ModelCheckpoint('best_freq_model.h5', monitor='val_accuracy', save_best_only=True)
]

if len(x_train) > 0:
    print("\n===== STARTING TRAINING (ELA) =====")
    history = model.fit(
        x_train, y_train,
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=(x_val, y_val),
        callbacks=callbacks,
        verbose=1
)
else:
    print("No ELA training data found. Please check your DATASET_PATH.")

if len(x_train_freq) > 0:
    print("\n===== STARTING TRAINING (FREQ) =====")
    history_freq = freq_model.fit(
        x_train_freq, y_train_freq,
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=(x_val_freq, y_val_freq),
        callbacks=callbacks_freq,
        verbose=1
)
else:
    print("No frequency training data found. Please check your DATASET_PATH.")

## 8. Performance Visualization

In [None]:
def visualize_frequency_analysis(img_path):
    if not os.path.exists(img_path):
        print(f"File not found: {img_path}")
        return

    # ---------------------------
    # Load original + ELA
    # ---------------------------
    original = Image.open(img_path).resize(IMAGE_SIZE)

    ela_img = convert_to_ela_image(img_path)
    if ela_img is None:
        print(f"ELA failed for {img_path}")
        return

    # ---------------------------
    # Compute FFT + band energies
    # ---------------------------
    freq_map = compute_frequency_map(img_path)
    energy = band_energy_stats(freq_map)

    # ---------------------------
    # COMPUTE RADIAL FFT PROFILE
    # ---------------------------
    h, w = freq_map.shape
    cy, cx = h // 2, w // 2
    y, x = np.indices((h, w))
    radius = np.sqrt((x - cx)**2 + (y - cy)**2).astype(int)

    radial_sum = np.bincount(radius.ravel(), freq_map.ravel())
    radial_count = np.bincount(radius.ravel())
    radial_profile = radial_sum / np.maximum(radial_count, 1)

    # -------------------------------------
    # CNN PREDICTION (FOR VISUALIZATION)
    # -------------------------------------
    # Convert ELA image → model input
    ela_tensor = np.array(ela_img.resize(IMAGE_SIZE)).astype("float32") / 255.0
    ela_tensor = np.expand_dims(ela_tensor, axis=0)

    pred = model.predict(ela_tensor)[0]    # [real_prob, fake_prob]
    label = "FORGED" if pred[1] > pred[0] else "REAL"
    conf = max(pred)

    # ---------------------------
    # PLOTS (5 panels)
    # ---------------------------
    fig, axes = plt.subplots(1, 5, figsize=(26, 5))

    # Original
    axes[0].imshow(original)
    axes[0].set_title("Original")
    axes[0].axis("off")

    # ELA
    axes[1].imshow(ela_img)
    axes[1].set_title("ELA Output")
    axes[1].axis("off")

    # FFT Heatmap
    axes[2].imshow(freq_map, cmap="magma")
    axes[2].set_title(
        f"FFT Magnitude\nLow/Mid/High: {energy['low']:.3f} / "
        f"{energy['mid']:.3f} / {energy['high']:.3f}"
    )
    axes[2].axis("off")

    # FFT Radial Graph
    axes[3].plot(radial_profile, linewidth=2)
    axes[3].set_title("Radial FFT Frequency Profile")
    axes[3].set_xlabel("Radius → Frequency")
    axes[3].set_ylabel("Energy")
    axes[3].grid(True)

    # CNN Prediction Panel
    axes[4].axis("off")
    axes[4].text(
        0.1,
        0.5,
        f" CNN Prediction\n\n Label: {label}\n Confidence: {conf:.3f}\n\nProbabilities:\nReal: {pred[0]:.3f}\nFake: {pred[1]:.3f}",
        fontsize=16,
        bbox=dict(facecolor="white", alpha=0.9)
    )

    plt.tight_layout()
    plt.show()

    # Terminal summary
    print(f"Energy stats for {img_path} → "
          f"low: {energy['low']:.3f}, mid: {energy['mid']:.3f}, high: {energy['high']:.3f}")
    print(f"CNN Prediction → {label} (confidence={conf:.3f})")


## 9. Evaluation

In [None]:
if len(x_test) > 0:
    test_loss, test_acc = model.evaluate(x_test, y_test)
    print(f"\n===== TEST ACCURACY (ELA): {test_acc * 100:.1f}% =====")
else:
    print("No ELA test data available.")

if len(x_test_freq) > 0:
    freq_test_loss, freq_test_acc = freq_model.evaluate(x_test_freq, y_test_freq)
    print(f"===== TEST ACCURACY (FREQ): {freq_test_acc * 100:.1f}% =====")
else:
    print("No frequency test data available.")

## 10. Prediction Utility

In [None]:
def predict_image(img_path, show=False):
    if not os.path.exists(img_path):
        print(f"Error: File not found -> {img_path}")
        return "Unknown", 0.0, None, None

    original = Image.open(img_path).resize(IMAGE_SIZE)
    ela_img = convert_to_ela_image(img_path, quality=ELA_QUALITY)
    img_array = np.array(ela_img) / 255.0
    img_array = img_array.reshape(1, IMAGE_SIZE[0], IMAGE_SIZE[1], 3)
    pred = model.predict(img_array)
    label = "Forged" if np.argmax(pred) == 1 else "Real"
    confidence = float(np.max(pred))

    if show:
        plt.figure(figsize=(10, 4))
        plt.subplot(1, 2, 1)
        plt.imshow(original)
        plt.title("Original")
        plt.axis("off")
        plt.subplot(1, 2, 2)
        plt.imshow(ela_img)
        plt.title(f"ELA\nPrediction: {label} ({confidence * 100:.1f}%)")
        plt.axis("off")
        plt.show()

    return label, confidence, original, ela_img

def predict_image_freq(img_path):
    if not os.path.exists(img_path):
        print(f"Error: File not found -> {img_path}")
        return "Unknown", 0.0, None

    freq_map = compute_frequency_map(img_path)
    freq_tensor = freq_map[np.newaxis, ..., np.newaxis]
    pred = freq_model.predict(freq_tensor)
    label = "Forged" if np.argmax(pred) == 1 else "Real"
    confidence = float(np.max(pred))
    return label, confidence, freq_map

import random

def get_random_images_from_split(split_name="test", num_images=10):
    """Get random images from given split ('test' or 'validation')."""
    all_images = []

    for class_name in ("fake", "real"):
        folder = os.path.join(DATASET_PATH, split_name, class_name)
        if os.path.isdir(folder):
            files = [os.path.join(folder, f) for f in os.listdir(folder)
                     if f.lower().endswith((".jpg", ".jpeg", ".png"))]
            all_images.extend(files)
        else:
            print(f"Missing folder: {folder}")

    if len(all_images) == 0:
        print(f"No {split_name} images found!")
        return []

    num_to_select = min(num_images, len(all_images))
    selected = random.sample(all_images, num_to_select)
    print(f"Randomly selected {num_to_select} images from {len(all_images)} available {split_name} images\n")
    return selected

# Choose source split: 'test' or 'validation'
SOURCE_SPLIT = "test"  # change to "validation" to sample validation images

# Get random images from chosen split
image_files = get_random_images_from_split(split_name=SOURCE_SPLIT, num_images=4)

# Add 1 hardcoded image for comparison (only if it exists in dataset)
hardcoded_image = "./dataset/test/fake/dog_forged.jpg"
if os.path.exists(hardcoded_image):
    image_files = [hardcoded_image] + image_files

# 2. Loop through the list
for file_path in image_files:
    if not os.path.exists(file_path):
        print(f"File not found: {file_path}")
        continue

    # Get predictions without separate plots
    ela_label, ela_conf, original, ela_img = predict_image(file_path, show=False)
    freq_label, freq_conf, freq_map = predict_image_freq(file_path)

    # Ensemble prediction (simple average)
    ela_tensor = (np.array(ela_img) / 255.0)[np.newaxis, ...]
    freq_tensor = freq_map[np.newaxis, ..., np.newaxis]
    ela_probs = model.predict(ela_tensor)
    freq_probs = freq_model.predict(freq_tensor)
    ensemble_probs = (ela_probs + freq_probs) / 2.0
    ensemble_label = "Forged" if np.argmax(ensemble_probs) == 1 else "Real"
    ensemble_conf = float(np.max(ensemble_probs))

    if original is None or freq_map is None or ela_img is None:
        continue

    # Compute band energies for display
    energy = band_energy_stats(freq_map)

    # Combined visualization: Original | ELA | Frequency with labels
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    axes[0].imshow(original)
    axes[0].set_title("Original")
    axes[0].axis("off")

    axes[1].imshow(ela_img)
    axes[1].set_title(f"ELA: {ela_label} ({ela_conf * 100:.1f}%)")
    axes[1].axis("off")

    axes[2].imshow(freq_map, cmap="magma")
    axes[2].set_title(
        f"Freq: {freq_label} ({freq_conf * 100:.1f}%)\n"
        f"Ensemble: {ensemble_label} ({ensemble_conf * 100:.1f}%)\n"
        f"low/mid/high: {energy['low']:.3f} / {energy['mid']:.3f} / {energy['high']:.3f}")
    axes[2].axis("off")

    plt.tight_layout()
    plt.show()

    print(f"File: {file_path}")
    print(
        f"ELA: {ela_label} ({ela_conf * 100:.1f}%) | "
        f"Freq: {freq_label} ({freq_conf * 100:.1f}%) | "
        f"Ensemble: {ensemble_label} ({ensemble_conf * 100:.1f}%)")
    print("-" * 60)