# Import Libraries

In [55]:
!pip install mtcnn

Defaulting to user installation because normal site-packages is not writeable


In [7]:
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.preprocessing import image
from tensorflow.keras.layers import LSTM, Dense, TimeDistributed, Input, Dropout
from tensorflow.keras.models import Model, Sequential
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc
from mtcnn.mtcnn import MTCNN
from IPython.display import display
import itertools

# Load Metadata

In [9]:
def load_metadata(base_path):
    metadata_path = os.path.join(base_path, 'metadata.json')
    with open(metadata_path, 'r') as f:
        metadata = json.load(f)
    df = pd.DataFrame(metadata).T  # Transpose to get video names as rows
    df.reset_index(inplace=True)
    df.rename(columns={'index': 'video_name'}, inplace=True)
    return df

base_path = 'deepfake-detection-challenge/train_sample_videos'
df = load_metadata(base_path)

# Video Duration Calculation

In [13]:
def get_video_duration(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return None
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    duration = frame_count / fps if fps != 0 else None
    cap.release()
    return duration

df['duration'] = df['video_name'].apply(lambda x: get_video_duration(os.path.join(base_path, x)))

# Extract Frames

In [18]:
def extract_frames(video_path, num_frames=10, target_size=(224, 224)):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Cannot open video {video_path}")
        return []

    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_interval = max(1, length // num_frames)  # Avoid division by zero

    frames = []
    for i in range(num_frames):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * frame_interval)
        ret, frame = cap.read()
        if ret:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.resize(frame, target_size)
            frames.append(frame)
        else:
            break
    cap.release()
    return np.array(frames)

# Feature Extraction and Face Detection with MTCNN

In [20]:
def detect_and_crop_faces(frames, detector):
    cropped_faces = []
    for frame in frames:
        detections = detector.detect_faces(frame)
        if len(detections) == 0:
            continue  # Skip if no face is detected
        for detection in detections:
            x, y, w, h = detection['box']
            face = frame[y:y+h, x:x+w]
            face = cv2.resize(face, (224, 224))
            cropped_faces.append(face)
            break  # Take the first detected face
    return np.array(cropped_faces)

detector = MTCNN()

## Video FPS

In [23]:
def get_video_fps(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return None
    fps = cap.get(cv2.CAP_PROP_FPS)
    cap.release()
    return fps

df['frame_rate'] = df['video_name'].apply(lambda x: get_video_fps(os.path.join(base_path, x)))

# Batch Processing for Feature Extraction

In [25]:
def extract_features(frames, resnet_model):
    preprocessed_frames = tf.keras.applications.resnet50.preprocess_input(frames)
    features = resnet_model.predict(preprocessed_frames, batch_size=16)  # Use batch processing
    return features

resnet_model = ResNet50(weights='imagenet', include_top=False, pooling='avg')

# Dataset Preparation

In [None]:
def prepare_dataset(video_paths, labels, num_frames=10):
    X = []
    y = []
    for video_path, label in zip(video_paths, labels):
        frames = extract_frames(video_path, num_frames)
        if len(frames) == 0:
            print(f"Skipping video {video_path} due to frame extraction error.")
            continue

        # Apply augmentations to simulate various conditions
        augmented_frames = []
        for frame in frames:
            augmented_frames.append(frame)  # Original frame
            augmented_frames.append(tf.image.random_brightness(frame, max_delta=0.3))  # Dim lighting
            augmented_frames.append(tf.image.random_contrast(frame, lower=0.8, upper=1.2))  # Varying contrast
            augmented_frames.append(tf.image.random_flip_left_right(frame))  # Random flip
            augmented_frames.append(tf.image.random_saturation(frame, lower=0.8, upper=1.2))  # Varying saturation
        augmented_frames = np.array(augmented_frames)

        cropped_faces = detect_and_crop_faces(augmented_frames, detector)
        if cropped_faces.shape[0] < num_frames:
            print(f"Skipping video {video_path} due to insufficient detected faces.")
            continue  # Skip if not enough faces detected

        features = extract_features(cropped_faces, resnet_model)
        X.append(features)
        y.append(label)
    return np.array(X), np.array(y)

# Load video files and labels
video_files = []
labels = []

for _, row in df.iterrows():
    video_name = row['video_name']
    label = row['label']
    video_path = os.path.join(base_path, video_name)
    video_files.append(video_path)
    labels.append(1 if label == 'FAKE' else 0)  # Label 1 for FAKE, 0 for REAL

train_videos, val_videos, train_labels, val_labels = train_test_split(video_files, labels, test_size=0.2, random_state=42)

# Prepare training and validation datasets
X_train, y_train = prepare_dataset(train_videos, train_labels)
X_val, y_val = prepare_dataset(val_videos, val_labels)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 296ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 183ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 195ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m

In [None]:
from tensorflow.keras.layers import TimeDistributed

# Define LSTM model
def create_lstm_model(input_shape):
    model = Sequential()
    model.add(TimeDistributed(Dense(256, activation='relu'), input_shape=input_shape))
    model.add(TimeDistributed(Dropout(0.5)))
    model.add(LSTM(128, return_sequences=False))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

input_shape = (X_train.shape[1], X_train.shape[2])
model = create_lstm_model(input_shape)

# Implement early stopping
from tensorflow.keras.callbacks import EarlyStopping
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# Train the model
history = model.fit(X_train, y_train, epochs=20, batch_size=16, validation_data=(X_val, y_val), callbacks=[early_stopping])

# Evaluate model
val_pred = model.predict(X_val)
val_pred_labels = (val_pred > 0.5).astype(int)

# Confusion Matrix
import itertools
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

def plot_confusion_matrix(cm, classes, title='Confusion matrix', cmap=plt.cm.Blues):
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    fmt = 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

cnf_matrix = confusion_matrix(y_val, val_pred_labels)
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=['REAL', 'FAKE'], title='Confusion Matrix')

# ROC Curve
from sklearn.metrics import roc_curve, auc

fpr, tpr, _ = roc_curve(y_val, val_pred)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()


# Model Architecture with Dropout Layers

In [None]:
model = Sequential()
model.add(LSTM(128, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=True))
model.add(Dropout(0.5))
model.add(LSTM(64))
model.add(Dropout(0.5))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Model Training with Early Stopping

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

model.fit(X_train, y_train, epochs=50, batch_size=16, validation_data=(X_val, y_val), callbacks=[early_stopping])

# Model Evaluation

In [None]:
val_predictions = model.predict(X_val)
val_pred_labels = (val_predictions > 0.5).astype(int)

print(confusion_matrix(y_val, val_pred_labels))
print(classification_report(y_val, val_pred_labels))

# ROC Curve
fpr, tpr, _ = roc_curve(y_val, val_predictions)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()

# Test Predictions

In [None]:
def prepare_test_dataset(test_videos_folder, num_frames=10):
    test_videos = [os.path.join(test_videos_folder, f) for f in os.listdir(test_videos_folder) if f.endswith('.mp4')]
    X_test = []
    for video_path in test_videos:
        frames = extract_frames(video_path, num_frames)
        if len(frames) == 0:
            continue
        cropped_faces = detect_and_crop_faces(frames, detector)
        if cropped_faces.shape[0] < num_frames:
            continue  # Skip if not enough faces detected
        features = extract_features(cropped_faces, resnet_model)
        X_test.append(features)
    return np.array(X_test), test_videos

test_videos_folder = 'deepfake-detection-challenge/test_videos'
X_test, test_video_files = prepare_test_dataset(test_videos_folder)

predictions = model.predict(X_test)

for video, pred in zip(test_video_files, predictions):
    print(f'Predicted label for {video}: {"FAKE" if pred > 0.5 else "REAL"}')

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 47ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 58ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 52ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 55ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 54ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 833ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3

# Visualization

In [4]:
# Confusion Matrix Plotting
def plot_confusion_matrix(cm, classes, title='Confusion matrix', cmap=plt.cm.Blues):
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    fmt = 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

cnf_matrix = confusion_matrix(y_val, val_pred_labels)
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=['REAL', 'FAKE'], title='Confusion Matrix')

# ROC Curve Plotting
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()

NameError: name 'y_val' is not defined