## Transfer Learning: First Order Model

**Objective**: Leverage the DeepFake generator model to improve the performance of DeepFake detection models

**Hypothesis**: The hypothesis behind the model is that generator models have robust feature extraction, which if leveraged can lead to a significant improvement in the DeepFake detection accuracy

**Notebook Division**:
1. Binary Classification
2. Multi-Class Classification

In [1]:
# Install the relevant code for First Order Model Animation
# !git clone https://github.com/AliaksandrSiarohin/first-order-model

In [2]:
# !python -m pip install -U scikit-image

In [3]:
cd first-order-model

/mnt/disks/user/project/first-order-model


In [4]:
#Data processing
from skimage import io
import os
import glob
import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing import image
from keras.applications.vgg16 import preprocess_input
from sklearn.preprocessing import LabelEncoder
import random
from collections import Counter

#Feature Extraction
from demo import load_checkpoints
import torch

# Models
from sklearn.svm import SVC
from sklearn import linear_model
from sklearn.ensemble import RandomForestClassifier

#Neural Network
import keras
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import GlobalAveragePooling2D
from tensorflow.keras.models import Sequential
from tensorflow.keras.models import Model
from tensorflow.keras import Input
from tensorflow.keras import optimizers
from keras.callbacks import ModelCheckpoint,Callback
import matplotlib.pyplot as plt


#Evaluation
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from keras.callbacks import ModelCheckpoint,Callback
from keras.callbacks import ReduceLROnPlateau

from sklearn import metrics
from tensorflow.keras.models import load_model
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
Using TensorFlow backend.
  from numpy.core.umath_tests import inner1d


## Load Dataset

In [5]:
# path to your dataset
DATASET_PATH = '/mnt/disks/user/project/Dataset/'
deepfake_class = ['FaceSwap/clean_frames', 'Reenactment/clean_frames','original/clean_frames']

In [6]:
'''
func: load the deepfake dataset and divide them into train and test with each having samples of the different classes
input:
    i.dataset_path: string: the main dataset folder path 
    ii.train_ratio: float: the ratio of the dataset that will be used for training the model. Eg: 0.8
    iii. fake_class: string array: the different deepfake classes
output:
    i. train_set, test_set: dictionary of image paths as key and deepfake class as value
    ii. X_train, X_test: array of image paths 
    iii. y_train, y_test: array of corresponding deepfake classes 
'''
def MultiDatasetSplit(DATASET_PATH, train_ratio,fake_cls):
    test_set = {}
    train_set = {}
    list_IDs = []
    labels = {}
    for i, cls in enumerate(fake_cls):
        paths = glob.glob(os.path.join(DATASET_PATH, cls,'*/*.jpg'))
        #balancing the dataset
        balance_paths = random.sample(paths,2000)

        brk_point = int(len(balance_paths)*train_ratio)
        for j in range(len(balance_paths)):
            if j <= brk_point:
                train_set.update({balance_paths[j]:i})
            else:
                test_set.update({balance_paths[j]:i})
    
    X_train = [X for X in train_set.keys()] 
    y_train = [y for y in train_set.values()]
    X_test = [X for X in test_set.keys()]
    y_test = [y for y in test_set.values()]
    return train_set, test_set, X_train,y_train, X_test, y_test

In [7]:
'''
func: load the deepfake dataset and divide them into train and test with each having samples of the different classes
input:
    i.dataset_path: string: the main dataset folder path 
    ii.train_ratio: float: the ratio of the dataset that will be used for training the model. Eg: 0.8
    iii. fake_class: string array: the different deepfake classes
output:
    i. train_set, test_set: dictionary of image paths as key and deepfake class as value
    ii. X_train, X_test: array of image paths 
    iii. y_train, y_test: array of corresponding deepfake classes 
'''
def BinaryDatasetSplit(DATASET_PATH, train_ratio,fake_cls):
    test_set = {}
    train_set = {}
    list_IDs = []
    labels = {}
    for i, cls in enumerate(fake_cls):
        paths = glob.glob(os.path.join(DATASET_PATH, cls,'*/*.jpg'))
        if i == 0:
            paths = random.sample(paths,4000)
        else:
            paths = random.sample(paths,2000)
        brk_point = int(len(paths)*train_ratio)
        for j in range(len(paths)):
            if j <= brk_point:
                if i == 0:
                    train_set.update({paths[j]:0})
                else: 
                    train_set.update({paths[j]:1})
            else:
                if i == 0:
                    test_set.update({paths[j]:0})
                else:
                    test_set.update({paths[j]:1})

    X_train = [X for X in train_set.keys()] 
    y_train = [y for y in train_set.values()]
    X_test = [X for X in test_set.keys()]
    y_test = [y for y in test_set.values()]
    return train_set, test_set, X_train,y_train, X_test, y_test

In [8]:
b_train_set, b_test_set, b_train_X, b_train_y, b_test_X, b_test_y = BinaryDatasetSplit(DATASET_PATH,0.7,deepfake_class)

In [9]:
m_train_set, m_test_set, m_train_X, m_train_y, m_test_X, m_test_y = MultiDatasetSplit(DATASET_PATH,0.7,deepfake_class)

In [10]:
Counter(b_train_set.values())

Counter({0: 2801, 1: 2802})

In [11]:
Counter(m_train_set.values())

Counter({0: 1401, 1: 1401, 2: 1401})

In [12]:
set(b_test_set.values())

{0, 1}

In [13]:
set(m_test_set.values())

{0, 1, 2}

In [14]:
# specify image size and channels
img_channels = 3
img_rows = 224
img_cols = 224

# number of classes
b_nb_classes = 2
m_nb_classes = 3

## Data Preprocessing

Allow for batch-wise pre-processing of data

In [15]:
'''
func: prepare and process batch wise data for training/ablation. 
The image path is converted to image data while the classes are label encoded (0,1,2)
input:
    i.image_list: X data: array of image paths
    ii.classes: y data: dictionary of image paths and their corresponding classes
    iii. feature_extractor model: model used to feature extraction in batches
output:
    i. DataGenerator
'''
class DataGenerator(tf.keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, image_list, classes, feature_extractor, batch_size=32, dim=(256,256), n_channels=3,
                 n_classes=3,shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.classes = classes
        self.image_list = image_list
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.feature_extractor = feature_extractor
        self.shuffle = shuffle
        self.on_epoch_end()
    

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.image_list) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_img_temp = [self.image_list[k] for k in indexes]
        #print(list_img_temp)

        # Generate data
        X, y = self.__data_generation(list_img_temp)
        #print(X,y)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.image_list))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_img_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization of Y
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty((self.batch_size), dtype=int)

        #For each image path in the batch: load it, convert it to array, normalize values and pre-process.
        #Append X and respective y values to the empty X and y variables
        for i, ID in enumerate(list_img_temp):
            img = image.load_img(ID, target_size=self.dim)
            img = image.img_to_array(img)
            #img = img/255
            img = preprocess_input(img)
            X[i,] = img
            y[i] = self.classes[ID]
            
        #Extract features for batch-wise values
        output = torch.tensor(X, dtype=torch.float).to('cuda')
        output_val = encoder(output)
        feature = output_val.cpu().data.numpy().reshape(32,256,256,-1)
        #class encoding to 0,1,2
        le = LabelEncoder()
        self.y_value = le.fit_transform(y)
        return feature,y_value

In [16]:
'''
func: get the data generators for training and validation
input:
    i.train_X, test_X: array of image paths in training and testing respectively
    ii.train_set, test_set: dictionary of image paths and their corresponding class for training and test data respectively
    iii. ablation: int: number of training and test entries to use for DataGeneration
output:
    i. training_generator
    ii. validation_generator
'''
def data_generator(train_X, train_set, test_X, test_set,feature_model,ablation = None):
    #Randomly selecting data in case of ablation testing
    if ablation != None:
        train_X = random.sample(train_X, ablation)
        train_set = {image: train_set[image] for image in train_X }
        test_X = random.sample(test_X, ablation)
        test_set = {image: test_set[image] for image in test_X }
    training_generator = DataGenerator(train_X, train_set,feature_model)
    validation_generator = DataGenerator(test_X, test_set,feature_model)
    return training_generator,validation_generator

In [17]:
'''
func: process the image paths to return image values and respective labeled classes
input:
    i.batch_X: array of image paths 
    ii.batch_y: dictionary of image paths and their corresponding class
output:
    i. X: processed image data
    ii. y_value: corresponding class values
'''
def PreProcess(batch_X, batch_y, batch_size=32, dimension=(256,256), n_channels=3):
    X = np.empty((len(batch_X), *dimension, n_channels))
    y = np.empty((len(batch_X)), dtype=int)
    for i, image_path in enumerate(batch_X):
        img = image.load_img(image_path, target_size=dimension)
        img = image.img_to_array(img)
        #img = img/255
        img = preprocess_input(img)
        X[i,] = img
        y[i] = batch_y[image_path]
    le = LabelEncoder()
    y_value = le.fit_transform(y)
    return X,y_value

## Feature Extraction

Load model and model checkpoints for Vox (from the other dataset models, Vox appears to be the most relevant and closest in content to the FaceForensics++ Dataset)


In [18]:
config_path='config/vox-256.yaml' #data checkpoints
checkpoint_path='../vox-cpk.pth.tar' #pyTorch Model

In [19]:
#kp_detector model loads keypoints
generator, kp_detector = load_checkpoints(config_path='config/vox-256.yaml', 
                            checkpoint_path='../vox-cpk.pth.tar')

In [20]:
#dir(generator.module)

In [21]:
#dir(generator.module.down_blocks)
#encoder.train()

In [22]:
#get the pre-trained weights from the generator module
encoder = generator.module.bottleneck
encoder.train()

Sequential(
  (r0): ResBlock2d(
    (conv1): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (norm1): SynchronizedBatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (norm2): SynchronizedBatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (r1): ResBlock2d(
    (conv1): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (norm1): SynchronizedBatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (norm2): SynchronizedBatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (r2): ResBlock2d(
    (conv1): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (norm1

In [23]:
'''
func: extract features from the pre-trained segment of the generator
input:
    i.model: pre-trained feature extraction model
    ii.X_batch: pre-processed batch of image data arrays
output:
    i. output: array: feature extracted values for X images
'''
def feature_model(model, X_batch):
    data = torch.tensor(X_batch, dtype=torch.float).to('cuda')
    output = encoder(data)
    return output

### Model Checkpoints

In [24]:
from sklearn.metrics import roc_auc_score

class roc_callback(Callback):
    
    def on_train_begin(self, logs={}):
        logs['val_auc'] = 0

    def on_epoch_end(self, epoch, logs={}):
        y_p = []
        y_v = []
        for i in range(len(validation_generator)):
            x_val, y_val = validation_generator[i]
            y_pred = self.model.predict(x_val)
            y_p.append(y_pred)
            y_v.append(y_val)
        y_p = np.concatenate(y_p)
        y_v = np.concatenate(y_v)
        roc_auc = roc_auc_score(y_v, y_p)
        print ('\nVal AUC for epoch{}: {}'.format(epoch, roc_auc))
        logs['val_auc'] = roc_auc

## Classification Model

Test Different Classification Models for Detection

In [25]:
'''
func: CNN model for DeepFake classification 
'''
def CNN_Model(in_shape=(256,256,3)):
    
    model = Sequential()
    model.add(Conv2D(64,(3,3), strides=(2,2), padding='same', input_shape=in_shape))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.6))
    model.add(Flatten())
    model.add(Dense(b_nb_classes, activation='sigmoid'))
    return model

#call define model
binary_first_model = CNN_Model()

In [26]:
sgd = optimizers.SGD(lr=0.005, momentum=0.5, decay = 0.001)

In [27]:
training_generator, validation_generator = data_generator(b_train_X, b_train_set, b_test_X, b_test_set,encoder)
binary_first_model.compile(loss='binary_crossentropy',
              optimizer=sgd,
              metrics=['accuracy'])

In [28]:
# checkpoint 
binary_filepath = '/mnt/disks/user/project/Final_Models/first_order_binary_best_modelv2.hdf5'
checkpoint = ModelCheckpoint(binary_filepath, monitor='val_acc', verbose=1, save_best_only=True, mode='max')
auc_logger = roc_callback()
# fit: this will fit the net on 'ablation' samples, only 1 epoch
binary_history = binary_first_model.fit_generator(generator = training_generator,
                    validation_data = validation_generator,
                    shuffle=True,
                    epochs=20,
                    callbacks=[checkpoint,auc_logger])

Epoch 1/20
Epoch 00001: val_acc improved from -inf to 0.50042, saving model to /mnt/disks/user/project/Final_Models/first_order_binary_best_modelv2.hdf5

Val AUC for epoch0: 0.5025216588020873
Epoch 2/20
Epoch 00002: val_acc improved from 0.50042 to 0.51858, saving model to /mnt/disks/user/project/Final_Models/first_order_binary_best_modelv2.hdf5

Val AUC for epoch1: 0.519083490269931
Epoch 3/20
Epoch 00003: val_acc did not improve from 0.51858

Val AUC for epoch2: 0.5007914597956971
Epoch 4/20
Epoch 00004: val_acc did not improve from 0.51858

Val AUC for epoch3: 0.5010108143582721
Epoch 5/20
Epoch 00005: val_acc did not improve from 0.51858

Val AUC for epoch4: 0.5010108143582721
Epoch 6/20
Epoch 00006: val_acc did not improve from 0.51858

Val AUC for epoch5: 0.5006334459459459
Epoch 7/20
Epoch 00007: val_acc did not improve from 0.51858

Val AUC for epoch6: 0.5000019616850804
Epoch 8/20
Epoch 00008: val_acc did not improve from 0.51858

Val AUC for epoch7: 0.5000019616850804
Epoch 

In [29]:
binary_acc = binary_history.history['acc']
binary_val_acc = binary_history.history['val_acc']

binary_epochs = range(len(binary_acc))

plt.plot(binary_epochs, binary_acc, 'g', label='Training acc')
plt.plot(binary_epochs, binary_val_acc, 'b', label='Validation acc')
plt.title('Binary Classification Res Model Accuracy')
plt.legend()

<matplotlib.legend.Legend at 0x7ff2db62b198>

### Binary First Order Model Evaluation

In [30]:
binary_test_generator = DataGenerator(b_test_X, b_test_set,encoder,n_classes=2,shuffle=False)

In [31]:
binary_eval_base_model = load_model(binary_filepath)

In [32]:
binary_predictions = binary_eval_base_model.predict_generator(binary_test_generator)

In [33]:
binary_y_class = np.array(list(binary_test_generator.classes.values()))[:len(binary_predictions)]
binary_y = keras.utils.to_categorical(binary_y_class, num_classes=b_nb_classes)

In [34]:
binary_prob_max = binary_predictions.max(axis=1).reshape(-1, 1)
binary_y_pred = np.where(binary_predictions == binary_prob_max, 1, 0)

In [35]:
binary_pred = np.argmax(binary_predictions, axis=-1)

In [36]:
binary_score_test = metrics.accuracy_score(binary_y, binary_y_pred)
print('Res Model Test Score ',binary_score_test)

Res Model Test Score  0.40244932432432434


In [37]:
binary_cm = confusion_matrix(binary_y_class, binary_pred)
print(binary_cm)

[[311 888]
 [216 953]]


In [38]:
binary_report = classification_report(binary_y,binary_y_pred)
print(binary_report)

             precision    recall  f1-score   support

          0       0.59      0.26      0.36      1199
          1       0.49      1.00      0.66      1169

avg / total       0.54      0.62      0.51      2368



In [39]:
roc_auc_score(binary_y, binary_y_pred)

0.5186523771235083

### Multi-Class Classification

In [31]:
'''
func: CNN model for DeepFake classification 
'''
def CNN_Model(in_shape=(256,256,3)):
    
    model = Sequential()
    model.add(Conv2D(64,(3,3), strides=(2,2), padding='same', input_shape=in_shape))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.6))
    model.add(Flatten())
    model.add(Dense(m_nb_classes, activation='softmax'))
    return model

#call define model
multi_first_model = CNN_Model()

In [32]:
sgd = optimizers.SGD(lr=0.005, momentum=0.5, decay = 0.001)

In [33]:
training_generator, validation_generator = data_generator(m_train_X, m_train_set, m_test_X, m_test_set,encoder)
multi_first_model.compile(loss='binary_crossentropy',
              optimizer=sgd,
              metrics=['accuracy'])

In [34]:
# checkpoint 
multi_filepath = '/mnt/disks/user/project/Final_Models/first_order_multi_best_modelv2.hdf5'
checkpoint = ModelCheckpoint(multi_filepath, monitor='val_acc', verbose=1, save_best_only=True, mode='max')
auc_logger = roc_callback()
# fit: this will fit the net on 'ablation' samples, only 1 epoch
multi_history = multi_first_model.fit_generator(generator = training_generator,
                    validation_data = validation_generator,
                    shuffle=True,
                    epochs=20,
                    callbacks=[checkpoint,auc_logger])

Epoch 1/20
Epoch 00001: val_acc improved from -inf to 0.55357, saving model to /mnt/disks/user/project/Final_Models/first_order_multi_best_modelv2.hdf5

Val AUC for epoch0: 0.49762182604391897
Epoch 2/20
Epoch 00002: val_acc improved from 0.55357 to 0.55580, saving model to /mnt/disks/user/project/Final_Models/first_order_multi_best_modelv2.hdf5

Val AUC for epoch1: 0.5
Epoch 3/20
Epoch 00003: val_acc did not improve from 0.55580

Val AUC for epoch2: 0.5
Epoch 4/20
Epoch 00004: val_acc did not improve from 0.55580

Val AUC for epoch3: 0.5
Epoch 5/20
Epoch 00005: val_acc did not improve from 0.55580

Val AUC for epoch4: 0.5
Epoch 6/20
Epoch 00006: val_acc did not improve from 0.55580

Val AUC for epoch5: 0.5
Epoch 7/20
Epoch 00007: val_acc did not improve from 0.55580

Val AUC for epoch6: 0.5
Epoch 8/20
Epoch 00008: val_acc did not improve from 0.55580

Val AUC for epoch7: 0.5
Epoch 9/20
Epoch 00009: val_acc did not improve from 0.55580

Val AUC for epoch8: 0.5
Epoch 10/20
Epoch 00010: 

In [35]:
multi_acc = multi_history.history['acc']
multi_val_acc = multi_history.history['val_acc']

multi_epochs = range(len(multi_acc))

plt.plot(multi_epochs, multi_acc, 'g', label='Training acc')
plt.plot(multi_epochs, multi_val_acc, 'b', label='Validation acc')
plt.title('Multi Classification Res Model Accuracy')
plt.legend()

<matplotlib.legend.Legend at 0x7f06b451e550>

### Multi First Order Model Evaluation

In [36]:
multi_test_generator = DataGenerator(m_test_X, m_test_set,encoder,n_classes=3,shuffle=False)

In [37]:
multi_eval_base_model = load_model(multi_filepath)

In [38]:
multi_predictions = multi_eval_base_model.predict_generator(multi_test_generator)

In [39]:
multi_y_class = np.array(list(multi_test_generator.classes.values()))[:len(multi_predictions)]
multi_y = keras.utils.to_categorical(multi_y_class, num_classes=m_nb_classes)

In [40]:
multi_prob_max = multi_predictions.max(axis=1).reshape(-1, 1)
multi_y_pred = np.where(multi_predictions == multi_prob_max, 1, 0)

In [41]:
multi_pred = np.argmax(multi_predictions, axis=-1)

In [42]:
multi_score_test = metrics.accuracy_score(multi_y,multi_y_pred)
print('Res Model Test Score ',multi_score_test)

Res Model Test Score  0.33426339285714285


In [44]:
multi_cm = confusion_matrix(multi_y_class, multi_pred)
print(multi_cm)

[[  0 599   0]
 [  0 599   0]
 [  0 594   0]]


In [46]:
multi_report = classification_report(multi_y,multi_y_pred)
print(multi_report)

             precision    recall  f1-score   support

          0       0.00      0.00      0.00       599
          1       0.33      1.00      0.50       599
          2       0.00      0.00      0.00       594

avg / total       0.11      0.33      0.17      1792



  'precision', 'predicted', average, warn_for)


In [47]:
roc_auc_score(multi_y, multi_y_pred)

0.5