<a href="https://colab.research.google.com/github/mark-polo/machine-learning/blob/main/Deepfake_detector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Utils.py

In [8]:
"""The code is a Python module that contains several functions for loading, manipulating and preparing data for a
DeepFake video detection project.

It imports the following libraries:
        - os
        - cv2
        - numpy
        - pandas
        - tensorflow.keras.

Methods and them describe:
_____

    create_df(json_path: str = None, file_paths: np.ndarray = None, file_labels: np.ndarray = None, mode: int = 0): This
    function creates a Pandas DataFrame either from a JSON file or from passed data. It takes four arguments: json_path (
    a string that is a path to a JSON file), file_paths (a Numpy array of the paths which was get from filename),
    file_labels (a Numpy array of the labels which was get from filename), and mode (an integer that takes values 0 and
    1. Value 0 means that data to creating a dataframe take from the JSON file, and 1 means data be transferred to the
    method). The function returns the created dataframe, labels, path list and a dataframe created from passed data.

    play_video(path: str, title: str = ""): This function displays a video by using the OpenCV library. It takes two
    arguments: path (a string that is a path to the video file) and title (a string that is the title of the video). The
    function returns nothing.

    crop_center_square(frame): This function is used to reduce the size of frames and create new images with size (
    224x224). If we just try to resize images by size (224x224), we will get poor quality squeezing images. It takes one
    argument frame (a frame from the video and preparing them) and returns a new image.

    load_video(path: str, max_frames: int = 0, resize=(img_size, img_size)) -> np.ndarray: This function loads a video,
    resizes the frames and changes color space from BGR to RGB. All images are saved to an array and then put to the
    numpy array because the machine learning model works with numpy array. It takes three arguments: path (a string that
    is a path to the video file), max_frames (an integer that checks the emptiness list of frames and if it has then
    break), and resize (a tuple of the image size, default 224x224). The function returns a numpy array of the frames.

    pretrain_feature_extractor(): This function is used to extract meaningful features from images by means of the
    pretrained Inception V3 model with Imagenet weights. The function returns a pretrained model.

Example:
------
    For creating dataframe:
        df_json, df, labels, label_list, path_list = create_df(
            "path")
    If we're using prepare video finction:
        train_data, train_labels = prepare_all_videos(train, train_path, feature_extractor)
    And when we're using predict:
        prediction(path=test_videos, feature_extractor=feature_extractor, model=model)

"""

import os

import cv2 as cv
import numpy as np
import pandas as pd
from tensorflow import keras


def create_df(json_path: str = None, file_paths: np.ndarray = None, file_labels: np.ndarray = None, mode: int = 0):
    """
    Simply way to create a dataframe from json file using pandas function read_json. And also from passed data into
    method.

    :param mode: Takes a values 0 and 1. Value 0 means that data to creating dataframe take from json file ,
    and 1 means data be transferred to method
    :param file_labels: Takes a numpy array of the labels which was get from filename
    :param file_paths:Takes a numpy array of the paths which was get from filename
    :param json_path: Takes a path to the json file
    :return: A dataframe created from json file, dataframe created from passed data, labels and filenames
    """

    # create df from json file
    df_json = pd.read_json(json_path).T
    # go through list of the indexes (df_json.index) and with using global variable TRAIN_PATH,
    # create path to the images
    path_list = list(map(lambda i: os.path.join(train_path, i), df_json.index))
    # go through labels of the df and save them in list
    label_list = list(df_json["label"])

    if mode == 0:
        # create df
        filepath = pd.Series(data=path_list, name="Path").astype(str)
        labels = pd.Series(data=label_list, name="Label")
        df = pd.concat([filepath, labels], axis=1)
    elif mode == 1:
        # create df
        filepath = pd.Series(data=file_paths, name="Path").astype(str)
        labels = pd.Series(data=file_labels, name="Label")
        df = pd.concat([filepath, labels], axis=1)
    else:
        # If mode get wrong value , will get error
        raise Exception("Sorry, you are pass wrong value , parameter mode takes only 0 and 1")
    return df_json, df, labels, label_list, path_list


def play_video(path: str, title: str = ""):
    """
    This is simply method to showing video by using opencv library.

    :param path: Takes a path to the video.
    :param title: Takes a title
    :return: None
    """
    print(path)
    video = cv.VideoCapture(path)
    while video.isOpened():
        _, frame = video.read()
        cv.imshow(title, frame)
        # Press esc to escape
        key = cv.waitKey(1)
        if key == 27:
            break
    # When everything done, release
    # the video capture object
    video.release()
    # closing all open windows
    cv.destroyAllWindows()


def crop_center_square(frame):
    """
    This is method using to reduce size frames and creating new images with size (224x224).If just try to
    resize images by size (224x224) , you're got poor quality squeezing images

    :param frame: Takes a frames from video and preparing them
    :return: A new images
    """

    h, w = frame.shape[0:2]
    min_dim = min(h, w)
    center_x = (w // 2) - (min_dim // 2))
    center_y = (h // 2) - (min_dim // 2)
    return frame[center_y: center_y + min_dim, center_x: center_x + min_dim]


def load_video(path: str, _frames: int = 0, resize=(img_size, img_size)) -> np.ndarray:
    """
    This is method to loading the video. Here video resizing  and changing color space from BGR
    to RGB. All images saving to the array and then put to the numpy array , because the machine learning
    model working with numpy array.

    :param path: Takes a path to the video.
    :param _frames: Takes a value that checks emptiness list of frames and if is has then break
    :param resize: Takes a tuple of the images size default 224x224
    :return: A numpy array of the frames
    """

    cap = cv.VideoCapture(path)
    frames = []
    try:
        while 1:
            success, frame = cap.read()
            # if success will be false then break , that's mean video can't read
            if not success:
                break
            frame = crop_center_square(frame)
            frame = cv.resize(frame, resize)
            # Set RGB color also can use this [2, 1, 0] "frame[:, :, [2, 1, 0]]"
            frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
            frames.append(frame)
            # if you haven't any frames
            if len(frames) == _frames:
                break
    finally:
        # When everything done, release
        # the video capture object
        cap.release()
    return np.array(frames)


def pretrain_feature_extractor():
    """
    This is method using to extract meaningful features from images by means of pretrained Inception V3 model
    with imagenet weights
    :return: A pretrained model
    """

    feature_extractor = keras.applications.InceptionV3(
        weights="path",
        include_top=False,
        pooling="avg",
        input_shape=(img_size, img_size, 3)
    )
    # preprocessing data between -1 and 1
    preprocess_input = keras.applications.inception_v3.preprocess_input

    inputs = keras.Input((img_size, img_size, 3))
    # Here returns tf.Tensor(type float32) instead numpy.array
    # The images are converted from RGB to BGR, then each color channel
    # is zero-centered with respect to the ImageNet dataset, without scaling.
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


def prepare_all_videos(df: pd.DataFrame, path: str, feature_extractor):
    """
    This method is need for take of the video frames and extract features from them.

    'feature_extractor.predict(batch[None, j, :])': the method that takes the most important features of frames of the
    video by using the CNN model and reducing dimensions from 224x224x3 (150528) to for train set (2048,) where 2048
    count of the features. And whole shape for the RNN model has (280, 30, 2048) where 280 counts of the samples for
    train set, 30 counts of the frames and 2048 count of the extracted features, in a case with test set count of the
    samples have 120.

    'temp_frame_mask[i, :length]': fills 1 (True) means doesn't need to do padding, if the lengths of the frames (not
    only frames that techies can apply to text) are different, need to add some paddings and when paddings adding
    they are filling False. When padding adds needn't filling by True. Shape here (30,).

    Then temporary variables putting to the numpy ndarray with shapes, for features (280/120 , 30, 2048) and mask
    (280/120, 30)

    :param df: Takes a dataframe
    :param path: Takes a path to dataset
    :param feature_extractor: Takes a pretrained model which will extract features from frames of the video
    :return: A features , masks  and labels to further feeding them to RNN model
    """

    num_samples = len(df)
    video_names = list(df.index)
    # get labels and preprocessing them
    labels = df["label"].values
    labels = np.array(labels == "FAKE").astype(int)

    # `frame_masks` and `frame_features` are what will feed to sequence model.
    # `frame_masks` will contain a bunch of booleans denoting if a timestep is
    #  masked with padding or not.
    #  MAX_SEQ_LENGTH - timestep (size of sequences)
    #  NUM_FEATURES - features (count of the neurons of RNN)
    frame_mask = np.zeros(shape=(num_samples, max_seq_length), dtype="bool")
    frame_feature = np.zeros(shape=(num_samples, max_seq_length, num_features), dtype="float32")

    # go through all images
    for idx, items in enumerate(video_names):
        frames = load_video(os.path.join(path, items))
        frames = frames[None, ...]  # (1, 300, 224, 224, 3)
        print("Frames shape : ", frames.shape)

        # initialize placeholders to store the masks and features of the current video.
        temp_frame_mask = np.zeros(shape=(1, max_seq_length,), dtype="bool")
        temp_frame_features = np.zeros(shape=(1, max_seq_length, num_features), dtype="float32")

        # extract features from the frames of the current video.
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            print("Look at the video_length : ", video_length)
            # video_length equal to 300 and max_seq_length 30 , so here is always ganna be 30
            length = min(max_seq_length, video_length)
            print(length)
            for j in range(length):
                # Go through 30 frames in each images of the all (batch[None, j, :])
                # and put results of the prediction
                # to the temp_frame_features variable here [i, j, :],
                # j - 30 frames of the images
                # Shape (2048,)
                temp_frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
                print(i, " ", j)
                print(temp_frame_features[i, j, :].shape)
            # When pass 1 to mask this means nothing will pad.
            # Nothing need to add to frame that adjust it to right length
            # Shape (30,)
            temp_frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

        # features and masks whose stored in temp variables put to main variables
        frame_feature[idx,] = temp_frame_features.squeeze()
        frame_mask[idx,] = temp_frame_mask.squeeze()

    return (frame_feature, frame_mask), labels


def prepare_single_video(frames, feature_extractor):
    """
    This method is similar to prepare_all_videos , but here don't prepare labels and saved features, mask immediately
    to blank.

    'feature_extractor.predict(batch[None, j, :])': the method that takes the most important features of frames of the
    video by using the CNN model and reducing dimensions from 224x224x3 (150528) to for train set (2048,) where 2048
    count of the features. And whole shape for the RNN model has (1, 30, 2048) where 1 count of the samples for
    one video, 30 counts of the frames and 2048 count of the extracted features.

    'temp_frame_mask[i, :length]': fills 1 (True) means doesn't need to do padding, if lengths of the frames (not only
    frames that techies can apply to text) are different, need to add some paddings and when paddings adding they are
    filling False. When padding adds needn't filling by True. Shape here (30,).

    :param frames: Takes a frames of the video
    :param feature_extractor: Takes a pretrained model
    :return: A features and mask
    """

    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, num_features,), dtype="bool")
    frame_features = np.zeros(shape=(1, num_features, num_features), dtype="float32")

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(num_features, video_length)
        for j in range(length):
            # Shape(2048,)
            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
        # Shape (30,)
        frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

    return frame_features, frame_mask


def prediction(feature_extractor, model, path = None, video = None):
    """
    This method is making predict of the model

    :param video: Takes a live video
    :param path: Takes a path to test directory
    :param feature_extractor: Takes a pretrained feature extractor model
                              because method prepare_single_video is implementing here
    :param model: Takes a already trained model
    :return: A prediction
    """

    frames = load_video(path)
    features, masks = prepare_single_video(frames if video is None else video, feature_extractor)
    return model.predict([features, masks])[0]

Rnn.py

In [5]:
"""
This module contains a class to create an RNN model for training.

This module contains the following class:
RNN: A class to create an RNN model for training.

"""

from tensorflow import keras


class RNN:
    """
    This class is used to create a RNN model.

    Attributes:
        rate (float): A dropout rate.
        units (int): A count of the outputs in last dense layer in the RNN model.

    Methods:
        training():
        This method creates and returns the RNN model.
    """

    def __init__(self, rate, units):
        """
        Initializes an instance of the RNN class.

        Args:
            rate (float): A dropout rate.
            units (int): A count of the outputs in last dense layer in the RNN model.

        Returns:
            None.

        """
        self.rate = rate
        self.units = units

    def training(self):
        """
        This method creates and returns the RNN model.

        Args:
            None.

        Returns:
            model (tensorflow.python.keras.engine.functional.Functional): A RNN model.

        """

        # Define inputs
        features_input = keras.Input((max_seq_length, num_features))
        mask_input = keras.Input((max_seq_length,), dtype="bool")

        # Define layers
        # return_sequences=True - (Many to Many) for each recursion get a values and make
        # a tensor with shape (batch, timestep, features) and can be linking with next layer
        x = keras.layers.GRU(32, return_sequences=True)(features_input, mask=mask_input)
        x = keras.layers.GRU(16)(x)
        x = keras.layers.Dropout(self.rate)(x)
        x = keras.layers.Dense(128, activation="relu")(x)
        output = keras.layers.Dense(self.units, activation="sigmoid")(x)

        # Define model
        model = keras.Model([features_input, mask_input], output)
        # Compile model

        model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["AUC"])

        # Display model summary
        model.summary()

        return model

Global_variabels.py

In [3]:
train_path = "path"
img_size = 224
batch_size = 8
epochs = 30
max_seq_length = 50  # timestep (size of sequences)
num_features = 2048  # feature (count of the neurons of RNN)
units = 1  # count of the outputs in last dense layer in the rnn model one because model will be
# trained to distinguish a FAKE videos from a REAL.
test_path = "path"
checkpoint_path = "path"

Coach.py

In [7]:
"""
This module contains the fit() function for training a deep learning model for video classification.

It uses the following modules:
    - os
    - numpy
    - pandas
    - keras.models.load_model
    - sklearn.model_selection.train_test_split


Parameters:
-----------
    model_name : str, optional
        A string value used to name the model.
    pre_trained_model_path : str, optional
        Path to the pre-trained model.
    load_video_file : str, optional
        Path to the video file for which the user wants to make a prediction.
    mode : int, optional
        Integer value indicating the mode. 0 means the model will be trained and saved, 1 means a saved model will be used, and 2 means live video will be used.
    to : int, optional
        The end index of the images used for validation.
    frm : int, optional
        The start index of the images used for validation.


Returns:
--------
    None


Example:
--------
For training model from scratch:
    fit(model_name="deepfake_cnn_rnn_5.h5", pre_trained_model_path="/checkpoint/deepfake_cnn_rnn_5.h5", mode=0)

For situation where model trained and mode equal 1:
    fit(model_name="deepfake_cnn_rnn_5.h5", pre_trained_model_path="/checkpoint/deepfake_cnn_rnn_5.h5", mode=1, frm=10, to=100)

In situation where model trained and mode equal 2:
    fit(model_name="deepfake_cnn_rnn_5.h5", pre_trained_model_path="/checkpoint/deepfake_cnn_rnn_5.h5", mode=2)
"""

import os
import numpy as np
import pandas as pd
from keras.models import load_model
from sklearn.model_selection import train_test_split


def fit(model_name: str = None, pre_trained_model_path: str = None, load_video_file: str = None, mode: int = 0, frm: int = 0, to: int = 200) -> None:
    """
    This method is assembling all steps before

    :param model_name: A string value used to name the model.
    :param load_video_file: Path to the video file for which the user wants to make a prediction.
    :param pre_trained_model_path: Path to the pre-trained model.
    :param mode: Integer value indicating the mode. 0 means the model will be trained and saved, 1 means a saved model will be used, and 2 means live video will be used.
    :param to: The end index of the images used for validation.
    :param frm: The start index of the images used for validation.

    Example:
    __________

        For training model from scratch:
            fit(model_name="deepfake_cnn_rnn_5.h5", pre_trained_model_path="/checkpoint/deepfake_cnn_rnn_5.h5", mode=0)
        For situation where model trained and mode equal 1:
            fit(model_name="deepfake_cnn_rnn_5.h5", pre_trained_model_path="/checkpoint/deepfake_cnn_rnn_5.h5", mode=1, frm=10, to=100)
        In situation where model trained and mode equal 2:
            fit(model_name="deepfake_cnn_rnn_5.h5", pre_trained_model_path="/checkpoint/deepfake_cnn_rnn_5.h5", mode=2)
    """

    # Load feature extractor
    feature_extractor = pretrain_feature_extractor()

    if mode == 0:
        # Load data and split into train and test
        df_json, df, labels, label_list, path_list = create_df(
            "path")

        train, test = train_test_split(df_json, test_size=0.3, random_state=42, stratify=df_json['label'])
        print(f"Train set shape: {train.shape}, Test set shape: {test.shape}")

        # Prepare data for training and testing
        train_data, train_labels = prepare_all_videos(train, train_path, feature_extractor)
        test_data, test_labels = prepare_all_videos(test, train_path, feature_extractor)

        print(f"Train frame features shape: {train_data[0].shape}, Train frame masks shape: {train_data[1].shape}")
        print(f"Test frame features shape: {test_data[0].shape}, Test frame masks shape: {test_data[1].shape}")

        # Initialize and train model
        model = RNN(0.4, units).training()
        model.fit(
            [train_data[0], train_data[1]],
            train_labels,
            validation_data=([test_data[0], test_data[1]], test_labels),
            epochs=epochs,
            batch_size=batch_size
        )

        # Save model
        os.chdir(checkpoint_path)
        model.save(model_name)

    elif mode == 1:
        # Load pre-trained model
        model = load_model(pre_trained_model_path)

        # Select video to predict on
        test_df = pd.read_json("path").T
        test_paths = list(map(lambda x: os.path.join(test_path, x), test_df.index[frm:to]))
        test_videos = np.random.choice(test_paths) if load_video_file is None else load_video_file

        # Make prediction and display result
        p = prediction(path=test_videos, feature_extractor=feature_extractor, model=model)
        if p <= 0.5:
            play_video(test_videos, f"Real with probability {100 - p}")
        else:
            play_video(test_videos, f"Fake with probability {p}")
    elif mode == 2:
        detector(model_path=pre_trained_model_path)
    else:
        raise Exception("Sorry, you are pass wrong value , parameter mode takes only 0, 1 and 2")

Face_detection.py

In [9]:
"""
Detects fake faces in a video stream using a pre-trained deep learning model.

:param model_path: Path to the pre-trained Keras model.
:param size: Scaling factor for the video frames. Default is 2.
:return: The function displays the results in real-time.
:raises: If the model_path is invalid or the model cannot be loaded.

Example:
________

    detect_fake_video("deepfake_cnn_rnn_5.h5", size=2)
"""

import cv2 as cv
from keras.models import load_model
import numpy as np


def detector(model_path: str, size: int = 2) -> None:
    """
    Detects fake faces in a video stream using a pre-trained deep learning model.

    :param model_path: Path to the pre-trained Keras model.
    :param size: Scaling factor for the video frames. Default is 2.
    :return: The function displays the results in real-time.
    :raises: If the model_path is invalid or the model cannot be loaded.

    Example:
    ________

        detector("deepfake_cnn_rnn_5.h5", size=2)
    """

    # Load the model
    model = load_model(model_path)

    # Load the face cascade classifier
    cascade = cv.CascadeClassifier("path")

    # Initialize the video capture device
    cap = cv.VideoCapture(0)

    # Load the pre-trained feature extractor
    feature_extractor = pretrain_feature_extractor()

    while True:
        # Capture a frame from the video stream
        _, frame = cap.read()

        # Flip the frame horizontally
        frame = cv.flip(frame, 1, 1)

        # Resize the frame
        w = int(frame.shape[1] // size)
        h = int(frame.shape[0] // size)
        dim = (w, h)
        res = cv.resize(frame, dim)

        # Detect faces in the frame
        faces = cascade.detectMultiScale(res)

        for face in faces:
            # Scale the face coordinates
            (x, y, w, h) = [v * size for v in face]
            x = int(x)
            y = int(y)
            w = int(w)
            h = int(h)

            # Extract the face from the frame
            face_img = frame[y:y + h, x:x + w]

            # Resize and reshape the face image for the model
            resized = cv.resize(face_img, (224, 224))
            reshaped = np.reshape(resized, (1, 224, 224, 3))
            reshaped = np.vstack([reshaped])

            # Use the trained model to predict whether the face image is real or fake
            result = prediction(feature_extractor=feature_extractor, video=reshaped, model=model)

            # Display the result on the frame
            if result <= 0.5:
                cv.putText(frame, "This is Real", (x, y - 10), cv.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
            else:
                cv.putText(frame, "This is Fake", (x, y - 10), cv.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

        # Display the frame
        cv.imshow("Fake Detection", frame)

        # Check for exit key
        key = cv.waitKey(1)
        if key == 27:
            break

    # Release resources
    cap.release()
    cv.destroyAllWindows()

main.py

In [11]:
import argparse

"""
1. python3 main.py --model_name=deepfake_cnn_rnn_6.h5 --mode=0 

2. python3 main.py --pre_trained_model_path=path --mode=1 --load_video_file=path

3. python3 main.py --pre_trained_model_path=path --mode=2
"""


def run_script(model_name: str = None, pre_trained_model_path: str = None, load_video_file: str = None, mode: int = 0,
               frm: int = 0, to: int = 200):
    fit(
        model_name=model_name,
        pre_trained_model_path=pre_trained_model_path,
        load_video_file=load_video_file,
        mode=mode,
        to=to,
        frm=frm
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Train or use a deepfake detection model")
    parser.add_argument("--model_name", type=str, default=None, help="A string value used to name the model")
    parser.add_argument("--pre_trained_model_path", type=str, default=None, help="Path to the pre-trained model")
    parser.add_argument("--load_video_file", type=str, default=None,
                        help="Path to the video file for which to make a prediction")
    parser.add_argument("--mode", type=int, default=0, choices=[0, 1, 2],
                        help="Integer value indicating the mode. 0 means the model will be trained and saved, "
                             "1 means a saved model will be used, and 2 means live video will be used")
    parser.add_argument("--to", type=int, default=200, help="The end index of the images used for validation")
    parser.add_argument("--frm", type=int, default=0, help="The start index of the images used for validation")

    args = parser.parse_args()
    run_script(model_name=args.model_name, pre_trained_model_path=args.pre_trained_model_path,
               load_video_file=args.load_video_file, mode=args.mode, to=args.to, frm=args.frm)

usage: ipykernel_launcher.py [-h] [--model_name MODEL_NAME]
                             [--pre_trained_model_path PRE_TRAINED_MODEL_PATH]
                             [--load_video_file LOAD_VIDEO_FILE]
                             [--mode {0,1,2}] [--to TO] [--frm FRM]
ipykernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-2d2aceb6-f626-4de2-9fb6-52c5a9765862.json


SystemExit: ignored