<h1>pip</h1>

In [21]:
# Install necessary libraries
%pip install torch torchvision opencv-python matplotlib facenet-pytorch

Note: you may need to restart the kernel to use updated packages.


<h1>Dependencies</h1>

In [22]:
import torch
import torchvision.models as models
import cv2
import numpy as np
from scipy.spatial.distance import cosine
from torchvision import transforms
import torchvision.transforms as transforms
from facenet_pytorch import MTCNN
from PIL import Image

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cpu


<h1>Face Distortion</h1>

<h4>Loading pre trained model</h4>

In [45]:
# Load MTCNN for face detection
mtcnn = MTCNN(keep_all=True, device=device)

# Load a pre-trained MobileNetV2 model for deepfake detection
mobilenet_model = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v2', pretrained=True).to(device)
mobilenet_model.eval()

# Modify the final layer for binary classification (Real/Fake)
num_ftrs = mobilenet_model.classifier[1].in_features
mobilenet_model.classifier[1] = torch.nn.Linear(num_ftrs, 2).to(device)

# Define preprocessing transformations for MobileNetV2
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to match the input size of MobileNetV2
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalize the image
])

Using cache found in C:\Users\Harshal Shah/.cache\torch\hub\pytorch_vision_v0.10.0


<h4>Detection</h4>

In [46]:
# Function to detect deepfakes in real-time
def detect_deepfakes(video_path, skip_frames=5):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video at path {video_path}")
        return 0, 0

    frame_count = 0
    total_frames = 0
    distorted_faces = 0
    example_abnormal_frame = None  # To store one example of an abnormal frame

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Skip frames to reduce processing load
        frame_count += 1
        if frame_count % skip_frames != 0:
            continue

        # Increment total frame count
        total_frames += 1

        # Convert frame to RGB for MTCNN
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Detect faces using MTCNN
        boxes, _ = mtcnn.detect(rgb_frame)
        if boxes is None:
            continue

        # Process each detected face
        for box in boxes:
            x1, y1, x2, y2 = map(int, box)
            face = rgb_frame[y1:y2, x1:x2]

            # Preprocess the face for MobileNetV2
            face_pil = Image.fromarray(face)
            input_tensor = transform(face_pil).unsqueeze(0).to(device)

            # Perform inference with MobileNetV2
            with torch.no_grad():
                output = mobilenet_model(input_tensor)
                _, predicted = torch.max(output, 1)
                prediction = "Real" if predicted.item() == 0 else "Fake"

            # If distortion (deepfake) is detected
            if prediction == "Fake":
                distorted_faces += 1

                # Save one example of an abnormal frame
                if example_abnormal_frame is None:
                    example_abnormal_frame = frame.copy()

                # Draw bounding box and label on the frame
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
                cv2.putText(frame, "Distorted Face", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)

    cap.release()

    # Display one example of an abnormal frame
    if example_abnormal_frame is not None:
        print("Displaying one example of an abnormal frame:")
        cv2.imshow("Example Abnormal Frame", example_abnormal_frame)
    else:
        print("No abnormal frames detected.")

    return total_frames, distorted_faces

<h4>Testing</h4>

In [49]:
# First, initialize MTCNN
mtcnn = MTCNN(keep_all=True, device=device)

# Then run your test
video_path = 'billgates.mp4'
skip_frames = 5  # Process every 5th frame to improve performance

total_frames, distorted_faces = detect_deepfakes(video_path, skip_frames)
# Print the results
print(f"Total Frames Processed: {total_frames}")
print(f"Total Distorted Faces Detected: {distorted_faces}")

Displaying one example of an abnormal frame:
Total Frames Processed: 90
Total Distorted Faces Detected: 36


<h1>Frame Anomalies</h1>

<h4>Preprocessing</h4>

In [25]:
# Define preprocessing transformations with smaller resolution
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((112, 112)),  # Smaller input size for faster processing
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image
])

# Function to preprocess a single frame
def preprocess_frame(frame):
    frame = transform(frame).unsqueeze(0)  # Add batch dimension
    return frame

<h4>Loading pre trained model</h4>

In [26]:
# Load a smaller pre-trained model (e.g., MobileNet)
model = models.mobilenet_v2(pretrained=True)
model = torch.nn.Sequential(*list(model.children())[:-1])  # Remove the final classification layer
model.eval()

# Function to extract features from a frame using the pre-trained model
def extract_features(frame, model):
    with torch.no_grad():
        features = model(frame)
    return features.squeeze().numpy().flatten()  # Flatten the feature vector to 1D

# Threshold for anomaly detection (you may need to tune this based on your data)
ANOMALY_THRESHOLD = 0.85

<h4>Detection</h4>

In [30]:
# Function to calculate cosine similarity between two feature vectors
def cosine_similarity(vec1, vec2):
    return 1 - cosine(vec1, vec2)

# Function to detect frame anomalies and display only abnormal frames
def detect_frame_anomalies(video_path, skip_frames=5):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video at path {video_path}")
        return 0, 0

    prev_features = None
    frame_count = 0
    total_frames = 0
    abnormal_frames = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Skip frames to reduce processing load
        frame_count += 1
        if frame_count % skip_frames != 0:
            continue

        # Increment total frame count
        total_frames += 1

        # Preprocess the frame
        input_tensor = preprocess_frame(frame)

        # Extract features using the pre-trained model
        current_features = extract_features(input_tensor, model)

        # Compare with previous frame's features
        if prev_features is not None:
            similarity = cosine_similarity(prev_features, current_features)

            # Detect anomaly based on similarity threshold
            if similarity < ANOMALY_THRESHOLD:
                abnormal_frames += 1

                # Display the abnormal frame
                anomaly_status = "Anomaly Detected"
                color = (0, 0, 255)  # Red color for anomalies
                cv2.putText(frame, f"Status: {anomaly_status}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

                # Show the abnormal frame (optional: use cv2_imshow for Colab)
                cv2.imshow("Frame", frame)

        # Update previous features
        prev_features = current_features

    cap.release()
    return total_frames, abnormal_frames

<h4>Testing</h4>

In [31]:
# Test the function with your video file
video_path = 'billgates.mp4'
skip_frames = 5

total_frames, abnormal_frames = detect_frame_anomalies(video_path, skip_frames)

print(f"Total Frames Processed: {total_frames}")
print(f"Total Abnormal Frames Detected: {abnormal_frames}")

Total Frames Processed: 90
Total Abnormal Frames Detected: 2
