# Deepfake Detector

# Preprocessing

In [None]:
import os
import glob
import random
import numpy as np
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch.nn as nn
import cv2
from collections import Counter
import matplotlib.pyplot as plt
from tqdm import tqdm
from PIL import Image
import timm
import torchvision.transforms.functional as F


# Set random seed for reproducibility
random_seed = 42
np.random.seed(random_seed)
random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)

# Set paths
data_folder = "Model_10k"

# Function to perform random rotation with seed
def random_rotation_with_seed(image, degrees):
    # Set the random seed for the rotation
    random.seed(random_seed)
    return F.rotate(image, degrees)

# Preprocessing transforms with data augmentation
preprocess_transform = transforms.Compose([
    transforms.Lambda(lambda x: random_rotation_with_seed(x, 10)),  # Set the seed for random rotation
    transforms.ToTensor(),
])

# Define custom dataset class
class CustomDataset(Dataset):
    def __init__(self, image_paths, class_labels, transform=None):
        self.image_paths = image_paths
        self.class_labels = class_labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert("RGB")  # Open image and convert to RGB mode
        if self.transform:
            image = self.transform(image)
        label = self.class_labels[idx]
        return image, label

# Define a custom binary loss function
class BinaryLossFunction(nn.Module):
    def __init__(self):
        super(BinaryLossFunction, self).__init__()

    def forward(self, outputs, labels):
        loss = nn.BCEWithLogitsLoss()(outputs, labels)
        return loss

def load_data(data_folder, dataset_name, limit=None):
    images, all_images = [], []
    class_labels, classes_label, all_labels = [], [], []
    subfolders = ["ADM", "DDPM", "IDDPM", "LDM", "PNDM"]
    count = 0
    for subfolder in subfolders:
        print("Folder : ",data_folder, dataset_name, subfolder)
        if dataset_name == "test" and count == 0:
          folder_path = os.path.join(data_folder, dataset_name)
          # print(folder_path)
          for image_type in os.listdir(folder_path):  # Iterate through subfolders
              subfolder_path = os.path.join(folder_path, image_type)  # Get the path of the current subfolder
              # print(subfolder_path, folder_path, image_type)
              # print("Processing subfolder:", subfolder_path, subfolder, image_type)
              class_images, class_labels = load_class_data(subfolder_path, image_type, limit)  # Load images from the current subfolder with a limit
              # print("Class images:", class_images)
              images.extend(class_images)
              classes_label.extend(class_labels)
              count += 1
        elif dataset_name == "train" or dataset_name == "val":
          folder_path = os.path.join(data_folder, dataset_name, subfolder)  # Updated folder_path
          for image_type in os.listdir(folder_path):  # Iterate through subfolders
              subfolder_path = os.path.join(folder_path, image_type)  # Get the path of the current subfolder
              # print(subfolder_path)
              # print("Processing subfolder:", subfolder_path, subfolder, image_type)
              class_images, class_labels = load_class_data(folder_path, image_type, limit)  # Load images from the current subfolder with a limit
              # print("Class images:", class_images)
              images.extend(class_images)
              classes_label.extend(class_labels)
          label_counts = Counter(classes_label)
          print("Label Counts:", label_counts)

        all_images = images
        all_labels = classes_label
    return all_images, all_labels

def load_class_data(folder, class_label, limit=None):
    if "test" in folder and not "real" in class_label:
        images = [os.path.join(folder, filename) for filename in os.listdir(os.path.join(folder)) if filename.endswith(".png")]
        class_labels = [1] * len(images)
    else:
        images = [os.path.join(folder, class_label, filename) for filename in os.listdir(os.path.join(folder, class_label)) if filename.endswith(".png")]
        class_labels = [1] * len(images) if class_label == "1_fake" else [0] * len(images)
    # Shuffle the data if limit is not None
    if limit:
        data = list(zip(images, class_labels))
        random.shuffle(data)
        images, class_labels = zip(*data)
        images = images[:limit]
        class_labels = class_labels[:limit]
    print(folder, class_labels[0], len(class_labels))
    return images, class_labels

# Load images and class labels for training set with a limit of 10000 samples
train_images, train_labels = load_data(data_folder, "train", limit=2000)

# Load images and class labels for validation set with a limit of 20000 samples
val_images, val_labels = load_data(data_folder, "val", limit=500)

# Create train and validation datasets
train_dataset = CustomDataset(train_image_paths, train_class_labels, transform=preprocess_transform)
val_dataset = CustomDataset(val_image_paths, val_class_labels, transform=preprocess_transform)

# Create train and validation data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Count the occurrences of each class label
train_label_counts = Counter(train_labels)
val_label_counts = Counter(val_labels)
print("Train Label Counts:", train_label_counts)
print("Validation Label Counts:", val_label_counts)

# Model

In [None]:
# Define your own custom model or use a pretrained model
model = timm.create_model('xception', pretrained=True)
num_features = model.num_features
num_classes = 1
model.fc = nn.Linear(num_features, num_classes)

# Define loss function and optimizer
criterion = BinaryLossFunction()  # Use the custom binary loss function
optimizer = Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)  # Add weight decay for L2 regularization

# Learning rate scheduler
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=3, verbose=True)

# Training loop
num_epochs = 100
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

best_val_loss = float('inf')
best_val_accuracy = 0.0
best_model_path = "best_model_xception_All.pth"  # Path to save the best model

patience = 5  # Number of epochs to wait for improvement
counter = 0  # Counter to track the number of epochs without improvement

train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    correct_train = 0
    total_train = 0

    progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False)

    for images, labels in progress_bar:
        images = images.to(device)
        labels = labels.float().to(device)  # Move labels to device and convert to float

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs.view(-1), labels)  # Flatten the output to match the target size
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        sig_outputs = torch.sigmoid(outputs)
        predicted = (sig_outputs >= 0.5).float()  # Convert sigmoid outputs to binary predictions
        total_train += labels.size(0)
        correct_train += (predicted == labels.view_as(predicted)).sum().item()  # Compare predictions and labels
        progress_bar.set_postfix({'Loss': loss.item(), 'Accuracy': (predicted == labels).sum().item() / labels.size(0)})

    train_loss /= len(train_loader)
    train_accuracy = correct_train / total_train
    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)

    # Evaluation on the validation set
    model.eval()
    val_loss = 0.0
    correct_val = 0
    total_val = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.float().to(device)  # Move labels to device and convert to float

            outputs = model(images)
            loss = criterion(outputs.view(-1), labels)  # Flatten the output to match the target size

            val_loss += loss.item()
            sig_outputs = torch.sigmoid(outputs)
            predicted = (sig_outputs >= 0.5).float()  # Convert sigmoid outputs to binary predictions
            total_val += labels.size(0)
            correct_val += (predicted == labels.view_as(predicted)).sum().item()  # Compare predictions and labels

    val_loss /= len(val_loader)
    val_accuracy = correct_val / total_val
    val_losses.append(val_loss)
    val_accuracies.append(val_accuracy)

    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")

    # Learning rate scheduler step
    scheduler.step(val_loss)

    # Check for early stopping based on both loss and accuracy
    if val_loss < best_val_loss or val_accuracy > best_val_accuracy:
        if best_val_loss > val_loss:
            best_val_loss = val_loss
        if best_val_accuracy < val_accuracy:
            best_val_accuracy = val_accuracy
        counter = 0

        # Save the best model
        torch.save(model.state_dict(), best_model_path)

    else:
        counter += 1
        if counter >= patience:
            print("Validation loss and accuracy did not improve for {} epochs. Early stopping.".format(patience))
            break

# Loading the best model for test evaluation
model.load_state_dict(torch.load(best_model_path))

# Plotting the loss and accuracy curves
epochs = range(1, len(train_losses) + 1)

plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label='Train')
plt.plot(epochs, val_losses, label='Validation')
plt.title('Training and Validation and Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, label='Train')
plt.plot(epochs, val_accuracies, label='Validation')
plt.title('Training and Validation and Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()


# Evaluation

In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score, accuracy_score, average_precision_score


# Set random seed for reproducibility
random_seed = 42
np.random.seed(random_seed)
random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)

# Set paths
data_folder = "Model_10k"

# Preprocessing transforms with data augmentation
preprocess_transform = transforms.Compose([
    transforms.ToTensor(),
])

# Define custom test dataset class
class CustomTestDataset(Dataset):
    def __init__(self, image_paths, class_labels, transform=None):
        self.image_paths = image_paths
        self.class_labels = class_labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert("RGB")  # Open image and convert to RGB mode
        if self.transform:
            image = self.transform(image)
        label = self.class_labels[idx]
        return image, label


# Define a custom binary loss function
class BinaryLossFunction(nn.Module):
    def __init__(self):
        super(BinaryLossFunction, self).__init__()

    def forward(self, outputs, labels):
        loss = nn.BCEWithLogitsLoss()(outputs, labels)
        return loss

def load_data(folder, class_label):
    images = [os.path.join(folder, class_label, filename) for filename in os.listdir(os.path.join(folder, class_label)) if filename.endswith(".png")]
    class_labels = [1] * len(images) if class_label == "1_fake" else [0] * len(images)
    return images, class_labels

def shuffle_data(image_paths, class_labels):
    data = list(zip(image_paths, class_labels))
    random.shuffle(data)
    image_paths, class_labels = zip(*data)
    return image_paths, class_labels

def limit_data(image_paths, class_labels, limit):
    return image_paths[:limit], class_labels[:limit]

# Load the saved model
model = timm.create_model('xception', pretrained=False)
num_features = model.num_features
num_classes = 1
model.fc = nn.Linear(num_features, num_classes)
model.load_state_dict(torch.load("best_model_xception_All.pth"))
model.eval()

# Move the model to the GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Load new test images and labels
new_test_real_images, new_test_real_labels = load_data(os.path.join(data_folder, "test_new", "PNDM"), "0_real")
new_test_fake_images, new_test_fake_labels = load_data(os.path.join(data_folder, "test_new", "PNDM"), "1_fake")
new_test_image_paths = new_test_real_images + new_test_fake_images
new_test_class_labels = new_test_real_labels + new_test_fake_labels

test_label_counts = Counter(new_test_class_labels)
print("Test Label Counts:", test_label_counts)


# Create a custom test dataset for new test images
new_test_dataset = CustomTestDataset(new_test_image_paths, new_test_class_labels, transform=preprocess_transform)

# Create a data loader for new test images
new_test_loader = DataLoader(new_test_dataset, batch_size=32, shuffle=False)

# Create lists to store the predicted probabilities and true labels for real and fake images
all_predicted_probabilities = []
all_true_labels = []

# Evaluation on new test set
with torch.no_grad():
    for images, labels in tqdm(new_test_loader):
        images = images.to(device)
        labels = labels.float().to(device)  # Move labels to device and convert to float

        outputs = model(images)
        sig_outputs = torch.sigmoid(outputs)

        all_predicted_probabilities.extend(sig_outputs.cpu().numpy())
        all_true_labels.extend(labels.cpu().numpy())

# Convert lists to NumPy arrays
all_true_labels = np.array(all_true_labels)
all_predicted_probabilities = np.array(all_predicted_probabilities)

# Separate real and fake images based on class labels
real_mask = np.array(new_test_class_labels) == 0
fake_mask = np.array(new_test_class_labels) == 1

# Assuming the variables `all_true_labels` and `all_predicted_probabilities` contain the true labels and predicted probabilities for the new test images, respectively.

# Convert lists to NumPy arrays
y_true = np.array(all_true_labels)
y_pred = np.array(all_predicted_probabilities)

# Calculate accuracy for real and fake images separately
r_acc = accuracy_score(y_true[y_true == 0], y_pred[y_true == 0] > 0.5)
f_acc = accuracy_score(y_true[y_true == 1], y_pred[y_true == 1] > 0.5)

# Calculate overall accuracy
acc = accuracy_score(y_true, y_pred > 0.5)

# Calculate average precision score
ap = average_precision_score(y_true, y_pred)

auroc = roc_auc_score(y_true, y_pred)


# Calculate the probability of detection at a fixed false alarm rate (Pd@FAR) at FARs of 5% and 1%
far_5_percent_threshold = np.percentile(y_pred[y_true == 0], 95)
far_1_percent_threshold = np.percentile(y_pred[y_true == 0], 99)

pd_at_far_5_percent = np.mean(y_pred[y_true == 1] >= far_5_percent_threshold)
pd_at_far_1_percent = np.mean(y_pred[y_true == 1] >= far_1_percent_threshold)

# Print the results
print(f"Accuracy for Real Images: {r_acc:.4f}")
print(f"Accuracy for Fake Images: {f_acc:.4f}")
print(f"Overall Accuracy: {acc:.4f}")
print(f"Average Precision Score: {ap:.4f}")
print(f"AUROC: {auroc:.4f}")
print(f"Pd@FAR (FAR = 5%): {pd_at_far_5_percent:.4f}")
print(f"Pd@FAR (FAR = 1%): {pd_at_far_1_percent:.4f}")

  model = create_fn(


Test Label Counts: Counter({0: 2000, 1: 2000})


100%|██████████| 250/250 [12:12<00:00,  2.93s/it]

Accuracy for Real Images: 0.9800
Accuracy for Fake Images: 0.9945
Overall Accuracy: 0.9872
Average Precision Score: 0.9990
AUROC: 0.9992
Pd@FAR (FAR = 5%): 0.9990
Pd@FAR (FAR = 1%): 0.9915



