In [1]:
import torch
import torch.nn as nn
from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights

class DeepfakeDetector(nn.Module):
    def __init__(self, pretrained=True):
        super(DeepfakeDetector, self).__init__()
        # Using the smaller EfficientNet-B0 as my training device does not have enough VRAM
        if pretrained:
            self.efficientnet = efficientnet_b0(weights=EfficientNet_B0_Weights.IMAGENET1K_V1)
        else:
            self.efficientnet = efficientnet_b0(weights=None)

        #get feature extractor part of the pretrained model
        self.features = self.efficientnet.features
        
        #modified classifier head since the pretrained one was used to predict 1000 classes
        in_features = self.efficientnet.classifier[1].in_features
        self.classifier = nn.Sequential(
            nn.Linear(in_features, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 1) #here we use only 1 output neuron because it was enough for binary classificaition, also it fit with the BCELoss later
        )
        # Replace the original classifier - because it was supposed to used in image classification
        self.efficientnet.classifier = self.classifier

    def forward(self, x):
        x = self.features(x)
        x = self.efficientnet.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

    def freeze_features(self):
        """Freezes the weights of the feature extractor."""
        print("Freezing feature extractor layers...")
        for param in self.features.parameters():
            param.requires_grad = False

    def unfreeze_features(self):
        """Unfreezes the weights of the feature extractor."""
        print("Unfreezing all layers...")
        for param in self.parameters():
            param.requires_grad = True

In [2]:
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import os
import random
from sklearn.model_selection import train_test_split

class VideoDataset(Dataset):
    def __init__(self, data_dir, num_frames=16, transform=None, split='train', test_size=0.2, random_state=42):
        
        # data_dir: Root directory containing DFD_manipulated_sequences and DFD_original_sequences folders
        # num_frames: Number of frames to extract from each video
        # transform: Transform to apply to frames
        # split: 'train' or 'test'/'val' to specify which split to use
        # test_size: Proportion of data to use for testing (default 0.2 for 20%)
        # random_state: Random seed for reproducible splits
        
        self.data_dir = data_dir
        self.num_frames = num_frames
        self.transform = transform
        self.split = split
        self.samples = self._make_dataset(test_size, random_state)

    def _make_dataset(self, test_size, random_state):
        all_samples = []
        
        
        folder_to_class = {
            'DFD_original_sequences': 0,  # real
            'DFD_manipulated_sequences': 1  # fake
        }
        
        for folder_name, target_class in folder_to_class.items():
            class_dir = os.path.join(self.data_dir, folder_name)
            if not os.path.exists(class_dir):
                print(f"Warning: Directory {class_dir} does not exist")
                continue
                
            #Get all video files 
            video_files = []
            for video_name in os.listdir(class_dir):
                video_path = os.path.join(class_dir, video_name)
                if os.path.isfile(video_path):  
                    video_files.append((video_path, target_class))
            
            
            if len(video_files) > 0:
                train_files, test_files = train_test_split(
                    video_files, 
                    test_size=test_size, 
                    random_state=random_state,
                    stratify=None  
                )
                
                if self.split == 'train':
                    all_samples.extend(train_files)
                else:  # test
                    all_samples.extend(test_files)
                    
                print(f"Class {folder_name}: {len(train_files)} train, {len(test_files)} test samples")
        
        print(f"Total {self.split} samples: {len(all_samples)}")
        return all_samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        video_path, label = self.samples[idx]
        
        #extract frames from the video
        cap = cv2.VideoCapture(video_path)
        frames = []
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        
        if total_frames == 0:
            #Replace with a dummy tensor if video is invalid
            return torch.zeros((self.num_frames, 3, 224, 224)), -1

        frame_indices = sorted(random.sample(range(total_frames), min(self.num_frames, total_frames)))
        
        for i in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, i)
            ret, frame = cap.read()
            if ret:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                if self.transform:
                    frame = self.transform(frame)
                frames.append(frame)
        
        cap.release()

        # If not enough frames were extracted, pad with zeros
        if len(frames) < self.num_frames:
            num_padding = self.num_frames - len(frames)
            padding = torch.zeros((num_padding, 3, 224, 224))
            if len(frames) > 0:
                frames = torch.stack(frames)
                frames = torch.cat((frames, padding), dim=0)
            else:
                frames = padding
        else:
            frames = torch.stack(frames)

        return frames, label

#data transformation will be applied in each frame extracted
data_transforms = {
    'train': transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [3]:
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from tqdm import tqdm


def validate_model(model, val_loader, criterion, device='cuda'):

    # model: The model to validate
    # val_loader: DataLoader for validation data
    # criterion: Loss function (BCEWithLogitsLoss)
    # device: Device to run validation on
    
    #Returns format: tuple: (validation_loss, validation_accuracy)
    
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    total_samples = 0

    with torch.no_grad():
        for videos, labels in tqdm(val_loader, desc="Validating"):
            if -1 in labels:
                continue
            
            videos = videos.to(device)
            labels = labels.float().to(device)
            
            # (batch_size, num_frames, C, H, W) -> (batch_size * num_frames, C, H, W)
            bs, nf, c, h, w = videos.shape
            videos = videos.view(-1, c, h, w)
            

            outputs = model(videos)
            outputs = outputs.view(bs, nf).mean(dim=1)  # get average predictions over frames
            

            loss = criterion(outputs, labels)
            preds = torch.sigmoid(outputs) > 0.5
            #change boolean predictions to float for comparison
            preds = preds.float()
            
            running_loss += loss.item() * bs
            running_corrects += torch.sum(preds == labels.data)
            total_samples += bs

    val_loss = running_loss / total_samples
    val_acc = running_corrects.double() / total_samples

    return val_loss, val_acc

def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=10, device='cuda'):
    model.to(device)
    

    best_val_acc = 0.0
    best_model_wts = model.state_dict().copy()

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)


        model.train()
        running_loss = 0.0
        running_corrects = 0
        total_train_samples = 0

        for videos, labels in tqdm(train_loader, desc="Training"):
            if -1 in labels:
                continue
            
            videos = videos.to(device)
            labels = labels.float().to(device)
            
            # (batch_size, num_frames, C, H, W) -> (batch_size * num_frames, C, H, W)
            bs, nf, c, h, w = videos.shape
            videos = videos.view(-1, c, h, w)
            
            optimizer.zero_grad()

            with torch.set_grad_enabled(True):
                outputs = model(videos)
                outputs = outputs.view(bs, nf).mean(dim=1) 
                

                loss = criterion(outputs, labels)
                
                #Calculate predictions for accuracy (apply sigmoid to logits)
                preds = torch.sigmoid(outputs) > 0.5
                preds = preds.float()

                loss.backward()
                optimizer.step()

            running_loss += loss.item() * bs
            running_corrects += torch.sum(preds == labels.data)
            total_train_samples += bs

        train_loss = running_loss / total_train_samples
        train_acc = running_corrects.double() / total_train_samples

        print(f'Train Loss: {train_loss:.4f} Acc: {train_acc:.4f}')

        #Validation
        val_loss, val_acc = validate_model(model, val_loader, criterion, device)
        print(f'Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}')
        
        #Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_wts = model.state_dict().copy()
            print(f'New best validation accuracy: {best_val_acc:.4f}')
        if scheduler:
            scheduler.step()
        print()

    #Load best weights
    model.load_state_dict(best_model_wts)
    
    torch.save(model.state_dict(), 'deepfake_detector_best.pth')
    print(f"Best model saved to deepfake_detector_best.pth with validation accuracy: {best_val_acc:.4f}")
    
    



if __name__ == '__main__':
    BATCH_SIZE = 8
    NUM_FRAMES_PER_VIDEO = 16
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    model = DeepfakeDetector(pretrained=True)
    criterion = nn.BCEWithLogitsLoss()


    
    train_dataset = VideoDataset(data_dir='video_detection_dataset', split='train', transform=data_transforms['train'])
    val_dataset = VideoDataset(data_dir='video_detection_dataset', split='test', transform=data_transforms['val'])
    
    # Use a custom collate function to filter out invalid samples
    def collate_fn(batch):
        batch = list(filter(lambda x: x[1] != -1, batch))
        return torch.utils.data.dataloader.default_collate(batch)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,num_workers=0, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False,num_workers=0, collate_fn=collate_fn)
    # ====================================================================
    # Phase 1: Feature Extraction
    # ====================================================================
    print("\n" + "="*50)
    print("PHASE 1: Training the classifier head")
    print("="*50 + "\n")

    model.freeze_features()
    model.to(device) 

    
    optimizer_phase1 = optim.Adam(
        filter(lambda p: p.requires_grad, model.parameters()),
        lr=0.001
    )
    # Train only the head for a few epochs
    train_model(model, train_loader, val_loader, criterion, optimizer_phase1, None, num_epochs=3, device=device)

    # ====================================================================
    # Phase 2: Fine-Tuning
    # ====================================================================
    print("\n" + "="*50)
    print("PHASE 2: Fine-tuning the entire model")
    print("="*50 + "\n")

    model.unfreeze_features()

    # Create a new optimizer for the whole model with a very low learning rate
    optimizer_phase2 = optim.Adam(model.parameters(), lr=0.00005) 
    # This method is called adaptive learning.
    scheduler = StepLR(optimizer_phase2, step_size=4, gamma=0.1)
    

    train_model(model, train_loader, val_loader, criterion, optimizer_phase2, scheduler, num_epochs=5, device=device)
    print("Training complete. Final model saved.")

Using device: cuda:0
Class DFD_original_sequences: 291 train, 73 test samples
Class DFD_manipulated_sequences: 2454 train, 614 test samples
Total train samples: 2745
Class DFD_original_sequences: 291 train, 73 test samples
Class DFD_manipulated_sequences: 2454 train, 614 test samples
Total test samples: 687

PHASE 1: Training the classifier head

Freezing feature extractor layers...
Epoch 1/3
----------


Training: 100%|██████████| 344/344 [58:27<00:00, 10.20s/it]


Train Loss: 0.3567 Acc: 0.8925


Validating: 100%|██████████| 86/86 [14:29<00:00, 10.11s/it]


Val Loss: 0.3456 Acc: 0.8937
New best validation accuracy: 0.8937

Epoch 2/3
----------


Training: 100%|██████████| 344/344 [58:34<00:00, 10.22s/it]


Train Loss: 0.3361 Acc: 0.8947


Validating: 100%|██████████| 86/86 [14:31<00:00, 10.14s/it]


Val Loss: 0.3366 Acc: 0.8967
New best validation accuracy: 0.8967

Epoch 3/3
----------


Training: 100%|██████████| 344/344 [58:31<00:00, 10.21s/it]


Train Loss: 0.3233 Acc: 0.8991


Validating: 100%|██████████| 86/86 [14:26<00:00, 10.07s/it]


Val Loss: 0.3509 Acc: 0.8908

Best model saved to deepfake_detector_best.pth with validation accuracy: 0.8967

PHASE 2: Fine-tuning the entire model

Unfreezing all layers...
Epoch 1/5
----------


Training: 100%|██████████| 344/344 [1:07:39<00:00, 11.80s/it]


Train Loss: 0.3199 Acc: 0.9031


Validating: 100%|██████████| 86/86 [14:57<00:00, 10.43s/it]


Val Loss: 0.3313 Acc: 0.9083
New best validation accuracy: 0.9083

Epoch 2/5
----------


Training: 100%|██████████| 344/344 [1:07:42<00:00, 11.81s/it]


Train Loss: 0.2915 Acc: 0.9107


Validating: 100%|██████████| 86/86 [15:01<00:00, 10.49s/it]


Val Loss: 0.3456 Acc: 0.9025

Epoch 3/5
----------


Training: 100%|██████████| 344/344 [1:07:30<00:00, 11.78s/it]


Train Loss: 0.2881 Acc: 0.9074


Validating: 100%|██████████| 86/86 [14:55<00:00, 10.42s/it]


Val Loss: 0.3415 Acc: 0.9127
New best validation accuracy: 0.9127

Epoch 4/5
----------


Training: 100%|██████████| 344/344 [1:07:24<00:00, 11.76s/it]


Train Loss: 0.2793 Acc: 0.9114


Validating: 100%|██████████| 86/86 [14:56<00:00, 10.42s/it]


Val Loss: 0.3475 Acc: 0.9068

Epoch 5/5
----------


Training: 100%|██████████| 344/344 [1:29:35<00:00, 15.63s/it]


Train Loss: 0.2612 Acc: 0.9129


Validating: 100%|██████████| 86/86 [19:39<00:00, 13.71s/it]

Val Loss: 0.3486 Acc: 0.9098

Best model saved to deepfake_detector_best.pth with validation accuracy: 0.9127
Training complete. Final model saved.



