In [1]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
!pip install retina-face

In [4]:
from retinaface import RetinaFace
import numpy as np
import os
import cv2
import imutils

import shutil
import matplotlib

from numba import njit, jit

# Face Extractor
Return the original image but with the face cropped and an increase of 15% in the image area to have some context of the surrounding of the image

In [5]:
class FaceExtractor():
    def __call__(self, img):
        extracted_faces = RetinaFace.detect_faces(img)
        if not isinstance(extracted_faces, dict):
            return None # No faces were detected
        if not "face_1" in extracted_faces:
            return None

        facial_area = extracted_faces["face_1"]["facial_area"]
        face_extracted_img = self.crop_image(img, facial_area)
        return face_extracted_img

    def crop_image(self, img, facial_area):
        x1 = facial_area[0]
        y1 = facial_area[1]
        x2 = facial_area[2]
        y2 = facial_area[3]

        face_width = x2 - x1
        face_height = y2 - y1

        original_image_height, original_image_width, _ = img.shape

        begin_x, end_x = self.increase_area_from_cropping([x1, x2], face_width, original_image_width)
        begin_y, end_y = self.increase_area_from_cropping([y1, y2], face_height, original_image_height)

        img = img[begin_y:end_y, begin_x:end_x]

        return img

    def increase_area_from_cropping(self, xy, face_width_height, original_wh, increase_ratio = 0.1):
        begin_xy = xy[0] - int(face_width_height * increase_ratio)
        begin_xy = begin_xy if begin_xy >= 0 else 0
        end_xy = xy[1] + 1 + int(face_width_height * increase_ratio)
        end_xy = end_xy if end_xy <= original_wh else original_wh
        return begin_xy, end_xy

# Frames with faces extractor
Given an image, extracts the number of frames desired for the total of videoclips desired, extract the faces from the frames using the FaceExtractor and return an array of images with the extracted faces

In [6]:
def image_resize(img, target_size = 299):
    height, width, _ = img.shape

    if height <= target_size and width <= target_size:
        return img

    aspect_ratio = width / height
    # Reshaping based on the dominat dimension
    if aspect_ratio > 1:
        resized_img = imutils.resize(img, width = target_size)
    else:
        resized_img = imutils.resize(img, height = target_size)

    return resized_img


@njit
def calculate_central_pasting_position(target_dimension, original_dimension):
    begining = (target_dimension - original_dimension) // 2
    ending = begining + original_dimension
    return begining, ending

@njit
def black_border_image(img, target_height = 299, target_width = 299):
    original_height, original_width, _ = img.shape

    new_img = np.empty((target_height, target_width, 3), dtype=np.int32)
    new_img[:] = 0

    # Calculate the pasting position to set the face in the center of the image
    w1, w2   = calculate_central_pasting_position(target_width, original_width)
    h1, h2   = calculate_central_pasting_position(target_height, original_height)

    #Placing the face in the center of the image surrounded by black pixels
    for h_i in range(h1, h2):
        for w_i in range(w1, w2):
            new_img[h_i][w_i] = img[h_i - h1][w_i - w1]

    return new_img


# Check if it is possible to extract the desired number of frames from a video
def total_frames_to_extract(video_capture, target_number_of_frames):
    video_total_frames = int(cv2.VideoCapture.get(video_capture, cv2.CAP_PROP_FRAME_COUNT))
    if target_number_of_frames <= video_total_frames:
        return target_number_of_frames
    else:
        return video_total_frames


face_extraction = FaceExtractor()
def extract_frames_faces(video_source_path, target_number_of_frames):
    # Start OpenCv and check if the file exists
    video_capture = cv2.VideoCapture(video_source_path)
    if not video_capture.isOpened(): return None

    videos_frames = []
    frame_counter = 0
    total_frames = total_frames_to_extract(video_capture, target_number_of_frames)

    while frame_counter != total_frames:
        ret, frame = video_capture.read()
        if not ret: break

        # Extracting the face
        face = face_extraction(frame)
        if face is None: continue

        # Reshaping the face to match a 299x299 size limit and adding a black border
        h, w, _ = face.shape
        face = image_resize(face)
        face = black_border_image(face)

        # Adding the face to the list of faces that composes a video
        videos_frames.append(face)
        frame_counter += 1

    return videos_frames

# Generate videos from frames
Receives the name of the original file to save the new videos following a pattern. Checks the number os possible subclips according to the number of frames and generates it saving into the destination path.

In [7]:
class VideoGenerator():
    def __call__(self, video_name, frames, video_fps, target_number_subclips, destination_path, target_dimension = (299, 299)):
        # Checking if the total of subclips desired is feasible
        total_frames = len(frames)
        possible_number_of_subclips = total_frames //  video_fps
        if(possible_number_of_subclips < target_number_subclips):
            print(f"{video_name} possuirá {possible_number_of_subclips} subclips - Possui somente {total_frames} frames")
            if possible_number_of_subclips == 0: return

        total_subclips = possible_number_of_subclips
        total_frames = total_subclips * video_fps
        frames = frames[:total_frames]

        # Separating the frames into the videos
        videos = np.array(frames, dtype=np.uint8)
        videos = videos.reshape(total_subclips, video_fps, 299, 299, 3)

        # Generating the videos
        output_path = os.path.join(destination_path, f"{video_name}_segment_")
        self.generate_video(videos, output_path, video_fps, target_dimension)


    def generate_video(self, videos, destination_path, video_fps, target_dimension):
        codec = cv2.VideoWriter_fourcc(*'mp4v')

        for index, video in enumerate(videos):
            output_path = destination_path + f"{(index+1)}.mp4"

            video_output = cv2.VideoWriter(output_path, codec, video_fps, target_dimension)
            for frame in video:
                video_output.write(frame)

            video_output.release()

# Video Handler
Receive a video path, extract its frames and faces and generates new videos with the number of frames and videos segments as desired

In [8]:
class VideoHandler():
    def __init__(self):
        self.video_generator = VideoGenerator()

    def __call__(self, video_source_path, destination_path, frames_per_video = 24, total_videos_segments = 1):
        video_name = self.get_video_name(video_source_path)

        total_frames_to_be_extracted = frames_per_video * total_videos_segments

        frames = extract_frames_faces(video_source_path, total_frames_to_be_extracted)
        if not (frames is None):
            self.video_generator(video_name, frames, frames_per_video, total_videos_segments, destination_path)


    def get_video_name(self, video_source_path):
        original_video_name = os.path.basename(video_source_path)
        original_video_name = original_video_name.split(".")[0]
        return original_video_name



# Pipeline





In [12]:
def delete_files_and_folders(folder_path):
    # Iterate over all files and folders in the given path
    for root, dirs, files in os.walk(folder_path, topdown=False):
        # Delete all files
        for file_name in files:
            file_path = os.path.join(root, file_name)
            os.remove(file_path)

        # Delete all folders
        for dir_name in dirs:
            dir_path = os.path.join(root, dir_name)
            os.rmdir(dir_path)
            print(f"Deleted folder: {dir_path}")

    # After all files and folders are deleted, remove the top-level folder itself
    os.rmdir(folder_path)
    print(f"Deleted top-level folder: {folder_path}")

delete_files_and_folders("/content/Celeb-df-V2-faces-extracted")

Deleted top-level folder: /content/Celeb-df-V2-faces-extracted


In [13]:
def read_txt(txt_file_path, base_path):
    with open(txt_file_path, 'r') as file:
        dataset = file.read().split("\n")

    dataset = [os.path.join(base_path, video_path) for video_path in dataset]
    return dataset

In [14]:
base_folder = "Celeb-df-V2-faces-extracted"
os.mkdir(base_folder)
vh = VideoHandler()

def make_processing(dictionary, start = -1, end = -1, real_fake_selector = ['real', 'fake']):
    os.mkdir(dictionary["destination"])

    for real_fake in real_fake_selector:
        sub_dict = dictionary[real_fake]

        os.mkdir(sub_dict["destination"])

        dataset = []
        dataset = read_txt(sub_dict["txt_file"], '/content/')

        if start != -1 and end != -1:
            dataset = dataset[start : end]

        total_itens_dataset = len(dataset)

        for i, video in enumerate(dataset):
            vh(
                video_source_path     = video,
                destination_path      = sub_dict["destination"],
                frames_per_video      = sub_dict["frames_per_video"],
                total_videos_segments = sub_dict["total_videos_segments"]
            )
            if (i+1) % 10 == 0:
                print(f"{i+1}/{total_itens_dataset}", end = " | ")

        print("\n")

# test dataset

In [None]:
test_dict = {
    "destination": base_folder + "/test",

    "real":{
        "txt_file": "/content/test_real.txt",
        "destination": base_folder + "/test/real",
        "frames_per_video" : 24,
        "total_videos_segments": 1
    },

    "fake": {
        "txt_file": "/content/test_fake.txt",
        "destination": base_folder + "/test/fake",
        "frames_per_video" : 24,
        "total_videos_segments": 1
    }
}

make_processing(test_dict)

10/179 | 20/179 | 30/179 | 40/179 | 50/179 | 60/179 | 70/179 | 80/179 | 90/179 | 100/179 | 110/179 | 120/179 | 130/179 | 140/179 | 150/179 | 160/179 | 170/179 | 

10/340 | 20/340 | 30/340 | 40/340 | 50/340 | 60/340 | 70/340 | 80/340 | 90/340 | 100/340 | 110/340 | 120/340 | 130/340 | 140/340 | 150/340 | 160/340 | 170/340 | 180/340 | 190/340 | 200/340 | 210/340 | 220/340 | 230/340 | 240/340 | 250/340 | 260/340 | 270/340 | 280/340 | 290/340 | 300/340 | 310/340 | 320/340 | 330/340 | 340/340 | 



In [None]:
!zip -r test /content/Celeb-df-V2-faces-extracted/test

In [None]:
shutil.copy("/content/test.zip", "/content/drive/MyDrive")

'/content/drive/MyDrive/test.zip'

# val dataset

In [None]:
val_dict = {
    "destination": base_folder + "/val",

    "real":{
        "txt_file": "/content/val_real.txt",
        "destination": base_folder + "/val/real",
        "frames_per_video" : 24,
        "total_videos_segments": 2
    },

    "fake": {
        "txt_file": "/content/val_fake.txt",
        "destination": base_folder + "/val/fake",
        "frames_per_video" : 24,
        "total_videos_segments": 1
    }
}

make_processing(val_dict)

10/164 | 20/164 | 30/164 | 40/164 | 50/164 | 60/164 | 70/164 | 80/164 | 90/164 | 100/164 | 110/164 | 120/164 | 130/164 | 140/164 | 150/164 | 160/164 | 

10/327 | 20/327 | 30/327 | 40/327 | 50/327 | 60/327 | 70/327 | 80/327 | 90/327 | 100/327 | 110/327 | 120/327 | 130/327 | 140/327 | 150/327 | 160/327 | 170/327 | 180/327 | 190/327 | 200/327 | 210/327 | 220/327 | 230/327 | 240/327 | 250/327 | 260/327 | 270/327 | 280/327 | 290/327 | 300/327 | 310/327 | 320/327 | 



In [None]:
!zip -r val.zip /content/Celeb-df-V2-faces-extracted/val
shutil.copy("/content/val.zip", "/content/drive/MyDrive")

# train dataset

In [None]:
train_dict = {
    "destination": base_folder + "/train",

    "real":{
        "txt_file": "/content/train_real.txt",
        "destination": base_folder + "/train/real",
        "frames_per_video" : 24,
        "total_videos_segments": 9
    },

    "fake": {
        "txt_file": "/content/train_fake.txt",
        "destination": base_folder + "/train/fake",
        "frames_per_video" : 24,
        "total_videos_segments": 1
    }
}

make_processing(train_dict, start = 0, end = 137, real_fake_selector = ['real'])

24-05-05 16:21:23 - Directory /root/.deepface created
24-05-05 16:21:23 - Directory /root/.deepface/weights created
24-05-05 16:21:23 - retinaface.h5 will be downloaded from the url https://github.com/serengil/deepface_models/releases/download/v1.0/retinaface.h5


Downloading...
From: https://github.com/serengil/deepface_models/releases/download/v1.0/retinaface.h5
To: /root/.deepface/weights/retinaface.h5
100%|██████████| 119M/119M [00:00<00:00, 178MB/s]


In [None]:
!zip -r train.zip /content/Celeb-df-V2-faces-extracted/train
shutil.copy("/content/train.zip", "/content/drive/MyDrive")