# Downloading the dataset

We are using the Face Forensics++ (FF++) dataset

In [2]:
# import kagglehub

# # Download latest version
# path = kagglehub.dataset_download("sanikatiwarekar/deep-fake-detection-dfd-entire-original-dataset")

# print("Path to dataset files:", path)

In [1]:
import os
from dotenv import load_dotenv
#store the paths to the real and fake images in an env file

load_dotenv()

path = os.getenv('REAL_PATH')
fake_path = os.getenv('DEEPFAKE_PATH')

def count_mp4_files(folder):
    mp4_count = 0
    for root, dirs, files in os.walk(folder):
        mp4_count += sum(1 for f in files if f.endswith('.mp4'))
    return mp4_count

real_mp4_count = count_mp4_files(path)
fake_mp4_count = count_mp4_files(fake_path)

print(f'Real folder mp4 count: {real_mp4_count}')
print(f'Fake folder mp4 count: {fake_mp4_count}')

Real folder mp4 count: 363
Fake folder mp4 count: 3068


Use the ResNet model for feature extraction

In [2]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
from ultralytics import YOLO
from dotenv import load_dotenv

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

tf.keras.backend.clear_session()

In [3]:
def process_single_video(video_path, output_dir=None):
    """Process a single video completely isolated from any other code"""
    
    # Create output directory if needed
    if output_dir is None:
        output_dir = "processed_vectors"
    os.makedirs(output_dir, exist_ok=True)
    
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    output_path = os.path.join(output_dir, f"{video_name}.npy")
    
    print(f"Processing: {video_path}")
    print(f"Output will be saved to: {output_path}")
    
    # Step 1: Create a fresh ResNet model (switching from EfficientNet)
    print("Creating feature extractor...")
    inputs = keras.Input(shape=(224, 224, 3))
    resnet = ResNet50(include_top=False, weights='imagenet', pooling='avg', input_tensor=inputs)
    feature_extractor = Model(inputs=inputs, outputs=resnet.output)
    
    # Step 2: Load face detector
    print("Loading face detector...")
    face_detector = YOLO('yolov8n.pt')
    
    # Step 3: Process video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"ERROR: Could not open video {video_path}")
        return
    
    # Extract faces from video
    print("Extracting faces from video...")
    face_frames = []
    frame_count = 0
    
    while frame_count < 210:
        ret, frame = cap.read()
        if not ret:
            break
            
        # Detect faces
        results = face_detector.predict(source=frame, conf=0.5, classes=0, verbose=False)
        
        # Find largest face
        largest_face = None
        largest_area = 0
        
        for result in results:
            for box in result.boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                area = (x2 - x1) * (y2 - y1)
                
                if area > largest_area:
                    largest_area = area
                    largest_face = (x1, y1, x2, y2)
        
        # Process face if found
        if largest_face:
            x1, y1, x2, y2 = largest_face
            face = frame[y1:y2, x1:x2]
            
            # Resize and convert to RGB (OpenCV uses BGR)
            face_resized = cv2.resize(face, (224, 224))
            face_rgb = cv2.cvtColor(face_resized, cv2.COLOR_BGR2RGB)
            
            # Add to list
            face_frames.append(face_rgb)
            if frame_count % 20 == 0:
                print(f"Processed frame {frame_count} - face detected")
        else:
            if frame_count % 20 == 0:
                print(f"Processed frame {frame_count} - no face detected")
        
        frame_count += 1
    
    cap.release()
    
    # Step 4: Extract features if faces found
    if len(face_frames) > 0:
        print(f"Found {len(face_frames)} faces. Extracting features...")
        
        # Convert to array and normalize to 0-1
        face_array = np.array(face_frames, dtype=np.float32) / 255.0
        
        # Process in smaller batches to avoid memory issues
        batch_size = 32
        all_features = []
        
        for i in range(0, len(face_array), batch_size):
            batch = face_array[i:i+batch_size]
            batch_features = feature_extractor.predict(batch, verbose=0)
            all_features.append(batch_features)
            print(f"Processed batch {i//batch_size + 1}/{(len(face_array)-1)//batch_size + 1}")
        
        # Combine features
        features = np.vstack(all_features)
        
        # Save features
        np.save(output_path, features)
        print(f"✅ SUCCESS: Saved {len(features)} feature vectors to {output_path}")
        return True
    else:
        print("❌ ERROR: No faces detected in video")
        return False


In [7]:
def process_all_videos():
    """Process all real and fake videos into feature vectors"""
    load_dotenv()
    
    real_path = os.getenv('REAL_PATH')
    fake_path = os.getenv('DEEPFAKE_PATH')
    
    # Process real videos
    if real_path and os.path.exists(real_path):
        print("=== PROCESSING REAL VIDEOS ===")
        real_videos = [f for f in os.listdir(real_path) if f.endswith('.mp4')]
        real_output_dir = "processed_real_vectors"
        os.makedirs(real_output_dir, exist_ok=True)
        
        print(f"Found {len(real_videos)} real videos to process")
        
        for i, video_file in enumerate(real_videos):
            video_path = os.path.join(real_path, video_file)
            print(f"\nProcessing real video {i+1}/{len(real_videos)}: {video_file}")
            
            try:
                success = process_single_video(video_path, real_output_dir)
                if success:
                    print(f"✅ Successfully processed {video_file}")
                else:
                    print(f"❌ Failed to process {video_file}")
            except Exception as e:
                print(f"❌ Error processing {video_file}: {e}")
                continue
    else:
        print("❌ Real video path not found or doesn't exist")
    
    # Process fake videos
    if fake_path and os.path.exists(fake_path):
        print("\n=== PROCESSING FAKE VIDEOS ===")
        fake_videos = [f for f in os.listdir(fake_path) if f.endswith('.mp4')]
        fake_output_dir = "processed_fake_vectors"
        os.makedirs(fake_output_dir, exist_ok=True)
        
        print(f"Found {len(fake_videos)} fake videos to process")
        
        for i, video_file in enumerate(fake_videos):
            video_path = os.path.join(fake_path, video_file)
            print(f"\nProcessing fake video {i+1}/{len(fake_videos)}: {video_file}")
            
            try:
                success = process_single_video(video_path, fake_output_dir)
                if success:
                    print(f"✅ Successfully processed {video_file}")
                else:
                    print(f"❌ Failed to process {video_file}")
            except Exception as e:
                print(f"❌ Error processing {video_file}: {e}")
                continue
    else:
        print("❌ Fake video path not found or doesn't exist")
    
    print("\n=== PROCESSING COMPLETE ===")
    print("All videos have been processed into feature vectors!")

In [None]:
process_all_videos()

=== PROCESSING REAL VIDEOS ===
Found 363 real videos to process

Processing real video 1/363: 07__exit_phone_room.mp4
Processing: /Users/jordan/.cache/kagglehub/datasets/sanikatiwarekar/deep-fake-detection-dfd-entire-original-dataset/versions/1/DFD_original sequences/07__exit_phone_room.mp4
Output will be saved to: processed_real_vectors/07__exit_phone_room.npy
Creating feature extractor...
Loading face detector...
Extracting faces from video...
Processed frame 0 - face detected
Processed frame 20 - face detected
Processed frame 40 - face detected
Processed frame 60 - face detected
Processed frame 80 - face detected
Processed frame 100 - face detected
Processed frame 120 - face detected
Processed frame 140 - face detected
Processed frame 160 - face detected
Processed frame 180 - face detected
Processed frame 200 - face detected
Found 210 faces. Extracting features...
Processed batch 1/7
Processed batch 2/7
Processed batch 3/7
Processed batch 4/7
Processed batch 5/7
Processed batch 6/7


In [None]:
# load_dotenv()

# fake_path = os.getenv('DEEPFAKE_PATH')
# if fake_path and os.path.exists(fake_path):
#     fake_videos = [f for f in os.listdir(fake_path) if f.endswith('.mp4')]
    
#     if fake_videos:
#         # Use the first video for testing
#         first_video = fake_videos[0]
#         video_path = os.path.join(fake_path, first_video)
        
#         # Create output directory
#         output_dir = "processed_vectors"
        
#         # Process the video
#         process_single_video(video_path, output_dir)
#     else:
#         print("No videos found in the fake path")
# else:
#     print("DEEPFAKE_PATH not found in environment variables")


Processing: /Users/jordan/.cache/kagglehub/datasets/sanikatiwarekar/deep-fake-detection-dfd-entire-original-dataset/versions/1/DFD_manipulated_sequences/DFD_manipulated_sequences/13_20__walking_down_indoor_hall_disgust__EV1V4ZQV.mp4
Output will be saved to: processed_vectors/13_20__walking_down_indoor_hall_disgust__EV1V4ZQV.npy
Creating feature extractor...
Loading face detector...
Extracting faces from video...
Loading face detector...
Extracting faces from video...
Processed frame 0 - face detected
Processed frame 0 - face detected
Processed frame 20 - face detected
Processed frame 20 - face detected
Processed frame 40 - face detected
Processed frame 40 - face detected
Processed frame 60 - face detected
Processed frame 60 - face detected
Processed frame 80 - face detected
Processed frame 80 - face detected
Processed frame 100 - face detected
Processed frame 100 - face detected
Processed frame 120 - face detected
Processed frame 120 - face detected
Processed frame 140 - face detected


In [3]:
import pandas as pd
import glob

def create_metadata_csv():
    """Create a CSV file with metadata for all processed videos"""
    metadata_records = []
    
    # Check for processed real videos
    real_vectors_dir = "processed_real_vectors"
    if os.path.exists(real_vectors_dir):
        real_npy_files = glob.glob(os.path.join(real_vectors_dir, "*.npy"))
        for npy_file in real_npy_files:
            video_name = os.path.splitext(os.path.basename(npy_file))[0]
            metadata_records.append({
                'video_id': video_name,
                'label': 'real',
                'features_path': npy_file,
                'category': 'real'
            })
    
    # Check for processed fake videos
    fake_vectors_dir = "processed_fake_vectors"
    if os.path.exists(fake_vectors_dir):
        fake_npy_files = glob.glob(os.path.join(fake_vectors_dir, "*.npy"))
        for npy_file in fake_npy_files:
            video_name = os.path.splitext(os.path.basename(npy_file))[0]
            metadata_records.append({
                'video_id': video_name,
                'label': 'fake',
                'features_path': npy_file,
                'category': 'fake'
            })
    
    # Check for single processed videos (from test runs)
    test_vectors_dir = "processed_vectors"
    if os.path.exists(test_vectors_dir):
        test_npy_files = glob.glob(os.path.join(test_vectors_dir, "*.npy"))
        for npy_file in test_npy_files:
            video_name = os.path.splitext(os.path.basename(npy_file))[0]
            # Assume fake for test videos (adjust as needed)
            metadata_records.append({
                'video_id': video_name,
                'label': 'fake',
                'features_path': npy_file
            })
    
    # Create DataFrame and save to CSV
    if metadata_records:
        df = pd.DataFrame(metadata_records)
        csv_path = "video_features_metadata.csv"
        df.to_csv(csv_path, index=False)
        print(f"✅ Created metadata CSV with {len(metadata_records)} entries: {csv_path}")
        print(f"Real videos: {len([r for r in metadata_records if r['label'] == 'real'])}")
        print(f"Fake videos: {len([r for r in metadata_records if r['label'] == 'fake'])}")
        return df
    else:
        print("❌ No processed video features found")
        return None

# Create the metadata CSV
metadata_df = create_metadata_csv()

✅ Created metadata CSV with 1827 entries: video_features_metadata.csv
Real videos: 363
Fake videos: 1464


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Conv1D, MaxPooling1D, Flatten
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np

# Data loader class
class VideoDataGenerator:
    def __init__(self, csv_path, seq_len=210, batch_size=32):
        self.df = pd.read_csv(csv_path)
        self.seq_len = seq_len
        self.batch_size = batch_size
        self.label_encoder = LabelEncoder()
        self.df['label_encoded'] = self.label_encoder.fit_transform(self.df['label'])
        
    def load_and_pad_features(self, path):
        features = np.load(path)
        if features.shape[0] < self.seq_len:
            # Pad with zeros
            pad = np.zeros((self.seq_len - features.shape[0], features.shape[1]))
            features = np.vstack([features, pad])
        else:
            # Truncate
            features = features[:self.seq_len]
        return features
    
    def generate_batches(self, indices):
        while True:
            np.random.shuffle(indices)
            for i in range(0, len(indices), self.batch_size):
                batch_indices = indices[i:i+self.batch_size]
                batch_x = []
                batch_y = []
                
                for idx in batch_indices:
                    row = self.df.iloc[idx]
                    features = self.load_and_pad_features(row['features_path'])
                    batch_x.append(features)
                    batch_y.append(row['label_encoded'])
                
                yield np.array(batch_x), np.array(batch_y)

# Create LSTM-CNN model
def create_lstm_cnn_model(input_shape):
    model = Sequential([
        # 1D CNN layers for temporal feature extraction
        Conv1D(64, kernel_size=3, activation='relu', input_shape=input_shape),
        MaxPooling1D(pool_size=2),
        Conv1D(128, kernel_size=3, activation='relu'),
        MaxPooling1D(pool_size=2),
        Dropout(0.3),
        
        # LSTM layers for sequence modeling
        LSTM(128, return_sequences=True, dropout=0.3),
        LSTM(64, dropout=0.3),
        
        # Dense layers for classification
        Dense(64, activation='relu'),
        Dropout(0.5),
        Dense(32, activation='relu'),
        Dropout(0.3),
        Dense(1, activation='sigmoid')  # Binary classification
    ])
    
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy', 'precision', 'recall']
    )
    
    return model

# Load data and create generators
data_gen = VideoDataGenerator('video_features_metadata.csv', seq_len=210, batch_size=16)

# Split data: 70% train, 15% validation, 15% test
train_indices, temp_indices = train_test_split(
    range(len(data_gen.df)), 
    test_size=0.3, 
    random_state=42,
    stratify=data_gen.df['label_encoded']
)

val_indices, test_indices = train_test_split(
    temp_indices, 
    test_size=0.5, 
    random_state=42,
    stratify=data_gen.df.iloc[temp_indices]['label_encoded']
)

# Create data generators
train_gen = data_gen.generate_batches(train_indices)
val_gen = data_gen.generate_batches(val_indices)
test_gen = data_gen.generate_batches(test_indices)

# Get input shape (sequence_length, feature_dim)
sample_features = np.load(data_gen.df.iloc[0]['features_path'])
input_shape = (210, sample_features.shape[1])  # (seq_len, feature_dim)

print(f"Input shape: {input_shape}")
print(f"Training samples: {len(train_indices)}")
print(f"Validation samples: {len(val_indices)}")
print(f"Test samples: {len(test_indices)}")

# Create and train model
model = create_lstm_cnn_model(input_shape)
print(model.summary())

# Training callbacks
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3)
]

# Train the model
history = model.fit(
    train_gen,
    steps_per_epoch=len(train_indices) // 16,
    validation_data=val_gen,
    validation_steps=len(val_indices) // 16,
    epochs=20,
    callbacks=callbacks
)

# Evaluate on test set
test_loss, test_acc, test_prec, test_rec = model.evaluate(
    test_gen, 
    steps=len(test_indices) // 16
)

print(f"Test Accuracy: {test_acc:.4f}")
print(f"Test Precision: {test_prec:.4f}")
print(f"Test Recall: {test_rec:.4f}")
print(f"Test F1-Score: {2 * (test_prec * test_rec) / (test_prec + test_rec):.4f}")

# Save the model
model.save('deepfake_detection_model.h5')
print("Model saved as 'deepfake_detection_model.h5'")