<a href="https://www.kaggle.com/code/joshuaokolo/deepfake-algorithm?scriptVersionId=104053877" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# Deepfake Algorithm

Algorithm for detecting deepfakes

## Tenforflow Input Pipeline

In [None]:
# Parse the Image

def parse_function(filename, label):

    IMG_SHAPE = 224

    image_string = tf.io.read_file(filename)

    image = tf.image.decode_png(image_string, channels=3)

    #This will convert to float values in [0, 1]

    image = tf.image.convert_image_dtype(image, tf.float32)

    resized_image = tf.image.resize(image, [IMG_SHAPE, IMG_SHAPE])

    return resized_image, label

## Frequency Domain Model

In [None]:
# Applying Discrete Cosine Transformation - II on the images

def dct_preprocess(image, label):

    img_t = tf.transpose(image,perm=[2, 1, 0])

    X1 = tf.signal.dct(img_t, type=2, norm="ortho")

    X1_t = tf.transpose(X1,perm=[0, 2, 1])

    X2 = tf.signal.dct(X1_t, type=2, norm="ortho")   

    array_X2 = tf.transpose(X2, perm=[1, 2, 0])

    # converting dct coefficients into log scale

    epsilon=1e-12
    
    array_X2_log = tf.math.log(tf.math.abs(array_X2) + epsilon)

    return array_X2_log, label

In [None]:
# Input pipeline for training and validation dataset

train_ratio   = 0.80

train_dataset = all_train_dataset.take(ds_size*train_ratio)

valid_dataset = all_train_dataset.skip(ds_size*train_ratio)

batch_size    = 40

train_dataset = train_dataset.map(parse_function, num_parallel_calls=tf.data.AUTOTUNE,deterministic=False)

train_dataset = train_dataset.map(dct_preprocess, num_parallel_calls=tf.data.AUTOTUNE,deterministic=False)

train_dataset = train_dataset.batch(batch_size)

train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE).cache()

valid_dataset = valid_dataset.map(parse_function, num_parallel_calls=tf.data.AUTOTUNE,deterministic=False)

valid_dataset = valid_dataset.map(dct_preprocess, num_parallel_calls=tf.data.AUTOTUNE,deterministic=False)

valid_dataset = valid_dataset.batch(batch_size)

valid_dataset = valid_dataset.prefetch(tf.data.AUTOTUNE).cache()

In [None]:
# Simple CNN Model ( using DCT-II pre-processing)

IMG_SHAPE = 224

x   = Input(shape = (IMG_SHAPE, IMG_SHAPE, 3))

x1  = Conv2D(3, 3, padding="same", activation="relu")(x)

x1  = BatchNormalization()(x1)

x2  = Conv2D(8, 3, padding="same", activation="relu")(x1)

x2  = BatchNormalization()(x2)

x2  = AveragePooling2D()(x2)  # 64

x3  = Conv2D(16, 3, padding="same", activation="relu")(x2)

x3  = BatchNormalization()(x3)

x3  = AveragePooling2D()(x3)  # 32

x4  = Conv2D(32, 3, padding="same", activation="relu")(x3)

x4  = BatchNormalization()(x4)

y   = Flatten()(x4)

y   = Dropout(0.5)(y)

y   = Dense(1, activation='sigmoid')(y)

model = KerasModel(inputs=x, outputs=y)

model.compile(loss='binary_crossentropy',optimizer="adam",metrics=['accuracy'])

## Spatial Domain Model

In [None]:
# Image Augmentation

def train_preprocess(image, label):

    IMG_SHAPE = 224

    image = tf.image.random_flip_left_right(image)

    image = tf.image.random_flip_up_down(image)

    image = tf.image.random_brightness(image, max_delta=32.0 /  

           255.0)

    image = tf.image.random_saturation(image, lower=0.5, upper=1.5)

  # random gaussian filter

    if tf.random.uniform(shape=[], minval=0.0, maxval=1.0) < 0.5:

        image = tfa.image.gaussian_filter2d(image)

    else:

        image

   # random invert image

    if  tf.random.uniform([]) < 0.5:

        image = (1-image)

    else:

        image

   # random crop

    if  tf.random.uniform([]) < 0.5:

        image = tf.image.resize(tf.image.central_crop(image,

        central_fraction=0.5),[IMG_SHAPE, IMG_SHAPE])

    else:

        image

    # random rotate

    if  tf.random.uniform([]) < 0.5:

        image = tf.image.rot90(image)

    else:

        image

        image = tf.clip_by_value(image, 0.0, 1.0)

    return image, label

In [None]:
# training and validation dataset

train_ratio   = 0.80

train_dataset = all_train_dataset.take(ds_size*train_ratio)

valid_dataset = all_train_dataset.skip(ds_size*train_ratio)

batch_size    = 40

train_dataset = train_dataset.map(parse_function, num_parallel_calls=tf.data.AUTOTUNE,deterministic=False)

train_dataset = train_dataset.map(train_preprocess, num_parallel_calls=tf.data.AUTOTUNE,deterministic=False)

train_dataset = train_dataset.batch(batch_size)

train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE).cache()

valid_dataset = valid_dataset.map(parse_function, num_parallel_calls=tf.data.AUTOTUNE,deterministic=False)

valid_dataset = valid_dataset.batch(batch_size)

In [None]:
# Spatial Domain Model ( Meso-inceptionnet)

# define inception layer

def InceptionLayer(a, b, c, d):

    def func(x):

    x1 = Conv2D(a, (1, 1), padding='same', activation='relu')(x)

    x2 = Conv2D(b, (1, 1), padding='same', activation='relu')(x)

    x2 = Conv2D(b, (3, 3), padding='same', activation='relu')(x2)

    x3 = Conv2D(c, (1, 1), padding='same', activation='relu')(x)

    x3 = Conv2D(c, (3, 3), dilation_rate = 2, strides = 1,    

         padding='same', activation='relu')(x3)

    x4 = Conv2D(d, (1, 1), padding='same', activation='relu')(x)

    x4 = Conv2D(d, (3, 3), dilation_rate = 3, strides = 1, 

         padding='same', activation='relu')(x4)

    y = Concatenate(axis = -1)([x1, x2, x3, x4])

    return y

 return func

# meso - inception net model

IMG_SHAPE = 224

x = Input(shape = (IMG_SHAPE, IMG_SHAPE, 3))

x1 = InceptionLayer(1, 4, 4, 2)(x)

x1 = BatchNormalization()(x1)

x1 = MaxPooling2D(pool_size=(2, 2), padding='same')(x1)

x2 = InceptionLayer(2, 4, 4, 2)(x1)

x2 = BatchNormalization()(x2)

x2 = MaxPooling2D(pool_size=(2, 2), padding='same')(x2)

x3 = Conv2D(16, (5, 5), padding='same', activation = 'relu')(x2)

x3 = BatchNormalization()(x3)

x3 = MaxPooling2D(pool_size=(2, 2), padding='same')(x3)

x4 = Conv2D(16, (5, 5), padding='same', activation = 'relu')(x3)

x4 = BatchNormalization()(x4)

x4 = MaxPooling2D(pool_size=(4, 4), padding='same')(x4)

y = Flatten()(x4)

y = Dropout(0.5)(y)

y = Dense(16)(y)

y = LeakyReLU(alpha=0.1)(y)

y = Dropout(0.5)(y)

y = Dense(1, activation = 'sigmoid')(y)

model = KerasModel(inputs = x, outputs = y)

model.compile(loss='mean_squared_error',optimizer="adam",metrics=['accuracy'])

## Combining both models: Cross Domain Model

In [None]:
@tf.function

def train_preprocess(image, label):

    IMG_SHAPE = 224

    image1 = tf.image.random_flip_left_right(image)

    image1 = tf.image.random_flip_up_down(image)

    image1 = tf.image.random_brightness(image, max_delta=32.0 / 255.0)

    image1 = tf.image.random_saturation(image, lower=0.5, upper=1.5)

    # random gaussian filter

    if tf.random.uniform(shape=[], minval=0.0, maxval=1.0) < 0.5:

        image1 = tfa.image.gaussian_filter2d(image)

    else:

        image1 = image

    # random invert image

    if  tf.random.uniform([]) < 0.5:

        image1 = (1-image)

    else:

        image1 = image

    # random crop

    if  tf.random.uniform([]) < 0.5:

        image1 = tf.image.resize(tf.image.central_crop(image, central_fraction=0.5),[IMG_SHAPE, IMG_SHAPE])

    else:

        image1 = image

    # random rotate

    if  tf.random.uniform([]) < 0.5:

        image1 = tf.image.rot90(image)

    else:

        image1 = image

        image1 = tf.clip_by_value(image, 0.0, 1.0)

    return image1,image, label

@tf.function

def dct_preprocess(image1,image, label):

    img_t = tf.transpose(image,perm=[2, 1, 0])

    X1 = tf.signal.dct(img_t, type=2, norm="ortho")

    X1_t = tf.transpose(X1,perm=[0, 2, 1])

    X2 = tf.signal.dct(X1_t, type=2, norm="ortho")

    array_X2 = tf.transpose(X2, perm=[1, 2, 0])

    epsilon=1e-12

    array_X2_log = tf.math.log(tf.math.abs(array_X2) + epsilon)

    return (image1,array_X2_log), label

In [None]:
# training and validation dataset

train_ratio   = 0.80

train_dataset = all_train_dataset.take(ds_size*train_ratio)

valid_dataset = all_train_dataset.skip(ds_size*train_ratio)

batch_size    = 40

train_dataset = train_dataset.map(parse_function, num_parallel_calls=tf.data.AUTOTUNE,deterministic=False)

train_dataset = train_dataset.map(train_preprocess, num_parallel_calls=tf.data.AUTOTUNE,deterministic=False)

train_dataset = train_dataset.map(dct_preprocess, num_parallel_calls=tf.data.AUTOTUNE,deterministic=False)

train_dataset = train_dataset.batch(batch_size)

train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE).cache()

The architecture of the cross domain model concatenates the output of spatial domain model (y1) and frequency domain model (y2) into single input y as shown in the code block below. It is then followed by a dense layer and output layer. 

## add the model layers

y = tf.keras.layers.Concatenate()([y1,y2])

y = Dropout(0.5)(y)

y = Dense(64)(y)

y = LeakyReLU(alpha=0.1)(y)

y = Dropout(0.5)(y)

y = Dense(1, activation = 'sigmoid')(y)

model = KerasModel(inputs = [x,u], outputs = y)

#early stopping to monitor the validation loss and avoid overfitting

early_stop = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10, restore_best_weights=True)

#reducing learning rate on plateau

rlrop = ReduceLROnPlateau(monitor='val_loss', mode='min', patience= 5, factor= 0.5, min_lr= 1e-6, verbose=1)

model.compile(loss='binary_crossentropy',optimizer="adam",metrics=['accuracy'])

## References

https://arxiv.org/abs/1809.00888

https://en.wikipedia.org/wiki/Discrete_cosine_transform

https://en.wikipedia.org/wiki/Discrete_cosine_transform