In [1]:
import logging
import os
from datetime import datetime

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from facenet_pytorch import MTCNN
from PIL import Image
from sklearn.metrics import (
    accuracy_score,
    precision_recall_curve,
    roc_auc_score,
    roc_curve,
)
from torch.nn import functional as F

# Setup logging to display progress and informational messages
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# Model architecture
To load the saved model weights, we must first define the model's architecture. This class must be identical to the `DeepfakeDetector` class used during training.

In [2]:
class DeepfakeDetector(nn.Module):
    def __init__(self, num_frames=20, backbone="efficientnet_b4", dropout_rate=0.6):
        super(DeepfakeDetector, self).__init__()
        self.num_frames = num_frames

        # CNN Backbone for feature extraction
        if backbone == "efficientnet_b4":
            weights = models.EfficientNet_B4_Weights.IMAGENET1K_V1
            self.backbone = models.efficientnet_b4(weights=weights)
            self.backbone.classifier = nn.Identity()  # Remove final classifier
            feature_dim = 1792
        elif backbone == "resnet50":
            weights = models.ResNet50_Weights.IMAGENET1K_V1
            self.backbone = models.resnet50(weights=weights)
            self.backbone.fc = nn.Identity()
            feature_dim = 2048

        # Temporal processing layers (LSTM)
        self.lstm = nn.LSTM(
            input_size=feature_dim,
            hidden_size=512,
            num_layers=3,
            batch_first=True,
            dropout=dropout_rate,
            bidirectional=True,
        )

        # Attention mechanism to focus on important frames
        self.attention = nn.Sequential(
            nn.Linear(1024, 256),  # 512 * 2 for bidirectional LSTM
            nn.ReLU(),
            nn.Linear(256, 1),
            nn.Softmax(dim=1),
        )

        # Final classification layers
        self.classifier = nn.Sequential(
            nn.Linear(1024, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(128, 1),
        )

    def forward(self, x):
        batch_size, num_frames, C, H, W = x.shape
        x = x.view(batch_size * num_frames, C, H, W)

        features = self.backbone(x)
        features = features.view(batch_size, num_frames, -1)

        lstm_out, _ = self.lstm(features)

        attention_weights = self.attention(lstm_out)
        context_vector = (lstm_out * attention_weights).sum(dim=1)

        output = self.classifier(context_vector)
        return output

# Load Model and set up for inference
This section handles loading the pre-trained model and preparing all necessary components for the inference process, including the face detector (MTCNN) and image transformations.

In [3]:
def load_model_and_dependencies():
    """
    Initializes the model, loads weights, and sets up dependencies.
    """
    # --- Configuration ---
    MODEL_PATH = (
        "models/best_deepfake_detector_model.pth"  # IMPORTANT: Update this path
    )
    NUM_FRAMES = 15  # Must match the model's training configuration
    IMAGE_SIZE = 224

    # Set device (GPU if available, otherwise CPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device: {device}")

    # Initialize model
    model = DeepfakeDetector(num_frames=NUM_FRAMES, backbone="efficientnet_b4").to(
        device
    )

    # Load the trained weights
    try:
        model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
        logging.info(f"Successfully loaded model weights from {MODEL_PATH}")
    except FileNotFoundError:
        logging.error(
            f"Model file not found at {MODEL_PATH}. Please update the MODEL_PATH variable."
        )
        return None, None, None, None, None
    except Exception as e:
        logging.error(f"Error loading model: {e}")
        return None, None, None, None, None

    model.eval()  # Set model to evaluation mode

    # Initialize MTCNN for face detection
    mtcnn = MTCNN(
        image_size=IMAGE_SIZE,
        margin=20,
        device=device,
        min_face_size=20,
        thresholds=[0.6, 0.7, 0.7],
    )
    logging.info("MTCNN face detector initialized.")

    # Define validation transforms (must match those used during training)
    val_transforms = transforms.Compose(
        [
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]
    )

    return model, mtcnn, val_transforms, device, IMAGE_SIZE

# Video inference and bounding box visualization
This function performs the core task: processing a video frame by frame, detecting faces, running the deepfake prediction, and drawing bounding boxes with the results on each frame.

In [4]:
def detect_deepfake_in_video(
    video_path, output_path, model, mtcnn, transforms, device, image_size
):
    """
    Processes a video to detect deepfakes, draws bounding boxes, and saves the output.
    """
    if not os.path.exists(video_path):
        logging.error(f"Video not found at {video_path}")
        return

    logging.info(f"Starting deepfake detection on {video_path}")

    # Create directories for frame outputs
    output_dir = os.path.dirname(output_path)
    frames_output_dir = os.path.join(
        output_dir, "processed_frames", datetime.now().strftime("%Y%m%d_%H%M%S")
    )

    os.makedirs(frames_output_dir, exist_ok=True)

    # List to hold prediction data for the CSV
    prediction_data = []

    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Define the codec and create VideoWriter object to save the output video
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(
        f"{output_path}/output_video.mp4", fourcc, fps, (width, height)
    )

    frame_count = 0

    face_buffer = []
    frame_buffer = []
    BUFFER_SIZE = model.num_frames  # Should be 15

    all_probs = []  # A list to store all frame probabilities

    with torch.no_grad():
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(frame_rgb)

            boxes, probs = mtcnn.detect(pil_image)

            # Check if any faces were detected
            if boxes is not None and len(boxes) > 0:
                # Find the index of the face with the highest detection probability
                best_face_index = np.argmax(probs)
                best_face_prob = probs[best_face_index]

                # Only proceed if the best detection is high-confidence
                if best_face_prob > 0.95:
                    box = boxes[best_face_index]

                    x1, y1, x2, y2 = [int(b) for b in box]
                    face = pil_image.crop((x1, y1, x2, y2))
                    face_resized = face.resize((image_size, image_size))
                    facetensor = transforms(face_resized)

                    face_buffer.append(facetensor)
                    frame_buffer.append(
                        {
                            "frame": frame.copy(),
                            "box": box,
                            "frame_count": frame_count,
                            "face_index": best_face_index,
                        }
                    )

                    if len(face_buffer) == BUFFER_SIZE:
                        input_tensor = torch.stack(face_buffer).unsqueeze(0).to(device)

                        prediction = model(input_tensor)
                        prediction_prob = torch.sigmoid(prediction).item()
                        all_probs.append(prediction_prob)

                        # Always label with the FAKE probability
                        display_text = f"FAKE: {prediction_prob:.2f}"
                        # Change color based on the threshold
                        color = (
                            (0, 0, 255) if prediction_prob > 0.5 else (0, 255, 0)
                        )  # Red for > 0.5, Green for <= 0.5
                        label_text = (
                            "FAKE" if prediction_prob > 0.5 else "REAL"
                        )  # Keep this for the CSV log

                        for item in frame_buffer:
                            f, b, fc, face_idx = (
                                item["frame"],
                                item["box"],
                                item["frame_count"],
                                item["face_index"],
                            )
                            x1_b, y1_b, x2_b, y2_b = [int(coord) for coord in b]

                            # Draw bounding box and label
                            cv2.rectangle(f, (x1_b, y1_b), (x2_b, y2_b), color, 2)
                            cv2.putText(
                                f,
                                display_text,
                                (x1_b, y1_b - 10),
                                cv2.FONT_HERSHEY_SIMPLEX,
                                0.7,
                                color,
                                2,
                            )

                            # Write the ANNOTATED frame to the video
                            out.write(f)

                            # Save the annotated frame as an image
                            frame_filename = f"frame{fc}_face{face_idx}.jpg" # Use face_idx here
                            frame_save_path = os.path.join(
                                frames_output_dir, frame_filename
                            )
                            cv2.imwrite(frame_save_path, f)

                            # Append prediction data for this frame
                            prediction_data.append(
                                {
                                    "frame_number": fc,
                                    "face_index": face_idx,
                                    "bounding_box": f"({int(b[0])},{int(b[1])},{int(b[2])},{int(b[3])})",
                                    "prediction_prob": prediction_prob,
                                    "label": label_text,
                                    "face_image_path": frame_save_path,
                                }
                            )

                        face_buffer.clear()
                        frame_buffer.clear()
            frame_count += 1
            if frame_count % 100 == 0:
                logging.info(f"Processed {frame_count} frames...")
        if face_buffer:
            # Pad the buffer if it's not full
            padding_needed = BUFFER_SIZE - len(face_buffer)
            if padding_needed > 0:
                last_face_tensor = face_buffer[-1]
                face_buffer.extend([last_face_tensor] * padding_needed)

            input_tensor = torch.stack(face_buffer).unsqueeze(0).to(device)

            # Perform prediction
            prediction = model(input_tensor)
            prediction_prob = torch.sigmoid(prediction).item()
            all_probs.append(prediction_prob)

            # Determine label and color
            label = (
                f"FAKE {prediction_prob:.2f}"
                if prediction_prob > 0.5
                else f"REAL {1 - prediction_prob:.2f}"
            )
            color = (0, 0, 255) if prediction_prob > 0.5 else (0, 255, 0)

            # Draw on remaining buffered frames
            for item in frame_buffer:
                f, b = item["frame"], item["box"]
                x1_b, y1_b, x2_b, y2_b = [int(coord) for coord in b]
                cv2.rectangle(f, (x1_b, y1_b), (x2_b, y2_b), color, 2)
                cv2.putText(
                    f, label, (x1_b, y1_b - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2
                )
                out.write(f)

    cap.release()
    out.release()

    if prediction_data:
        df = pd.DataFrame(prediction_data)
        csv_output_path = os.path.join(output_dir, "prediction_results.csv")
        df.to_csv(csv_output_path, index=False, float_format="%.2f")
        logging.info(f"Prediction results saved to {csv_output_path}")

    if all_probs:
        avg_prob = np.mean(all_probs)
        confidence = avg_prob
        if avg_prob > 0.5:
            final_verdict = "FAKE"
        else:
            final_verdict = "REAL"
        logging.info(f"--- Video Analysis Complete ---")
        logging.info(f"Final Verdict: The video is likely {final_verdict}")
        logging.info(f"Average FAKE Confidence: {confidence:.2%}")
    else:
        logging.info("No faces were confidently detected in the video.")

    logging.info(f"Detection complete. Output video saved to {output_path}")

# Main execution block

In [5]:
if __name__ == "__main__":
    # --- User Configuration ---

    INPUT_VIDEO_PATH = "./id59_id61_0006.mp4"
    OUTPUT_PATH = "."

    # Load model and dependencies
    model, mtcnn, val_transforms, device, image_size = load_model_and_dependencies()

    if model:
        # Create output directory if it doesn't exist
        output_dir = os.path.dirname(OUTPUT_PATH)
        if output_dir and not os.path.exists(output_dir):
            os.makedirs(output_dir)

        # Run the deepfake detection process
        detect_deepfake_in_video(
            video_path=INPUT_VIDEO_PATH,
            output_path=OUTPUT_PATH,
            model=model,
            mtcnn=mtcnn,
            transforms=val_transforms,
            device=device,
            image_size=image_size,
        )

2025-10-19 04:32:43,874 - INFO - Using device: cuda
2025-10-19 04:32:44,548 - INFO - Successfully loaded model weights from models/best_deepfake_detector_model.pth
2025-10-19 04:32:44,559 - INFO - MTCNN face detector initialized.
2025-10-19 04:32:44,560 - INFO - Starting deepfake detection on ./id59_id61_0006.mp4


NameError: name 'i' is not defined