In [1]:
print("Hello ")

Hello 


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np
import os
from glob import glob
import albumentations as A
from albumentations.pytorch import ToTensorV2
import random
from tqdm import tqdm
import matplotlib.pyplot as plt
from efficientnet_pytorch import EfficientNet
from sklearn.model_selection import train_test_split
import json

In [3]:
# ! pip install efficientnet_pytorch

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Running on", device)

Running on cuda


In [5]:
# torch.cuda.device_count()
torch.cuda.get_device_name(0)

'NVIDIA GeForce RTX 2050'

In [6]:
import torch
print(torch.__version__)
print(torch.version.cuda)
print(torch.cuda.is_available())


2.7.1+cu118
11.8
True


In [7]:
# DATASET PATH

# Real and Fake Video list json path 
 
fake_path = "D:/Desktop/FinalYearProject/DataSets/celeb_df_face_cropped/valid_fake_videos.json"
real_path = "D:/Desktop/FinalYearProject/DataSets/celeb_df_face_cropped/valid_real_videos.json"

# Load lists from JSON
valid_real_videos = []
valid_fake_videos = []

with open(real_path, 'r') as f:
    valid_real_videos = json.load(f)

with open(fake_path, 'r') as f:
    valid_fake_videos = json.load(f)

# Paths to real and fake video folders on Google Drive
celeb_df_real_path = 'D:/Desktop/FinalYearProject/DataSets/celeb_df_face_cropped/real_face_only224/real_face_only224'
celeb_df_fake_path = 'D:/Desktop/FinalYearProject/DataSets/celeb_df_face_cropped/fake_face_only224/fake_face_only224'

# Reconstruct full paths
valid_real_videos_path = [os.path.normpath(os.path.join(celeb_df_real_path, name)) for name in valid_real_videos]
valid_fake_videos_path = [os.path.normpath(os.path.join(celeb_df_fake_path, name)) for name in valid_fake_videos]

print(f"Total real videos for training: {len(valid_real_videos_path)}")
print(f"Total fake videos for training: {len(valid_fake_videos_path)}")

Total real videos for training: 585
Total fake videos for training: 5634


In [8]:
print(valid_real_videos_path[0])
print(valid_fake_videos_path[0])

D:\Desktop\FinalYearProject\DataSets\celeb_df_face_cropped\real_face_only224\real_face_only224\id0_0000.mp4
D:\Desktop\FinalYearProject\DataSets\celeb_df_face_cropped\fake_face_only224\fake_face_only224\id0_id16_0000.mp4


In [9]:
torch.manual_seed(42)
random.seed(42)
np.random.seed(42)

In [10]:
class DeepFakeDataset(Dataset):
    def __init__(self, video_paths, labels, transform=None, num_frames=16):
        self.video_paths = video_paths
        self.labels = labels
        self.transform = transform
        self.num_frames = num_frames

    def read_frames(self, video_path):
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_indices = np.linspace(0, total_frames - 1, self.num_frames).astype(int)   #
        frames = []

        for idx in frame_indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if not ret:
                continue
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            if self.transform:
                frame = self.transform(image=frame)['image']
            frames.append(frame)

        cap.release()

        if len(frames) < self.num_frames:
            # pad missing frames with black images
            for _ in range(self.num_frames - len(frames)):
                frames.append(torch.zeros_like(frames[0]))

        return torch.stack(frames)

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        try:
            video_tensor = self.read_frames(self.video_paths[idx])
            label = torch.tensor(self.labels[idx], dtype=torch.float32)
            return video_tensor, label
        except Exception as e:
            print(f"Failed loading video: {self.video_paths[idx]}, Error: {e}")
            return self.__getitem__((idx + 1) % len(self))

In [11]:
# DATA TRANSFORMS
transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2(),
])

In [12]:
# Combine video paths and labels
video_paths = valid_real_videos_path + valid_fake_videos_path
labels = [0]*len(valid_real_videos_path) + [1]*len(valid_fake_videos_path)

# Use train_test_split to shuffle and split
train_paths, val_paths, train_labels, val_labels = train_test_split(
    video_paths, labels, test_size=0.2, stratify=labels, random_state=10
)

# Create datasets and loaders
train_dataset = DeepFakeDataset(train_paths, train_labels, transform)
val_dataset = DeepFakeDataset(val_paths, val_labels, transform)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=0, pin_memory=True)

In [13]:
# MODEL COMPONENTS
class TemporalAttention(nn.Module):
    def __init__(self, feature_dim):
        super().__init__()
        self.attention = nn.Linear(feature_dim, 1)

    def forward(self, x):
        # x: (batch, time, features)
        weights = F.softmax(self.attention(x), dim=1)
        return torch.sum(weights * x, dim=1)


class DeepFakeDetector(nn.Module):
    def __init__(self):
        super().__init__()
        self.feature_extractor = EfficientNet.from_pretrained('efficientnet-b0')
        self.feature_extractor._fc = nn.Identity()
        self.lstm = nn.LSTM(input_size=1280, hidden_size=256, num_layers=1, batch_first=True, bidirectional=True)
        self.attention = TemporalAttention(512)
        self.classifier = nn.Sequential(
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.view(B*T, C, H, W)
        with torch.no_grad():  # freeze feature extractor to reduce memory
            feats = self.feature_extractor(x)
        feats = feats.view(B, T, -1)
        lstm_out, _ = self.lstm(feats)
        attn_out = self.attention(lstm_out)
        return self.classifier(attn_out).squeeze(1)


In [14]:
def train_one_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0

    loop = tqdm(dataloader, desc="Training", leave=True)
    for inputs, labels in loop:
        inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        preds = (torch.sigmoid(outputs) > 0.5).float()
        correct += (preds == labels).sum().item()
        running_loss += loss.item() * inputs.size(0)

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = correct / len(dataloader.dataset)
    return epoch_loss, epoch_acc


def validate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0

    loop = tqdm(dataloader, desc="Validation", leave=True)
    with torch.no_grad():
        for inputs, labels in loop:
            inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            preds = (torch.sigmoid(outputs) > 0.5).float()
            correct += (preds == labels).sum().item()
            running_loss += loss.item() * inputs.size(0)

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = correct / len(dataloader.dataset)
    return epoch_loss, epoch_acc

In [15]:
# START TRAINING
model = DeepFakeDetector().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.BCEWithLogitsLoss()

Loaded pretrained weights for efficientnet-b0


In [16]:
# model

In [17]:
# Prepare save path
model_dir = 'D:/Desktop/FinalYearProject/models/rPPG_model'
os.makedirs(model_dir, exist_ok=True)
save_path = os.path.join(model_dir, 'best_model_celeb_df.pth')

In [18]:
# Training loop
EPOCHS = 5
best_val_acc = 0.0

for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")

    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = validate(model, val_loader, criterion, device)

    print(f"Train Loss: {train_loss:.4f}, Accuracy: {train_acc:.4f}")
    print(f"Val   Loss: {val_loss:.4f}, Accuracy: {val_acc:.4f}")

    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_accuracy': best_val_acc
        }, save_path)
        print(f"💾 Model saved to Google Drive with Val Accuracy: {best_val_acc:.4f}")


Epoch 1/5


Training: 100%|██████████| 622/622 [08:17<00:00,  1.25it/s]
Validation: 100%|██████████| 156/156 [02:04<00:00,  1.25it/s]


Train Loss: 0.3050, Accuracy: 0.9053
Val   Loss: 0.2260, Accuracy: 0.9164
💾 Model saved to Google Drive with Val Accuracy: 0.9164

Epoch 2/5


Training: 100%|██████████| 622/622 [07:43<00:00,  1.34it/s]
Validation: 100%|██████████| 156/156 [01:49<00:00,  1.43it/s]


Train Loss: 0.2285, Accuracy: 0.9192
Val   Loss: 0.1950, Accuracy: 0.9389
💾 Model saved to Google Drive with Val Accuracy: 0.9389

Epoch 3/5


Training: 100%|██████████| 622/622 [07:31<00:00,  1.38it/s]
Validation: 100%|██████████| 156/156 [01:47<00:00,  1.44it/s]


Train Loss: 0.2006, Accuracy: 0.9303
Val   Loss: 0.1935, Accuracy: 0.9357

Epoch 4/5


Training: 100%|██████████| 622/622 [07:27<00:00,  1.39it/s]
Validation: 100%|██████████| 156/156 [01:45<00:00,  1.48it/s]


Train Loss: 0.1883, Accuracy: 0.9323
Val   Loss: 0.2322, Accuracy: 0.9252

Epoch 5/5


Training: 100%|██████████| 622/622 [07:49<00:00,  1.32it/s]
Validation: 100%|██████████| 156/156 [01:50<00:00,  1.41it/s]

Train Loss: 0.1685, Accuracy: 0.9383
Val   Loss: 0.1856, Accuracy: 0.9293



