Project : Develop model to detect deepfake video with Highest accuracy (possible by us) which has explainability . Will try to create few model to generate comparisons for comparison and then picking one final one as our "final model".

Business Value: Flagging misinformation/ protecting digital identity

In [None]:
# # Deep Fake Detection Project
# ## Complete Pipeline: Data Analysis → Feature Engineering → Model Training → Hyperparameter Tuning
#
# **Dataset**: [Hemgg/deep-fake-detection-dfd-entire-original-dataset](https://huggingface.co/datasets/Hemgg/deep-fake-detection-dfd-entire-original-dataset)
#
# **Objective**: Detect original vs AI-generated images and videos
#
# **Approach**:
# - Comprehensive EDA
# - Feature engineering (spatial, frequency, texture features)
# - Multiple CNN architectures + Transfer Learning
# - Hyperparameter optimization
# - Model evaluation and comparison
#
%pip install optuna
%pip install torchcodec
# %%
# Import libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
import warnings
warnings.filterwarnings('ignore')

# Deep Learning
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision
from torchvision import transforms, models
import timm
import torchcodec

# Computer Vision
import cv2
from PIL import Image
from skimage import feature, filters
from skimage.feature import local_binary_pattern

# ML & Evaluation
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score,
    f1_score, confusion_matrix, classification_report,
    roc_curve, auc, roc_auc_score
)
from sklearn.preprocessing import StandardScaler

# HuggingFace
from datasets import load_dataset

# Hyperparameter Tuning
import optuna
from optuna.visualization import plot_optimization_history, plot_param_importances

# Utilities
from tqdm.auto import tqdm
import time
from datetime import datetime
import json
import joblib

# Set style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

# Set random seeds for reproducibility
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
else:
    print("Using CPU - training will be slower")


In [None]:
# ## 2. Data Loading and Label Analysis
# Set your Hugging Face token (if needed)
# os.environ["HF_TOKEN"] = "your_token_here"

print("="*80)
print("LOADING DATASET FROM HUGGINGFACE (200 RECORDS ONLY)")
print("="*80)

MAX_RECORDS = 200  # Only download first 200 records

# Configure video decoding to use av (PyAV) instead of torchcodec
os.environ['HF_DATASETS_VIDEO_DECODER'] = 'av'  # Using PyAV instead of torchcodec

def infer_label_from_path(video_path):
    """Extract label from HF dataset path structure"""
    path_str = str(video_path).lower()
    if 'original_sequences' in path_str or 'pristine' in path_str or 'original' in path_str:
        return 0  # Real/Original
    elif 'manipulated_sequences' in path_str or 'dfdc' in path_str or 'fake' in path_str or 'deepfake' in path_str:
        return 1  # Fake/Manipulated
    else:
        # Default based on dataset name - this dataset might be all original
        return 0  # Default to original for safety

try:
    # Load dataset in streaming mode
    print(f"[INFO] Loading first {MAX_RECORDS} records...")
    dataset_stream = load_dataset(
        "Hemgg/deep-fake-detection-dfd-entire-original-dataset",
        streaming=True,
        split="train"
    )
    
    # Extract first 200 records and analyze labels
    train_data_list = []
    label_analysis = []
    
    for i, sample in enumerate(tqdm(dataset_stream, desc="Loading samples", total=MAX_RECORDS)):
        if i >= MAX_RECORDS:
            break
        
        # Get video path
        video_path = sample.get('video', {}).get('path', '')
        if isinstance(video_path, dict):
            video_path = video_path.get('path', '')
        
        # Infer label from path
        label = infer_label_from_path(video_path)
        
        label_analysis.append({
            'index': i,
            'path': str(video_path),
            'label': label,
            'label_name': 'Original' if label == 0 else 'Manipulated'
        })
        
        train_data_list.append(sample)
    
    # Convert to DataFrame for analysis
    label_df = pd.DataFrame(label_analysis)
    
    print("\n" + "="*80)
    print("LABEL DISTRIBUTION ANALYSIS")
    print("="*80)
    print(f"\nTotal samples analyzed: {len(label_df)}")
    print(f"\nLabel Distribution:")
    print(label_df['label_name'].value_counts())
    print(f"\nLabel Percentages:")
    print(label_df['label_name'].value_counts(normalize=True) * 100)
    
    # Check if we have both classes
    unique_labels = label_df['label'].unique()
    print(f"\nUnique labels found: {unique_labels}")
    
    if len(unique_labels) == 1:
        print("\n⚠️  WARNING: Only one class found in the dataset!")
        print(f"   All samples are labeled as: {'Original' if unique_labels[0] == 0 else 'Manipulated'}")
        print("\n   This dataset might contain only original videos.")
        print("   You may need to:")
        print("   1. Use a different dataset that has both classes")
        print("   2. Combine with another dataset that has manipulated videos")
        print("   3. Check if labels are stored differently in the dataset")
    else:
        print("\n✓ Both classes found in the dataset!")
        print("   The dataset contains both Original and Manipulated videos.")
    
    # Show sample paths
    print("\n" + "="*80)
    print("SAMPLE PATHS (First 10):")
    print("="*80)
    for idx, row in label_df.head(10).iterrows():
        print(f"Sample {row['index']}: {row['label_name']}")
        print(f"  Path: {row['path'][:100]}...")
    
    # Visualize label distribution
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Count plot
    label_counts = label_df['label_name'].value_counts()
    axes[0].bar(label_counts.index, label_counts.values, color=['#2ecc71', '#e74c3c'])
    axes[0].set_title('Label Distribution (Count)', fontsize=14, fontweight='bold')
    axes[0].set_xlabel('Label', fontsize=12)
    axes[0].set_ylabel('Count', fontsize=12)
    axes[0].grid(True, alpha=0.3)
    
    # Pie chart
    colors = ['#2ecc71', '#e74c3c']
    axes[1].pie(label_counts.values, labels=label_counts.index, autopct='%1.1f%%',
               colors=colors[:len(label_counts)], startangle=90)
    axes[1].set_title('Label Distribution (Percentage)', fontsize=14, fontweight='bold')
    
    plt.tight_layout()
    plt.savefig('label_distribution_analysis.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # Convert to Dataset object
    from datasets import Dataset
    train_data = Dataset.from_list(train_data_list)
    print(f"\n✓ Dataset loaded: {len(train_data)} records")
    
except Exception as e:
    print(f"\n✗ Error loading dataset: {e}")
    import traceback
    traceback.print_exc()


In [None]:
# Additional analysis: Check dataset structure more deeply
print("="*80)
print("DEEP DATASET STRUCTURE ANALYSIS")
print("="*80)

# Check what information is available in each sample
if 'train_data' in locals() and len(train_data) > 0:
    sample = train_data[0]
    print("\nSample structure:")
    print(f"Keys: {sample.keys()}")
    print(f"\nVideo data type: {type(sample.get('video', {}))}")
    
    if 'video' in sample:
        video_data = sample['video']
        if isinstance(video_data, dict):
            print(f"Video dict keys: {video_data.keys()}")
            if 'path' in video_data:
                print(f"Video path: {video_data['path']}")
    
    # Check if there are any other fields that might contain labels
    print("\nChecking for label fields in dataset...")
    all_keys = set()
    for i in range(min(10, len(train_data))):
        sample = train_data[i]
        all_keys.update(sample.keys())
    
    print(f"All available keys in samples: {all_keys}")
    
    # Check if dataset has features/columns with label information
    if hasattr(train_data, 'features'):
        print(f"\nDataset features: {train_data.features}")
    
    if hasattr(train_data, 'column_names'):
        print(f"Dataset column names: {train_data.column_names}")
else:
    print("Dataset not loaded. Please run the previous cell first.")
