# COMPSCI 4AL3: Deepfake Detection Project (PyTorch)

## Group 15

This notebook provides the starting structure for our deepfake classification project using PyTorch.

**Project Goal:** Build a model to classify images as "Real" or "Fake".

**Our Plan:**
1.  **Load Data:** Use `torchvision.transforms` and `ImageFolder` to load and preprocess the dataset.
2.  **Define Model:** Use the `DeepfakeClassifier` class (subclassing `nn.Module`) to build our CNN architecture.
3.  **Train Model:** Write a manual training and validation loop to train the model.
4.  **Evaluate:** Test the final model on the unseen test set and report metrics (Accuracy, Precision, Recall, F1-score, Confusion Matrix).

## 1. Setup: Import Libraries

Import all necessary packages. We'll use `torch` and `torchvision` to build our CNN, `sklearn` for evaluation metrics, and `matplotlib` for plotting results.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision
from torchvision import datasets, transforms

import matplotlib.pyplot as plt
import numpy as np
import os
from pathlib import Path
import time
import copy

# Scikit-learn for metrics
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

# Set global parameters
IMG_SIZE = 128
BATCH_SIZE = 32
IMAGE_SHAPE = (3, IMG_SIZE, IMG_SIZE) # PyTorch format: (Channels, Height, Width)

## 2. Data Loading and Preprocessing

We define a `transform` pipeline to resize, crop, convert images to Tensors, and normalize them. Then, we use `ImageFolder` to load the pre-split data and `DataLoader` to create batches.

In [None]:
# !!! UPDATE THIS PATH !!!
# Set the base directory where the 'Deepfake and Real Images' dataset is extracted
DATA_DIR = Path("./kaggle/input/deepfake-and-real-images/")

# Define paths for each split
train_dir = DATA_DIR / 'train'
val_dir = DATA_DIR / 'validation'
test_dir = DATA_DIR / 'test'

# 1.2: Preprocessing/Feature Engineering step
# Define transformations for the images
# Normalization values are standard for models pre-trained on ImageNet
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        # TODO: Consider adding data augmentation here (e.g., RandomHorizontalFlip)
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

try:
    # Create datasets using ImageFolder
    image_datasets = {
        'train': datasets.ImageFolder(train_dir, data_transforms['train']),
        'val': datasets.ImageFolder(val_dir, data_transforms['val']),
        'test': datasets.ImageFolder(test_dir, data_transforms['test'])
    }

    # 2. Train/Test split is already done by the dataset structure
    # Create DataLoaders
    dataloaders = {
        'train': DataLoader(image_datasets['train'], batch_size=BATCH_SIZE, shuffle=True, num_workers=4),
        'val': DataLoader(image_datasets['val'], batch_size=BATCH_SIZE, shuffle=False, num_workers=4),
        'test': DataLoader(image_datasets['test'], batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
    }

    dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val', 'test']}
    CLASS_NAMES = image_datasets['train'].classes

    print(f"Class names found: {CLASS_NAMES}")
    print(f"Dataset sizes: {dataset_sizes}")

    # Set device to GPU if available
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

except FileNotFoundError:
    print("Error: Dataset directory not found.")
    print(f"Please make sure the dataset is at: {DATA_DIR}")
    print("You can download it from: https://www.kaggle.com/datasets/manjilkarki/deepfake-and-real-images/data")

## 3. Model Definition: The `DeepfakeClassifier` Class

This class encapsulates our model's architecture by inheriting from `nn.Module`.

In [None]:
class DeepfakeClassifier(nn.Module):
    """1. A class which represents the model/classifier."""
    
    def __init__(self):
        """1.1: Initialize the model's layers."""
        super(DeepfakeClassifier, self).__init__()
        
        # --- START OF CNN ARCHITECTURE ---
        # TODO: This is a basic placeholder. Iterate and improve this architecture.
        
        # Block 1: (3 x 128 x 128) -> (32 x 64 x 64)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Block 2: (32 x 64 x 64) -> (64 x 32 x 32)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Block 3: (64 x 32 x 32) -> (128 x 16 x 16)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # --- END OF CNN ARCHITECTURE ---
        
        # Flatten the 3D features to 1D vector
        # Size will be 128 * 16 * 16 = 32768
        self.flatten = nn.Flatten()
        
        # Fully-connected layers
        self.fc1 = nn.Linear(in_features=128 * 16 * 16, out_features=128)
        # TODO: Consider adding Dropout layers
        # self.dropout = nn.Dropout(0.5)
        
        # Output layer: 1 node for binary classification
        self.fc2 = nn.Linear(in_features=128, out_features=1)

    def forward(self, x):
        """Defines the forward pass of the data through the layers."""
        
        # Pass through conv/pool blocks
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        
        x = F.relu(self.conv3(x))
        x = self.pool3(x)
        
        # Flatten for the fully-connected layers
        x = self.flatten(x)
        
        # Pass through dense layers
        x = F.relu(self.fc1(x))
        # x = self.dropout(x) # Apply dropout if using
        
        # Output layer (no activation here, as nn.BCEWithLogitsLoss combines sigmoid + loss)
        x = self.fc2(x)
        return x


## 4. Model Training

First, we instantiate the model, optimizer, and loss function. Then, we create a function to handle the training and validation loops (1.3).

In [None]:
# 1. Instantiate the model and move it to the device
model = DeepfakeClassifier().to(device)

# Optional: Print a summary of the model (requires torchinfo)
# !pip install torchinfo
# from torchinfo import summary
# summary(model, input_size=(BATCH_SIZE, 3, IMG_SIZE, IMG_SIZE))

# 2. Define Loss Function and Optimizer
# Use BCEWithLogitsLoss, which is numerically stable and includes the sigmoid activation
criterion = nn.BCEWithLogitsLoss()

# Use the Adam optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# TODO: Consider a learning rate scheduler
# exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
def train_model(model, criterion, optimizer, num_epochs=10):
    """1.3: Function for training and making predictions (validation)."""
    start_time = time.time()
    
    # To track history
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)

        # Each epoch has a training and validation phase (3. Validation)
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                # Labels need to be float and shape [batch_size, 1] for BCEWithLogitsLoss
                labels = labels.float().view(-1, 1).to(device)

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward pass
                # Track history only if in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    
                    # Get predictions (apply sigmoid and threshold)
                    preds = (torch.sigmoid(outputs) > 0.5).float()

                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            
            history[f'{phase}_loss'].append(epoch_loss)
            history[f'{phase}_acc'].append(epoch_acc.item())

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # Deep copy the model if it's the best one yet
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
        
        print()

    time_elapsed = time.time() - start_time
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model, history


In [None]:
# TODO: Adjust the number of epochs as needed.
EPOCHS = 15

if 'dataloaders' in locals():
    model, history = train_model(model, criterion, optimizer, num_epochs=EPOCHS)
else:
    print("Skipping training because data was not loaded.")

## 5. Results and Evaluation

Here we'll plot our training history and print the final classification metrics from the test set, as outlined in the proposal.

In [None]:
def plot_training_history(history):
    """4. Plots the model's training and validation accuracy/loss."""
    
    # Convert tensor values to CPU numpy arrays if they aren't already
    acc = [h for h in history['train_acc']]
    val_acc = [h for h in history['val_acc']]
    loss = [h for h in history['train_loss']]
    val_loss = [h for h in history['val_loss']]

    epochs_range = range(len(acc))

    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.show()

if 'history' in locals():
    plot_training_history(history)


### 5.2. Final Evaluation on Test Set

Now we use the unseen test set to get our final, unbiased metrics.

In [None]:
def get_predictions(model, data_loader):
    """1.3: Helper function to get predictions on the test set."""
    model.eval()   # Set model to evaluate mode
    all_preds = []
    all_labels = []
    
    with torch.no_grad(): # Disable gradient calculation
        for inputs, labels in data_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            # Get predictions (apply sigmoid and threshold)
            preds = (torch.sigmoid(outputs) > 0.5).float()
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            
    return np.array(all_labels), np.array(all_preds).flatten()


if 'dataloaders' in locals() and 'model' in locals():
    # 1. Get predictions and true labels
    y_true, y_pred = get_predictions(model, dataloaders['test'])
    
    # 2. Print detailed classification report (Precision, Recall, F1-score)
    print("\n--- Classification Report ---")
    print(classification_report(y_true, y_pred, target_names=CLASS_NAMES))
    
    # 3. Plot Confusion Matrix
    print("\n--- Confusion Matrix ---")
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=CLASS_NAMES)
    disp.plot(cmap=plt.cm.Blues)
    plt.show()

else:
    print("Skipping evaluation because data or model is not available.")

## 6. Next Steps

Our goal is to achieve at least 60% accuracy.

To improve this model, we should focus on:
1.  **Iterating on `DeepfakeClassifier`:** 
    * Try adding more `nn.Conv2d` layers.
    * Try different `out_channels` (e.g., 32, 64, 128, 256).
    * Add `nn.Dropout(0.5)` layers after `F.relu` in the fully-connected part to reduce overfitting.
2.  **Hyperparameter Tuning:**
    * Try different optimizers (e.g., `optim.SGD`, `optim.RMSprop`).
    * Try different learning rates (e.g., `lr=0.0001`).
    * Add a learning rate scheduler (like `optim.lr_scheduler.StepLR`).
    * Increase the number of `EPOCHS`.
3.  **Feature Engineering:**
    * Add data augmentation to the `data_transforms['train']` pipeline. For example:
        * `transforms.RandomHorizontalFlip()`
        * `transforms.RandomRotation(10)`
        * `transforms.ColorJitter(brightness=0.2, contrast=0.2)`