# Final Project Computer Vision
## Truth in Motion: Depth and Flow Enhanced DeepFake Detection

Authors: Aimee Lin, Neli Catar and Gellert Toth

### Imports

In [None]:
import cv2
import glob
import numpy as np
import matplotlib.pyplot as plt
import os
import random
import re
import torch
import torch.nn as nn
import torch.nn.init as init
import torchvision.transforms as T

from depth_anything_v2.dpt import DepthAnythingV2
from facenet_pytorch import MTCNN
from IPython.display import display
from matplotlib import colors
from PIL import Image, ImageShow
from pwcnet.run import estimate
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from transformers import AutoModel, AutoImageProcessor
from tqdm import tqdm

### Globals

In [None]:
# data folders
video_root = './data' # where the data is stored
data_root = './preprocessed_data' # where the pre processed data gets stored
data_flow = "./flow_data" # where flow data gets saved

# data loading
BATCH_SIZE_DATA = 64 # batch size of the preprocessed data

# device
# DEVICE = 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'
DEVICE = 'cpu' # setting the default device

# training
num_epochs = 5 # epochs when training


### Utils

#### Feature extraction generation

In the sections below we tested the feature extraction generation techniques

##### Optical Flow

In [None]:
def list_leaf_folders(root_dir):
    leaf_folders = []
    for dirpath, dirnames, _ in os.walk(root_dir):
        if not dirnames:
            leaf_folders.append(dirpath)
    return leaf_folders

def natural_sort_key(s):
    return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', s)]

def list_sorted_images(folder, extensions={'.png', '.jpg', '.jpeg', '.bmp'}):
    files = [f for f in os.listdir(folder)
             if os.path.isfile(os.path.join(folder, f)) and os.path.splitext(f)[1].lower() in extensions]
    files.sort(key=natural_sort_key)
    return [os.path.join(folder, f) for f in files]

def flow_to_rgb(flow):
    """
    Convert the 2-channel flow output (horizontal and vertical) into an RGB image
    where the pixel color represents the angle and saturation represents the magnitude.
    
    Parameters:
        flow (numpy.ndarray): A numpy array of shape (H, W, 2), where:
            - flow[..., 0] is the horizontal flow (u)
            - flow[..., 1] is the vertical flow (v)
    
    Returns:
        rgb_image (numpy.ndarray): An RGB image (H, W, 3) representing the optical flow
    """

    u = flow[..., 0]
    v = flow[..., 1] 

    magnitude = np.sqrt(u**2 + v**2) 
    direction = np.arctan2(v, u)
    mag_max = np.max(magnitude)
    magnitude = magnitude / mag_max if mag_max != 0 else magnitude

    direction = (direction + np.pi) / (2 * np.pi)
    saturation = magnitude
    value = np.ones_like(magnitude)
    hsv_image = np.stack((direction, saturation, value), axis=-1)
    rgb_image = colors.hsv_to_rgb(hsv_image.astype(np.float32))
    return rgb_image

def calculate_flow_picture(img1, img2):
    res = estimate(img1, img2)
    res = np.array(res.numpy(force=True).transpose(1, 2, 0), np.float32)
    res = flow_to_rgb(res)
    return res



leaf_folders = list_leaf_folders(data_root)
for folder in tqdm(leaf_folders):
    relative_path = os.path.relpath(folder, data_root)
    images = list_sorted_images(folder)
    images = [torch.FloatTensor(np.ascontiguousarray(np.array(Image.open(image))[:, :, ::-1].transpose(2, 0, 1).astype(np.float32) * (1.0 / 255.0))) for image in images]
    for i in range(len(images)-1):
        img1, img2 = images[i], images[i+1]
        res = calculate_flow_picture(img1, img2)
        save_path = os.path.join(data_flow, relative_path)
        os.makedirs(save_path, exist_ok=True)
        rgb_flow_uint8 = (res * 255).astype(np.uint8)
        pil_img = Image.fromarray(rgb_flow_uint8, mode='RGB')
        pil_img.save(os.path.join(save_path, f"flow_image_{i}.png"))

##### Depth Estimation

In [None]:
DEVICE = 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'
print(DEVICE)

model_configs = {
    'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},
    'vitb': {'encoder': 'vitb', 'features': 128, 'out_channels': [96, 192, 384, 768]},
    'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},
    'vitg': {'encoder': 'vitg', 'features': 384, 'out_channels': [1536, 1536, 1536, 1536]}
}

model = DepthAnythingV2(**model_configs['vitb'])
model.load_state_dict(torch.load(f'checkpoints/depth_anything_v2_vitb.pth', map_location='cpu'))
model = model.to(DEVICE).eval()

root_dir = "./preprocessed_data"
save_folder = "./depth_data"
def list_leaf_folders(root_dir):
    leaf_folders = []
    for dirpath, dirnames, _ in os.walk(root_dir):
        if not dirnames:
            leaf_folders.append(dirpath)
    return leaf_folders

def natural_sort_key(s):
    return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', s)]

def list_sorted_images(folder, extensions={'.png', '.jpg', '.jpeg', '.bmp'}):
    files = [f for f in os.listdir(folder)
             if os.path.isfile(os.path.join(folder, f)) and os.path.splitext(f)[1].lower() in extensions]
    files.sort(key=natural_sort_key)
    return [os.path.join(folder, f) for f in files]

def calculate_depth_picture(img):
    res = model.infer_image(img)
    depth_min = np.min(res)
    depth_max = np.max(res)
    if depth_max - depth_min < 1e-6:
        normalised = np.zeros_like(res, dtype=np.uint8)
    else:
        normalised = 255 * (res - depth_min) / (depth_max - depth_min)
        normalised = normalised.astype(np.uint8)
    return normalised

leaf_folders = list_leaf_folders(root_dir)
for folder in tqdm(leaf_folders):
    relative_path = os.path.relpath(folder, root_dir)
    images = list_sorted_images(folder)
    for i in range(len(images)):
        img_path = images[i]
        img = np.array(Image.open(img_path))
        res = calculate_depth_picture(img)
        save_path = os.path.join(save_folder, relative_path)
        os.makedirs(save_path, exist_ok=True)
        pil_img = Image.fromarray(res, mode='L')  
        pil_img.save(os.path.join(save_path, f"depth_image_{i}.png"))

### Data

Code for downloading the data is based on FaceForensics++ dataset and is available on https://github.com/ondyari/FaceForensics

In the code below we simply sample the required batch of data.

In [None]:
mtcnn = MTCNN(margin=20, keep_all=True, post_process=False, device='cuda:0')

def process_video(path, save_path):
    print(path, save_path)
    cap = cv2.VideoCapture(path)

    frames = []
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break
        frames.append(frame)

    cap.release()

    faces = [frame_faces for i in tqdm(range(0, len(frames), BATCH_SIZE_DATA)) for frame_faces in mtcnn(frames[i:min(len(frames),i+BATCH_SIZE_DATA)])]

    os.makedirs(save_path, exist_ok=True)
    for i, frame_faces in tqdm(enumerate(faces)):
        if frame_faces is None:
            continue
        face = frame_faces[0]
        img_pil = Image.fromarray(face.permute(1, 2, 0).numpy().astype(np.uint8))
        img_pil.save(os.path.join(save_path, f"frame_{i}.png"))


for dirpath, _, filenames in os.walk(video_root):
    for file in filenames:
        if file.endswith('.mp4'):
            video_path = os.path.join(dirpath, file)
            relative_path = os.path.relpath(video_path, video_root)
            output_folder = os.path.join(data_root, os.path.splitext(relative_path)[0])

            process_video(video_path, output_folder)
            

### Network

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model_name = "facebook/dinov2-base"
processor = AutoImageProcessor.from_pretrained(model_name)
backbone = AutoModel.from_pretrained(model_name)

In [None]:
def collect_video_folders(base_dir):
    folders = []
    for root, dirs, _ in os.walk(base_dir):
        for d in dirs:
            folder_path = os.path.join(root, d)
            # check if it contains frames
            if any(fname.lower().endswith(('.jpg', '.png')) for fname in os.listdir(folder_path)):
                folders.append(folder_path)
    return sorted(folders)

def extract_frame_paths_and_labels(base_dirs, label, train_ratio=0.8):
    train_paths = []
    test_paths = []

    for base_dir in base_dirs:
        video_folders = collect_video_folders(base_dir)
        total_videos = len(video_folders)

        train_cutoff = int(total_videos * train_ratio)

        for i, video_folder in enumerate(video_folders):
            frame_paths = glob.glob(os.path.join(video_folder, '*'))
            frame_paths = [f for f in frame_paths if f.lower().endswith(('.jpg', '.png'))]

            labeled = [(fp, label) for fp in frame_paths]

            if i < train_cutoff:
                train_paths.extend(labeled)
            else:
                test_paths.extend(labeled)

    return train_paths, test_paths

# Define your paths
original_sources = [
    'flow_data/original_sequences/actors/c23/videos',
    'flow_data/original_sequences/youtube/c23/videos'
]

manipulated_sources = [
    'flow_data/manipulated_sequences/DeepFakeDetection/c23/videos',
    'flow_data/manipulated_sequences/Deepfakes/c23/videos',
    # 'flow_data/manipulated_sequences/Face2Face/c23/videos',
    # 'flow_data/manipulated_sequences/FaceShifter/c23/videos',
    # 'flow_data/manipulated_sequences/FaceSwap/c23/videos',
    # 'flow_data/manipulated_sequences/NeuralTextures/c23/videos'
]

# Collect data
train_real, test_real = extract_frame_paths_and_labels(original_sources, label=0)
train_fake, test_fake = extract_frame_paths_and_labels(manipulated_sources, label=1)

# Combine
train_data = train_real + train_fake
test_data = test_real + test_fake

# Optionally shuffle
random.shuffle(train_data)
random.shuffle(test_data)

# Sample output
print(f"Training samples: {len(train_data)}")
print(f"Testing samples: {len(test_data)}")
print(f"Sample training item: {train_data[0]}")

In [None]:

class FrameDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path, label = self.data[idx]
        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        label = torch.tensor(label, dtype=torch.float32) 
        return image, label
    
def repeat_channels(x):
    if isinstance(x, torch.Tensor) and x.dim() == 3 and x.shape[0] == 1:
        return x.repeat(3, 1, 1)
    elif isinstance(x, torch.Tensor) and x.dim() == 2:
        # Add channel dimension if missing
        return x.unsqueeze(0).repeat(3, 1, 1)
    else:
        raise ValueError(f"Unexpected input shape for repeat_channels: {x.shape}")

image_transform = transforms.Compose([
    transforms.Resize((224, 224)),  
    transforms.ToTensor(),    
    transforms.Normalize(
        mean=[0.5, 0.5, 0.5], 
        std=[0.5, 0.5, 0.5]
    )
])

train_dataset = FrameDataset(train_data, transform=image_transform)
test_dataset = FrameDataset(test_data, transform=image_transform)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=4, pin_memory=True)

In [None]:
for param in backbone.parameters():
    param.requires_grad = False

In [None]:
for name, param in backbone.named_parameters():
    if "encoder.layer.10" in name or "encoder.layer.11" in name:
        param.requires_grad = True

In [None]:
class DinoBinaryClassifier(nn.Module):
    def __init__(self, backbone):
        super().__init__()
        self.backbone = backbone
        self.classifier = nn.Sequential(
            nn.Linear(self.backbone.config.hidden_size, 256, bias=True),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 1, bias=True)  # binary output
        )
        self._init_weights()

    def _init_weights(self):
        for layer in self.classifier:
            if isinstance(layer, nn.Linear):
                init.kaiming_normal_(layer.weight, nonlinearity='relu')
                init.zeros_(layer.bias)

    def forward(self, pixel_values):
        outputs = self.backbone(pixel_values=pixel_values)
        cls_token = outputs.last_hidden_state[:, 0]  
        return self.classifier(cls_token)

In [None]:
model = DinoBinaryClassifier(backbone)
model.to(device)

In [None]:
criterion = torch.nn.BCEWithLogitsLoss()
backbone_params = []
classifier_params = []

for name, param in model.named_parameters():
    if param.requires_grad:
        if "backbone" in name:
            backbone_params.append(param)
        else:
            classifier_params.append(param)
print(len(backbone_params), len(classifier_params))
optimizer = torch.optim.Adam([
    {"params": backbone_params, "lr": 1e-5},
    {"params": classifier_params, "lr": 1e-4},
])

### Train

In [None]:
def train(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    total = 0
    correct = 0
    for images, labels in tqdm.tqdm(dataloader):
        images = images.to(device)
        labels = labels.to(device).unsqueeze(1)
        
        optimizer.zero_grad()
        outputs = model(images)
        
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * images.size(0)
        
        # Calculate accuracy
        preds = torch.sigmoid(outputs) > 0.5
        # print(outputs.mean().item(), outputs.std().item())
        
        correct += (preds == labels.bool()).sum().item()
        total += labels.size(0)
        
    epoch_loss = running_loss / total
    epoch_acc = correct / total
    return epoch_loss, epoch_acc

def evaluate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    total = 0
    correct = 0
    
    with torch.no_grad():
        for images, labels in tqdm.tqdm(dataloader):
            images = images.to(device)
            labels = labels.to(device).unsqueeze(1)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * images.size(0)
            
            preds = torch.sigmoid(outputs) > 0.5
            correct += (preds == labels.bool()).sum().item()
            total += labels.size(0)
    
    epoch_loss = running_loss / total
    epoch_acc = correct / total
    return epoch_loss, epoch_acc

#### Running the model with Flows

In [None]:
for epoch in range(num_epochs):
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = evaluate(model, test_loader, criterion, device)
    
    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

In [None]:
# Grab a single batch
images, labels = next(iter(train_loader))
for i in range(100):  # overfit
    optimizer.zero_grad()
    outputs = model(images.to(device))
    loss = criterion(outputs, labels.to(device).unsqueeze(1))
    loss.backward()
    optimizer.step()
    print(f"Epoch {i}, Loss: {loss.item()}, Output Std: {outputs.std().item()}, outputs mean {outputs.mean().item()}")

#### Running the model with Depth

In [None]:
for epoch in range(1, 1 + num_epochs):
    save_path = f"depth_epoch_{epoch}.pth"
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = evaluate(model, test_loader, criterion, device)
    
    print(f"Epoch {epoch}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'train_loss': train_loss,
        'val_loss': val_loss,
    }, save_path)

In [None]:
# Grab a single batch
images, labels = next(iter(train_loader))
for i in range(100):  # overfit
    optimizer.zero_grad()
    outputs = model(images.to(device))
    loss = criterion(outputs, labels.to(device).unsqueeze(1))
    loss.backward()
    optimizer.step()
    print(f"Epoch {i}, Loss: {loss.item()}, Output Std: {outputs.std().item()}, outputs mean {outputs.mean().item()}")

### Evaluation