In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os
import cv2
import shutil
import random
import glob
import numpy as np 
import pandas as pd
# Torch & torchvision
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets, models
from torch.utils.data import DataLoader

In [None]:
def extract_frames_at_intervals(video_path_pattern, output_dir, interval):
    os.makedirs(output_dir, exist_ok=True)
    
    # Lấy danh sách tất cả các video từ pattern
    
    if not video_files:
        print("No video files found!")
        return

    for video_path in video_files:
        cap = cv2.VideoCapture(video_path)
        count = 0
        video_name = os.path.basename(video_path).split('.')[0]  # Lấy tên file không có đuôi
        
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if count % interval == 0:
                frame_filename = os.path.join(output_dir, f"{video_name}_frame_{count}.jpg")
                cv2.imwrite(frame_filename, frame)
            count += 1
        
        cap.release()
        print(f"Frames extracted from {frame_filename} saved in {output_dir}")

In [None]:
# Example usage
video_files =  glob.glob('/kaggle/input/celeb-v1-df/Celeb-real/*.mp4')
video_files +=  glob.glob('/kaggle/input/celeb-v1-df/YouTube-real/*.mp4')
video_files +=  glob.glob('/kaggle/input/celeb-df-v2/Celeb-real/*.mp4')
video_files +=  glob.glob('/kaggle/input/celeb-df-v2/YouTube-real/*.mp4')
output_dir = 'real_output_frames'    
interval = 30                   
extract_frames_at_intervals(video_files, output_dir, interval)

In [None]:

video_files_fake += glob.glob('/kaggle/input/celeb-df-v2/Celeb-synthesis/*.mp4')
output_dir_fake = 'fake_output_frames'    
interval = 30                   
extract_frames_at_intervals(video_files_fake, output_dir_fake, interval)

In [None]:
def detect_faces_and_context(input_dir, output_dir, face_cascade_path=None, padding=30):
    # Kiểm tra đường dẫn của Haarcascade
    if face_cascade_path is None:
        face_cascade_path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
    
    roi_dir = os.path.join(output_dir, 'ROI')
    context_dir = os.path.join(output_dir, 'Surrounding_Context')
    os.makedirs(roi_dir, exist_ok=True)
    os.makedirs(context_dir, exist_ok=True)
    
    
    face_cascade = cv2.CascadeClassifier(face_cascade_path)
    
    
    for filename in os.listdir(input_dir):
        if filename.endswith(".jpg"):
            img_path = os.path.join(input_dir, filename)
            img = cv2.imread(img_path)
            if img is None:
                print(f"Error: Could not load image {img_path}")
                continue

            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            
            
            faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
            
            for i, (x, y, w, h) in enumerate(faces):
               
                roi_filename = os.path.join(roi_dir, f"{filename[:-4]}_face_{i}.jpg")
                context_filename = os.path.join(context_dir, f"{filename[:-4]}_context_{i}.jpg")
                
            
                if os.path.exists(roi_filename) and os.path.exists(context_filename):
                    print(f"Skipping {filename}: face {i} already processed")
                    continue
                
               
                face = img[y:y+h, x:x+w]
                cv2.imwrite(roi_filename, face)

                
                x1 = max(x - padding, 0)
                y1 = max(y - padding, 0)
                x2 = min(x + w + padding, img.shape[1])
                y2 = min(y + h + padding, img.shape[0])
                context = img[y1:y2, x1:x2]
                cv2.imwrite(context_filename, context)

            print(f"Processed {filename}: {len(faces)} face(s) detected")

In [None]:
input_dir_real = 'real_output_frames'         
output_dir_real = 'real_output_faces_context' 
detect_faces_and_context(input_dir_real, output_dir_real)

In [None]:

input_dir_fake = 'fake_output_frames'         
output_dir_fake = 'fake_output_faces_context' 
detect_faces_and_context(input_dir_fake, output_dir_fake)

In [None]:
def split_data(real_dir, fake_dir, output_base, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):
    assert train_ratio + val_ratio + test_ratio == 1,

   
    train_real = os.path.join(output_base, "train", "real")
    train_fake = os.path.join(output_base, "train", "fake")
    val_real = os.path.join(output_base, "val", "real")
    val_fake = os.path.join(output_base, "val", "fake")
    test_real = os.path.join(output_base, "test", "real")
    test_fake = os.path.join(output_base, "test", "fake")

    
    for folder in [train_real, train_fake, val_real, val_fake, test_real, test_fake]:
        os.makedirs(folder, exist_ok=True)

    def split_and_copy(src_dir, train_dst, val_dst, test_dst):
        files = [f for f in os.listdir(src_dir) if f.endswith(".jpg")]
        random.shuffle(files)

        total = len(files)
        train_idx = int(total * train_ratio)
        val_idx = train_idx + int(total * val_ratio)

        train_files = files[:train_idx]
        val_files = files[train_idx:val_idx]
        test_files = files[val_idx:]

        for f in train_files:
            shutil.copy(os.path.join(src_dir, f), os.path.join(train_dst, f))
        for f in val_files:
            shutil.copy(os.path.join(src_dir, f), os.path.join(val_dst, f))
        for f in test_files:
            shutil.copy(os.path.join(src_dir, f), os.path.join(test_dst, f))

        print(f"{src_dir}: {len(train_files)} train, {len(val_files)} val, {len(test_files)} test")

    
    split_and_copy(real_dir, train_real, val_real, test_real)
    split_and_copy(fake_dir, train_fake, val_fake, test_fake)

In [None]:
real_faces_dir = "/kaggle/working/real_output_faces_context/ROI" 
fake_faces_dir = "/kaggle/working/fake_output_faces_context/ROI" 
output_dir = "/kaggle/working/split_data" 

split_data(real_faces_dir, fake_faces_dir, output_dir)

In [None]:


def get_dataloaders(data_dir, batch_size=32, image_size=224):
    
    transform = {
        'train': transforms.Compose([
            transforms.Resize((image_size, image_size)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ]),
        'val': transforms.Compose([
            transforms.Resize((image_size, image_size)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ]),
        'test': transforms.Compose([
            transforms.Resize((image_size, image_size)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
    }

    
    train_dataset = datasets.ImageFolder(root=f"{data_dir}/train", transform=transform['train'])
    val_dataset = datasets.ImageFolder(root=f"{data_dir}/val", transform=transform['val'])
    test_dataset = datasets.ImageFolder(root=f"{data_dir}/test", transform=transform['test'])

    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

    return train_loader, val_loader, test_loader


train_loader, val_loader, test_loader = get_dataloaders("/kaggle/working/split_data")


In [None]:

device = torch.device("cpu")
model = models.resnet50(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, 2)
model = model.to(device)


criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [None]:
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

        
        if (batch_idx + 1) % 100 == 0:
            accuracy = 100 * correct / total
            print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], "
                  f"Loss: {running_loss / (batch_idx + 1):.4f}, Accuracy: {accuracy:.2f}%")

    
    epoch_accuracy = 100 * correct / total
    print(f"Epoch [{epoch+1}/{num_epochs}] completed - Average Loss: {running_loss / len(train_loader):.4f}, "
          f"Final Accuracy: {epoch_accuracy:.2f}%")

print("Training finished!")
