In [None]:
import os
!pip install opencv-python
import cv2
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
from sklearn.model_selection import train_test_split
from google.colab import drive
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import torch
import gc
import time

# Mount Google Drive
drive.mount('/content/drive')

# Define paths based on your Google Drive directory structure
ROOT_DIR = "/content/drive/MyDrive/Celeb-DF"
REAL_DIRS = [
    os.path.join(ROOT_DIR, "YouTube-real"),
    os.path.join(ROOT_DIR, "Celeb-real"),
    os.path.join(ROOT_DIR, "Actors-real"),
    os.path.join(ROOT_DIR, "YouTube-real-FF")
]
FAKE_DIRS = [os.path.join(ROOT_DIR, "Celeb-synthesis")]
OUTPUT_DIR = os.path.join(ROOT_DIR, "processed_data")
FACE_DIR = os.path.join(OUTPUT_DIR, "face_clips")
METADATA_DIR = os.path.join(OUTPUT_DIR, "metadata")

# Create necessary directories
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(FACE_DIR, exist_ok=True)
os.makedirs(METADATA_DIR, exist_ok=True)
os.makedirs(os.path.join(FACE_DIR, "real"), exist_ok=True)
os.makedirs(os.path.join(FACE_DIR, "fake"), exist_ok=True)


In [None]:
# Check if GPU is available
USE_GPU = torch.cuda.is_available()
print(f"GPU available: {USE_GPU}")
if USE_GPU:
    print(f"GPU device: {torch.cuda.get_device_name(0)}")
    print(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

# Determine number of CPU cores for parallel processing
# For video processing, using slightly fewer than max cores can avoid overloading
NUM_CORES = max(1, multiprocessing.cpu_count() - 1)
print(f"Using {NUM_CORES} CPU cores for processing")

# Performance optimization settings
FRAMES_PER_VIDEO = 15  # Reduced from 30
TARGET_FACE_SIZE = (128, 128)  # Reduced from 224x224
MIN_FACE_FRAMES = 5  # Reduced from 10
BATCH_SIZE = 8 if USE_GPU else 4  # Increase batch size for GPU processing
MAX_VIDEOS_PER_RUN = 500  # Process in chunks to avoid memory issues # Number of videos to process per worker

# Pre-load the face detector
face_cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
face_cascade = cv2.CascadeClassifier(face_cascade_path)

In [None]:
def extract_frames(video_path, num_frames=FRAMES_PER_VIDEO):
    """
    Extract evenly spaced frames from a video - optimized version
    """
    try:
        cap = cv2.VideoCapture(video_path)

        if not cap.isOpened():
            return []

        # Get video properties
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        if frame_count <= 0:
            return []

        # Calculate frame indices to extract
        indices = np.linspace(0, frame_count-1, num_frames, dtype=int)

        frames = []
        for i in indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, i)
            ret, frame = cap.read()
            if ret and frame is not None and frame.size > 0:
                # Downsize frame immediately to speed up face detection
                frame = cv2.resize(frame, (320, 240))
                frames.append(frame)

        cap.release()
        return frames
    except Exception as e:
        return []

In [None]:
def extract_face_with_opencv(frame, target_size=TARGET_FACE_SIZE):
    """
    Extract face using only OpenCV for speed
    """
    try:
        # Convert to grayscale for face detection
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Detect faces with relaxed parameters for speed
        faces = face_cascade.detectMultiScale(
            gray,
            scaleFactor=1.2,  # Increased for speed
            minNeighbors=3,   # Reduced for speed
            minSize=(20, 20), # Reduced minimum face size
            flags=cv2.CASCADE_SCALE_IMAGE
        )

        if len(faces) == 0:
            return None

        # Use largest face if multiple faces detected
        if len(faces) > 1:
            faces = sorted(faces, key=lambda x: x[2] * x[3], reverse=True)

        x, y, w, h = faces[0]

        # Add smaller margin
        margin = min(w, h) // 8
        height, width = frame.shape[:2]

        # Ensure bounds
        y = max(0, y - margin)
        y2 = min(height, y + h + margin)
        x = max(0, x - margin)
        x2 = min(width, x + w + margin)

        # Extract and resize face
        face_img = frame[y:y2, x:x2]
        return cv2.resize(face_img, target_size)
    except Exception as e:
        return None


In [None]:
def process_video(video_path, output_dir, label, video_id=None):
    """
    Process a video: extract faces and save as a face clip - optimized version
    """
    try:
        if video_id is None:
            video_id = os.path.splitext(os.path.basename(video_path))[0]

        output_path = os.path.join(output_dir, f"{video_id}.npy")

        # Skip if already processed
        if os.path.exists(output_path):
            return output_path

        # Extract frames
        frames = extract_frames(video_path)
        if not frames:
            return None

        # Extract faces using only OpenCV (faster)
        face_frames = []
        for frame in frames:
            face = extract_face_with_opencv(frame)
            if face is not None:
                face_frames.append(face)

        # Require minimum number of face frames (reduced threshold)
        if len(face_frames) < MIN_FACE_FRAMES:
            return None

        # Pad or truncate to exactly FRAMES_PER_VIDEO frames
        if len(face_frames) < FRAMES_PER_VIDEO:
            # Duplicate last frame if needed
            face_frames.extend([face_frames[-1]] * (FRAMES_PER_VIDEO - len(face_frames)))
        else:
            face_frames = face_frames[:FRAMES_PER_VIDEO]

        # Convert to numpy array and save
        face_frames = np.array(face_frames)
        np.save(output_path, face_frames)

        return output_path
    except Exception as e:
        return None

In [None]:
def process_video_batch(batch_videos, output_dir, label):
    """Process a batch of videos sequentially"""
    results = []
    for video_path in batch_videos:
        result = process_video(video_path, output_dir, label)
        if result:
            results.append(result)
    return results

In [None]:
def get_video_files(dirs):
    """Get all video files from directories"""
    video_files = []
    for dir_path in dirs:
        if not os.path.exists(dir_path):
            print(f"Warning: Directory does not exist: {dir_path}")
            continue

        for root, _, files in os.walk(dir_path):
            for file in files:
                if file.lower().endswith(('.mp4', '.avi', '.mov')):
                    video_path = os.path.join(root, file)
                    video_files.append(video_path)
    return video_files

In [None]:
def downsample_videos(video_paths, target_count=None, ratio=None, seed=42):
    """
    Downsample a list of video paths to achieve target count or ratio.

    Args:
        video_paths: List of video paths
        target_count: Target number of videos (used if not None)
        ratio: Target ratio compared to real videos (used if target_count is None)
        seed: Random seed for reproducibility

    Returns:
        Downsampled list of video paths
    """
    if not video_paths:
        return []

    if target_count is None and ratio is None:
        return video_paths

    # If target count is specified, use it
    if target_count is not None:
        if target_count >= len(video_paths):
            return video_paths

        # Random sampling without replacement
        np.random.seed(seed)
        indices = np.random.choice(len(video_paths), target_count, replace=False)
        return [video_paths[i] for i in indices]

    # If ratio is specified, use it
    if ratio is not None:
        target_count = int(len(video_paths) * ratio)
        return downsample_videos(video_paths, target_count=target_count, seed=seed)

In [None]:
def process_videos_in_parallel(video_paths, output_dir, label):
    """Process videos in parallel using batches"""
    results = []

    # Create batches
    batches = []
    for i in range(0, len(video_paths), BATCH_SIZE):
        batches.append(video_paths[i:i+BATCH_SIZE])

    # Process batches in parallel
    with ProcessPoolExecutor(max_workers=NUM_CORES) as executor:
        futures = [executor.submit(process_video_batch, batch, output_dir, label)
                  for batch in batches]

        # Track progress with better reporting
        completed = 0
        start_time = time.time()
        total_batches = len(batches)

        for future in as_completed(futures):
            batch_results = future.result()
            results.extend(batch_results)

            # Update progress
            completed += 1
            elapsed = time.time() - start_time
            videos_per_hour = (completed * BATCH_SIZE) / (elapsed / 3600)

            # Estimate remaining time
            remaining_batches = total_batches - completed
            if completed > 0:
                eta_seconds = (elapsed / completed) * remaining_batches
                eta_hours = eta_seconds / 3600
                print(f"\rProcessed {completed}/{total_batches} batches "
                      f"({len(results)} successful) | "
                      f"Speed: {videos_per_hour:.2f} videos/hour | "
                      f"ETA: {eta_hours:.2f} hours", end="")

    print()  # New line after progress tracking

    # Clear memory
    gc.collect()
    if USE_GPU:
        torch.cuda.empty_cache()

    return results


In [None]:
def process_in_chunks(video_paths, output_dir, label, chunk_size=MAX_VIDEOS_PER_RUN):
    """Process videos in chunks to avoid memory issues"""
    all_results = []

    for i in range(0, len(video_paths), chunk_size):
        chunk = video_paths[i:i+chunk_size]
        print(f"Processing chunk {i//chunk_size + 1}/{(len(video_paths) + chunk_size - 1)//chunk_size}")

        # Process chunk
        results = process_videos_in_parallel(chunk, output_dir, label)
        all_results.extend(results)

        # Clear memory
        gc.collect()
        if USE_GPU:
            torch.cuda.empty_cache()

        print(f"Completed chunk with {len(results)} processed videos")

    return all_results

In [None]:
def create_balanced_splits(real_paths, fake_paths, real_to_fake_ratio=1.0, test_size=0.2, val_size=0.1, seed=42):
    """
    Create balanced train/val/test splits with a specific real-to-fake ratio

    Args:
        real_paths: List of real video paths
        fake_paths: List of fake video paths
        real_to_fake_ratio: Target ratio of real:fake in each split (1.0 = equal)
        test_size: Proportion of data for test set
        val_size: Proportion of data for validation set
        seed: Random seed

    Returns:
        Dictionary containing train/val/test splits for real and fake
    """
    # First calculate how many samples we need in each split
    total_real = len(real_paths)
    total_fake = len(fake_paths)

    # Split real videos
    real_train_val, real_test = train_test_split(real_paths, test_size=test_size, random_state=seed)
    real_train, real_val = train_test_split(
        real_train_val,
        test_size=val_size/(1-test_size),  # Adjust validation size
        random_state=seed
    )

    # Split fake videos
    fake_train_val, fake_test = train_test_split(fake_paths, test_size=test_size, random_state=seed)
    fake_train, fake_val = train_test_split(
        fake_train_val,
        test_size=val_size/(1-test_size),  # Adjust validation size
        random_state=seed
    )

    # Calculate target sizes for fake videos to match the ratio
    target_fake_train = int(len(real_train) / real_to_fake_ratio)
    target_fake_val = int(len(real_val) / real_to_fake_ratio)
    target_fake_test = int(len(real_test) / real_to_fake_ratio)

    # Downsample fake videos in each split if needed
    if len(fake_train) > target_fake_train:
        fake_train = downsample_videos(fake_train, target_count=target_fake_train, seed=seed)

    if len(fake_val) > target_fake_val:
        fake_val = downsample_videos(fake_val, target_count=target_fake_val, seed=seed)

    if len(fake_test) > target_fake_test:
        fake_test = downsample_videos(fake_test, target_count=target_fake_test, seed=seed)

    return {
        'real_train': real_train,
        'real_val': real_val,
        'real_test': real_test,
        'fake_train': fake_train,
        'fake_val': fake_val,
        'fake_test': fake_test
    }

In [None]:
# Import for as_completed
from concurrent.futures import as_completed

def preprocess_dataset(balance_ratio=1.1, balance_splits=True):
    """
    Preprocess all videos and create metadata files with balanced classes

    Args:
        balance_ratio: Target ratio of real:fake videos (e.g., 1.1 means 10% more real than fake)
        balance_splits: Whether to balance train/val/test splits individually
    """
    # Get all video files
    real_videos = get_video_files(REAL_DIRS)
    fake_videos = get_video_files(FAKE_DIRS)

    print(f"Found {len(real_videos)} real videos and {len(fake_videos)} fake videos")

    # Calculate target number of fake videos based on real videos
    target_fake_count = int(len(real_videos) / balance_ratio)

    # Downsample fake videos if needed
    if len(fake_videos) > target_fake_count:
        print(f"Downsampling fake videos from {len(fake_videos)} to {target_fake_count} for balance...")
        fake_videos = downsample_videos(fake_videos, target_count=target_fake_count)

    print(f"After balancing: {len(real_videos)} real videos and {len(fake_videos)} fake videos")
    print(f"Class ratio (real:fake): {len(real_videos)/len(fake_videos):.2f}")

    # Process real videos in chunks
    print("Processing real videos...")
    processed_real = process_in_chunks(real_videos, os.path.join(FACE_DIR, "real"), 'real')

    # Process fake videos in chunks
    print("Processing fake videos...")
    processed_fake = process_in_chunks(fake_videos, os.path.join(FACE_DIR, "fake"), 'fake')

    print(f"Processed {len(processed_real)} real videos and {len(processed_fake)} fake videos")

    # Handle empty dataset case
    if len(processed_real) == 0 or len(processed_fake) == 0:
        print("Warning: Not enough videos processed to create dataset splits")
        return {
            'total': len(processed_real) + len(processed_fake),
            'real': len(processed_real),
            'fake': len(processed_fake)
        }

    # Create splits with balancing
    if balance_splits:
        print("Creating balanced train/val/test splits...")
        splits = create_balanced_splits(
            processed_real,
            processed_fake,
            real_to_fake_ratio=balance_ratio,
            test_size=0.2,
            val_size=0.1,
            seed=42
        )

        real_train, real_val, real_test = splits['real_train'], splits['real_val'], splits['real_test']
        fake_train, fake_val, fake_test = splits['fake_train'], splits['fake_val'], splits['fake_test']
    else:
        # Original split code
        real_train, real_temp = train_test_split(processed_real, test_size=0.3, random_state=42)
        real_val, real_test = train_test_split(real_temp, test_size=0.5, random_state=42)

        fake_train, fake_temp = train_test_split(processed_fake, test_size=0.3, random_state=42)
        fake_val, fake_test = train_test_split(fake_temp, test_size=0.5, random_state=42)

    # Create metadata files more efficiently
    train_data = [{'path': path, 'label': 1} for path in real_train] + \
                [{'path': path, 'label': 0} for path in fake_train]

    val_data = [{'path': path, 'label': 1} for path in real_val] + \
              [{'path': path, 'label': 0} for path in fake_val]

    test_data = [{'path': path, 'label': 1} for path in real_test] + \
               [{'path': path, 'label': 0} for path in fake_test]

    # Save metadata
    pd.DataFrame(train_data).to_csv(os.path.join(METADATA_DIR, "train_metadata.csv"), index=False)
    pd.DataFrame(val_data).to_csv(os.path.join(METADATA_DIR, "val_metadata.csv"), index=False)
    pd.DataFrame(test_data).to_csv(os.path.join(METADATA_DIR, "test_metadata.csv"), index=False)

    # Dataset statistics
    stats = {
        'train_real': len(real_train),
        'train_fake': len(fake_train),
        'val_real': len(real_val),
        'val_fake': len(fake_val),
        'test_real': len(real_test),
        'test_fake': len(fake_test),
        'total': len(processed_real) + len(processed_fake)
    }

    print("\nDataset Statistics:")
    print(f"Total videos: {stats['total']}")
    print(f"Training set: {stats['train_real']} real, {stats['train_fake']} fake")
    print(f"Validation set: {stats['val_real']} real, {stats['val_fake']} fake")
    print(f"Test set: {stats['test_real']} real, {stats['test_fake']} fake")

    # Print class balance statistics
    print("\nClass Balance Statistics:")
    print(f"Total videos: {stats['total']}")
    print(f"Real videos: {len(processed_real)} ({len(processed_real)/stats['total']*100:.2f}%)")
    print(f"Fake videos: {len(processed_fake)} ({len(processed_fake)/stats['total']*100:.2f}%)")

    # Source distribution for real videos
    source_counts = {}
    for path in processed_real:
        source = path.split(os.sep)[-3] if "face_clips" in path else os.path.basename(os.path.dirname(path))
        source_counts[source] = source_counts.get(source, 0) + 1

    print("\nReal videos by source:")
    for source, count in source_counts.items():
        print(f"  {source}: {count} ({count/len(processed_real)*100:.2f}%)")

    return stats

In [None]:
def optimize_colab_performance():
    """Set Colab to high performance mode"""
    try:
        # Set OpenCV to use TBB for parallel processing
        cv2.setNumThreads(NUM_CORES)

        # Only retain essential process memory
        os.environ['MALLOC_TRIM_THRESHOLD_'] = '65536'

        # GPU-specific optimizations for T4
        if USE_GPU and torch.cuda.get_device_name(0).find('T4') != -1:
            # T4 optimizations
            torch.backends.cudnn.benchmark = True
            torch.backends.cudnn.deterministic = False
            # Set GPU memory optimization
            torch.cuda.set_per_process_memory_fraction(0.85)  # Use 85% of available GPU memory
            print("T4 GPU optimizations applied")

        print("Colab performance optimizations applied")
    except Exception as e:
        print(f"Error applying performance optimizations: {e}")

In [None]:
# Example usage
if __name__ == "__main__":
    print("Optimizing Colab performance...")
    optimize_colab_performance()

    # Run preprocessing with balanced dataset
    # Perfect balance between real and fake
    stats = preprocess_dataset(balance_ratio=1.0, balance_splits=True)

    # Clean up
    if USE_GPU:
        torch.cuda.empty_cache()
    gc.collect()

    print("Preprocessing complete!")