In [26]:
import os
import face_recognition
from PIL import Image
import cv2
import math
import torch
import torch.nn as nn
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, Dataset,random_split,Subset
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import torch.optim as optim
import random
import numpy as np

In [27]:
# Move model to GPU if available
device = torch.device("cuda:0")
print(device)

# For reproducability
seed = 33

torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

cuda:0


In [2]:
#from video get frames
def capture_frames(video_path, output_folder,video_name, num_frames):
    # Create the output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)
    
    cap = cv2.VideoCapture(video_path)
    
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # print(total_frames)
    
    interval = math.floor(total_frames // num_frames)
    if interval == 0:
        interval = 1
    # print(interval)
    
    frame_count = 0
    success = True
    frame_number = 0
    while success and frame_number < total_frames:
        # Set the frame number to read
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
        
        # Read a frame from the video
        success, frame = cap.read()
        
        # Check if its time to capture a frame
        if frame_number != 0 and interval !=0 and frame_number % interval == 0 and success:
            frame_path = os.path.join(output_folder, f"{video_name}_{frame_count}.jpg")
            cv2.imwrite(frame_path, frame)
            # print(f"Saved frame {frame_count}")
            
            frame_count += 1

        frame_number += interval

    cap.release()
    print("Frame capture completed.")

def capture_frames_from_folder(video_folder, output_folder, num_frames=10):
    # Create the output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    # Iterate through all files in the video folder
    for file_name in os.listdir(video_folder):
        if file_name.endswith(('.mp4')):
            video_path = os.path.join(video_folder, file_name)
            video_name = os.path.splitext(file_name)[0]
            print(video_name)
            capture_frames(video_path, output_folder, video_name, num_frames)

In [4]:
video_path = "train/real"
output_folder = "train/real_frame"
num_frames_to_capture = 6

capture_frames_from_folder(video_path, output_folder, num_frames=num_frames_to_capture)

000
Frame capture completed.
001
Frame capture completed.
002
Frame capture completed.
003
Frame capture completed.
004
Frame capture completed.
005
Frame capture completed.
006
Frame capture completed.
007
Frame capture completed.
008
Frame capture completed.
009
Frame capture completed.
010
Frame capture completed.
011
Frame capture completed.
012
Frame capture completed.
013
Frame capture completed.
014
Frame capture completed.
015
Frame capture completed.
016
Frame capture completed.
017
Frame capture completed.
018
Frame capture completed.
019
Frame capture completed.
020
Frame capture completed.
021
Frame capture completed.
022
Frame capture completed.
023
Frame capture completed.
024
Frame capture completed.
025
Frame capture completed.
026
Frame capture completed.
027
Frame capture completed.
028
Frame capture completed.
029
Frame capture completed.
030
Frame capture completed.
031
Frame capture completed.
032
Frame capture completed.
033
Frame capture completed.
034
Frame capt

In [6]:
def extract_faces_from_frames(input_folder, output_folder):
    # Create the output folder if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)

    for filename in os.listdir(input_folder):
        if filename.endswith((".jpg", ".jpeg", ".png")):
            image_path = os.path.join(input_folder, filename)
            image = face_recognition.load_image_file(image_path)

            # Find all face locations in the image
            face_locations = face_recognition.face_locations(image)

            # Extract and save faces
            for i, (top, right, bottom, left) in enumerate(face_locations):
                # only extract faces greater than 75x75 pixels --> helps to ignore wrong faces from background objects
                if (bottom - top > 75) and (right - left > 75):
                    face_roi = image[top:bottom, left:right]
                    face_path = os.path.join(
                        output_folder, f"{os.path.splitext(filename)[0]}_face_{i}.jpg"
                    )
                    face_image = Image.fromarray(face_roi)
                    face_image.save(face_path)
                    print(f"Saved face {i} from {filename}")

input_folder = "train/fake_frame"
output_folder = "train/fake_face"


extract_faces_from_frames(input_folder, output_folder)

Saved face 0 from 000_003_deepfakes_0.jpg
Saved face 0 from 000_003_deepfakes_1.jpg
Saved face 0 from 000_003_deepfakes_2.jpg
Saved face 0 from 000_003_deepfakes_3.jpg
Saved face 0 from 000_003_deepfakes_4.jpg
Saved face 0 from 000_003_face2face_0.jpg
Saved face 0 from 000_003_face2face_1.jpg
Saved face 0 from 000_003_face2face_2.jpg
Saved face 0 from 000_003_face2face_3.jpg
Saved face 0 from 000_003_face2face_4.jpg
Saved face 0 from 000_003_face2face_5.jpg
Saved face 0 from 000_003_faceswap_0.jpg
Saved face 0 from 000_003_faceswap_1.jpg
Saved face 0 from 000_003_faceswap_2.jpg
Saved face 0 from 000_003_faceswap_3.jpg
Saved face 0 from 000_003_faceswap_4.jpg
Saved face 0 from 000_003_faceswap_5.jpg
Saved face 0 from 000_003_neuraltextures_0.jpg
Saved face 0 from 000_003_neuraltextures_1.jpg
Saved face 0 from 000_003_neuraltextures_2.jpg
Saved face 0 from 000_003_neuraltextures_3.jpg
Saved face 0 from 000_003_neuraltextures_4.jpg
Saved face 0 from 000_003_neuraltextures_5.jpg
Saved face

In [7]:
class DeepfakeDetectionDataset(Dataset):
    def __init__(self, root_dir, transform=None):

        self.root_dir = root_dir
        self.transform = transform
        self.data = []

        # Load data paths and labels
        for label, subdir in enumerate(['real_face', 'fake_face']):
            dir_path = os.path.join(self.root_dir, subdir)
            files = [os.path.join(dir_path, f) for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f)) and f.lower().endswith(('.jpg', '.jpeg', '.png'))]
            labels = [label] * len(files)  # 0 for real, 1 for fake
            self.data.extend(zip(files, labels))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]
        image = Image.open(img_path).convert('RGB')  # Convert to RGB to ensure 3 channels

        if self.transform:
            image = self.transform(image)

        return image, label

In [8]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [9]:
dataset = DeepfakeDetectionDataset(root_dir='train',transform=transform)

In [11]:
# Split sizes
total_size = len(dataset)
# print(total_size)
# train_loader = DataLoader(dataset, batch_size=16, shuffle=True)
train_size = int(0.8 * total_size)
test_size = int(0.1 * total_size)
val_size = total_size - train_size - test_size  # Remainder for validation

print(train_size)
print(val_size)
print(test_size)
total_samples = len(dataset)
indices = torch.randperm(total_samples)[:32]  # Randomly shuffle and select the first 100 indices

small_train_dataset = Subset(dataset, indices)
small_train_loader = DataLoader(small_train_dataset, batch_size=16, shuffle=True)

# Split the dataset
train_dataset, test_dataset, val_dataset = random_split(dataset, [train_size, test_size, val_size])
# train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

1101
139
137


In [12]:
def inspect_dataloader(dataloader):
    # Get the first batch
    images, labels = next(iter(dataloader))
    
    # Print the shapes and labels of the batch
    print(f"Batch shape: {images.shape}")
    print(f"Labels: {labels}")

print("Inspecting Training DataLoader:")
inspect_dataloader(train_loader)

print("\nInspecting Validation DataLoader:")
inspect_dataloader(val_loader)

print("\nInspecting Testing DataLoader:")
inspect_dataloader(test_loader)

Inspecting Training DataLoader:
Batch shape: torch.Size([32, 3, 224, 224])
Labels: tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1,
        0, 1, 1, 1, 1, 0, 1, 1])

Inspecting Validation DataLoader:
Batch shape: torch.Size([32, 3, 224, 224])
Labels: tensor([1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1,
        1, 1, 0, 0, 0, 1, 1, 1])

Inspecting Testing DataLoader:
Batch shape: torch.Size([32, 3, 224, 224])
Labels: tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1,
        1, 0, 1, 1, 1, 1, 1, 1])


In [19]:
# Load a pre-trained ResNet50 model
model = models.resnet50(weights="ResNet50_Weights.IMAGENET1K_V1")

# Modify the final layer for binary classification
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 2)  # Output for 2 classes: fake and real

# model = model.to(device)

criterion = nn.CrossEntropyLoss()
#L2 Regularization
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

In [18]:
def train_model(model, criterion, optimizer, dataloader, num_epochs):
    model.train()  # Set model to training mode
    
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_preds = 0
        
        for inputs, labels in dataloader:
            # print(f"Input batch shape: {inputs.shape}")
            # print(f"Labels batch shape: {labels.shape}")
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Backward and optimize
            loss.backward()
            optimizer.step()
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct_preds += torch.sum(preds == labels.data)
        
        epoch_loss = running_loss / len(dataloader.dataset)
        epoch_acc = correct_preds.double() / len(dataloader.dataset)
        
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f}')
        
    print('Training complete')
    return model


In [15]:
train_model(model, criterion, optimizer, train_loader, num_epochs=10)

Epoch 1/10, Loss: 0.5978, Acc: 0.7775
Epoch 2/10, Loss: 0.5075, Acc: 0.8011
Epoch 3/10, Loss: 0.4888, Acc: 0.8038
Epoch 4/10, Loss: 0.4821, Acc: 0.8084
Epoch 5/10, Loss: 0.4791, Acc: 0.8056
Epoch 6/10, Loss: 0.4826, Acc: 0.8011
Epoch 7/10, Loss: 0.4679, Acc: 0.8120
Epoch 8/10, Loss: 0.4491, Acc: 0.8220
Epoch 9/10, Loss: 0.4334, Acc: 0.8274
Epoch 10/10, Loss: 0.3990, Acc: 0.8392
Training complete


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [24]:
def validate(model, criterion, validate_loader):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for inputs, labels in validate_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    print(val_loss)
    return val_loss / len(validate_loader)

def evaluate(model, data_loader):
    model.eval()
    all_predictions = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in data_loader:

            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            
            all_predictions.extend(predicted.numpy())
            all_labels.extend(labels.numpy())

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions)
    recall = recall_score(all_labels, all_predictions)
    f1 = f1_score(all_labels, all_predictions)

    return accuracy, precision, recall, f1

# Evaluate the model
validate(model,criterion,val_loader)
evaluate(model, test_loader)

3.9239246249198914


(0.23357664233576642, 1.0, 0.05405405405405406, 0.10256410256410256)