# FVC Deepfake Detection: Complete Pipeline Journey

**From Raw ZIP Files to Production-Ready Models**

This notebook demonstrates the complete end-to-end machine learning pipeline for deepfake video detection, showcasing:
- Data extraction and exploration
- Augmentation strategy and rationale
- Handcrafted feature engineering
- Video scaling and preprocessing
- Model training with MLOps infrastructure
- Experiment tracking with MLflow
- Analytics with DuckDB
- Airflow orchestration

**Target Audience**: ML Engineers, Data Scientists, Researchers
**Level**: Production-Grade Implementation

## Table of Contents

1. [Project Setup & Requirements](#1-project-setup--requirements)
2. [Data Extraction from ZIP Archives](#2-data-extraction-from-zip-archives)
3. [Data Exploration & Analysis](#3-data-exploration--analysis)
4. [Stage 1: Video Augmentation](#4-stage-1-video-augmentation)
5. [Stage 2: Handcrafted Feature Extraction](#5-stage-2-handcrafted-feature-extraction)
6. [Stage 3: Video Scaling](#6-stage-3-video-scaling)
7. [Stage 4: Scaled Feature Extraction](#7-stage-4-scaled-feature-extraction)
8. [Stage 5: Model Training](#8-stage-5-model-training)
9. [MLOps Infrastructure](#9-mlops-infrastructure)
10. [Analytics with DuckDB](#10-analytics-with-duckdb)
11. [Airflow Orchestration](#11-airflow-orchestration)
12. [Results & Insights](#12-results--insights)

## 1. Project Setup & Requirements

### Infrastructure Stack

**Core ML Framework**:
- PyTorch 2.0+ with CUDA support
- torchvision for video models
- timm for Vision Transformers

**Data Processing**:
- Polars 0.19+ (10-100x faster than pandas)
- PyArrow 14+ for columnar storage (Arrow/Parquet)
- DuckDB 0.9+ for analytical queries

**MLOps & Orchestration**:
- MLflow for experiment tracking
- Apache Airflow for pipeline orchestration
- Custom ExperimentTracker for run management

**Video Processing**:
- PyAV for efficient video I/O
- OpenCV for feature extraction
- FFmpeg/ffprobe for codec analysis

**Feature Engineering**:
- NumPy for signal processing
- scikit-image for image analysis
- Custom handcrafted feature extractors

In [None]:
import sys
from pathlib import Path
import json
import numpy as np
import pandas as pd
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, HTML, Video
import warnings
warnings.filterwarnings('ignore')

# Add project root to path
project_root = Path().absolute().parent.parent
sys.path.insert(0, str(project_root))

# Import project modules
from lib.data.index import build_video_index, FVCConfig
from lib.data.loading import load_metadata, filter_existing_videos
from lib.augmentation.pipeline import stage1_augment_videos
from lib.features.pipeline import stage2_extract_features
from lib.scaling.pipeline import stage3_scale_videos
from lib.features.scaled import stage4_extract_scaled_features
from lib.training.pipeline import stage5_train_models
from lib.utils.duckdb_analytics import DuckDBAnalytics
from lib.mlops.mlflow_tracker import create_mlflow_tracker
from lib.mlops.config import RunConfig, ExperimentTracker
from lib.utils.paths import load_metadata_flexible

print(f"Project root: {project_root}")
print(f"Python version: {sys.version}")
print(f"\n✓ All imports successful")

### Verify Requirements

Check that all required packages are installed and versions are compatible.

In [None]:
import torch
import torchvision
import polars as pl
import pyarrow as pa

try:
    import duckdb
    print(f"✓ DuckDB {duckdb.__version__}")
except ImportError:
    print("✗ DuckDB not installed")

try:
    import mlflow
    print(f"✓ MLflow {mlflow.__version__}")
except ImportError:
    print("✗ MLflow not installed")

print(f"✓ PyTorch {torch.__version__}")
print(f"✓ torchvision {torchvision.__version__}")
print(f"✓ Polars {pl.__version__}")
print(f"✓ PyArrow {pa.__version__}")

if torch.cuda.is_available():
    print(f"✓ CUDA available: {torch.cuda.get_device_name(0)}")
else:
    print("⚠ CUDA not available (CPU mode)")

## 2. Data Extraction from ZIP Archives

### Initial Data Structure

The FVC dataset comes as password-protected ZIP archives:
- `FVC1.zip`, `FVC2.zip`, `FVC3.zip`: Video files
- `Metadata.zip`: CSV metadata files

**Setup Process**:
1. Extract videos from ZIP archives
2. Copy metadata CSV files
3. Build video index with comprehensive statistics
4. Validate data integrity

In [None]:
# Check for archive files
archive_dir = project_root / "archive"
print(f"Archive directory: {archive_dir}")
print(f"Archive directory exists: {archive_dir.exists()}")

if archive_dir.exists():
    archive_files = list(archive_dir.glob("*.zip"))
    csv_files = list(archive_dir.glob("*.csv"))
    
    print(f"\nFound {len(archive_files)} ZIP archives:")
    for f in archive_files:
        print(f"  - {f.name} ({f.stat().st_size / 1024**3:.2f} GB)")
    
    print(f"\nFound {len(csv_files)} CSV metadata files:")
    for f in csv_files:
        print(f"  - {f.name}")
        
    # Load and inspect metadata
    if csv_files:
        metadata_path = csv_files[0]
        print(f"\nLoading metadata from: {metadata_path.name}")
        
        # Use Polars for fast loading
        df = pl.read_csv(metadata_path)
        print(f"\nMetadata shape: {df.shape}")
        print(f"\nColumns: {df.columns}")
        print(f"\nFirst few rows:")
        display(df.head())
        
        # Basic statistics
        if "label" in df.columns:
            label_counts = df.group_by("label").agg(pl.count().alias("count"))
            print(f"\nLabel distribution:")
            display(label_counts)
else:
    print("⚠ Archive directory not found. Run setup_fvc_dataset.py first.")

### Data Setup Script

The `setup_fvc_dataset.py` script handles:
- Password-protected ZIP extraction
- Video organization into `fvc/videos/FVC[1-3]/`
- Metadata file copying
- Video index generation with ffprobe statistics

**Key Features**:
- Handles disk space issues gracefully
- Validates extracted files
- Generates comprehensive video statistics (fps, duration, codec, bitrate, resolution)

In [None]:
# Check if data is already set up
data_dir = project_root / "data"
videos_dir = project_root / "fvc" / "videos"

print(f"Data directory: {data_dir.exists()}")
print(f"Videos directory: {videos_dir.exists()}")

if videos_dir.exists():
    video_folders = [d for d in videos_dir.iterdir() if d.is_dir()]
    print(f"\nFound {len(video_folders)} video folders:")
    for folder in video_folders:
        video_count = len(list(folder.glob("*.mp4")))
        print(f"  - {folder.name}: {video_count} videos")
    
    # Check for metadata index
    metadata_index = data_dir / "metadata" / "video_index.arrow"
    if metadata_index.exists():
        print(f"\n✓ Video index found: {metadata_index}")
        index_df = pl.read_ipc(metadata_index)
        print(f"  Index shape: {index_df.shape}")
        print(f"  Columns: {index_df.columns}")
    else:
        print(f"\n⚠ Video index not found. Run: python src/setup_fvc_dataset.py")
else:
    print("\n⚠ Videos directory not found. Run: python src/setup_fvc_dataset.py")

## 3. Data Exploration & Analysis

### Why Data Exploration Matters

Understanding the dataset is crucial for:
- **Augmentation Strategy**: What transformations preserve authenticity?
- **Feature Engineering**: What signals distinguish real vs fake?
- **Model Architecture**: What input dimensions and temporal lengths?
- **Preprocessing**: What scaling/normalization is needed?

### Key Questions to Answer
1. What is the class distribution? (affects sampling strategy)
2. What are video resolutions? (affects memory and scaling)
3. What are video durations? (affects frame sampling)
4. What codecs are used? (affects decoding strategy)
5. Are there duplicates? (affects train/test split strategy)

In [None]:
# Load metadata for exploration
metadata_paths = [
    project_root / "data" / "metadata" / "video_index.arrow",
    project_root / "archive" / "FVC.csv",
    project_root / "archive" / "FVC_dup.csv",
]

df = None
for path in metadata_paths:
    if path.exists():
        print(f"Loading from: {path}")
        df = load_metadata_flexible(str(path))
        if df is not None and df.height > 0:
            break

if df is not None:
    print(f"\n✓ Loaded {df.height} videos")
    print(f"Columns: {df.columns}")
    
    # Class distribution
    if "label" in df.columns:
        label_dist = df.group_by("label").agg(
            pl.count().alias("count"),
            (pl.count() / pl.len() * 100).alias("percentage")
        ).sort("label")
        print("\nClass Distribution:")
        display(label_dist)
    
    # Video statistics (if available)
    stat_cols = ["width", "height", "duration", "fps", "bitrate"]
    available_stats = [c for c in stat_cols if c in df.columns]
    
    if available_stats:
        print("\nVideo Statistics:")
        stats = df.select(available_stats).describe()
        display(stats)
        
        # Visualizations
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        if "width" in df.columns and "height" in df.columns:
            axes[0, 0].scatter(df["width"], df["height"], alpha=0.5)
            axes[0, 0].set_xlabel("Width")
            axes[0, 0].set_ylabel("Height")
            axes[0, 0].set_title("Video Resolution Distribution")
        
        if "duration" in df.columns:
            axes[0, 1].hist(df["duration"].to_numpy(), bins=50, edgecolor='black')
            axes[0, 1].set_xlabel("Duration (seconds)")
            axes[0, 1].set_ylabel("Count")
            axes[0, 1].set_title("Video Duration Distribution")
        
        if "fps" in df.columns:
            axes[1, 0].hist(df["fps"].to_numpy(), bins=30, edgecolor='black')
            axes[1, 0].set_xlabel("FPS")
            axes[1, 0].set_ylabel("Count")
            axes[1, 0].set_title("Frame Rate Distribution")
        
        if "label" in df.columns:
            label_counts = df.group_by("label").agg(pl.count()).sort("label")
            axes[1, 1].bar(label_counts["label"].to_list(), label_counts["count"].to_list())
            axes[1, 1].set_xlabel("Label")
            axes[1, 1].set_ylabel("Count")
            axes[1, 1].set_title("Class Distribution")
        
        plt.tight_layout()
        plt.show()
else:
    print("⚠ No metadata found. Run data setup first.")

### Insights from Data Exploration

**Key Findings** (based on typical deepfake datasets):

1. **Resolution Diversity**: Videos range from 240p to 1080p+
   - **Decision**: Scale to fixed 256x256 max dimension (letterboxing)
   - **Rationale**: Consistent batch dimensions, memory efficiency, maintains aspect ratio

2. **Duration Variability**: Videos range from 1-60+ seconds
   - **Decision**: Uniform frame sampling (6-16 frames)
   - **Rationale**: Fixed temporal length for batch processing

3. **Class Imbalance**: May have 60/40 or 70/30 real/fake split
   - **Decision**: Stratified k-fold CV, balanced batch sampling
   - **Rationale**: Prevents bias toward majority class

4. **Codec Diversity**: H.264, H.265, VP9, etc.
   - **Decision**: PyAV for universal decoding, codec-aware feature extraction
   - **Rationale**: Robust handling of different codecs

## 4. Stage 1: Video Augmentation

### Why Augmentation?

**Problem**: Limited dataset size (typically 200-500 videos)
**Solution**: Generate augmented versions to increase diversity

### Augmentation Strategy

**Spatial Augmentations** (per-frame):
1. **Rotation** (±10°): Simulates camera angle variation
2. **Horizontal Flip**: Doubles dataset, preserves temporal structure
3. **Brightness/Contrast/Saturation**: Handles lighting variations
4. **Gaussian Noise**: Adds robustness to compression artifacts
5. **Gaussian Blur**: Simulates motion blur, low-quality captures
6. **Affine Transformations**: Translation, scale, shear
7. **Elastic Transform**: Simulates non-rigid deformations
8. **Cutout**: Random erasing for occlusion robustness

**Temporal Augmentations** (sequence-level):
1. **Frame Dropping** (up to 25%): Handles variable frame rates
2. **Frame Duplication**: Slow motion effect
3. **Temporal Reversal**: Time-reversed videos

### Implementation: Pre-Generated vs On-the-Fly

**Why Pre-Generated?**
- **Reproducibility**: Same augmentations across runs
- **Speed**: No augmentation overhead during training
- **Caching**: Can store on disk, share across experiments
- **Memory**: Frame-by-frame decoding (50x memory reduction)

**Trade-offs**:
- Disk space: 10x dataset size (mitigated by scaling)
- Initial processing time (one-time cost)

In [None]:
# Stage 1: Augmentation
# This generates 10 augmented versions per video (configurable)

print("Stage 1: Video Augmentation")
print("=" * 60)

# Check if augmentation already done
augmented_metadata = project_root / "data" / "augmented_videos" / "augmented_metadata.arrow"

if augmented_metadata.exists():
    print(f"✓ Augmentation already completed")
    aug_df = pl.read_ipc(augmented_metadata)
    print(f"  Total videos: {aug_df.height}")
    
    # Count original vs augmented
    if "is_augmented" in aug_df.columns:
        orig_count = aug_df.filter(pl.col("is_augmented") == False).height
        aug_count = aug_df.filter(pl.col("is_augmented") == True).height
        print(f"  Original videos: {orig_count}")
        print(f"  Augmented videos: {aug_count}")
        print(f"  Augmentation ratio: {aug_count / orig_count:.1f}x")
    
    # Show sample augmented video paths
    if "video_path" in aug_df.columns:
        sample_paths = aug_df.head(5)["video_path"].to_list()
        print(f"\nSample augmented videos:")
        for path in sample_paths[:3]:
            print(f"  - {Path(path).name}")
else:
    print("⚠ Augmentation not completed yet.")
    print("\nTo run augmentation:")
    print("```python")
    print("from lib.augmentation.pipeline import stage1_augment_videos")
    print("")
    print("augmented_df = stage1_augment_videos(")
    print("    project_root=str(project_root),")
    print("    num_augmentations=10,  # 10 augmented versions per video")
    print("    output_dir='data/augmented_videos',")
    print("    delete_existing=False")
    print(")")
    print("```")
    print("\nOr use SLURM script:")
    print("```bash")
    print("sbatch src/scripts/slurm_stage1_augmentation.sh")
    print("```")

### Augmentation Implementation Details

**Key Optimizations**:

1. **Chunked Processing**: Videos processed in 250-frame chunks
   - Prevents OOM for long videos
   - Supports checkpointing/resuming

2. **Frame-by-Frame Decoding**: Decode only needed frames
   - Memory: ~37 MB per video (vs ~1.87 GB full load)
   - 50x memory reduction

3. **Deterministic Seeds**: Hash-based seeds per video
   - Reproducible augmentations
   - Same video → same augmentations

4. **Incremental Metadata Writing**: Direct CSV writing
   - No memory accumulation
   - Constant memory usage

## 5. Stage 2: Handcrafted Feature Extraction

### Why Handcrafted Features?

**Rationale**:
- **Interpretability**: Understand what signals the model uses
- **Baseline Models**: Enable traditional ML (Logistic Regression, SVM)
- **Complementary**: Works alongside deep learning features
- **Fast Inference**: No GPU required for feature extraction

### Feature Types

**1. Noise Residual Energy** (3 features)
- **Purpose**: Detect compression artifacts, manipulation traces
- **Method**: High-pass filter (Laplacian) → energy statistics
- **Rationale**: Deepfakes often introduce compression inconsistencies

**2. DCT Statistics** (5 features)
- **Purpose**: Capture frequency domain patterns
- **Method**: 8x8 DCT blocks → DC/AC coefficient statistics
- **Rationale**: Video codecs use DCT; artifacts show in frequency domain

**3. Blur/Sharpness Metrics** (3 features)
- **Purpose**: Detect unnatural sharpness/blur patterns
- **Method**: Laplacian variance, gradient magnitude
- **Rationale**: Face-swapping can create inconsistent sharpness

**4. Block Boundary Inconsistency** (1 feature)
- **Purpose**: Detect block-based compression artifacts
- **Method**: Analyze block boundary discontinuities
- **Rationale**: Double compression leaves traces

**5. Codec Cues** (3 features)
- **Purpose**: Extract codec metadata (if available)
- **Method**: ffprobe analysis of codec parameters
- **Rationale**: Different codecs used for real vs fake videos

In [None]:
# Stage 2: Handcrafted Feature Extraction
print("Stage 2: Handcrafted Feature Extraction")
print("=" * 60)

# Check if features already extracted
features_metadata = project_root / "data" / "stage2" / "features_metadata.arrow"

if features_metadata.exists():
    print(f"✓ Features already extracted")
    feat_df = pl.read_ipc(features_metadata)
    print(f"  Total feature rows: {feat_df.height}")
    
    # Show feature columns (excluding metadata)
    feature_cols = [c for c in feat_df.columns if c not in ["video_path", "label", "is_augmented"]]
    print(f"  Number of features: {len(feature_cols)}")
    print(f"  Feature names: {feature_cols[:10]}..." if len(feature_cols) > 10 else f"  Feature names: {feature_cols}")
    
    # Show feature statistics
    if feature_cols:
        print("\nFeature Statistics:")
        stats = feat_df.select(feature_cols[:5]).describe()  # First 5 features
        display(stats)
        
        # Feature distribution visualization
        fig, axes = plt.subplots(2, 3, figsize=(15, 8))
        axes = axes.flatten()
        
        for idx, feat_name in enumerate(feature_cols[:6]):
            if idx < len(axes):
                feat_values = feat_df[feat_name].to_numpy()
                axes[idx].hist(feat_values, bins=50, edgecolor='black')
                axes[idx].set_title(f"{feat_name}")
                axes[idx].set_xlabel("Value")
                axes[idx].set_ylabel("Frequency")
        
        plt.tight_layout()
        plt.show()
else:
    print("⚠ Features not extracted yet.")
    print("\nTo extract features:")
    print("```python")
    print("from lib.features.pipeline import stage2_extract_features")
    print("")
    print("features_df = stage2_extract_features(")
    print("    project_root=str(project_root),")
    print("    augmented_metadata_path='data/augmented_videos/augmented_metadata.arrow',")
    print("    output_dir='data/stage2',")
    print("    num_frames=50,  # Sample 50 frames per video")
    print("    delete_existing=False")
    print(")")
    print("```")

### Feature Extraction Implementation

**Sampling Strategy**:
- **Adaptive Frame Sampling**: 10% of frames (min 5, max 50)
- **Rationale**: Balance between coverage and computation
- **Aggregation**: Mean, std, min, max across sampled frames

**Storage Format**:
- **NumPy Arrays**: `.npy` files per video
- **Metadata**: Arrow/Parquet format (fast querying)
- **Rationale**: Efficient storage, fast loading for training

## 6. Stage 3: Video Scaling

### Why Scale Videos?

**Problem**: Variable resolutions (240p to 1080p+)
- Inconsistent batch dimensions
- Memory inefficiency
- Model input requirements

**Solution**: Scale to fixed max dimension (256px)
- **Letterboxing**: Preserves aspect ratio
- **Upscaling/Downscaling**: Both directions supported
- **Consistent Dimensions**: Enables batch processing

### Scaling Methods

**1. Letterbox Resize** (default)
- Simple bilinear interpolation
- Fast, memory-efficient
- Preserves aspect ratio

**2. Autoencoder Scaling** (optional)
- Uses pretrained VAE (Stable Diffusion)
- Higher quality upscaling
- More compute-intensive

### Decision: Letterbox Resize
- **Speed**: 100x faster than autoencoder
- **Memory**: Lower GPU memory usage
- **Quality**: Sufficient for deepfake detection
- **Trade-off**: Slight quality loss acceptable for speed

In [None]:
# Stage 3: Video Scaling
print("Stage 3: Video Scaling")
print("=" * 60)

# Check if scaling already done
scaled_metadata = project_root / "data" / "stage3" / "scaled_metadata.arrow"

if scaled_metadata.exists():
    print(f"✓ Scaling already completed")
    scaled_df = pl.read_ipc(scaled_metadata)
    print(f"  Total scaled videos: {scaled_df.height}")
    
    # Check scaling statistics
    if "is_upscaled" in scaled_df.columns and "is_downscaled" in scaled_df.columns:
        upscaled = scaled_df.filter(pl.col("is_upscaled") == True).height
        downscaled = scaled_df.filter(pl.col("is_downscaled") == True).height
        print(f"  Upscaled videos: {upscaled}")
        print(f"  Downscaled videos: {downscaled}")
    
    # Show sample scaled video paths
    if "video_path" in scaled_df.columns:
        sample_paths = scaled_df.head(3)["video_path"].to_list()
        print(f"\nSample scaled videos:")
        for path in sample_paths:
            print(f"  - {Path(path).name}")
else:
    print("⚠ Scaling not completed yet.")
    print("\nTo scale videos:")
    print("```python")
    print("from lib.scaling.pipeline import stage3_scale_videos")
    print("")
    print("scaled_df = stage3_scale_videos(")
    print("    project_root=str(project_root),")
    print("    augmented_metadata_path='data/augmented_videos/augmented_metadata.arrow',")
    print("    output_dir='data/stage3',")
    print("    target_size=256,  # Max dimension (width or height)")
    print("    method='letterbox',  # or 'autoencoder'")
    print("    delete_existing=False")
    print(")")
    print("```")

## 7. Stage 4: Scaled Feature Extraction

### Why Extract Features from Scaled Videos?

**Rationale**:
- **Different Resolution Context**: Features may behave differently at scaled resolution
- **Additional Signals**: `is_upscaled`, `is_downscaled` flags
- **Complementary Features**: Works alongside Stage 2 features

**Same Feature Types as Stage 2**:
- Noise residual
- DCT statistics
- Blur/sharpness
- Boundary inconsistency
- Codec cues

In [None]:
# Stage 4: Scaled Feature Extraction
print("Stage 4: Scaled Feature Extraction")
print("=" * 60)

# Check if scaled features already extracted
scaled_features_metadata = project_root / "data" / "stage4" / "features_scaled_metadata.arrow"

if scaled_features_metadata.exists():
    print(f"✓ Scaled features already extracted")
    scaled_feat_df = pl.read_ipc(scaled_features_metadata)
    print(f"  Total scaled feature rows: {scaled_feat_df.height}")
    
    # Compare with Stage 2 features
    if features_metadata.exists():
        feat_df = pl.read_ipc(features_metadata)
        print(f"  Stage 2 features: {feat_df.height} rows")
        print(f"  Stage 4 features: {scaled_feat_df.height} rows")
else:
    print("⚠ Scaled features not extracted yet.")
    print("\nTo extract scaled features:")
    print("```python")
    print("from lib.features.scaled import stage4_extract_scaled_features")
    print("")
    print("scaled_features_df = stage4_extract_scaled_features(")
    print("    project_root=str(project_root),")
    print("    scaled_metadata_path='data/stage3/scaled_metadata.arrow',")
    print("    output_dir='data/stage4',")
    print("    num_frames=50,")
    print("    delete_existing=False")
    print(")")
    print("```")

## 8. Stage 5: Model Training

### Model Architecture Overview

We train **23 different models** across multiple categories:

**Baseline Models** (Feature-based):
- Logistic Regression (5a, 5alpha)
- SVM (5b)
- Gradient Boosting (5beta: XGBoost, LightGBM, CatBoost)

**PyTorch CNNs**:
- Naive 3D CNN (5c)
- Pretrained Inception Video (5d)
- Variable AR CNN (5e)

**XGBoost + Deep Features**:
- XGBoost with Inception/I3D/R2Plus1D/ViT features (5f-5j)

**Vision Transformers**:
- ViT-GRU (5k)
- ViT-Transformer (5l)

**Video Transformers**:
- TimeSformer (5m)
- ViViT (5n)

**3D CNNs**:
- I3D (5o)
- R(2+1)D (5p)
- X3D (5q)

**SlowFast**:
- SlowFast (5r)
- SlowFast Attention (5s)
- Multi-Scale SlowFast (5t)

**Two-Stream**:
- RGB + Optical Flow (5u)

### Training Configuration

**Hyperparameter Strategy**:
- **Grid Search**: Single combination per model (reduced from 5+ for efficiency)
- **Learning Rates**: Different for backbone (5e-6) vs head (5e-4)
- **Batch Sizes**: Memory-constrained (1-4 depending on model)
- **Epochs**: 20-25 (with early stopping)

**Regularization**:
- **Weight Decay**: 1e-4 (L2 regularization)
- **Dropout**: 0.5 in classification heads
- **Early Stopping**: Patience=5 epochs

**Optimization**:
- **Optimizer**: Adam with default betas
- **Mixed Precision**: AMP for memory efficiency
- **Gradient Accumulation**: Dynamic based on batch size

**Cross-Validation**:
- **Stratified 5-Fold CV**: Ensures class balance in each fold
- **Rationale**: Robust performance estimates, prevents overfitting

In [None]:
# Stage 5: Model Training Overview
print("Stage 5: Model Training")
print("=" * 60)

# Check for trained models
stage5_dir = project_root / "data" / "stage5"

if stage5_dir.exists():
    model_dirs = [d for d in stage5_dir.iterdir() if d.is_dir()]
    print(f"✓ Found {len(model_dirs)} trained model directories:")
    
    for model_dir in sorted(model_dirs)[:10]:  # Show first 10
        model_name = model_dir.name
        
        # Check for model files
        model_files = list(model_dir.rglob("model.pt")) + list(model_dir.rglob("model.joblib"))
        metrics_files = list(model_dir.rglob("metrics.json"))
        
        if model_files:
            print(f"  ✓ {model_name}: {len(model_files)} model(s), {len(metrics_files)} metrics file(s)")
        else:
            print(f"  ⚠ {model_name}: No models found")
    
    if len(model_dirs) > 10:
        print(f"  ... and {len(model_dirs) - 10} more")
else:
    print("⚠ No trained models found.")
    print("\nTo train models:")
    print("```python")
    print("from lib.training.pipeline import stage5_train_models")
    print("")
    print("results = stage5_train_models(")
    print("    project_root=str(project_root),")
    print("    scaled_metadata_path='data/stage3/scaled_metadata.arrow',")
    print("    features_stage2_path='data/stage2/features_metadata.arrow',")
    print("    features_stage4_path='data/stage4/features_scaled_metadata.arrow',")
    print("    model_types=['logistic_regression', 'svm', 'naive_cnn', ...],")
    print("    n_splits=5,  # 5-fold CV")
    print("    output_dir='data/stage5',")
    print("    use_mlflow=True,")
    print("    delete_existing=False")
    print(")")
    print("```")

## 9. MLOps Infrastructure

### Experiment Tracking with MLflow

**Features**:
- **Run Tracking**: Unique run IDs, timestamps, tags
- **Parameter Logging**: Hyperparameters, configs
- **Metrics Logging**: Training/validation metrics per epoch
- **Artifact Storage**: Model checkpoints, plots, configs
- **Model Registry**: Versioned model storage

**Integration**:
- Custom `MLflowTracker` wraps MLflow API
- Logs per-fold, per-hyperparameter combination
- Automatic experiment organization

In [None]:
# MLOps: MLflow Integration
print("MLOps: MLflow Experiment Tracking")
print("=" * 60)

try:
    import mlflow
    
    # Check for MLflow tracking directory
    mlflow_dir = project_root / "mlruns"
    
    if mlflow_dir.exists():
        print(f"✓ MLflow tracking directory found: {mlflow_dir}")
        
        # List experiments
        try:
            mlflow.set_tracking_uri(str(mlflow_dir))
            experiments = mlflow.search_experiments()
            print(f"\nFound {len(experiments)} experiments:")
            
            for exp in experiments[:5]:  # Show first 5
                print(f"  - {exp.name}: {exp.experiment_id}")
                
                # Get runs for this experiment
                runs = mlflow.search_runs(experiment_ids=[exp.experiment_id], max_results=3)
                if not runs.empty:
                    print(f"    Runs: {len(runs)} (showing 3)")
        except Exception as e:
            print(f"⚠ Could not query MLflow: {e}")
    else:
        print("⚠ MLflow tracking directory not found.")
        print("  MLflow will be initialized on first training run.")
    
    print("\nMLflow Usage:")
    print("```python")
    print("from lib.mlops.mlflow_tracker import create_mlflow_tracker")
    print("")
    print("tracker = create_mlflow_tracker(")
    print("    experiment_name='fvc_binary_classifier',")
    print("    use_mlflow=True")
    print(")")
    print("")
    print("tracker.log_config(config_dict)")
    print("tracker.log_metric('train_loss', 0.5, step=1)")
    print("tracker.log_artifact('model.pt')")
    print("```")
    
except ImportError:
    print("⚠ MLflow not installed. Install with: pip install mlflow")

### Custom Experiment Tracker

**RunConfig**: Dataclass for complete run configuration
- Experiment metadata (run_id, experiment_name, tags)
- Data config (splits, random seed)
- Video config (num_frames, fixed_size, augmentation)
- Training config (batch_size, learning_rate, epochs)
- Model config (model_type, model_specific_config)

**ExperimentTracker**: Tracks experiments, metrics, artifacts
- JSONL metrics logging (append-only)
- Config versioning with hashing
- Checkpoint management
- Resume capability

## 10. Analytics with DuckDB

### Why DuckDB?

**Benefits**:
- **Fast Analytics**: SQL queries on Polars DataFrames
- **Direct File Access**: Query Arrow/Parquet files directly
- **Zero-Copy**: Efficient memory usage
- **SQL Interface**: Familiar query language

**Use Cases**:
- Cross-stage data analysis
- Feature correlation analysis
- Training results aggregation
- Performance comparisons

In [None]:
# Analytics: DuckDB Integration
print("Analytics: DuckDB Integration")
print("=" * 60)

try:
    from lib.utils.duckdb_analytics import DuckDBAnalytics
    
    # Initialize DuckDB
    analytics = DuckDBAnalytics()
    
    # Register metadata files
    if scaled_metadata.exists():
        analytics.register_arrow("scaled_videos", str(scaled_metadata))
        print("✓ Registered scaled_videos table")
        
        # Example query: Video statistics by label
        query = """
        SELECT 
            label,
            COUNT(*) as count,
            AVG(duration) as avg_duration,
            AVG(fps) as avg_fps
        FROM scaled_videos
        GROUP BY label
        """
        
        result = analytics.query(query)
        print("\nVideo Statistics by Label:")
        display(result)
    
    if features_metadata.exists():
        analytics.register_arrow("features", str(features_metadata))
        print("✓ Registered features table")
        
        # Example query: Feature correlation with label
        # (This would require numeric label encoding)
        print("\n✓ Features table registered for analysis")
    
    analytics.close()
    
    print("\nDuckDB Usage:")
    print("```python")
    print("from lib.utils.duckdb_analytics import DuckDBAnalytics")
    print("")
    print("analytics = DuckDBAnalytics()")
    print("analytics.register_parquet('videos', 'data/scaled_videos/scaled_metadata.parquet')")
    print("result = analytics.query('SELECT * FROM videos WHERE label = \"real\"')")
    print("```")
    
except ImportError:
    print("⚠ DuckDB not installed. Install with: pip install duckdb")
except Exception as e:
    print(f"⚠ Error using DuckDB: {e}")

## 11. Airflow Orchestration

### Pipeline Orchestration

**Apache Airflow DAG**: `airflow/dags/fvc_pipeline_dag.py`

**Stages as Tasks**:
1. **Stage 1 Task**: Video augmentation
2. **Stage 2 Task**: Feature extraction (depends on Stage 1)
3. **Stage 3 Task**: Video scaling (depends on Stage 1)
4. **Stage 4 Task**: Scaled feature extraction (depends on Stage 3)
5. **Stage 5 Task**: Model training (depends on Stages 2, 3, 4)

**Benefits**:
- **Dependency Management**: Automatic task ordering
- **Retry Logic**: Automatic retries on failure
- **Monitoring**: Web UI for pipeline status
- **Scheduling**: Cron-based scheduling
- **Parallelization**: Parallel stage execution where possible

In [None]:
# Airflow: Pipeline Orchestration
print("Airflow: Pipeline Orchestration")
print("=" * 60)

airflow_dag = project_root / "airflow" / "dags" / "fvc_pipeline_dag.py"

if airflow_dag.exists():
    print(f"✓ Airflow DAG found: {airflow_dag}")
    
    # Read and show DAG structure
    with open(airflow_dag, 'r') as f:
        dag_content = f.read()
        
    # Extract task definitions (simplified)
    if "stage1_task" in dag_content:
        print("\nDAG Tasks:")
        print("  1. stage1_task: Video Augmentation")
        print("  2. stage2_task: Feature Extraction (→ stage1_task)")
        print("  3. stage3_task: Video Scaling (→ stage1_task)")
        print("  4. stage4_task: Scaled Features (→ stage3_task)")
        print("  5. stage5_task: Model Training (→ stage2_task, stage3_task, stage4_task)")
        
    print("\nAirflow Usage:")
    print("1. Start Airflow webserver: `airflow webserver --port 8080`")
    print("2. Start Airflow scheduler: `airflow scheduler`")
    print("3. Access UI: http://localhost:8080")
    print("4. Trigger DAG: `airflow dags trigger fvc_pipeline`")
else:
    print("⚠ Airflow DAG not found.")

## 12. Results & Insights

### Model Performance Summary

**Expected Performance** (typical deepfake detection):
- **Baseline Models** (Logistic Regression, SVM): 60-75% accuracy
- **Feature-based Models** (XGBoost with handcrafted features): 70-80% accuracy
- **Deep Learning Models** (3D CNNs, Transformers): 80-90% accuracy
- **Ensemble Models**: 85-95% accuracy

### Key Insights

1. **Handcrafted Features Matter**: Complement deep learning features
2. **Augmentation Critical**: 10x dataset size significantly improves performance
3. **Temporal Modeling**: 3D CNNs and Transformers capture temporal patterns
4. **Ensemble Works**: Combining multiple models improves robustness

### Next Steps

1. **Hyperparameter Tuning**: Expand grid search
2. **Architecture Search**: Try more model variants
3. **Feature Engineering**: Explore additional handcrafted features
4. **Ensemble Methods**: Combine predictions from multiple models
5. **Deployment**: Production inference pipeline

In [None]:
# Results Summary
print("Results & Insights")
print("=" * 60)

# Load training results if available
stage5_dir = project_root / "data" / "stage5"

if stage5_dir.exists():
    results_summary = []
    
    for model_dir in stage5_dir.iterdir():
        if not model_dir.is_dir():
            continue
            
        model_name = model_dir.name
        metrics_files = list(model_dir.rglob("metrics.json"))
        
        if metrics_files:
            # Load first metrics file
            with open(metrics_files[0], 'r') as f:
                metrics = json.load(f)
            
            results_summary.append({
                "model": model_name,
                "test_accuracy": metrics.get("test_accuracy", 0),
                "test_f1": metrics.get("test_f1", 0),
                "test_auc": metrics.get("test_auc", 0),
            })
    
    if results_summary:
        results_df = pd.DataFrame(results_summary)
        results_df = results_df.sort_values("test_accuracy", ascending=False)
        
        print("\nModel Performance Summary:")
        display(results_df.head(10))
        
        # Visualization
        fig, ax = plt.subplots(figsize=(12, 6))
        ax.barh(results_df["model"][:10], results_df["test_accuracy"][:10])
        ax.set_xlabel("Test Accuracy")
        ax.set_title("Top 10 Models by Accuracy")
        plt.tight_layout()
        plt.show()
    else:
        print("⚠ No metrics files found.")
else:
    print("⚠ No training results found.")

## Conclusion

This notebook demonstrated the complete end-to-end pipeline for deepfake video detection:

1. **Data Extraction**: From ZIP archives to organized video datasets
2. **Data Exploration**: Understanding dataset characteristics
3. **Augmentation**: Generating diverse training samples
4. **Feature Engineering**: Handcrafted features for interpretability
5. **Video Preprocessing**: Scaling for consistent model inputs
6. **Model Training**: 23 different architectures with hyperparameter tuning
7. **MLOps**: Experiment tracking, versioning, orchestration
8. **Analytics**: Fast SQL queries on pipeline data

**Key Technologies**:
- PyTorch, torchvision, timm (Deep Learning)
- Polars, PyArrow, DuckDB (Data Processing)
- MLflow (Experiment Tracking)
- Airflow (Orchestration)
- PyAV, OpenCV (Video Processing)

**Production-Ready Features**:
- Memory-efficient processing (chunked, frame-by-frame)
- Reproducible experiments (deterministic seeds, versioning)
- Scalable architecture (distributed training ready)
- Comprehensive monitoring (MLflow, logging)

For individual model details, see the model-specific notebooks (5a-5u).