In [2]:
import os
import numpy as np
import cv2
from moviepy.editor import VideoFileClip
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import json
from datetime import datetime
import shutil

In [4]:
# Create necessary directories
def create_directories():
    directories = [
        '../processed_dataset/real',
        '../processed_dataset/fake',
        '../models',
        '../models/checkpoints',
        '../logs',
        '../logs/training_history',
        '../evaluation'
    ]
    for dir in directories:
        os.makedirs(dir, exist_ok=True)

In [5]:
create_directories()

In [6]:
class VideoFrameExtractor:
    def __init__(self, sample_rate=30):
        self.sample_rate = sample_rate
        self.face_cascade = cv2.CascadeClassifier(
            cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
        )

    def extract_faces_from_frame(self, frame):
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = self.face_cascade.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=5,
            minSize=(30, 30)
        )
        if len(faces) == 0:
            return None

        processed_faces = []
        for (x, y, w, h) in faces:
            face = frame[y:y + h, x:x + w]
            face = cv2.resize(face, (128, 128))
            processed_faces.append(face)
        return processed_faces

    def process_video(self, video_path, output_dir):
        """Extract frames from a video and save them as images"""
        try:
            clip = VideoFileClip(video_path)
            for i, frame in enumerate(clip.iter_frames()):
                if i % self.sample_rate == 0:
                    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
                    faces = self.extract_faces_from_frame(frame)
                    if faces:
                        for j, face in enumerate(faces):
                            face_path = os.path.join(output_dir, f"{os.path.basename(video_path)}_{i}_{j}.jpg")
                            cv2.imwrite(face_path, face)
            clip.close()
        except Exception as e:
            print(f"Error processing video {video_path}: {e}")

In [7]:
def prepare_dataset(real_dir='../dataset/real_videos', fake_dir='../dataset/fake_videos'):
    """Prepare dataset and save frames to disk"""
    extractor = VideoFrameExtractor(sample_rate=30)
    # Process real videos
    print("Processing real videos...")
    for video in os.listdir(real_dir):
        video_path = os.path.join(real_dir, video)
        output_dir = '../processed_dataset/real'
        extractor.process_video(video_path, output_dir)

    # Process fake videos
    print("\nProcessing fake videos...")
    for video in os.listdir(fake_dir):
        video_path = os.path.join(fake_dir, video)
        output_dir = '../processed_dataset/fake'
        extractor.process_video(video_path, output_dir)

In [9]:
# Prepare dataset
print("Starting dataset preparation...")
prepare_dataset()

Starting dataset preparation...
Processing real videos...

Processing fake videos...


In [9]:
# Load and preprocess images in batches
def data_generator(batch_size, file_list, label):
    """Generate data batches from specific file lists."""
    num_samples = len(file_list)
    while True:  # Infinite loop for generator
        np.random.shuffle(file_list)
        for i in range(0, num_samples, batch_size):
            batch_files = file_list[i:i + batch_size]
            batch_images = []
            batch_labels = []
            for file in batch_files:
                file_path = file  # Direct file path
                img = cv2.imread(file_path)
                if img is not None:
                    img = cv2.resize(img, (224, 224))  # Resize images
                    img = img / 255.0  # Normalize
                    batch_images.append(img)
                    batch_labels.append(label)
            yield np.array(batch_images), np.array(batch_labels)


In [10]:
# Define data generators
# Wrap Python generators with tf.data.Dataset
batch_size = 32

# Split dataset into training and validation
all_real_files = [os.path.join('../processed_dataset/real', f) for f in os.listdir('../processed_dataset/real')]
all_fake_files = [os.path.join('../processed_dataset/fake', f) for f in os.listdir('../processed_dataset/fake')]
real_train, real_val = train_test_split(all_real_files, test_size=0.2, random_state=42)
fake_train, fake_val = train_test_split(all_fake_files, test_size=0.2, random_state=42)

In [11]:
# Training datasets
real_train_dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(batch_size, real_train, 0),
    output_signature=(
        tf.TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None,), dtype=tf.int32)
    )
)

fake_train_dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(batch_size, fake_train, 1),
    output_signature=(
        tf.TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None,), dtype=tf.int32)
    )
)


In [12]:
# Combine and shuffle training datasets
train_dataset = tf.data.Dataset.zip((real_train_dataset, fake_train_dataset)).shuffle(1000).batch(batch_size)

In [13]:
# Validation datasets
real_val_dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(batch_size, real_val, 0),
    output_signature=(
        tf.TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None,), dtype=tf.int32)
    )
)

fake_val_dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(batch_size, fake_val, 1),
    output_signature=(
        tf.TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None,), dtype=tf.int32)
    )
)


In [14]:

# Combine validation datasets
val_generator = tf.data.Dataset.zip((real_val_dataset, fake_val_dataset)).batch(batch_size)

In [15]:
def create_model():
    """Create the model architecture"""
    base_model = tf.keras.applications.EfficientNetB0(
        include_top=False,
        weights='imagenet',
        input_shape=(128, 128, 3)
    )

    # Freeze the base model
    base_model.trainable = False

    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(512, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(1, activation='sigmoid')
    ])

    return model

In [16]:
# Create and compile model
model = create_model()
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=['accuracy', tf.keras.metrics.AUC()]
)

In [17]:
# Setup callbacks
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    f'../models/checkpoints/model_{timestamp}_{{epoch:02d}}-{{val_accuracy:.4f}}.keras',
    monitor='val_accuracy',
    save_best_only=True,
    mode='max',
    verbose=1
)

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True,
    verbose=1
)

In [19]:
# Train model
print("\nStarting model training...")
history = model.fit(
    train_dataset ,
    validation_data=val_generator,
    epochs=20,  # Increased epochs for better training
    steps_per_epoch=len(real_train) // batch_size,
    validation_steps=len(real_val) // batch_size,
    callbacks=[model_checkpoint, early_stopping]
)


Starting model training...
Epoch 1/20


ValueError: Exception encountered when calling Sequential.call().

[1mInvalid input shape for input Tensor("data:0", shape=(None, None, 224, 224, 3), dtype=float32). Expected shape (None, 128, 128, 3), but input has incompatible shape (None, None, 224, 224, 3)[0m

Arguments received by Sequential.call():
  • inputs=('tf.Tensor(shape=(None, None, 224, 224, 3), dtype=float32)', 'tf.Tensor(shape=(None, None), dtype=int32)')
  • training=True
  • mask=('None', 'None')

In [None]:
# Save model and training history
model.save(f'../models/deepfake_detector_{timestamp}.keras')
with open(f'../logs/training_history/history_{timestamp}.json', 'w') as f:
    json.dump(history.history, f)


In [None]:
# Evaluate model
y_true = np.concatenate([
    np.zeros(len(real_val)),
    np.ones(len(fake_val))
])
y_pred = model.predict(val_generator).flatten()

from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
from sklearn.metrics import roc_auc_score, roc_curve

precision = precision_score(y_true, np.round(y_pred))
recall = recall_score(y_true, np.round(y_pred))
f1 = f1_score(y_true, np.round(y_pred))
auc = roc_auc_score(y_true, y_pred)
conf_matrix = confusion_matrix(y_true, np.round(y_pred))

In [None]:
# Save evaluation results
evaluation_results = {
    'precision': precision,
    'recall': recall,
    'f1_score': f1,
    'auc': auc,
    'confusion_matrix': conf_matrix.tolist()  # Convert to list for JSON serialization
}


In [None]:
# Save confusion matrix image
plt.figure(figsize=(10, 7))
plt.matshow(conf_matrix, fignum=1)
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.colorbar()
plt.savefig(f'../evaluation/confusion_matrix_{timestamp}.png')

with open(f'../evaluation/results_{timestamp}.json', 'w') as f:
    json.dump(evaluation_results, f)

print("Training and evaluation completed.")