In [1]:
import numpy as np
import pandas as pd
import os
import matplotlib
import seaborn as sns
import matplotlib.pyplot as plt
import cv2 as cv

In [2]:
DATASET_PATH = '../datasets/data/sample_dataset/'
TRAIN_PATH = DATASET_PATH + 'train_sample_videos/'
TEST_PATH = DATASET_PATH + 'test_videos/'
MODELS_PATH = '../models/'
HAAR_CASCADE_PATH = '../models/haar_cascade/'

In [3]:
train_files = os.listdir(TRAIN_PATH)
training_files = len(train_files)

test_files = os.listdir(TEST_PATH)
test_files = len(test_files)

print(f"Number of training files: {training_files}")
print(f"Number of test files: {test_files}")

Number of training files: 401
Number of test files: 400


In [4]:
from collections import defaultdict

fileTypes = {
    'training': defaultdict(lambda: 0),
    'testing': defaultdict(lambda: 0),
}

for file in os.listdir(TRAIN_PATH):
    extension = file.split('.')[-1]
    fileTypes['training'][extension] += 1

for file in os.listdir(TEST_PATH):
    extension = file.split('.')[-1]
    fileTypes['testing'][extension] += 1

fileTypes

{'training': defaultdict(<function __main__.<lambda>()>,
             {'mp4': 400, 'json': 1}),
 'testing': defaultdict(<function __main__.<lambda>()>, {'mp4': 400})}

In [5]:
import json


real_train_data = []
fake_train_data = []

data = json.load(open(TRAIN_PATH + 'metadata.json'))

for k, v in data.items():
    fake_train_data.append(k)
    if v['original'] is not None:
        real_train_data.append(v['original'])

print(len(fake_train_data))
print(len(real_train_data))
print(fake_train_data[:5])
print(real_train_data[:5])

400
323
['aagfhgtpmv.mp4', 'aapnvogymq.mp4', 'abarnvbtwb.mp4', 'abofeumbvv.mp4', 'abqwwspghj.mp4']
['vudstovrck.mp4', 'jdubbvfswz.mp4', 'atvmxvwyns.mp4', 'qzimuostzz.mp4', 'kbvibjhfzo.mp4']


In [6]:
def display_image_from_video_list(video_path_list, video_folder=TRAIN_PATH):
    '''
    input: video_path_list - path for video
    process:
    0. for each video in the video path list
        1. perform a video capture from the video
        2. read the image
        3. display the image
    '''
    plt.figure()
    fig, ax = plt.subplots(2,3,figsize=(16,8))
    for i, video_file in enumerate(video_path_list):
        video_path = os.path.join(video_folder,video_file)
        capture_image = cv.VideoCapture(video_path) 
        ret, frame = capture_image.read()
        frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
        ax[i//3, i%3].imshow(frame)
        ax[i//3, i%3].set_title(f"Video: {video_file}")
        ax[i//3, i%3].axis('on')

In [7]:
train_df = pd.read_json(TRAIN_PATH + 'metadata.json').T
train_df.head()

Unnamed: 0,label,split,original
aagfhgtpmv.mp4,FAKE,train,vudstovrck.mp4
aapnvogymq.mp4,FAKE,train,jdubbvfswz.mp4
abarnvbtwb.mp4,REAL,train,
abofeumbvv.mp4,FAKE,train,atvmxvwyns.mp4
abqwwspghj.mp4,FAKE,train,qzimuostzz.mp4


Preprocessing

- Split the video into frames
- crop the face from each frame
- save the face cropped video

In [8]:
def frame_extract(path):
    vidObj = cv.VideoCapture(path) 
    success = 1
    while success:
        success, image = vidObj.read()
        if success:
            yield image

frame_extract(TRAIN_PATH + train_files[0])

<generator object frame_extract at 0x7fca50d6cc80>

In [9]:
def count_number_of_frames(video_path):
    cap = cv.VideoCapture(video_path)
    length = int(cap.get(cv.CAP_PROP_FRAME_COUNT))
    cap.release()
    return length


a) Using Haarcascade

In [10]:

def create_face_videos_using_haar(input_path: str, output_path):
    # Create output directory if it doesn't exist
    no_of_frames = 0
    output_directory = os.path.dirname(output_path)
    os.makedirs(output_directory, exist_ok=True)
    face_cascade = cv.CascadeClassifier(HAAR_CASCADE_PATH + 'haarcascade_frontalface_alt2.xml')
    padding = 50
    cap = cv.VideoCapture(input_path)
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    out = cv.VideoWriter(output_path, cv.VideoWriter_fourcc('M','J','P','G'), 30, (112, 112))

    frames = []
    while True:
        no_of_frames = no_of_frames + 1
        success, frame = cap.read()
        if not success:
            break

        gray_frame = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray_frame, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

        if (len(faces) == 0):
            continue

        for (x, y, w, h) in faces:
            face_region = frame[y - padding :y+h + padding, x - padding :x+w + padding]
            resized_face = cv.resize(face_region, (112, 112))
            out.write(resized_face)
    cap.release()
    out.release()
    cv.destroyAllWindows()
    print('Total frames: ', no_of_frames)

# Replace the input and output paths with your actual paths
# create_face_videos_using_haar(TRAIN_PATH + train_files[2],DATASET_PATH + '../output_face/' + train_files[2].split('haar.')[0] + '.mp4')  



b) Using MTCNN to detect faces in the image

In [11]:
import cv2
from mtcnn.mtcnn import MTCNN

def create_face_videos_using_mtcnn(input_path, output_path):
    detector = MTCNN()
    # Create an object to read
    # from camera

    print(count_number_of_frames(input_path))

    video = cv2.VideoCapture(input_path)

    if (video.isOpened() == False):
        print("Error reading video file")

    padding = 50

    result = cv.VideoWriter(output_path, cv.VideoWriter_fourcc('M','J','P','G'), 30, (112, 112))

    while (True):
        ret, frame = video.read()
        if ret == True:

            location = detector.detect_faces(frame)
            if len(location) > 0:
                for face in location:
                    x, y, w, h = face['box']
                    # x2, y2 = x + width, y + height
                    face_region = frame[y - padding :y+h + padding, x - padding :x+w + padding]
                    resized_face = cv.resize(face_region, (112, 112))
                    result.write(resized_face)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        # Break the loop
        else:
            break


    video.release()
    result.release()

    # Closes all the frames
    cv2.destroyAllWindows()

    print("The video was successfully saved")

input_path = TRAIN_PATH + train_files[2]
output_path = DATASET_PATH + '../output_face/' + train_files[2].split('.')[0] + 'new.mp4'
# create_face_videos_using_mtcnn(input_path, output_path)

2023-08-29 20:33:17.238524: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-08-29 20:33:17.255971: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-08-29 20:33:17.401918: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-08-29 20:33:17.403461: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [12]:
total_train_frames = 0
total_error_free_videos = 0

for file in train_files:
    if file == "metadata.json":
        continue
    path = os.path.join(TRAIN_PATH, file)
    try:
        cap = cv2.VideoCapture(path)
        total_train_frames += int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        total_error_free_videos += 1
    except:
        print("Error with file: ", file)
        continue

print("Total train frames: ", total_train_frames)
print("Total error free videos: ", total_error_free_videos)


mtcnn_time = 180 / 300
haar_time = 11.2 / 300
print("MTCNN takes ", mtcnn_time, " seconds per frame")
print("MTCNN takes ", haar_time, " seconds per frame")


total_mtcnn_time = total_train_frames * mtcnn_time
total_haar_time = total_train_frames * haar_time

def get_time_in_hrs_mins_and_secs(time):
    hrs = int(time / 3600)
    remaining = time - (hrs * 3600)
    mins = int(remaining / 60)
    remaining = remaining - (mins * 60)
    secs = int(remaining)
    return hrs, mins, secs

print("Total MTCNN time: ", total_mtcnn_time, " = ", get_time_in_hrs_mins_and_secs(total_mtcnn_time), "(hrs, mins, secs)")
print("Total Haar time: ", total_haar_time, " = ", get_time_in_hrs_mins_and_secs(total_haar_time), "(hrs, mins, secs)")



Total train frames:  119974
Total error free videos:  400
MTCNN takes  0.6  seconds per frame
MTCNN takes  0.03733333333333333  seconds per frame
Total MTCNN time:  71984.4  =  (19, 59, 44) (hrs, mins, secs)
Total Haar time:  4479.029333333333  =  (1, 14, 39) (hrs, mins, secs)


In [None]:
from typing_extensions import Literal

def pre_process_dataset(method: Literal["haar", "mtcnn"]):
    if method == "haar":
        for file_name in train_files:
            try:
                create_face_videos_using_haar(TRAIN_PATH + file_name, DATASET_PATH + '../output_face/' + file_name.split('.')[0] + '.mp4')
            except:
                print("Error with file: ", file_name)
                continue
    elif method == "mtcnn":
        for file_name in train_files:
            create_face_videos_using_mtcnn(TRAIN_PATH + file_name, DATASET_PATH + '../output_face/' + file_name.split('.')[0] + '.mp4')
    else:
        raise ValueError("Invalid method")

pre_process_dataset("haar")