In [None]:
# !apt-get install zip
# !zip -r train_data /kaggle/input/deepfake-detection-challenge/train_sample_videos/*
!ls

**Import**

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pylab as plt
import cv2
plt.style.use('ggplot')
from IPython.display import Video
from IPython.display import HTML
from PIL import Image, ImageDraw
# !pip install face_recognition
# import face_recognition
import os
import pdb
import pickle
from multiprocessing import Pool
!pip install facenet-pytorch
import torch
from tqdm.notebook import tqdm
import time
from facenet_pytorch import MTCNN


device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

pad_ratio = 1.3
train_dir = '/kaggle/input/deepfake-detection-challenge/train_sample_videos/'

***Utilities***

In [None]:
def plot_faces(images, figsize=(10.8/2, 19.2/2)):
    shape = images[0].shape
    images = images[np.linspace(0, len(images)-1, 16).astype(int)]
    im_plot = []
    for i in range(0, 16, 4):
        im_plot.append(np.concatenate(images[i:i+4], axis=0))
    im_plot = np.concatenate(im_plot, axis=1)
    
    fig, ax = plt.subplots(1, 1, figsize=figsize)
    ax.imshow(im_plot)
    ax.xaxis.set_visible(False)
    ax.yaxis.set_visible(False)

    ax.grid(False)
    fig.tight_layout()

**Get Frames**

In [None]:
def save_frame(file_name, image):
    np.save(file_name, image)
    
def get_frames(video_path, max_frame_index=9999999):
    cap = cv2.VideoCapture(video_path)
    frames = []
    i = 0
    while(cap.isOpened()):
        ret, frame = cap.read()
        if ret==True:
            frames.append(frame)
            i += 1
            if i == max_frame_index:
                break
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        else:
            break
    cap.release()
    
    return frames
    
def save_video(video_file, path_dir):
    frames = get_frames(path_dir + video_file)
    
    for i, frame in enumerate(frames):
        save_frame(video_file + str(i), frame)
        
def get_frames_i(index, max_frame_index=9999999):
    return get_frames(train_dir + train_video_names[index], max_frame_index=max_frame_index)


**Load Metadata**

In [None]:
train_sample_metadata = pd.read_json('/kaggle/input/deepfake-detection-challenge/train_sample_videos/metadata.json').T
train_video_names = [x for x in os.listdir(train_dir)]
train_sample_names = []
for x in train_video_names:
    if x != 'metadata.json':
        train_sample_names.append(x)
train_video_names = train_sample_names

train_sample_metadata = train_sample_metadata.loc[train_video_names]
train_labels = np.array([0 if x == "REAL" else 1 for x in train_sample_metadata['label']])


test_dir = '/kaggle/input/deepfake-detection-challenge/test_videos/'
test_video_names = [x for x in os.listdir(test_dir)]

train_sample_metadata = pickle.load(open('/kaggle/input/deepfake-metadata/train_sample_metadata', 'rb'))

***Data Preprocess***

In [None]:
from facenet_pytorch import MTCNN
detector = MTCNN(device=device, post_process=False)

batch_size = 32

def detect_facenet_pytorch(detector, images, batch_size):
    boxes = []
    landmarks = []
    i_frames = []
    for lb in np.arange(0, len(images), batch_size):
        imgs = [img for img in images[lb:lb+batch_size]]
        box, _, lms = detector.detect(imgs, landmarks=True)
#         print(type(box))
        for i in range(len(box)):
            if box[i] is None:
                continue
            boxes.append(box[i][0])
            landmarks.append(lms[i])
            i_frames.append(i + lb)
    return boxes, landmarks, i_frames


# frames = get_frames_i(0)
# frames = np.stack(frames)
# a, b, c = detect_facenet_pytorch(detector, frames, batch_size)
# i_frames = []
# boxes = []
# landmarks = []
# load_video_batch_size = 1

for lb in tqdm(np.arange(0, len(train_video_names), load_video_batch_size)):
    print([train_dir + x for x in train_video_names[lb:lb+load_video_batch_size]])
    with Pool(load_video_batch_size) as p:
        frames_in_videos = p.map(get_frames, [train_dir + x for x in train_video_names[lb:lb+load_video_batch_size]])
#     pdb.set_trace()
    for frames in frames_in_videos:
      
# for name in tqdm(train_video_names):
        frames = get_frames(train_dir + name)
        frames = np.stack(frames)

        a, b, c = detect_facenet_pytorch(detector, frames, batch_size)
        boxes.append(a)
        landmarks.append(b)
        i_frames.append(c)


# train_sample_metadata['index_w/_frames'] = i_frames
# train_sample_metadata['face_locations'] = boxes
# train_sample_metadata['face_ladmarks'] = landmarks

# pickle.dump(train_sample_metadata, open( "train_sample_metadata", "wb" ) )
        

In [None]:
# plt.imshow(frames[0])
# plt.show()

# l, t, r, b = a[0].astype(int).tolist()
# plt.imshow(frames[0][t:b, l:r])
# plt.show()

# print(train_sample_metadata.loc[train_video_names]['face_locations'])
# print(train_video_names)

**MesoNet**

In [None]:
from tensorflow.keras.models import Model as KerasModel
from tensorflow.keras.layers import Input, Dense, Flatten, Conv2D, MaxPooling2D, BatchNormalization, Dropout, Reshape, Concatenate, LeakyReLU
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import LSTM
IMGWIDTH = 256

class Classifier:
    def __init__():
        self.model = 0
    
    def predict(self, x):
        return self.model.predict(x)
    
    def fit(self, x, y):
        return self.model.train_on_batch(x, y)
    
    def get_accuracy(self, x, y):
        return self.model.test_on_batch(x, y)
    
    def get_summary(self):
        return self.model.summary()
    
    def load(self, path):
        self.model.load_weights(path)
        
    def features_model(self):
        return tf.keras.Model(self.model.input, self.model.layers[-2].output)

class CNN(Model):
    def __init__(self, feature_model, n_filters=8, kernel_size=5):
        super(RNN, self).__init__()
        self.feature_model = feature_model
        self.feature_model.train_able = False
        self.conv1 = Conv1D(n_filters, kernel_size, activation='relu')
        self.last = Dense(1)    
        
    def call(inputs):
        n_videos, n_ims, s1, s2, s3 = inputs.shape
        features = self.feature_model(inputs.reshape((-1, s1, s2, s3))).reshape(n_videos, n_ims, -1)
        x = self.conv1(features).reshape(n_videos, -1)
        return self.last(x)
            
class RNN(Model):
    def __init__(self, feature_model, LSTM_len=16):
        super(Temporal_CNN, self).__init__()
        self.feature_model = feature_model
        self.feature_model.train_able = False
        self.LSTM = LSTM(LSTM_len)
        self.last = Dense(1)
    
    """
    inputs: nparray with size n_videos * n_faces * im_sizes
    """
    def call(inputs):
        n_videos = len(inputs)
        xs = []
        for i in range(n_videos):
            features = self.feature_model(intputs[i])
            xs.append(self.LSTM(features))
        xs = np.stack(xs)
        
        return self.last(xs)            
    
class Xception_binary(Classifier):
    def __init__(self, learning_rate = 0.001):
        self.model = self.init_model()
        optimizer = Adam(lr = learning_rate)
        self.model.compile(optimizer = optimizer, loss = 'binary_crossentropy', metrics = ['accuracy'])
    
    def init_model(self):
        from tensorflow.keras.applications import Xception
        xception = Xception()
        xception.trainable = False
        xception_features = tf.keras.Model(xception.input, xception.layers[-2].output)
        # model2.summary()
        prediction_layer = tf.keras.layers.Dense(1)
        model = tf.keras.Sequential([
          xception_features,
          prediction_layer])
        return model

    def fine_tunning_on(self, layers_lb=0, layers_ub=100):
        self.model.trainable = True
        for layer in self.model.layers[:layers_lb]:
            layer.trainable =  False
        for layer in self.model.layers[layers_ub:]:
            layer.trainable =  False
    

class MesoInception4(Classifier):
    def __init__(self, learning_rate = 0.001):
        self.model = self.init_model()
        optimizer = Adam(lr = learning_rate)
        self.model.compile(optimizer = optimizer, loss = 'binary_crossentropy', metrics = ['accuracy'])
    
    def InceptionLayer(self, a, b, c, d):
        def func(x):
            x1 = Conv2D(a, (1, 1), padding='same', activation='relu')(x)
            
            x2 = Conv2D(b, (1, 1), padding='same', activation='relu')(x)
            x2 = Conv2D(b, (3, 3), padding='same', activation='relu')(x2)
            
            x3 = Conv2D(c, (1, 1), padding='same', activation='relu')(x)
            x3 = Conv2D(c, (3, 3), dilation_rate = 2, strides = 1, padding='same', activation='relu')(x3)
            
            x4 = Conv2D(d, (1, 1), padding='same', activation='relu')(x)
            x4 = Conv2D(d, (3, 3), dilation_rate = 3, strides = 1, padding='same', activation='relu')(x4)

            y = Concatenate(axis = -1)([x1, x2, x3, x4])
            
            return y
    
    def init_model(self):
        x = Input(shape = (IMGWIDTH, IMGWIDTH, 3))
        
        x1 = self.InceptionLayer(1, 4, 4, 2)(x)
        x1 = BatchNormalization()(x1)
        x1 = MaxPooling2D(pool_size=(2, 2), padding='same')(x1)
        
        x2 = self.InceptionLayer(2, 4, 4, 2)(x1)
        x2 = BatchNormalization()(x2)
        x2 = MaxPooling2D(pool_size=(2, 2), padding='same')(x2)        
        
        x3 = Conv2D(16, (5, 5), padding='same', activation = 'relu')(x2)
        x3 = BatchNormalization()(x3)
        x3 = MaxPooling2D(pool_size=(2, 2), padding='same')(x3)
        
        x4 = Conv2D(16, (5, 5), padding='same', activation = 'relu')(x3)
        x4 = BatchNormalization()(x4)
        x4 = MaxPooling2D(pool_size=(4, 4), padding='same')(x4)
        
        y = Flatten()(x4)
        y = Dropout(0.5)(y)
        y = Dense(16)(y)
        y = LeakyReLU(alpha=0.1)(y)
        y = Dropout(0.5)(y)
        y = Dense(1, activation = 'sigmoid')(y)

        return KerasModel(inputs = x, outputs = y)
        
class Meso4(Classifier):
    def __init__(self, learning_rate = 0.001):
        self.model = self.init_model()
        optimizer = Adam(lr = learning_rate)
        self.model.compile(optimizer = optimizer, loss = 'binary_crossentropy', metrics = ['accuracy'])
    
    def init_model(self): 
        x = Input(shape = (IMGWIDTH, IMGWIDTH, 3))
        
        x1 = Conv2D(8, (3, 3), padding='same', activation = 'relu')(x)
        x1 = BatchNormalization()(x1)
        x1 = MaxPooling2D(pool_size=(2, 2), padding='same')(x1)
        
        x2 = Conv2D(8, (5, 5), padding='same', activation = 'relu')(x1)
        x2 = BatchNormalization()(x2)
        x2 = MaxPooling2D(pool_size=(2, 2), padding='same')(x2)
        
        x3 = Conv2D(16, (5, 5), padding='same', activation = 'relu')(x2)
        x3 = BatchNormalization()(x3)
        x3 = MaxPooling2D(pool_size=(2, 2), padding='same')(x3)
        
        x4 = Conv2D(16, (5, 5), padding='same', activation = 'relu')(x3)
        x4 = BatchNormalization()(x4)
        x4 = MaxPooling2D(pool_size=(4, 4), padding='same')(x4)
        
        y = Flatten()(x4)
        y = Dropout(0.5)(y)
        y = Dense(16)(y)
        y = LeakyReLU(alpha=0.1)(y)
        y = Dropout(0.5)(y)
        y = Dense(1, activation = 'sigmoid')(y)

        return KerasModel(inputs = x, outputs = y)

***Hyperparameters***

In [None]:
def clip_int(x, lb, ub):
    return int(max(lb, min(x, ub)))

def clip_face_location(face_location, ub1=1080-1, ub2=1920-1, lb1=0, lb2=0, pad_ratio=1.3):
    
    left, top, right, bottom = face_location
    l1 = bottom - top
    l1 *= pad_ratio / 2
    l2 = right - left
    l2 *= pad_ratio / 2
    mid1 = (top + bottom) * .5
    mid2 = (left + right) * .5
    top = clip_int(mid1 - l1, lb1, ub1)
    bottom = clip_int(mid1 + l1, lb1, ub1)
    right = clip_int(mid2 + l2, lb2, ub2)
    left = clip_int(mid2 - l2, lb2, ub2)
    
    return left, top, right, bottom 

def get_face_given_location(frame, face_location):
    
    face_location = clip_face_location(face_location)
    left, top, right, bottom  = face_location
    face = frame[top:bottom, left:right]
        
    return face

def add_landmarks(frame, LM_width=3):
    face_landmarks_list = face_recognition.face_landmarks(frame)
    pil_image = Image.fromarray(image)
    d = ImageDraw.Draw(pil_image)
    

    for face_landmarks in face_landmarks_list:
        for facial_feature in face_landmarks.keys():
            d.line(face_landmarks[facial_feature], width=LM_width)
            
    return np.array(pil_image)

def face_resize(face, face_len1=256, face_len2=256, n_channels=3):
    
    img = Image.fromarray(face)
#         pdb.set_trace()
    img = img.resize((face_len1, face_len2))
    face_resized = np.array(img)
        
    return face_resized

In [None]:
import random
n_epochs = 100
n_videos = len(train_video_names)
video_len = 300
batch_size_frames = 8
batch_size_videos = 8
batch_size = batch_size_frames * batch_size_videos
data_type = 'face'
n_batches = int(n_videos * video_len / batch_size)
face_len1, face_len2, n_channels = 256, 256, 3    

***Get Batches***

In [None]:
np.random.seed(0)
random.seed(0)

def get_batch_1video(x = ('face', 256)):
    data_type, face_size = x
    while True:
        i_video = random.randint(0, n_videos - 1)
        if len(train_sample_metadata.loc[train_video_names]['face_locations'][i_video]) != 0:
            break
    
    print(i_video, train_labels[i_video])
    i_frames = train_sample_metadata.loc[train_video_names]['index_w/_frames'][i_video]
    face_locations = train_sample_metadata.loc[train_video_names]['face_locations'][i_video]
    
    faces = [] 
    sampled_frames = random.choices(range(len(i_frames)), k=batch_size_frames)
    
    frames = get_frames_i(i_video, max_frame_index=max(i_frames)+1)
    
    for i in sampled_frames:
        fcs = get_face_given_location(frames[i_frames[i]], face_locations[i].tolist())
#         plt.imshow(fcs)
#         plt.show()
        fcs = face_resize(fcs, face_len1=face_size, face_len2=face_size, n_channels=n_channels)
        faces.append(fcs)
#     pdb.set_trace()        
    return faces, np.ones(batch_size_frames) * train_labels[i_video]
    
def get_batch(data_type='face', face_size=256):
    face_batch = np.zeros((0, face_size, face_size, n_channels))
    label_batch = np.zeros(0)
    
#     batch = []
#     for i in range(batch_size_videos):
#         batch.append(get_batch_1video(data_type=data_type))
    
#     pdb.set_trace()
    with Pool(2) as p:
        batch = p.map(get_batch_1video, [('face', face_size) for i in range(batch_size_videos)])
            
    for x in batch:
        faces, labels = x
        face_batch = np.concatenate((face_batch, faces))
        label_batch = np.concatenate((label_batch, labels))
        
    return face_batch, label_batch
        

In [None]:
%cd /kaggle/input/meso-pretrain
cnn_im = Xception_binary(learning_rate=1e-3)
cnn_im.get_summary()
# model.load("MesoInception_F2F")
for epoch in range(n_epochs):
    for _ in range(n_batches):
        x, y = get_batch(face_size=299)
        print(cnn_im.fit(x, y))
#         print(model.predict(x))

In [None]:
model = RNN(cnn_im.feature_model())

In [None]:
diff_norms = []

for name in train_video_names:
    frames = get_frames(train_dir + name)
    frames = np.stack(frames)
    dns = []
    for i in range(len(frames) - 1):
        dns.append(np.norm((frames[i]-frames[i+1]).flatten()))
    dns = np.stack(dns)
    diff_norms.append(dns)
    
diff_norms = np.stack(diff_norms)

plt.hist(diff_norms[train_labels == 0])
plt.show()
plt.hist(diff_norms[train_labels == 0])