In [None]:
pip install torch torchvision kagglehub matplotlib seaborn tqdm scikit-learn pandas

In [None]:
import os
import copy
import time
import torch
import torchvision
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from torch import nn, optim
from torchvision import transforms, datasets, models
from torchvision.models import Inception_V3_Weights
from torch.utils.data import DataLoader
from sklearn.metrics import roc_auc_score, confusion_matrix, roc_curve, precision_recall_curve, average_precision_score, f1_score
from tqdm import tqdm
import kagglehub
import pandas as pd
from sklearn.calibration import calibration_curve

In [None]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
# Download dataset
path = kagglehub.dataset_download("manjilkarki/deepfake-and-real-images")
dataset_dir = os.path.join(path, "Dataset")

In [None]:
# Define transforms with images resized to 299x299
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((299, 299)),
        transforms.ToTensor(),
    ]),
    'val': transforms.Compose([
        transforms.Resize((299, 299)),
        transforms.ToTensor(),
    ]),
    'test': transforms.Compose([
        transforms.Resize((299, 299)),
        transforms.ToTensor(),
    ]),
}


In [None]:
# Load datasets
train_dataset = datasets.ImageFolder(os.path.join(dataset_dir, "Train"), data_transforms['train'])
val_dataset = datasets.ImageFolder(os.path.join(dataset_dir, "Validation"), data_transforms['val'])
test_dataset = datasets.ImageFolder(os.path.join(dataset_dir, "Test"), data_transforms['test'])

In [None]:
# Create dataloaders
batch_size = 32
dataloaders = {
    'train': DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, drop_last=True),
    'val': DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=4, drop_last=True),
    'test': DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=4, drop_last=True)
}

In [None]:
# Function to plot dataset insights
def plot_dataset_insights(dataset, split_name):
    # Calculate class counts
    class_counts = {cls: 0 for cls in dataset.classes}
    for _, label in dataset.samples:
        class_counts[dataset.classes[label]] += 1

    # Prepare data for plotting
    classes = list(class_counts.keys())
    counts = list(class_counts.values())

    # Create bar plot
    plt.figure(figsize=(6, 4))
    bars = plt.bar(classes, counts, color=['skyblue', 'salmon'])
    plt.title(f"{split_name} Dataset Insights")
    plt.xlabel("Class")
    plt.ylabel("Number of images")

    # Add text labels on top of each bar
    for bar in bars:
        yval = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2.0, yval + 0.05*yval, f'{yval}', ha='center', va='bottom')

    plt.show()

# Plot dataset insights for each split
plot_dataset_insights(train_dataset, "Train")
plot_dataset_insights(val_dataset, "Validation")
plot_dataset_insights(test_dataset, "Test")

In [None]:
# Initialize InceptionV3 model (with aux_logits disabled for a single output)
model = models.inception_v3(weights=Inception_V3_Weights.IMAGENET1K_V1, aux_logits=True)
num_ftrs = model.fc.in_features
# Replace the final fully connected layer so that it outputs one value for binary classification
model.fc = nn.Linear(num_ftrs, 1)

if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs!")
    model = nn.DataParallel(model)
model = model.to(device)

In [None]:
# Training setup
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, min_lr=1e-7)

In [None]:
# Training configuration
num_epochs = 20
early_stop_patience = 3
best_auc = 0
epochs_no_improve = 0

# Lists to store epoch-level metrics for post-training insights
epoch_train_losses, epoch_val_losses = [], []
epoch_train_accs, epoch_val_accs = [], []
epoch_train_aucs, epoch_val_aucs = [], []

In [None]:
for epoch in range(num_epochs):
    print(f"\nEpoch {epoch+1}/{num_epochs}")
    
    for phase in ['train', 'val']:
        if phase == 'train':
            model.train()
        else:
            model.eval()
            
        running_loss = 0.0
        correct = 0
        total = 0
        all_labels = []
        all_preds = []
        
        pbar = tqdm(dataloaders[phase], desc=f"{phase.capitalize()} Phase", leave=False)
        for inputs, labels in pbar:
            inputs = inputs.to(device)
            labels = labels.to(device).float().unsqueeze(1)
            
            with torch.set_grad_enabled(phase == 'train'):
                outputs = model(inputs)
                # If aux_logits=True, the model returns a tuple (main_output, aux_output)
                if isinstance(outputs, tuple):
                    outputs = outputs[0]
                loss = criterion(outputs, labels)
                if phase == 'train':
                    loss.backward()
                    optimizer.step()
                    optimizer.zero_grad()
                
            running_loss += loss.item() * inputs.size(0)
            preds = torch.sigmoid(outputs).detach().cpu().numpy()
            batch_preds = (preds >= 0.5).astype(int)
            batch_labels = labels.cpu().numpy().astype(int)
            
            all_preds.extend(preds.flatten())
            all_labels.extend(batch_labels.flatten())
            correct += (batch_preds == batch_labels).sum()
            total += inputs.size(0)
            pbar.set_postfix({"Loss": f"{loss.item():.4f}", "Acc": f"{(batch_preds == batch_labels).mean():.4f}"})
        
        epoch_loss = running_loss / len(dataloaders[phase].dataset)
        epoch_acc = correct / total
        epoch_auc = roc_auc_score(all_labels, all_preds)
        current_lr = optimizer.param_groups[0]['lr']
        
        print(f"{phase.capitalize()} - Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}, AUC: {epoch_auc:.4f}, LR: {current_lr:.6f}")
        
        if phase == 'train':
            epoch_train_losses.append(epoch_loss)
            epoch_train_accs.append(epoch_acc)
            epoch_train_aucs.append(epoch_auc)
        else:
            epoch_val_losses.append(epoch_loss)
            epoch_val_accs.append(epoch_acc)
            epoch_val_aucs.append(epoch_auc)
            scheduler.step(epoch_auc)
            if epoch_auc > best_auc:
                best_auc = epoch_auc
                # Save the entire model (architecture + parameters)
                torch.save(model, 'best_model_inceptionv3.pth')
                epochs_no_improve = 0
            else:
                epochs_no_improve += 1

    if epochs_no_improve >= early_stop_patience:
        print("Early stopping triggered")
        break

In [None]:
torch.save(model, "final_model_inceptionv3.pth")

In [None]:
# Plot epoch-level Loss vs Epoch
plt.figure(figsize=(8, 6))
plt.plot(epoch_train_losses, label='Train Loss')
plt.plot(epoch_val_losses, label='Validation Loss')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Loss vs Epoch")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Plot epoch-level AUC vs Epoch
plt.figure(figsize=(8, 6))
plt.plot(epoch_train_aucs, label='Train AUC')
plt.plot(epoch_val_aucs, label='Validation AUC')
plt.xlabel("Epoch")
plt.ylabel("AUC")
plt.title("AUC vs Epoch")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Testing phase

model = torch.load("final_model_inceptionv3.pth")

model.eval()
test_loss = 0.0
correct = 0
total = 0
all_labels = []
all_preds = []

In [None]:
pbar_test = tqdm(dataloaders['test'], desc="Testing Phase", leave=False)
with torch.no_grad():
    for inputs, labels in pbar_test:
        inputs = inputs.to(device)
        labels = labels.to(device).float().unsqueeze(1)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        test_loss += loss.item() * inputs.size(0)
        preds = torch.sigmoid(outputs).cpu().numpy()
        batch_preds = (preds >= 0.5).astype(int)
        batch_labels = labels.cpu().numpy().astype(int)
        
        all_preds.extend(preds.flatten())
        all_labels.extend(batch_labels.flatten())
        correct += (batch_preds == batch_labels).sum()
        total += inputs.size(0)
        pbar_test.set_postfix({"Loss": f"{loss.item():.4f}", "Acc": f"{(batch_preds == batch_labels).mean():.4f}"})

test_loss = test_loss / len(dataloaders['test'].dataset)
test_acc = correct / total
test_auc = roc_auc_score(all_labels, all_preds)
current_lr = optimizer.param_groups[0]['lr']

print("\nTest Metrics:")
print(f"Loss: {test_loss:.4f}, Accuracy: {test_acc:.4f}, AUC: {test_auc:.4f}, LR: {current_lr:.6f}")

In [None]:
# Confusion Matrix
cm = confusion_matrix(all_labels, (np.array(all_preds) >= 0.5))
plt.figure(figsize=(6, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Fake', 'Real'], yticklabels=['Fake', 'Real'])
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

In [None]:
# ROC Curve
fpr, tpr, _ = roc_curve(all_labels, all_preds)
plt.figure()
plt.plot(fpr, tpr, label=f'AUC = {test_auc:.2f}')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.show()

In [None]:
# Load best model weights and re-test
model = torch.load("best_model_inceptionv3.pth")

# Testing Best Model
model.eval()
test_loss = 0.0
correct = 0
total = 0
all_labels = []
all_preds = []

In [None]:
pbar_test = tqdm(dataloaders['test'], desc="Testing Best Model Phase", leave=False)
with torch.no_grad():
    for inputs, labels in pbar_test:
        inputs = inputs.to(device)
        labels = labels.to(device).float().unsqueeze(1)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        test_loss += loss.item() * inputs.size(0)
        preds = torch.sigmoid(outputs).cpu().numpy()
        batch_preds = (preds >= 0.5).astype(int)
        batch_labels = labels.cpu().numpy().astype(int)
        
        all_preds.extend(preds.flatten())
        all_labels.extend(batch_labels.flatten())
        correct += (batch_preds == batch_labels).sum()
        total += inputs.size(0)
        pbar_test.set_postfix({"Loss": f"{loss.item():.4f}", "Acc": f"{(batch_preds == batch_labels).mean():.4f}"})

test_loss = test_loss / len(dataloaders['test'].dataset)
test_acc = correct / total
test_auc = roc_auc_score(all_labels, all_preds)
current_lr = optimizer.param_groups[0]['lr']

print("\nTest Metrics for Best Model:")
print(f"Loss: {test_loss:.4f}, Accuracy: {test_acc:.4f}, AUC: {test_auc:.4f}, LR: {current_lr:.6f}")

In [None]:
# Confusion Matrix for Best Model
cm = confusion_matrix(all_labels, (np.array(all_preds) >= 0.5))
plt.figure(figsize=(6, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Fake', 'Real'], yticklabels=['Fake', 'Real'])
plt.title('Confusion Matrix (Best Model)')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

In [None]:
# ROC Curve for Best Model
fpr, tpr, _ = roc_curve(all_labels, all_preds)
plt.figure()
plt.plot(fpr, tpr, label=f'AUC = {test_auc:.2f}')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve (Best Model)')
plt.legend()
plt.show()

In [None]:
# Precision-Recall Curve
precision, recall, _ = precision_recall_curve(all_labels, all_preds)
ap = average_precision_score(all_labels, all_preds)
plt.figure(figsize=(6, 4))
plt.step(recall, precision, where='post', label=f'AP = {ap:.2f}')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend()
plt.show()

In [None]:
# Histogram of Predicted Probabilities
plt.figure(figsize=(6, 4))
plt.hist(all_preds, bins=20, alpha=0.7, color='gray')
plt.xlabel('Predicted Probability')
plt.ylabel('Frequency')
plt.title('Histogram of Predicted Probabilities')
plt.show()

In [None]:
# Calibration Plot
prob_true, prob_pred = calibration_curve(all_labels, all_preds, n_bins=10)
plt.figure(figsize=(6, 4))
plt.plot(prob_pred, prob_true, marker='o', linewidth=1, label='Calibration curve')
plt.plot([0, 1], [0, 1], linestyle='--', label='Perfect calibration')
plt.xlabel('Mean predicted probability')
plt.ylabel('Fraction of positives')
plt.title('Calibration Plot')
plt.legend()
plt.show()

In [None]:
# F1 Score vs Threshold
thresholds = np.linspace(0, 1, 100)
f1_scores = [f1_score(all_labels, (np.array(all_preds) >= t).astype(int)) for t in thresholds]
plt.figure(figsize=(6, 4))
plt.plot(thresholds, f1_scores, marker='o')
plt.xlabel("Threshold")
plt.ylabel("F1 Score")
plt.title("F1 Score vs Threshold")
plt.grid(True)
plt.show()

In [None]:
# Cumulative Gains Chart
df = pd.DataFrame({"label": all_labels, "pred": all_preds})
df = df.sort_values("pred", ascending=False).reset_index(drop=True)
df["cumulative_positive"] = df["label"].cumsum()
df["percentage_positive"] = df["cumulative_positive"] / df["label"].sum()
df["percentage_data"] = (df.index + 1) / len(df)
plt.figure(figsize=(6, 4))
plt.plot(df["percentage_data"], df["percentage_positive"], label="Model")
plt.plot([0, 1], [0, 1], linestyle="--", label="Random")
plt.xlabel("Percentage of Data")
plt.ylabel("Percentage of Positives Captured")
plt.title("Cumulative Gains Chart")
plt.legend()
plt.grid(True)
plt.show()