<a href="https://colab.research.google.com/github/the-chosen-wan/deepfake-detection/blob/main/Copy_of_multi_modal_transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.keras import layers
from tensorflow.keras.models import Model
import os
import glob

In [None]:
class Patch(layers.Layer):
    def __init__(self,patch_size):
        super(Patch,self).__init__()
        self.patch_size=patch_size
    
    def build(self,input_shape):
      self.H = input_shape[1]
      self.B = input_shape[2]
      self.C = input_shape[-1]
    
    def call(self,patch):
      batch_size = tf.shape(patch)[0]
      num_patches = (self.B//self.patch_size)**2
      pad = [[0,0],[0,0]]
      patches = tf.space_to_batch_nd(patch,[self.patch_size,self.patch_size],pad)
      patches = tf.split(patches,self.patch_size**2,0)
      patches = tf.stack(patches,3)
      patches = tf.reshape(patches,[batch_size,(self.H//self.patch_size)**2,self.patch_size,self.patch_size,self.C])
      patches = tf.reshape(patches,[batch_size,num_patches,-1])
      return patches

In [None]:
class inverse_patch(layers.Layer):
  def __init__(self,target_shape,patch_size):
    super(inverse_patch,self).__init__()
    self.H = target_shape[0]
    self.B = target_shape[1]
    self.C = target_shape[2]
    self.patch_size = patch_size
  
  def call(self,x):
    batch_size = tf.shape(x)[0]
    num_patches = tf.shape(x)[1]
    pad = [[0,0],[0,0]]
    p = self.patch_size
    h = self.H
    patches = tf.reshape(x,[batch_size,num_patches,p,p,self.C])
    patches_proc = tf.reshape(patches,[batch_size,h//p,h//p,p*p,self.C])
    patches_proc = tf.split(patches_proc,p*p,3)
    patches_proc = tf.stack(patches_proc,axis=0)
    patches_proc = tf.reshape(patches_proc,[p*p*batch_size,h//p,h//p,self.C])
    reconstructed = tf.compat.v1.batch_to_space_nd(patches_proc,[p, p],pad)
    return reconstructed

In [None]:
class encoding(layers.Layer):
    def __init__(self,num_patches,projection_dim):
        super(encoding,self).__init__()
        self.num = num_patches
        self.embed = layers.Embedding(input_dim = num_patches , output_dim = projection_dim)
    
    def call(self,patch):
        pos = tf.range(start = 0,limit = self.num , delta =1)
        return patch + self.embed(pos)

In [None]:
class simple_attention(layers.Layer):
  def __init__(self,patch_size):

    super(simple_attention,self).__init__()
    self.patch_size = patch_size
    self.patch_encoder = Patch(self.patch_size)
    
  def build(self,input_shape):
    H = input_shape[1]
    B = input_shape[2]
    C = input_shape[-1]
    projection_dim = C*(self.patch_size)**2
    norm = tf.math.sqrt(tf.cast(projection_dim,dtype='float32'))


    self.num_patches = ((B)//self.patch_size)**2
    self.query_conv = layers.Conv2D(C,kernel_size=1)
    self.key_conv = layers.Conv2D(C,kernel_size=1)
    self.value_conv = layers.Conv2D(C,kernel_size=1)
    self.embed = encoding(self.num_patches,projection_dim)
    self.reshape = inverse_patch((H,B,C),self.patch_size)
    #self.sa = layers. MultiHeadAttention(key_dim = projection_dim,num_heads=1)
    self.sa = layers.Attention()
    self.normalizer = layers.Lambda(lambda x: x/norm)
  
  def call(self,x):
    query_block = self.query_conv(x)
    key_block = self.key_conv(x)
    value_block = self.value_conv(x)

    query_block = self.patch_encoder(query_block)
    query_block = self.embed(query_block)

    key_block = self.patch_encoder(key_block)
    key_block = self.embed(key_block)

    value_block = self.patch_encoder(value_block)
    value_block = self.embed(value_block)

    #out = self.sa(query=query_block,value=value_block,key=key_block)
    out = self.sa([query_block,value_block,key_block])
    out = self.normalizer(out)
    out = self.reshape(out)
    return out

In [None]:
class multi_modal_attention(layers.Layer):
  def __init__(self,l):
    super(multi_modal_attention,self).__init__()
    self.heads=[]
    for i in l:
      self.heads.append(simple_attention(i))
  
  def build(self,input_shape):
    C = input_shape[-1]
    self.conv = layers.Conv2D(C,kernel_size=3,padding='same')
  
  def call(self,x):
    out = self.heads[0](x)

    for layer in self.heads[1:]:
      t = layer(x)
      out = layers.Concatenate(axis=-1)([out,t])
    
    out = self.conv(out)
    return out

In [None]:
inp = layers.Input((256,256,3))

o1 = layers.Lambda(lambda x:x/255.0)(inp)
#o = layers.Conv2D(1,kernel_size=1)(o)
o1 = tf.cast(o1,dtype='complex64')
o1=  tf.signal.fft2d(o1)
o = tf.math.abs(o1)
o = multi_modal_attention([8,16,32,64,128])(o)

o = layers.Conv2D(16,3,activation='relu',padding='same')(o)
o = layers.BatchNormalization()(o)
o = layers.MaxPool2D((2,2))(o)

o = layers.Conv2D(32,3,activation='relu',padding='same')(o)
o = layers.BatchNormalization()(o)
o = layers.MaxPool2D((2,2))(o)
#o = multi_modal_attention([8,16,32,64])(o)

o = layers.Conv2D(64,3,activation='relu',padding='same')(o)
o = layers.BatchNormalization()(o)
o = layers.MaxPool2D((2,2))(o)

o = layers.Conv2D(128,3,activation='relu',padding='same')(o)
o = layers.BatchNormalization()(o)
o = layers.MaxPool2D((2,2))(o)

o = layers.Flatten()(o)
o = layers.Dense(2,activation='softmax')(o)

m = Model(inputs = inp,outputs=o)

In [None]:
m.summary()

Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 256, 256, 3)]     0         
                                                                 
 lambda_3 (Lambda)           (None, 256, 256, 3)       0         
                                                                 
 tf.cast_1 (TFOpLambda)      (None, 256, 256, 3)       0         
                                                                 
 tf.signal.fft2d_2 (TFOpLamb  (None, 256, 256, 3)      0         
 da)                                                             
                                                                 
 tf.math.real (TFOpLambda)   (None, 256, 256, 3)       0         
                                                                 
 multi_modal_attention_2 (mu  (None, 256, 256, 3)      983628    
 lti_modal_attention)                                      

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#GETS TRAIN FILES PATH
train_files = []
label_files= ['fake','real']
for x in os.walk('/content/drive/My Drive/data_fakesecond/train'):
    for y in glob.glob(os.path.join(x[0], '*.jpg')):
        train_files.append(y)
print(len(train_files))

15958


In [None]:
import cv2
#GETS IMAGE ARRAY OUT GIVEN AN IMAGE PATH
def get_input(path):
    im = cv2.imread(path)
    return(im)
 
#CREATES LABEL VECTORE [0,1] OR [1,0] BY EXPLOITING CLASS TYPE label_files IN FILE PATH 
def get_output( path, label_files,mode):
    if mode =='train':
      img_id = path.split('/')[-2]
    else:
      img_id = path.split('/')[-1].split('_')[0].lower()
    laba = []
    for label in label_files:
      if label == img_id:
        laba.append(1)
      else:
        laba.append(0)
    return laba

In [None]:
#GENERATOR FUNCTION TO PASS THE IMAGES AND LABELS TO model.fit FOR TRAINING
def image_generator(files, label_files, batch_size,mode, resize=(256,256)):
 
      while True:
          batch_paths  = np.random.choice(a  = files, 
                                          size = batch_size)
          batch_x = []
          batch_y = [] 
          
          for input_path in batch_paths:
              input = get_input(input_path)
              output = get_output(input_path, label_files,mode)
              if resize is not None:
                input = cv2.resize(input, resize)
              #input  = input[np.newaxis,:,:,:]
              batch_x.append(input)
              batch_y.append(output)
   
          batch_x = tf.convert_to_tensor(batch_x)
          batch_x = tf.cast(batch_x,dtype='float32')
          #batch_x = batch_x / 255.0
          batch_y = tf.convert_to_tensor(batch_y)
          #batch_x = patch_encoder(batch_x)

          yield batch_x, batch_y

In [None]:
m.compile('Adam',loss=tf.keras.losses.CategoricalCrossentropy(),metrics=['accuracy',tf.keras.metrics.AUC()])
m.fit(image_generator(train_files, label_files,mode='train', batch_size = 32),epochs=10,steps_per_epoch=30,verbose=1)

Epoch 1/10

KeyboardInterrupt: ignored

In [None]:
m.fit(image_generator(train_files, label_files,mode='train', batch_size = 32),epochs=10,steps_per_epoch=30,verbose=1)

Epoch 1/10
Epoch 2/10
Epoch 3/10

KeyboardInterrupt: ignored