<a href="https://colab.research.google.com/github/rahul99-collab/DUO-CONNECT/blob/main/PreprocessAndSpiltting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install opencv-python numpy tqdm moviepy




In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
import os

video_folder = "/content/drive/MyDrive/deepfake"  # Folder where 60 videos are stored
output_frames = "/content/drive/MyDrive/Extracted frames"  # Extracted frames
output_preprocessed = "/content/drive/MyDrive/Preprocessed frames"  # Preprocessed frames
split_dataset_path = "/content/drive/MyDrive/Final dataset split"  # Train/Val/Test folders

# Create directories if they don't exist
os.makedirs(output_frames, exist_ok=True)
os.makedirs(output_preprocessed, exist_ok=True)
os.makedirs(split_dataset_path, exist_ok=True)


In [None]:
import cv2
import glob

def extract_frames_from_videos(video_folder, output_folder, frame_rate=1):
    """
    Extract frames from multiple videos and save them as images.
    - video_folder: Folder containing videos.
    - output_folder: Where to save extracted frames.
    - frame_rate: Frames per second to extract.
    """
    video_files = glob.glob(os.path.join(video_folder, "*.mp4"))  # Get all videos
    print(f"Found {len(video_files)} videos.")

    for video_path in video_files:
        video_name = os.path.basename(video_path).split(".")[0]
        video_output_folder = os.path.join(output_folder, video_name)
        os.makedirs(video_output_folder, exist_ok=True)

        cap = cv2.VideoCapture(video_path)
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        frame_interval = max(1, fps // frame_rate)

        count = 0
        frame_number = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if count % frame_interval == 0:
                frame_filename = os.path.join(video_output_folder, f"{video_name}_frame_{frame_number:04d}.jpg")
                cv2.imwrite(frame_filename, frame)
                frame_number += 1
            count += 1
        cap.release()
        print(f"Extracted frames from {video_name}.")

# Run extraction
extract_frames_from_videos(video_folder, output_frames)


Found 60 videos.
Extracted frames from id7_0006.
Extracted frames from id6_0004.
Extracted frames from id6_0006.
Extracted frames from id6_0000.
Extracted frames from id6_0005.
Extracted frames from id7_0009.
Extracted frames from id7_0005.
Extracted frames from id7_0002.
Extracted frames from id6_0008.
Extracted frames from id7_0000.
Extracted frames from id6_0002.
Extracted frames from id7_0004.
Extracted frames from id6_0003.
Extracted frames from id7_0003.
Extracted frames from id7_0007.
Extracted frames from id6_0009.
Extracted frames from id7_0008.
Extracted frames from id6_0007.
Extracted frames from id6_0001.
Extracted frames from id7_0001.
Extracted frames from id9_0006.
Extracted frames from id9_0004.
Extracted frames from id9_0001.
Extracted frames from id9_0002.
Extracted frames from id9_0007.
Extracted frames from id9_0009.
Extracted frames from id9_0005.
Extracted frames from id9_0000.
Extracted frames from id9_0003.
Extracted frames from id9_0008.
Extracted frames from i

In [None]:
import numpy as np

def preprocess_images(image_folder, output_folder, img_size=(224, 224)):
    """
    Resize and normalize all images in a folder.
    - image_folder: Path containing video frame folders.
    - output_folder: Path to save preprocessed images.
    - img_size: Target size for resizing.
    """
    os.makedirs(output_folder, exist_ok=True)

    for video_folder in os.listdir(image_folder):
        video_path = os.path.join(image_folder, video_folder)
        output_video_folder = os.path.join(output_folder, video_folder)
        os.makedirs(output_video_folder, exist_ok=True)

        for img_name in os.listdir(video_path):
            img_path = os.path.join(video_path, img_name)
            img = cv2.imread(img_path)

            if img is None:
                continue

            img = cv2.resize(img, img_size)  # Resize
            img = img / 255.0  # Normalize
            img = (img * 255).astype(np.uint8)  # Convert back

            cv2.imwrite(os.path.join(output_video_folder, img_name), img)

    print(f"Preprocessed images saved in {output_folder}")

# Run preprocessing
preprocess_images(output_frames, output_preprocessed)


Preprocessed images saved in /content/drive/MyDrive/Preprocessed frames


In [None]:
import shutil
import random

def split_dataset(image_folder, output_base_folder, train_ratio=0.8, val_ratio=0.1):
    """
    Split dataset into train, validation, and test sets.
    - image_folder: Folder with processed frames.
    - output_base_folder: Folder to store train, val, test sets.
    - train_ratio: Percentage of data for training.
    - val_ratio: Percentage of data for validation.
    """
    os.makedirs(output_base_folder, exist_ok=True)

    for video_folder in os.listdir(image_folder):
        video_path = os.path.join(image_folder, video_folder)
        images = [f for f in os.listdir(video_path) if f.endswith('.jpg')]
        random.shuffle(images)

        train_split = int(train_ratio * len(images))
        val_split = int((train_ratio + val_ratio) * len(images))

        subsets = {
            "train": images[:train_split],
            "val": images[train_split:val_split],
            "test": images[val_split:]
        }

        for subset, files in subsets.items():
            subset_folder = os.path.join(output_base_folder, subset, video_folder)
            os.makedirs(subset_folder, exist_ok=True)

            for file in files:
                src_path = os.path.join(video_path, file)
                dst_path = os.path.join(subset_folder, file)
                shutil.copy(src_path, dst_path)

    print("Dataset split completed.")

# Run dataset split
split_dataset(output_preprocessed, split_dataset_path)


Dataset split completed.


In [None]:
!pip install opencv-python-headless facenet-pytorch numpy




In [None]:
!pip install --upgrade --force-reinstall pillow


Collecting pillow
  Downloading pillow-11.1.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.1 kB)
Downloading pillow-11.1.0-cp311-cp311-manylinux_2_28_x86_64.whl (4.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.5/4.5 MB[0m [31m30.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pillow
  Attempting uninstall: pillow
    Found existing installation: pillow 10.2.0
    Uninstalling pillow-10.2.0:
      Successfully uninstalled pillow-10.2.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
facenet-pytorch 2.6.0 requires Pillow<10.3.0,>=10.2.0, but you have pillow 11.1.0 which is incompatible.[0m[31m
[0mSuccessfully installed pillow-11.1.0


In [None]:
import cv2
import os
import numpy as np
from facenet_pytorch import MTCNN

#  Define paths
VIDEO_FOLDER = "/content/drive/MyDrive/deepfake"
OUTPUT_FOLDER = "/content/drive/MyDrive/split_dataset_path"
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

#  Initialize MTCNN
mtcnn = MTCNN(keep_all=False, select_largest=True)

def process_video(video_path, output_path, frame_limit=100):
    """Extract face from each frame and create a face-only video."""
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    face_frames = []

    while cap.isOpened() and frame_count < frame_limit:
        ret, frame = cap.read()
        if not ret:
            break

        # Convert BGR to RGB for MTCNN
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Detect face and get bounding box
        boxes, _ = mtcnn.detect(rgb_frame)

        if boxes is not None and len(boxes) > 0:
            x1, y1, x2, y2 = map(int, boxes[0])  # Get first detected face

            # Ensure bounding box is within image bounds
            h, w, _ = frame.shape
            x1, y1, x2, y2 = max(0, x1), max(0, y1), min(w, x2), min(h, y2)

            # Extract face from original BGR image
            face = frame[y1:y2, x1:x2]

            if face.size > 0:
                # Resize for consistency
                face_resized = cv2.resize(face, (160, 160), interpolation=cv2.INTER_LANCZOS4)

                # Store extracted face frame
                face_frames.append(face_resized)

        frame_count += 1

    cap.release()

    # Convert extracted faces into a new video
    if len(face_frames) > 0:
        height, width, _ = face_frames[0].shape
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        fps = 30  # Adjust FPS as needed
        video_writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        for face in face_frames:
            video_writer.write(face)

        video_writer.release()
        print(f"Face-only video saved: {output_path}")
    else:
        print(f"No faces detected in {video_path}")

def process_all_videos(video_folder, output_folder, frame_limit=100):
    """Process all videos in a folder and generate face-only videos."""
    video_files = [f for f in os.listdir(video_folder) if f.endswith('.mp4')]

    if not video_files:
        print(" No videos found in the folder!")
        return

    for video_file in video_files:
        video_path = os.path.join(video_folder, video_file)
        output_path = os.path.join(output_folder, f"processed_{video_file}")

        print(f" Processing: {video_file}")
        process_video(video_path, output_path, frame_limit)

#  Process all videos in the folder
process_all_videos(VIDEO_FOLDER, OUTPUT_FOLDER)

print(" Face-only dataset creation complete!")


 Processing: id7_0006.mp4
Face-only video saved: /content/drive/MyDrive/split_dataset_path/processed_id7_0006.mp4
 Processing: id6_0004.mp4
Face-only video saved: /content/drive/MyDrive/split_dataset_path/processed_id6_0004.mp4
 Processing: id6_0006.mp4
Face-only video saved: /content/drive/MyDrive/split_dataset_path/processed_id6_0006.mp4
 Processing: id6_0000.mp4
Face-only video saved: /content/drive/MyDrive/split_dataset_path/processed_id6_0000.mp4
 Processing: id6_0005.mp4
Face-only video saved: /content/drive/MyDrive/split_dataset_path/processed_id6_0005.mp4
 Processing: id7_0009.mp4
Face-only video saved: /content/drive/MyDrive/split_dataset_path/processed_id7_0009.mp4
 Processing: id7_0005.mp4
Face-only video saved: /content/drive/MyDrive/split_dataset_path/processed_id7_0005.mp4
 Processing: id7_0002.mp4
Face-only video saved: /content/drive/MyDrive/split_dataset_path/processed_id7_0002.mp4
 Processing: id6_0008.mp4
Face-only video saved: /content/drive/MyDrive/split_dataset_pa

In [2]:
import os
import shutil
import random

# Define paths
PROCESSED_VIDEO_FOLDER = "/content/drive/MyDrive/split_dataset_path"  # Use preprocessed videos
OUTPUT_FOLDER = "/content/drive/MyDrive/output_data"

# Define split ratios
train_ratio = 0.8
val_ratio = 0.1
test_ratio = 0.1

# Create output directories
train_folder = os.path.join(OUTPUT_FOLDER, "train")
val_folder = os.path.join(OUTPUT_FOLDER, "val")
test_folder = os.path.join(OUTPUT_FOLDER, "test")

os.makedirs(train_folder, exist_ok=True)
os.makedirs(val_folder, exist_ok=True)
os.makedirs(test_folder, exist_ok=True)

# Get all videos
video_files = [f for f in os.listdir(PROCESSED_VIDEO_FOLDER) if f.endswith('.mp4')]

# Shuffle for randomness
random.shuffle(video_files)

# Split indices
total_videos = len(video_files)
train_split = int(train_ratio * total_videos)
val_split = int((train_ratio + val_ratio) * total_videos)

# Assign to sets
train_videos = video_files[:train_split]
val_videos = video_files[train_split:val_split]
test_videos = video_files[val_split:]

# Function to move videos
def move_videos(video_list, destination_folder):
    for video in video_list:
        src_path = os.path.join(PROCESSED_VIDEO_FOLDER, video)
        dst_path = os.path.join(destination_folder, video)
        shutil.move(src_path, dst_path)

# Move videos to respective folders
move_videos(train_videos, train_folder)
move_videos(val_videos, val_folder)
move_videos(test_videos, test_folder)

print("Video dataset split completed!")
print(f"Train: {len(train_videos)} videos")
print(f"Validation: {len(val_videos)} videos")
print(f"Test: {len(test_videos)} videos")

Video dataset split completed!
Train: 48 videos
Validation: 6 videos
Test: 6 videos
