In [None]:
import torch
import numpy as np # linear algebra
import cv2
from pathlib import Path

**Goal:** Create a Machine Learning Model that is able to classify whether a video is AI generated or not


Method: Using CNN and Temporal Modeling as my starting point for this classification task

In [4]:
def extract_frames(video_path, num_frames=30, resize=(224, 224)):
    """
    Extract evenly spaced frames from a video
    
    Args:
        video_path: Path to video file
        num_frames: Number of frames to extract
        resize: Target size for frames (height, width)
    
    Returns:
        numpy array of shape (num_frames, height, width, 3)
    """
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Calculate frame indices to extract (evenly spaced)
    frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
    
    frames = []
    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        
        if ret:
            # Convert BGR to RGB
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            # Resize
            frame = cv2.resize(frame, resize)
            # Normalize pixel values to [0, 1]
            frame = frame.astype(np.float32) / 255.0
            frames.append(frame)
    
    cap.release()
    
    return np.array(frames)

In [5]:
from sklearn.model_selection import train_test_split
from tqdm import tqdm

def process_video_dataset(ai_video_dir, real_video_dir, num_frames=30):
    """
    Process all videos and create labeled dataset
    """
    X = []  # Features (frames)
    y = []  # Labels (0=real, 1=AI)
    
    # Process AI-generated videos
    print("Processing AI videos...")
    ai_videos = list(Path(ai_video_dir).glob('*.mp4'))
    for video_path in tqdm(ai_videos):
        try:
            frames = extract_frames(str(video_path), num_frames)
            X.append(frames)
            y.append(1)  # AI label
        except Exception as e:
            print(f"Error processing {video_path}: {e}")
    
    # Process real videos
    print("Processing real videos...")
    real_videos = list(Path(real_video_dir).glob('*.mp4'))
    for video_path in tqdm(real_videos):
        try:
            frames = extract_frames(str(video_path), num_frames)
            X.append(frames)
            y.append(0)  # Real label
        except Exception as e:
            print(f"Error processing {video_path}: {e}")
    
    X = np.array(X)
    y = np.array(y)
    
    return X, y

# Usage
X, y = process_video_dataset('/kaggle/input/realai-video-dataset/ai', 
                              '/kaggle/input/realai-video-dataset/real',
                              num_frames=30)

print(f"Dataset shape: {X.shape}")
print(f"Labels shape: {y.shape}")

# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

Processing AI videos...


100%|██████████| 33/33 [04:31<00:00,  8.23s/it]


Processing real videos...


100%|██████████| 33/33 [24:07<00:00, 43.85s/it]  


Dataset shape: (66, 30, 224, 224, 3)
Labels shape: (66,)


In [13]:
def build_lightweight_model(input_shape):
    """
    Smaller model that uses less memory
    Good for Kaggle's GPU limits
    """
    model = Sequential([
        # Smaller 3D convolutions
        Conv3D(8, (3, 3, 3), padding='same', activation='relu', input_shape=input_shape),
        MaxPooling3D((2, 2, 2)),
        BatchNormalization(),
        
        Conv3D(16, (3, 3, 3), padding='same', activation='relu'),
        MaxPooling3D((2, 2, 2)),
        BatchNormalization(),
        
        Conv3D(32, (3, 3, 3), padding='same', activation='relu'),
        MaxPooling3D((2, 2, 2)),
        BatchNormalization(),
        
        # Global average pooling instead of flatten (reduces parameters)
        tf.keras.layers.GlobalAveragePooling3D(),
        
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])
    
    return model

model = build_lightweight_model(input_shape=X_train.shape[1:])

In [16]:
from tensorflow.keras.optimizers import Adam

model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=[
        'accuracy',
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall'),
        tf.keras.metrics.AUC(name='auc')
    ]
)

In [17]:
def augment_video(video_frames):
    """
    Apply random augmentations to video frames
    """
    # Random horizontal flip
    if np.random.random() > 0.5:
        video_frames = video_frames[:, :, ::-1, :]
    
    # Random brightness adjustment
    brightness_factor = np.random.uniform(0.8, 1.2)
    video_frames = np.clip(video_frames * brightness_factor, 0, 1)
    
    # Random rotation (small angle)
    angle = np.random.uniform(-5, 5)
    # Note: You'd need to implement rotation for each frame
    
    return video_frames

# Create augmented training data
def create_augmented_dataset(X, y, augmentation_factor=2):
    """
    Create additional training samples through augmentation
    """
    X_augmented = [X]
    y_augmented = [y]
    
    for _ in range(augmentation_factor - 1):
        X_aug = np.array([augment_video(video) for video in X])
        X_augmented.append(X_aug)
        y_augmented.append(y)
    
    return np.vstack(X_augmented), np.hstack(y_augmented)

# Augment training data
X_train_aug, y_train_aug = create_augmented_dataset(X_train, y_train, augmentation_factor=2)
print(f"Augmented training data: {X_train_aug.shape}")

Augmented training data: (104, 30, 224, 224, 3)


In [None]:
from tensorflow.keras.callbacks import (
    EarlyStopping, ModelCheckpoint, 
    ReduceLROnPlateau, TensorBoard
)
import datetime

# Setup callbacks
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

callbacks = [
    EarlyStopping(
        monitor='val_loss',
        patience=15,
        restore_best_weights=True,
        verbose=1
    ),
    
    ModelCheckpoint(
        'best_model.keras',
        monitor='val_accuracy',
        save_best_only=True,
        verbose=1
    ),
    
    ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-7,
        verbose=1
    ),
    
    TensorBoard(log_dir=log_dir, histogram_freq=1)
]

# Train the model
history = model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=50,
    batch_size=4,  # Adjust based on your GPU memory
    callbacks=callbacks,
    verbose=1
)

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import seaborn as sns

# Plot training history
def plot_training_history(history):
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    # Accuracy
    axes[0].plot(history.history['accuracy'], label='Train Accuracy')
    axes[0].plot(history.history['val_accuracy'], label='Val Accuracy')
    axes[0].set_title('Model Accuracy')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Accuracy')
    axes[0].legend()
    axes[0].grid(True)
    
    # Loss
    axes[1].plot(history.history['loss'], label='Train Loss')
    axes[1].plot(history.history['val_loss'], label='Val Loss')
    axes[1].set_title('Model Loss')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Loss')
    axes[1].legend()
    axes[1].grid(True)
    
    plt.tight_layout()
    plt.show()

plot_training_history(history)

# Predictions
y_pred_proba = model.predict(X_test)
y_pred = (y_pred_proba > 0.5).astype(int)

# Classification Report
print("Classification Report:")
print(classification_report(y_test, y_pred, target_names=['Real', 'AI']))

# Confusion Matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Real', 'AI'], 
            yticklabels=['Real', 'AI'])
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

# ROC Curve
fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.grid(True)
plt.show()

In [None]:
# Save the final model
model.save('ai_video_detector_final.keras')

# Load and use for predictions
from tensorflow.keras.models import load_model

loaded_model = load_model('ai_video_detector_final.keras')

def predict_video(video_path):
    """Predict if a video is AI-generated"""
    frames = extract_frames(video_path, num_frames=30)
    frames = np.expand_dims(frames, axis=0)  # Add batch dimension
    
    prediction = loaded_model.predict(frames)[0][0]
    
    if prediction > 0.5:
        return f"AI-Generated (confidence: {prediction*100:.2f}%)"
    else:
        return f"Real Video (confidence: {(1-prediction)*100:.2f}%)"

# Test on a new video
result = predict_video('/path/to/test_video.mp4')
print(result)