In [1]:
import os
import io
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import itertools
from tqdm import tqdm
from datetime import datetime

# PIL/ELA
from PIL import Image, ImageChops, ImageEnhance

# Torch & vision
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models

# Metrics & utils
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

# Reproducibility
np.random.seed(2)
torch.manual_seed(2)

<torch._C.Generator at 0x21c03cc1510>

In [2]:
def convert_to_ela_image(path, quality):
    filename = path
    im = Image.open(filename).convert('RGB')
    # Save to in-memory buffer to avoid disk I/O stalls
    buf = io.BytesIO()
    im.save(buf, format='JPEG', quality=quality)
    buf.seek(0)
    resaved_im = Image.open(buf)

    ela_im = ImageChops.difference(im, resaved_im)
    extrema = ela_im.getextrema()
    max_diff = max([ex[1] for ex in extrema])
    if max_diff == 0:
        max_diff = 1
    scale = 255.0 / max_diff
    ela_im = ImageEnhance.Brightness(ela_im).enhance(scale)

    im.close()
    resaved_im.close()
    buf.close()
    return ela_im


def build_image_list(path_to_image, label, images):
    for file in tqdm(os.listdir(path_to_image)):
        try:
            if file.endswith('jpg') or file.endswith('JPG') or file.endswith('jpeg') or file.endswith('JPEG'):
                if int(os.stat(os.path.join(path_to_image, file)).st_size) > 10000:
                    line = os.path.join(path_to_image, file) + ',' + label + '\n'
                    images.append(line)
        except Exception:
            print(os.path.join(path_to_image, file))
    return images


In [3]:
custom_path_original = r"C:\Users\Julian\Documents\Dataset\train\authentic"
custom_path_tampered = r"C:\Users\Julian\Documents\Dataset\train\tampered"


In [4]:
training_data_set = 'dataset.csv'

In [5]:
images = []
images = build_image_list(custom_path_original, '0', images)
images = build_image_list(custom_path_tampered, '1', images)

100%|███████████████████████████████████████████████████████████████████████████| 2100/2100 [00:00<00:00, 12549.35it/s]
100%|███████████████████████████████████████████████████████████████████████████| 2100/2100 [00:00<00:00, 12203.18it/s]


In [6]:
image_name = []
label = []
for i in tqdm(range(len(images))):
    image_name.append(images[i][0:-3])
    label.append(images[i][-2])

dataset = pd.DataFrame({'image':image_name,'class_label':label})
dataset.to_csv(training_data_set,index=False)

100%|█████████████████████████████████████████████████████████████████████████| 4200/4200 [00:00<00:00, 1018152.63it/s]


In [7]:
# If you have separate directories for validation/testing, set them here.
# Expected structure (case-insensitive file suffixes jpg/jpeg):
#   <root>/original/*.jpg
#   <root>/forged/*.jpg
val_root = r'C:\Users\Julian\Documents\Dataset\validation'
test_root = r'C:\Users\Julian\Documents\Dataset\testing'

class ELAImageFolder(Dataset):
    def __init__(self, root_dir, transform, ela_quality=90):
        self.root = root_dir
        self.transform = transform
        self.ela_quality = ela_quality
        self.samples = []
        # label map: original=0, forged=1
        for label_name, label in [('original', 0), ('forged', 1)]:
            folder = os.path.join(root_dir, label_name)
            if not os.path.isdir(folder):
                continue
            for fn in os.listdir(folder):
                if fn.lower().endswith(('jpg','jpeg')):
                    full = os.path.join(folder, fn)
                    if os.path.isfile(full) and os.stat(full).st_size > 10000:
                        self.samples.append((full, float(label)))
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        path, label = self.samples[idx]
        ela_im = convert_to_ela_image(path, self.ela_quality)
        img = self.transform(ela_im)
        y = torch.tensor([label], dtype=torch.float32)
        return img, y


In [10]:
# Switch validation loader to use external directory if present
use_external_val = os.path.isdir(val_root)
use_external_test = os.path.isdir(test_root)

if use_external_val:
    external_val_ds = ELAImageFolder(val_root, transform=val_transform, ela_quality=90)
    val_loader = DataLoader(external_val_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=torch.cuda.is_available())
    print(f"Using external validation from: {val_root} (n={len(external_val_ds)})")
else:
    print("Using 20% split from dataset.csv for validation.")

if use_external_test:
    test_ds = ELAImageFolder(test_root, transform=val_transform, ela_quality=90)
    test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=torch.cuda.is_available())
    print(f"Using external testing from: {test_root} (n={len(test_ds)})")
else:
    test_loader = None
    print("No external testing directory found; skipping test evaluation.")


Using external validation from: C:\Users\Julian\Documents\Dataset\validation (n=0)
Using external testing from: C:\Users\Julian\Documents\Dataset\testing (n=0)


In [9]:
# Torch dataset with ELA conversion
class ELAImageDataset(Dataset):
    def __init__(self, csv_path, transform=None, ela_quality=90):
        df = pd.read_csv(csv_path)
        self.image_paths = df['image'].tolist()
        self.labels = df['class_label'].astype(int).tolist()
        self.transform = transform
        self.ela_quality = ela_quality

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        path = self.image_paths[idx]
        label = float(self.labels[idx])
        ela_im = convert_to_ela_image(path, self.ela_quality)
        if self.transform is not None:
            img = self.transform(ela_im)
        else:
            img = transforms.ToTensor()(ela_im)
        # BCEWithLogitsLoss expects float targets of shape [N, 1]
        y = torch.tensor([label], dtype=torch.float32)
        return img, y

# Transforms for ResNet50 input size 224x224
# Train uses augmentations; Val is deterministic
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Build dataset from CSV built via build_image_list() using
#   images/training/original/ (label 0)
#   images/training/forged/   (label 1)
csv_path = 'dataset.csv'
full_dataset = ELAImageDataset(csv_path, transform=None, ela_quality=90)

# Train-test split (aligned with original split settings)
indices = np.arange(len(full_dataset))
train_idx, val_idx = train_test_split(indices, test_size=0.2, random_state=5, shuffle=True, stratify=[full_dataset.labels[i] for i in indices])

# Wrap subsets to apply different transforms
class SubsetWithTransform(Dataset):
    def __init__(self, base_dataset, indices, transform):
        self.base = base_dataset
        self.indices = indices
        self.transform = transform
    def __len__(self):
        return len(self.indices)
    def __getitem__(self, i):
        img, y = self.base[self.indices[i]]
        # base returns tensor if base has transform; here base has None, so img is PIL Image from ELA
        if isinstance(img, torch.Tensor):
            # Defensive: if base was changed to yield tensor, just normalize pipeline
            pil = transforms.ToPILImage()(img)
            img = self.transform(pil)
        else:
            img = self.transform(img)
        return img, y

train_dataset = SubsetWithTransform(full_dataset, train_idx, train_transform)
val_dataset = SubsetWithTransform(full_dataset, val_idx, val_transform)

batch_size = 32
# Use 0 workers on Windows to avoid potential DataLoader hang
num_workers = 0

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=torch.cuda.is_available())
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=torch.cuda.is_available())

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Quick sanity check to ensure loaders iterate
_sanity_images, _sanity_targets = next(iter(train_loader))
print(f"Sanity batch: images={_sanity_images.shape}, targets={_sanity_targets.shape}")


Sanity batch: images=torch.Size([32, 3, 224, 224]), targets=torch.Size([32, 1])


In [11]:
# Load pretrained ResNet50 and modify head for binary classification
resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)

# Replace final layer for binary classification (logit)
in_features = resnet.fc.in_features
resnet.fc = nn.Linear(in_features, 1)

# Phase 1: freeze all backbone layers
for name, param in resnet.named_parameters():
    param.requires_grad = False
# Unfreeze only the final classification head
for param in resnet.fc.parameters():
    param.requires_grad = True

resnet = resnet.to(device)

# Print summary for frozen model
try:
    summary(resnet, (3, 224, 224))
except Exception as e:
    print(resnet)

# Loss and optimizer for phase 1
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=1e-3)



ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [12]:
def train_one_epoch(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for images, targets in tqdm(loader, desc='Train', leave=False):
        images = images.to(device)
        targets = targets.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        preds = (torch.sigmoid(outputs) > 0.5).float()
        correct += (preds == targets).sum().item()
        total += targets.size(0)
    return running_loss / total, correct / total


def evaluate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, targets in tqdm(loader, desc='Eval', leave=False):
            images = images.to(device)
            targets = targets.to(device)
            outputs = model(images)
            loss = criterion(outputs, targets)
            running_loss += loss.item() * images.size(0)
            preds = (torch.sigmoid(outputs) > 0.5).float()
            correct += (preds == targets).sum().item()
            total += targets.size(0)
    return running_loss / total, correct / total


# Training configuration analogous to original (epochs/batch_size adjusted for ResNet)
epochs_phase1 = 10
history = {
    'phase1_train_loss': [], 'phase1_val_loss': [],
    'phase1_train_acc': [], 'phase1_val_acc': [],
    'phase2_train_loss': [], 'phase2_val_loss': [],
    'phase2_train_acc': [], 'phase2_val_acc': [],
}

best_val_acc_phase1 = 0.0
for epoch in range(1, epochs_phase1 + 1):
    train_loss, train_acc = train_one_epoch(resnet, train_loader, criterion, optimizer, device)
    val_loss, val_acc = evaluate(resnet, val_loader, criterion, device)

    history['phase1_train_loss'].append(train_loss)
    history['phase1_val_loss'].append(val_loss)
    history['phase1_train_acc'].append(train_acc)
    history['phase1_val_acc'].append(val_acc)

    print(f"[Phase 1] Epoch {epoch}/{epochs_phase1} - loss: {train_loss:.4f} acc: {train_acc:.4f} - val_loss: {val_loss:.4f} val_acc: {val_acc:.4f}")

    if val_acc > best_val_acc_phase1:
        best_val_acc_phase1 = val_acc



                                                                                                                       

ZeroDivisionError: float division by zero

In [None]:
# Phase 2: unfreeze last few layers for fine-tuning
# Limit to layer4 only to reduce overfitting risk

# Re-freeze everything first (safety)
for p in resnet.parameters():
    p.requires_grad = False

# Unfreeze only layer4
for name, module in resnet.named_children():
    if name == 'layer4':
        for p in module.parameters():
            p.requires_grad = True
# Keep head trainable
for p in resnet.fc.parameters():
    p.requires_grad = True

# Differential learning rates and weight decay
backbone_params = [p for n, p in resnet.named_parameters() if p.requires_grad and not n.startswith('fc.')]
head_params = [p for n, p in resnet.named_parameters() if p.requires_grad and n.startswith('fc.')]
optimizer_ft = torch.optim.Adam([
    {'params': backbone_params, 'lr': 1e-4, 'weight_decay': 1e-4},
    {'params': head_params, 'lr': 3e-4, 'weight_decay': 1e-4},
])

# Print summary for fine-tuning configuration
try:
    summary(resnet, (3, 224, 224))
except Exception as e:
    print(resnet)

# Early stopping on val loss
epochs_phase2 = 10
best_val = float('inf')
patience = 3
pat = 0
best_state = None

for epoch in range(1, epochs_phase2 + 1):
    train_loss, train_acc = train_one_epoch(resnet, train_loader, criterion, optimizer_ft, device)
    val_loss, val_acc = evaluate(resnet, val_loader, criterion, device)

    history['phase2_train_loss'].append(train_loss)
    history['phase2_val_loss'].append(val_loss)
    history['phase2_train_acc'].append(train_acc)
    history['phase2_val_acc'].append(val_acc)

    print(f"[Phase 2] Epoch {epoch}/{epochs_phase2} - loss: {train_loss:.4f} acc: {train_acc:.4f} - val_loss: {val_loss:.4f} val_acc: {val_acc:.4f}")

    if val_loss < best_val:
        best_val = val_loss
        pat = 0
        best_state = {k: v.cpu().clone() for k, v in resnet.state_dict().items()}
    else:
        pat += 1
        if pat >= patience:
            print("Early stopping triggered.")
            break

# Restore best weights if early stopped
if best_state is not None:
    resnet.load_state_dict(best_state)


In [None]:
# Save fine-tuned model
os.makedirs('model', exist_ok=True)
torch.save(resnet.state_dict(), os.path.join('model', 'image_forgery_finetuned.pth'))


In [None]:
# Optional: evaluate on external test set if provided
if test_loader is not None:
    test_loss, test_acc = evaluate(resnet, test_loader, criterion, device)
    print(f"[Test] loss: {test_loss:.4f} acc: {test_acc:.4f}")
    # Test confusion matrix
    resnet.eval()
    all_preds, all_true = [], []
    with torch.no_grad():
        for images, targets in test_loader:
            images = images.to(device)
            outputs = resnet(images)
            preds = (torch.sigmoid(outputs).cpu().numpy() > 0.5).astype(int).flatten().tolist()
            all_preds.extend(preds)
            all_true.extend(targets.numpy().astype(int).flatten().tolist())
    cm_test = confusion_matrix(all_true, all_preds)
    plot_confusion_matrix(cm_test, classes=range(2), title='Confusion matrix (Test)')


In [None]:
fig, ax = plt.subplots(2,2, figsize=(10,8))
# Phase 1
ax[0,0].plot(history['phase1_train_loss'], color='b', label='Training loss (P1)')
ax[0,0].plot(history['phase1_val_loss'], color='r', label='Validation loss (P1)')
ax[0,0].legend(loc='best', shadow=True)
ax[0,0].set_title('Phase 1 Loss')

ax[1,0].plot(history['phase1_train_acc'], color='b', label='Training acc (P1)')
ax[1,0].plot(history['phase1_val_acc'], color='r', label='Validation acc (P1)')
ax[1,0].legend(loc='best', shadow=True)
ax[1,0].set_title('Phase 1 Accuracy')

# Phase 2
ax[0,1].plot(history['phase2_train_loss'], color='b', label='Training loss (P2)')
ax[0,1].plot(history['phase2_val_loss'], color='r', label='Validation loss (P2)')
ax[0,1].legend(loc='best', shadow=True)
ax[0,1].set_title('Phase 2 Loss')

ax[1,1].plot(history['phase2_train_acc'], color='b', label='Training acc (P2)')
ax[1,1].plot(history['phase2_val_acc'], color='r', label='Validation acc (P2)')
ax[1,1].legend(loc='best', shadow=True)
ax[1,1].set_title('Phase 2 Accuracy')
plt.tight_layout()


In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

# Predict validation set for confusion matrix
resnet.eval()
all_preds = []
all_true = []
with torch.no_grad():
    for images, targets in val_loader:
        images = images.to(device)
        outputs = resnet(images)
        preds = (torch.sigmoid(outputs).cpu().numpy() > 0.5).astype(int).flatten().tolist()
        all_preds.extend(preds)
        all_true.extend(targets.numpy().astype(int).flatten().tolist())

confusion_mtx = confusion_matrix(all_true, all_preds)
plot_confusion_matrix(confusion_mtx, classes=range(2))
