In [None]:
import os
import cv2
import pandas as pd
import numpy as np
import csv
from tqdm import tqdm
import time
import torchvision.transforms as transforms

import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.optim.lr_scheduler import CosineAnnealingLR

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
class paths:
    fake_dir = '/kaggle/input/deep-fake-dataset/SDFVD/videos_fake/'
    real_dir = '/kaggle/input/deep-fake-dataset/SDFVD/videos_real/'

    data_dict = {}

    batch_size = 2
    learning_rate = 1e-3
    #epochs = 50

In [None]:
fake_paths  = [os.path.join(paths.fake_dir, path) for path in os.listdir(paths.fake_dir)]
real_paths  = [os.path.join(paths.real_dir, path) for path in os.listdir(paths.real_dir)]

paths.data_dict.update({paths: 0 for paths in fake_paths })
paths.data_dict.update({paths: 1 for paths in  real_paths})

print(f'Total number of fake videos :{len(fake_paths)}')
print(f'Total number of real videos :{len(real_paths)}')

In [None]:
for path, label in list(paths.data_dict.items())[:5]:
    print(f'Path: {path},  Label: {label}')

In [None]:
def crop_frame(frame):
    y,x = frame.shape[0:2]
    min_val = min(x,y)
    start_x = ( x//2 ) - ( min_val//2 )
    start_y = ( y//2 ) - ( min_val//2 )

    return frame[ start_y : start_y + min_val , start_x : start_x + min_val]

In [None]:
cap = cv2.VideoCapture(paths.fake_dir + 'vs11.mp4')
ret, frame = cap.read()
if ret:
    frame = crop_frame(frame)
    frame = cv2.resize(frame, (299,299))   
    frame = frame[:, :, (2,1,0)]
    
    plt.imshow(frame)
    plt.show()

In [None]:
def process_frames(frames, max_frames):
    
    if len(frames) >= max_frames:
        return frames[:max_frames]
        
    return frames + [frames[-1]] * (max_frames - len(frames))

In [None]:
from torchvision import models
from torchvision.models import Inception_V3_Weights
from torchsummary import summary

feature_extractor = models.inception_v3(weights = Inception_V3_Weights.DEFAULT).to(device)
feature_extractor.fc = torch.nn.Identity()
feature_extractor.eval()

In [None]:
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
])

In [None]:
class DeepfakeDataset(Dataset):
    def __init__(self, paths, labels):
        self.paths = paths
        self.labels = labels
      

    def __len__(self):
        return len(self.paths)

    def __getitem__(self,idx):
        video_path = self.paths[idx]
        frames = []
        
        cap = cv2.VideoCapture(video_path)
        
        while True:
            ret, frame = cap.read()

            if not ret:
                break
                
            frame = crop_frame(frame) 
            frame = cv2.resize(frame, (299,299))   
            frame = frame[:, :, (2,1,0)]
            frame = transform(frame)
            frames.append(frame)
            
        cap.release()

        frames = process_frames(frames, max_frames = 64)
        frames = np.array(frames)

        frames = torch.tensor(frames,dtype=torch.float32).to(device)
        
        features = feature_extractor(frames)

        return  features, torch.tensor(self.labels[idx]).to(device)
       

In [None]:
videos = list(paths.data_dict.keys())
labels = list(paths.data_dict.values())
print(len(videos))

In [None]:
Xtrain, Xtest, Ytrain, Ytest = train_test_split(videos, labels, test_size = 0.2, stratify=labels, random_state = 42, shuffle = True)

In [None]:
train_dataset    = DeepfakeDataset(Xtrain, Ytrain)
train_dataloader = DataLoader(train_dataset , batch_size = paths.batch_size , shuffle = True)

test_dataset = DeepfakeDataset(Xtest, Ytest)
test_dataloader = DataLoader(test_dataset , batch_size = paths.batch_size , shuffle = True)

In [None]:
class DeepfakeGru(nn.Module):
    def __init__(self, input_size=2048, hidden_size=256, num_layers=2, num_classes=1):
        super(DeepfakeGru, self).__init__()
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True, dropout=0.5)  
        self.batchnorm1 = nn.BatchNorm1d(hidden_size) 
        self.dropout = nn.Dropout(p=0.5)  
        self.fc = nn.Linear(hidden_size, num_classes)
        self.batchnorm2 = nn.BatchNorm1d(num_classes)  

    def forward(self, x):
        if isinstance(x, tuple):
            x = x[0]

        _, hidden = self.gru(x)
        hidden = self.batchnorm1(hidden[-1])  
        hidden = self.dropout(hidden)  
        
        out = self.fc(hidden)
        out = self.batchnorm2(out)  
        return out


In [None]:
model = DeepfakeGru()

criterion = nn.BCEWithLogitsLoss()

optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5, )

In [None]:
def perform_validation(model, test_dataloader, device):
    model.eval()  
    val_loss, val_accuracy, val_precision, val_recall = 0, 0, 0, 0

    #val_iterator = tqdm(test_dataloader, leave=False)
    with torch.no_grad():
        for inputs, labels in test_dataloader:
            inputs = inputs.to(device)
            labels = labels.float().unsqueeze(1).to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            
            predictions = torch.sigmoid(outputs) > 0.5

            
            labels_np = labels.cpu().numpy()
            predictions_np = predictions.cpu().numpy()

            
            accuracy = accuracy_score(labels_np, predictions_np)
            precision = precision_score(labels_np, predictions_np, zero_division=1)
            recall = recall_score(labels_np, predictions_np, zero_division=1)

            val_loss += loss.item()
            val_accuracy += accuracy
            val_precision += precision
            val_recall += recall

    num_batches = len(test_dataloader)
    return (
        val_loss / num_batches,
        val_accuracy / num_batches,
        val_precision / num_batches,
        val_recall / num_batches
    )

In [None]:
import time
import csv
import torch
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score

model.to(device)

epochs = 50
best_accuracy = 0.0
total_training_time = 0.0
log_file = "Deepfake_299.csv"

log_fields = [
    "epoch", "time", "train_loss", "val_loss",
    "accuracy_train", "accuracy_val",
    "precision_train", "precision_val",
    "recall_train", "recall_val"
]

# Create CSV file with headers
with open(log_file, 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=log_fields)
    writer.writeheader()

for epoch in range(epochs):
    model.train()
    start_time = time.time()

    train_loss, total_train_accuracy, total_train_precision, total_train_recall = 0.0, 0.0, 0.0, 0.0
    val_loss, val_accuracy, val_precision, val_recall = 0.0, 0.0, 0.0, 0.0

    with tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{epochs}") as train_iterator:
        for i, (inputs, labels) in enumerate(train_iterator):
            inputs, labels = inputs.to(device), labels.float().unsqueeze(1).to(device)

            optimizer.zero_grad()
            outputs = model(inputs)

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

           
            predictions = (torch.sigmoid(outputs) > 0.5).detach().cpu().numpy()
            labels_np = labels.cpu().numpy()

            train_accuracy = accuracy_score(labels_np, predictions)
            train_precision = precision_score(labels_np, predictions, zero_division=0)
            train_recall = recall_score(labels_np, predictions, zero_division=0)

            train_loss += loss.item()
            total_train_accuracy += train_accuracy
            total_train_precision += train_precision
            total_train_recall += train_recall

           
            if i == len(train_dataloader) - 1:
                val_loss, val_accuracy, val_precision, val_recall = perform_validation(model, test_dataloader, device)
                scheduler.step(val_loss)

                if val_accuracy > best_accuracy:
                    torch.save(model.state_dict(), 'deepfake_299.pt')
                    best_accuracy = val_accuracy

            
            train_iterator.set_postfix({
                'loss': f"{train_loss / (i + 1):.4f}",
                'accuracy': f"{total_train_accuracy / (i + 1):.4f}",
                'precision': f"{total_train_precision / (i + 1):.4f}",
                'recall': f"{total_train_recall / (i + 1):.4f}",
                'val_loss': f"{val_loss:.4f}",
                'val_acc': f"{val_accuracy:.4f}",
                'val_prec': f"{val_precision:.4f}",
                'val_rec': f"{val_recall:.4f}"
            })

    epoch_time = time.time() - start_time
    total_training_time += epoch_time

 
    with open(log_file, 'a', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=log_fields)
        writer.writerow({
            "epoch": epoch + 1,
            "time": round(epoch_time, 4),
            "train_loss": round(train_loss / len(train_dataloader), 4),
            "val_loss": round(val_loss, 4),
            "accuracy_train": round(total_train_accuracy / len(train_dataloader), 4),
            "accuracy_val": round(val_accuracy, 4),
            "precision_train": round(total_train_precision / len(train_dataloader), 4),
            "precision_val": round(val_precision, 4),
            "recall_train": round(total_train_recall / len(train_dataloader), 4),
            "recall_val": round(val_recall, 4)
        })

print(f"Training Complete in {total_training_time:.2f}s with {total_training_time / epochs:.2f}s per epoch.")
