In [13]:
import tensorflow as tf
import numpy as np
import cv2
import os
import time
import pandas as pd
from PIL import Image


# from google.colab import drive
# drive.mount('/content/drive')

from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns
from keras.layers import Conv2D, MaxPooling2D, Flatten, LSTM, TimeDistributed, Dense, Dropout, BatchNormalization
from keras.callbacks import EarlyStopping, ModelCheckpoint


import warnings
import random
from tqdm.notebook import tqdm
warnings.filterwarnings('ignore')
%matplotlib inline

from tensorflow.keras.utils import to_categorical
from keras.preprocessing.image import load_img
from keras.models import Sequential
from keras.layers import Dense, Conv2D, Dropout, Flatten, MaxPooling2D


In [15]:
# DATASET PATHS

# TRAIN_DIR = "D:\Zeina\Documents\GUC\8th Semester\Datasets\Celeb-DF\extract_frames\Train"
# TEST_DIR = 'D:\Zeina\Documents\GUC\8th Semester\Datasets\Celeb-DF\extract_frames\Test'
# VAL_DIR = 'D:\Zeina\Documents\GUC\8th Semester\Datasets\Celeb-DF\extract_frames\Validation'


# TRAIN_DIR = "/content/drive/MyDrive/Celeb-DF/Kaggle_Train"
# VAL_DIR = "/content/drive/MyDrive/Celeb-DF/Validation"

DATA_DIR = '/kaggle/input/celeb-df-v2'

face_detector=cv2.CascadeClassifier('/kaggle/input/haar-cascades-for-face-detection/haarcascade_frontalface_default.xml')

img_size = 64
seq_length = 50
step = 10
batch_size = 32

In [16]:
# img = cv2.imread('/kaggle/input/test-img/IMG_0953.JPG')
# gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

# results = face_detector.detectMultiScale(gray, 1.3, 5)

# for (x,y,w,h) in results:
#     cv2.rectangle(img,(x,y),(x+w,y+h),(0,255,0),2)
#     faces = img[y:y + h, x:x + w] 
#     cv2.imshow("face",faces) 
  
# # cv2.imshow('img',img)
# cv2.waitKey(0)
# cv2.destroyAllWindows()

In [17]:
def detect_face(img):
#     gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    faces = face_detector.detectMultiScale(img, 1.3, 5)
    
    #if at least one face detected
    if(len(faces) > 0):
        for (x,y,w,h) in faces:
            cv2.rectangle(img,(x,y),(x+w,y+h),(0,255,0),2)
            face = img[y:y + h, x:x + w] #crop frame to face
            return face
    else:
        return img #if no face detected use frame as is
#         cv2.imshow("face",faces) 

In [18]:
def load_frames_from_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        face = detect_face(frame)  
        frame = cv2.resize(face, (img_size, img_size))
        frame = frame.astype('float32') / 255.0
        frames.append(frame)
    cap.release()

    # Pad or slice frames to have a consistent sequence length
    if len(frames) < seq_length:
        frames += [np.zeros((img_size, img_size, 3))] * (seq_length - len(frames))
    frames = frames[:seq_length]

    return np.array(frames)

In [25]:
def setup_dataset(root_path, real_files, fake_files, batch_size, is_training):
    frames, labels = [], []
    for category in ("Celeb-real", "Celeb-synthesis"):
        folder_path = os.path.join(root_path, category)
        if(category == "Celeb-real"):
            files = real_files
        else:
            files = fake_files
        # videos = os.listdir(folder_path)
        for video_name in tqdm(files):
            video_path = os.path.join(folder_path, video_name)
            print("video_path", video_path)
            vid_frames = load_frames_from_video(video_path)
            frames.append(vid_frames)
            labels.append(0 if category == "Celeb-real" else 1)

#     dataset = tf.data.Dataset.from_tensor_slices((frame_paths, labels))
#     if is_training:
#         dataset = dataset.shuffle(buffer_size=100)
#     dataset = dataset.batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)
#     return dataset

    return frames, labels

In [22]:
##### Splitting the dataset into train, validation, and test sets
from sklearn.model_selection import train_test_split

real_files = os.listdir(os.path.join(DATA_DIR, 'Celeb-real'))
fake_files = os.listdir(os.path.join(DATA_DIR, 'Celeb-synthesis'))

print(f"Real Files before dataset reduction: {len(real_files)}")
print(f"Fake Files before dataset reduction: {len(fake_files)}")

print("---------------------------------------------")

real_extra, real_sample = train_test_split(real_files, test_size=0.5, random_state=42)
fake_extra, fake_sample = train_test_split(fake_files, test_size=0.5, random_state=42)

print(f"Real Files after 50% reduction: {len(real_sample)}")
print(f"Fake Files after 50% reduction: {len(fake_sample)}")

print("---------------------------------------------")

real_train, real_test = train_test_split(real_sample, test_size=0.2, random_state=42)    # test size = 20%
real_train, real_val = train_test_split(real_train, test_size=0.1, random_state=42)     # validation size = 10% of train (80%) = 8%
real_train, real_extra = train_test_split(real_train, test_size=0.2, random_state=42)

fake_train, fake_test = train_test_split(fake_sample, test_size=0.2, random_state=42)   
fake_train, fake_val = train_test_split(fake_train, test_size=0.1, random_state=42)
fake_train, fake_extra = train_test_split(fake_train, test_size=0.2, random_state=42)

print("TESTING SET (20%)")
print(f"Real Test Files: {len(real_test)}")
print(f"Fake Test Files: {len(fake_test)}")

print("---------------------------------------------")

print("VALIDATION SET (10%)")
print(f"Real Validation Files: {len(real_val)}")
print(f"Fake Validation Files: {len(fake_val)}")

print("---------------------------------------------")

print("TRAINING SET (80%)")
print(f"Real Train Files: {len(real_train)}")
print(f"Fake Train Files: {len(fake_train)}")

print("---------------------------------------------")


train_files = real_train + fake_train
val_files = real_val + fake_val
test_files = real_test + fake_test

# shuffle training data
np.random.shuffle(train_files)

print(f"Total Train Files: {len(train_files)}")
print(f"Total Test Files: {len(test_files)}")
print(f"Total Validation: {len(val_files)}")


Real Files before dataset reduction: 590
Fake Files before dataset reduction: 5639
---------------------------------------------
Real Files after 50% reduction: 295
Fake Files after 50% reduction: 2820
---------------------------------------------
TESTING SET (20%)
Real Test Files: 59
Fake Test Files: 564
---------------------------------------------
VALIDATION SET (10%)
Real Validation Files: 24
Fake Validation Files: 226
---------------------------------------------
TRAINING SET (80%)
Real Train Files: 169
Fake Train Files: 1624
---------------------------------------------
Total Train Files: 1793
Total Test Files: 623
Total Validation: 250


In [23]:

# Calculate the number of steps per epoch for training and validation
train_steps_per_epoch = np.ceil(len(train_files) / batch_size).astype(int)
val_steps_per_epoch = np.ceil(len(val_files) / batch_size).astype(int)
test_steps_per_epoch = np.ceil(len(test_files) / batch_size).astype(int)

# Calculate the number of samples in each set
num_train_samples = len(train_files)
num_val_samples = len(val_files)
num_test_samples = len(test_files)

# Print the number of samples in each set
print("Number of training samples:", num_train_samples)
print("Number of validation samples:", num_val_samples)
print("Number of test samples:", num_test_samples)

print("train_steps_per_epoch:", train_steps_per_epoch)
print("val_steps_per_epoch", val_steps_per_epoch)
print("test_steps_per_epoch", test_steps_per_epoch)


Number of training samples: 1793
Number of validation samples: 250
Number of test samples: 623
train_steps_per_epoch: 57
val_steps_per_epoch 8
test_steps_per_epoch 20


In [None]:
# val_data = setup_dataset(DATA_DIR, real_val, fake_val, batch_size, is_training=False)

val_data = pd.DataFrame()
val_data['frame'], val_data['label'] = setup_dataset(DATA_DIR, real_val, fake_val, batch_size, False)
val_data.head()

val_data.to_csv('val_data.csv')

In [None]:
# test_data = setup_dataset(DATA_DIR, real_test, fake_test, batch_size, is_training=False)

test_data = pd.DataFrame()
test_data['frame'], test_data['label'] = setup_dataset(DATA_DIR, real_test, fake_test, batch_size, False)
test_data.head()

test_data.to_csv('test_data.csv')

In [None]:

# Set up training, validation, and test data
# train_data = setup_dataset(DATA_DIR, real_train, fake_train, batch_size, is_training=True)

train_data = pd.DataFrame()
train_data['frame'], train_data['label'] = setup_dataset(DATA_DIR, real_train, fake_train, batch_size, True)
train_data = train_data.sample(frac = 1) ##shuffle
train_data.head()

train_data.to_csv('train_data.csv')

## Emotion Recognition Model

In [None]:
#======================================================================================
#                               CNN COMPONENT
#======================================================================================

cnn_model = tf.keras.Sequential([
    Conv2D(8, (3, 3), activation='relu', input_shape=(img_size, img_size, 3)),  #8 filters, each 3x3, input images size 64x64x3(rgb channels), relu activation to output
    MaxPooling2D((2, 2)),   #max pooling with pool size 2x2 (selects max value in every 2x2 filter -> reduces spatial dimension by factor of 2
    Dropout(0.2),
    Conv2D(16, (3, 3), activation='relu'),  #16 filters, each 3x3, relu activation to output
    MaxPooling2D((2, 2)),      #max pooling with pool size 2x2 (selects max value in every 2x2 filter -> reduces spatial dimension by factor of 2
    Dropout(0.2),
    Conv2D(32, (3, 3), activation='relu'),  #32 filters, each 3x3, relu activation to output
    MaxPooling2D((2, 2)),       #max pooling with pool size 2x2 (selects max value in every 2x2 filter -> reduces spatial dimension by factor of 2
    Dropout(0.2),
    Conv2D(64, (3, 3), activation='relu'), #64 filters, each 3x3, relu activation to output
    MaxPooling2D((2, 2)),   #max pooling with pool size 2x2 (selects max value in every 2x2 filter -> reduces spatial dimension by factor of 2
    Conv2D(128, (3, 3), activation='relu'), #64 filters, each 3x3, relu activation to output
    MaxPooling2D((2, 2)),   #max pooling with pool size 2x2 (selects max value in every 2x2 filter -> reduces spatial dimension by factor of 2
    Flatten()       #flattens output into a 1D vector
])

In [None]:
#======================================================================================
#                               RNN COMPONENT (LSTM)
#======================================================================================

#takes sequence of flattened vectors produced by the CNN and captures temporal info between them

rnn_model = tf.keras.Sequential([
    LSTM(64)
])



In [None]:
#======================================================================================
#                               FULL FER MODEL
#======================================================================================

model = tf.keras.Sequential([
    TimeDistributed(cnn_model, input_shape=(seq_length, img_size, img_size, 3)),        #CNN MODEL
    rnn_model,      #LSTM

    #===================================================
    #              FULLY CONNECTED LAYER
    #===================================================

    #utilizes the extracted features and temporal information to learn complex relationships and transform the features into higher-level representations.

    Dense(16, activation='relu'),       #fixed-length representation input from LSTM and applies ReLU activation function -> introduces non-linearity -> learn complex relationships between features.
    Dropout(0.4),       #prevent overfitting -> 20% of output set to 0

    #===================================================
    #               OUTPUT LAYER
    #===================================================

    #binary classification using sigmoid activation -> 0 or 1 -> probability of belonging to class
    #if value > 0.5 -> fake, if < 0.5 -> real

    Dense(1, activation='sigmoid')
])


In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),     #adam optimizer with learning rate 0.0001
              loss='binary_crossentropy',       #loss function
              metrics=['accuracy'])

In [None]:
# Callback for testing

class TestCallback(tf.keras.callbacks.Callback):
    def on_train_end(self, logs=None):
        test_loss, test_acc = model.evaluate(test_data, steps=test_steps_per_epoch)
        print('Test loss:', test_loss)
        print('Test accuracy:', test_acc)

In [None]:

class AccuracyPlotCallback(tf.keras.callbacks.Callback):
    def __init__(self):
        super(AccuracyPlotCallback, self).__init__()
        self.train_acc = []
        self.val_acc = []

    def on_epoch_end(self, epoch, logs=None):
        self.train_acc.append(logs.get('accuracy'))
        self.val_acc.append(logs.get('val_accuracy'))

        plt.figure(figsize=(10, 6))
        plt.plot(self.train_acc, label='Training Accuracy')
        plt.plot(self.val_acc, label='Validation Accuracy')
        plt.title('Training and Validation Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)
        plt.show()


In [None]:
from sklearn.metrics import classification_report

class MetricsPlotCallback(tf.keras.callbacks.Callback):
    def on_train_end(self, logs=None):
        test_iterator = iter(test_data)
        y_true = []
        y_pred = []
        for _ in range(test_steps_per_epoch):
            X_test, y_test = next(test_iterator)
            y_true.extend(y_test)
            y_pred.extend((model.predict(X_test) > 0.5).astype(int).flatten())
        cm = confusion_matrix(y_true, y_pred)
        labels = ['Real', 'Deepfake']
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
        ax1 = sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False, square=True, xticklabels=labels, yticklabels=labels)
        ax1.set_xlabel('Predicted')
        ax1.set_ylabel('True')
        ax1.set_title('Confusion Matrix')

        report = classification_report(y_true, y_pred, labels=np.unique(y_true), target_names=labels)
        ax2.text(0, 0.5, report, fontsize=12, verticalalignment='center')
        ax2.axis('off')
        ax2.set_title('Classification Report')
        
        plt.show()



In [None]:
checkpoint = ModelCheckpoint(filepath='./Plots/celeb-df_model_plots_{epoch}.keras', monitor='val_loss', verbose=1, save_best_only=False, mode='auto', save_freq='epoch')

        
metrics_plot_callback = MetricsPlotCallback()

test_callback = TestCallback()

accuracy_plot_callback = AccuracyPlotCallback()

In [None]:

# Train the model
history = model.fit(
    train_data,
    epochs=10,
    steps_per_epoch=train_steps_per_epoch,
    validation_data=val_data,
    validation_steps=val_steps_per_epoch,
    callbacks=[test_callback, checkpoint, accuracy_plot_callback, metrics_plot_callback]
)