Importing Libraries

In [None]:
import cv2
import sys
import random
import os
import glob
import numpy as np
import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt
import mediapipe as mp
import torch
import timm
import math
from torch import nn
from torchvision import models
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from tqdm.autonotebook import tqdm
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

Functions to Load Video Names and Labels from CSV

In [None]:
# Define a custom dataset class for loading video frames and labels
class video_dataset(Dataset):
    def __init__(self, video_names, labels, sequence_length=60, transform=None):
        self.video_names = video_names  
        self.labels = labels  
        self.transform = transform 
        self.count = sequence_length  

    def __len__(self):
        return len(self.video_names)  

    def __getitem__(self, idx):
        video_path = self.video_names[idx] 
        frames = []
        a = int(100 / self.count)  # Calculate the range for random starting frame
        first_frame = np.random.randint(0, a) 
        temp_video = video_path.split('/')[-1]  
        label = lab.loc[lab["file"] == temp_video, "label"].values 

        if label == 'FAKE':
            label = 0
        if label == 'REAL':
            label = 1

        # Extract frames from the video
        for i, frame in enumerate(self.frame_extract(video_path)):
            frames.append(self.transform(frame)) 
            if len(frames) == self.count:
                break

        frames = torch.stack(frames)  # Convert list of frames to a tensor
        frames = frames[:self.count] 
        return frames, label 

    def frame_extract(self, path):
        vidObj = cv2.VideoCapture(path)
        success = 1
        while success:
            success, image = vidObj.read()  
            if success:
                yield image  

# Function to plot an image tensor
def im_plot(tensor):
    image = tensor.cpu().numpy().transpose(1, 2, 0)  # Convert tensor to numpy array and reorder dimensions
    b, g, r = cv2.split(image)
    image = cv2.merge((r, g, b))
    image = image * [0.22803, 0.22145, 0.216989] + [0.43216, 0.394666, 0.37645]  
    image = image * 255.0 
    plt.imshow(image.astype(int))
    plt.show()


Function to Count Real and Fake Videos

In [None]:
def number_of_real_and_fake_videos(data_list):
    header_list = ["file", "label"]
    # Load the CSV file containing video file names and labels
    lab = pd.read_csv('/content/drive/MyDrive/labels.csv', names=header_list)

    fake = 0
    real = 0

    for files_pattern in data_list:
        # Get all file paths that match the pattern
        file_paths = glob.glob(files_pattern)
        for file_path in file_paths:
            temp_video = os.path.basename(file_path) 

            # Get the label for the video
            label = lab.loc[lab["file"] == temp_video, "label"].values

            if len(label) > 0:
                label = label[0]
                if label == 'FAKE':
                    fake += 1
                elif label == 'REAL':
                    real += 1
            else:
                print(f"No label found for {temp_video}")

    return real, fake


Function to Define and Load Data Transforms

In [None]:

def get_data_transforms():
    im_size = 112 
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]

    # Define transformations for training data
    train_transforms = transforms.Compose([
        transforms.ToPILImage(),  
        transforms.Resize((im_size, im_size)),
        transforms.ToTensor(),  
        transforms.Normalize(mean, std) 
    ])

    # Define transformations for testing data
    test_transforms = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((im_size, im_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ])

    return train_transforms, test_transforms


Model Definition

In [None]:

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=100):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

class TransformerEncoder(nn.Module):
    def __init__(self, latent_dim=512, num_heads=2, num_layers=1, hidden_dim=256):
        super(TransformerEncoder, self).__init__()

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=latent_dim,
            nhead=num_heads,
            dim_feedforward=hidden_dim,
            dropout=0.5,
            activation="gelu",
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.norm = nn.LayerNorm(latent_dim)

    def forward(self, x):
        x = self.norm(x)
        return self.transformer_encoder(x)

class DeepfakeModel(nn.Module):
    def __init__(self, num_classes, latent_dim=512, num_heads=2, num_layers=1, hidden_dim=256, max_seq_len=20):
        super(DeepfakeModel, self).__init__()

        base_model = timm.create_model('efficientnet_b3', pretrained=True)
        self.feature_extractor = nn.Sequential(*list(base_model.children())[:-2])

        feat_dim = 1536

        self.projection = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(1),
            nn.Linear(feat_dim, latent_dim),
            nn.LayerNorm(latent_dim),
            nn.GELU()
        )

        self.pos_encoder = PositionalEncoding(latent_dim, dropout=0.2, max_len=max_seq_len)

        self.transformer = TransformerEncoder(
            latent_dim=latent_dim,
            num_heads=num_heads,
            num_layers=num_layers,
            hidden_dim=hidden_dim
        )

        self.classifier = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(0.5),
            nn.Linear(hidden_dim, num_classes)
        )

        self.temporal_pool = nn.AdaptiveAvgPool1d(1)

    def forward(self, x, return_features=False):
        batch_size, seq_length, c, h, w = x.shape
        # Subsample frames to reduce computation
        sample_rate = 2  
        if seq_length > 10:
            x = x[:, ::sample_rate, :, :, :]
            seq_length = x.shape[1]

        x = x.view(batch_size * seq_length, c, h, w)
        x = self.feature_extractor(x)
        x = self.projection(x)
        x = x.view(batch_size, seq_length, -1)
        x = self.pos_encoder(x)
        features = self.transformer(x)
        x = features.transpose(1, 2)  # [B, D, T]
        x = self.temporal_pool(x).squeeze(-1)  # [B, D]
        output = self.classifier(x)

        if return_features:
            return output, features

        return output

In [None]:
model = DeepfakeModel(num_classes=2).cuda()

Functions For Training, Testing and Metrics

In [None]:
def calculate_metrics(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred) * 100
    precision = precision_score(y_true, y_pred) * 100
    recall = recall_score(y_true, y_pred) * 100
    f1 = f1_score(y_true, y_pred) * 100
    roc_auc = roc_auc_score(y_true, y_pred) * 100
    return accuracy, precision, recall, f1, roc_auc

def calculate_accuracy(outputs, targets):
    batch_size = targets.size(0)
    _, pred = outputs.topk(1, 1, True)  
    pred = pred.t()  
    correct = pred.eq(targets.view(1, -1))  # Compare predictions to targets
    n_correct_elems = correct.float().sum().item()  # Count correct predictions
    return 100 * n_correct_elems / batch_size 

class AverageMeter(object):
    # Computes and stores the average and current value
    def __init__(self):
        self.reset()  

    def reset(self):
        self.val = 0
        self.avg = 0  
        self.sum = 0 
        self.count = 0  

    def update(self, val, n=1):
        self.val = val  
        self.sum += val * n  
        self.count += n  
        self.avg = self.sum / self.count 

In [None]:
def train_epoch(epoch, num_epochs, data_loader, model, criterion, optimizer, scaler=None):
    model.train()
    losses = AverageMeter()
    accuracies = AverageMeter()

    for i, (inputs, targets) in enumerate(data_loader):
        if torch.cuda.is_available():
            targets = targets.type(torch.cuda.LongTensor)  # Move targets to GPU and convert to LongTensor
            inputs = inputs.cuda()  

        optimizer.zero_grad()

        if scaler is not None:
            # Mixed precision training
            with torch.amp.autocast('cuda'):
                outputs = model(inputs)
                loss = criterion(outputs, targets)

            acc = calculate_accuracy(outputs, targets)
            losses.update(loss.item(), inputs.size(0))
            accuracies.update(acc, inputs.size(0))
            scaler.scale(loss).backward()  # Mixed precision backward and optimizer step           
            scaler.unscale_(optimizer) # Unscale gradients for gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) # Gradient clipping to prevent exploding gradients
            scaler.step(optimizer)
            scaler.update()

        else:
            # Standard precision training
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            acc = calculate_accuracy(outputs, targets)
            losses.update(loss.item(), inputs.size(0))
            accuracies.update(acc, inputs.size(0))
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

        sys.stdout.write(
            "\r[Epoch %d/%d] [Batch %d / %d] [Loss: %f, Acc: %.2f%%]"
            % (
                epoch,
                num_epochs,
                i,
                len(data_loader),
                losses.avg,
                accuracies.avg))

    return losses.avg, accuracies.avg

def test(epoch, model, data_loader, criterion, scaler=None):
    print('\nTesting')
    model.eval()
    losses = AverageMeter()
    accuracies = AverageMeter()
    pred = []
    true = []

    with torch.no_grad():
        for i, (inputs, targets) in enumerate(data_loader):
            if torch.cuda.is_available():
                targets = targets.cuda().type(torch.cuda.LongTensor)
                inputs = inputs.cuda()

            # Use mixed precision for inference if scaler is provided
            if scaler is not None:
                with torch.amp.autocast('cuda'):
                    outputs = model(inputs)
                    loss = criterion(outputs, targets)
            else:
                outputs = model(inputs)
                loss = criterion(outputs, targets)

            acc = calculate_accuracy(outputs, targets)
            _, p = torch.max(outputs, 1)

            # Track predictions and ground truth
            true.extend(targets.detach().cpu().numpy().tolist())
            pred.extend(p.detach().cpu().numpy().tolist())

            losses.update(loss.item(), inputs.size(0))
            accuracies.update(acc, inputs.size(0))

            sys.stdout.write("\r[Batch %d / %d] [Loss: %f, Acc: %.2f%%]" %
                            (i, len(data_loader), losses.avg, accuracies.avg))

    # Calculate detailed evaluation metrics
    accuracy, precision, recall, f1, roc_auc = calculate_metrics(true, pred)
    print(f"\nAccuracy: {accuracy:.2f}% | Precision: {precision:.2f}% | Recall: {recall:.2f}% | F1-Score: {f1:.2f}% | ROC-AUC: {roc_auc:.2f}%")

    return true, pred, losses.avg, accuracies.avg, precision, recall, f1, roc_auc


Functions to Plot Confusion Matrix, Loss and Accuracy

In [None]:

def print_confusion_matrix(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    labels = ['Fake', 'Real']

    print(f"True Positive: {cm[1][1]} | False Positive: {cm[0][1]}")
    print(f"False Negative: {cm[1][0]} | True Negative: {cm[0][0]}\n")

    df_cm = pd.DataFrame(cm, index=labels, columns=labels)
    sn.set(font_scale=1.4)
    sn.heatmap(df_cm, annot=True, cmap="Blues", fmt="d", annot_kws={"size": 16})
    plt.ylabel('Actual Label', size=14)
    plt.xlabel('Predicted Label', size=14)
    plt.title("Confusion Matrix", size=16)
    plt.show()

    calculated_acc = (cm[0][0] + cm[1][1]) / (cm[0][0] + cm[0][1] + cm[1][0] + cm[1][1])
    print("Calculated Accuracy", calculated_acc * 100)

# Function to plot training and validation loss
def plot_loss(train_loss_avg, test_loss_avg, num_epochs):
    loss_train = train_loss_avg 
    loss_val = test_loss_avg  
    epochs = range(1, num_epochs + 1) 

    plt.plot(epochs, loss_train, 'g', label='Training loss')
    plt.plot(epochs, loss_val, 'b', label='Validation loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

# Function to plot training and validation accuracy
def plot_accuracy(train_accuracy, test_accuracy, num_epochs):
    accuracy_train = train_accuracy  
    accuracy_val = test_accuracy  
    epochs = range(1, num_epochs + 1)  

    plt.plot(epochs, accuracy_train, 'g', label='Training accuracy')
    plt.plot(epochs, accuracy_val, 'b', label='Validation accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()


Preparing Video Data and Creating Data Loaders

In [None]:

f_path = '/content/fake'
fake_videos = []
for root, _, files in os.walk(f_path):
    for file in files:
        if file.endswith('.mp4'):
            fake_videos.append(os.path.join(root, file))

print(f" Found {len(fake_videos)} fake videos.")

real_paths = ['/content/real/*.mp4']
real_videos = []
for path in real_paths:
    real_videos.extend(glob.glob(path))

print(f" Found {len(real_videos)} real videos.")

all_videos = fake_videos + real_videos
print(f" Total videos collected: {len(all_videos)}")


In [None]:

random.shuffle(all_videos)

header_list = ["file", "label"]
labels = pd.read_csv('labels.csv', names=header_list)
lab = labels

train_videos, valid_videos = train_test_split(all_videos, test_size=0.2)

print("TRAIN: ", "Real:", number_of_real_and_fake_videos(train_videos)[0], " Fake:", number_of_real_and_fake_videos(train_videos)[1])
print("TEST: ", "Real:", number_of_real_and_fake_videos(valid_videos)[0], " Fake:", number_of_real_and_fake_videos(valid_videos)[1])


train_transforms, test_transforms = get_data_transforms()

train_data = video_dataset(train_videos, labels, sequence_length=10, transform=train_transforms)
val_data = video_dataset(valid_videos, labels, sequence_length=10, transform=test_transforms)

train_loader = DataLoader(train_data, batch_size=8, shuffle=True, num_workers=2)
valid_loader = DataLoader(val_data, batch_size=8, shuffle=False, num_workers=2)

image, label = train_data[0]
im_plot(image[0, :, :, :])


Training and Evaluation of the Model

In [None]:
lr = 1e-6
num_epochs = 30
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
criterion = nn.CrossEntropyLoss().cuda()

# Initialize lists to store metrics
train_loss_avg = []
train_accuracy = []
test_loss_avg = []
test_accuracy = []
best_f1 = 0
best_epoch = 0
scaler = torch.amp.GradScaler('cuda')


try:
    print("Training model...")
    for epoch in range(1, num_epochs + 1):
        l, acc = train_epoch(epoch, num_epochs, train_loader, model, criterion, optimizer, scaler=scaler)
        train_loss_avg.append(l)
        train_accuracy.append(acc)

        # Evaluate on validation set
        true, pred, tl, t_acc, precision, recall, f1, roc_auc = test(epoch, model, valid_loader, criterion)
        test_loss_avg.append(tl)
        test_accuracy.append(t_acc)

        # Save best model
        if f1 > best_f1:
            best_f1 = f1
            best_epoch = epoch
            torch.save(model.state_dict(), 'best_model.pth')

        # Early stopping
        if epoch - best_epoch > 5:
            print(f"Early stopping triggered. No improvement for 5 epochs.")
            break

except RuntimeError as e:
    if "NaN" in str(e):
        print("NaN detected during forward pass. Investigating...")
        for name, param in model.named_parameters():
            if param.requires_grad:
                print(f"Layer: {name}, NaN values: {torch.isnan(param).sum().item()}")
    else:
        raise e

print(f"Best F1 Score: {best_f1:.4f} achieved at epoch {best_epoch}")
plot_loss(train_loss_avg, test_loss_avg, len(train_loss_avg))
plot_accuracy(train_accuracy, test_accuracy, len(train_accuracy))
print(confusion_matrix(true, pred))