<a href="https://colab.research.google.com/github/lartmann/deepfake_detection_project/blob/main/NVB_main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [1]:
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras import layers
import numpy as np
import pandas as pd
import cv2
import os

# Import matplotlib libraries
from matplotlib import pyplot as plt

In [None]:
# import helper functions from my github
!wget https://raw.githubusercontent.com/lartmann/deepfake_detection_project/main/ComputerVision/helper_functions_CV.py
from helper_functions_CV import plot_loss_curves, plot_accuracy

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Computer Vision

### F2F: Load numpy arrays and combine to tensor

In [5]:
# Set Hyperparameters
BATCH_SIZE = 32 # small BatchSize to aviod ResourceExhaustionError
SHUFFLE_BUFFER_SIZE = 42
NUMBER_VIDEOS = 50

In [None]:
f2f_path = '/content/drive/MyDrive/nvb/F2F'

count = 0
for np_file in os.listdir(f2f_path):
  if np_file.endswith('mp4.npz'):
    count += 1
    if count == 1:
      file_np = np.load(f'{f2f_path}/{np_file}', allow_pickle=True)['data'] # Load file
      file_np = np.reshape(file_np, ( len(file_np)*10, 5, 300, 300, 3))
      file_tensor = tf.convert_to_tensor(file_np) # convert np to tensor
      labels_tensor = tf.expand_dims(tf.convert_to_tensor(np.array([0] * len(file_np))), axis=1) # create labels
      f2f_data = tf.data.Dataset.from_tensor_slices((file_np, labels_tensor)) # create dataset from values and labels
      print(file_tensor.shape)
      print(labels_tensor.shape)
    elif count <= NUMBER_VIDEOS:
      file_np = np.load(f'{f2f_path}/{np_file}', allow_pickle=True)['data'] # Load file
      file_np = np.reshape(file_np, (len(file_np)*10, 5, 300, 300, 3))  
      file_tensor = tf.convert_to_tensor(file_np) # convert np to tensor
      labels_tensor = tf.expand_dims(tf.convert_to_tensor(np.array([0] * len(file_np))), axis=1) # create labels
      current = tf.data.Dataset.from_tensor_slices((file_np, labels_tensor)) # create dataset from values and labels
      f2f_data = f2f_data.concatenate(current) # append current dataset to the rest
    else:
      break
  

### original: load numpy arrays and combine to tensor

In [7]:
original_path = '/content/drive/MyDrive/nvb/original'
count = 0
for np_file in os.listdir(original_path):
  count += 1
  if count == 1:
    file_np = np.load(f'{original_path}/{np_file}', allow_pickle=True)['data'] # Load file
    file_np = np.reshape(file_np, (len(file_np)*10, 5, 300, 300, 3))
    file_tensor = tf.convert_to_tensor(file_np) # convert np to tensor
    labels_tensor = tf.expand_dims(tf.convert_to_tensor(np.array([1] * len(file_np))), axis=1) # create labels
    original_data = tf.data.Dataset.from_tensor_slices((file_np, labels_tensor)) # create dataset
  elif count <= NUMBER_VIDEOS:
    file_np = np.load(f'{original_path}/{np_file}', allow_pickle=True)['data'] # Load file
    file_np = np.reshape(file_np, (len(file_np)*10, 5, 300, 300, 3))
    file_tensor = tf.convert_to_tensor(file_np) # convert np to tensor
    labels_tensor = tf.expand_dims(tf.convert_to_tensor(np.array([1] * len(file_np))), axis=1) # create labels
    current = tf.data.Dataset.from_tensor_slices((file_np, labels_tensor)) # create dataset of current tensor
    original_data = original_data.concatenate(current) # append current dataset to existing 
  else:
    break

In [8]:
# split data into test and train 

train_data_f = f2f_data.take(int(len(f2f_data)*0.8))
test_data_f = f2f_data.skip(int(len(f2f_data)*0.8))

train_data_o = original_data.take(int(len(original_data)*0.8)) 
test_data_o = original_data.skip(int(len(original_data)*0.8))


In [9]:
train_data = train_data_f.concatenate(train_data_o).shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)
test_data = test_data_f.concatenate(test_data_o).shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)

print(f"Number of clips in training data: {train_data.cardinality().numpy() * BATCH_SIZE}")
print(f"Number of clips in test data: {test_data.cardinality().numpy() * BATCH_SIZE}")

Number of clips in training data: 8960
Number of clips in test data: 2240


# Training

In [10]:
from sklearn.utils import validation
from tensorflow.keras.layers import Dense, Flatten, Conv3D, MaxPool3D, Activation, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import Sequential

## Base Model
basic CNN Model

In [11]:
tf.random.set_seed(42)

model_1 = Sequential([
  Input(shape=[5, 300, 300, 3], dtype=tf.float16),
  Flatten(input_shape = (5,300,300,3)),
  Dense(10, activation="relu"), # binary activation output
  Dense(100, activation="relu"),
  Dense(100, activation="relu"),
  Dense(1, activation="sigmoid",)
])

# Compile the model
model_1.compile(loss="binary_crossentropy",
              optimizer=Adam(),
              metrics=["accuracy"])

In [None]:
# Fit the model
history_1 = model_1.fit(train_data,
                        validation_data = test_data,
                        batch_size=BATCH_SIZE,
                        epochs=10)
                        #validation_split=0.

In [None]:
plot_accuracy(history_1)

## 3D CNN

In [None]:
# CNN model for video detection
model_2 = Sequential([
  tf.keras.layers.BatchNormalization(),
  Conv3D(16,(3,3,3), activation="relu", padding = "same"),
  MaxPool3D(pool_size=(2, 2, 2)),
  tf.keras.layers.BatchNormalization(),
  Conv3D(32,(3,3,3), activation="relu", padding = "same"),
  Conv3D(16,(3,1,1), activation="relu", padding = "same"),
  tf.keras.layers.BatchNormalization(),
  Flatten(input_shape = (5,300,300,3)),
  Dense(256, activation="relu"),
  Dense(1, activation="sigmoid")
])

# Compile the model
model_2.compile(loss="binary_crossentropy",
              optimizer=Adam(),
              metrics=["accuracy"])

In [None]:
history_2 = model_2.fit(train_data,
                        validation_data = test_data,
                        batch_size=BATCH_SIZE,
                        epochs=5)

In [None]:
tf.keras.backend.clear_session()

In [None]:
plot_accuracy(history_2)

## Movinet


In [None]:
!pip install remotezip tqdm opencv-python==4.5.2.52 opencv-python-headless==4.5.2.52 tf-models-official

In [None]:
import tqdm
import random
import pathlib
import itertools
import collections

import cv2
import numpy as np
import remotezip as rz
import seaborn as sns
import matplotlib.pyplot as plt

import keras
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy

# Import the MoViNet model from TensorFlow Models (tf-models-official) for the MoViNet model
from official.projects.movinet.modeling import movinet
from official.projects.movinet.modeling import movinet_model

In [None]:
gru = layers.GRU(units=4, return_sequences=True, return_state=True)

inputs = tf.random.normal(shape=[1, 10, 8]) # (batch, sequence, channels)

result, state = gru(inputs) # Run it all at once

In [None]:
first_half, state = gru(inputs[:, :5, :])   # run the first half, and capture the state
second_half, _ = gru(inputs[:,5:, :], initial_state=state)  # Use the state to continue where you left off.

print(np.allclose(result[:, :5,:], first_half))
print(np.allclose(result[:, 5:,:], second_half))

True
True


In [None]:
model_id = 'a5'
resolution = 300

tf.keras.backend.clear_session()

backbone = movinet.Movinet(model_id=model_id)
backbone.trainable = False

model = movinet_model.MovinetClassifier(backbone=backbone, num_classes = 600)
model.build([None, None, None, None, 3])




In [None]:
# Load pre-trained weights
!wget https://storage.googleapis.com/tf_model_garden/vision/movinet/movinet_a5_base.tar.gz -O movinet_a5_base.tar.gz -q
!tar -xvf movinet_a5_base.tar.gz

checkpoint_dir = f'movinet_{model_id}_base'
checkpoint_path = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint = tf.train.Checkpoint(model=model)
status = checkpoint.restore(checkpoint_path)
status.assert_existing_objects_matched()

In [None]:
def build_classifier(batch_size, num_frames, resolution, backbone, num_classes):
  """Builds a classifier on top of a backbone model."""
  model = movinet_model.MovinetClassifier(
      backbone=backbone,
      num_classes=num_classes
      )
  model.build([batch_size, num_frames, resolution, resolution, 3])

  return model

In [None]:
movinet = build_classifier(32, 5, 300, backbone, 2)

movinet.trainable = True

# freeze all blocks except the last
for layer in movinet.layers[:-1]:
  layer.trainable = False



movinet.compile(loss="binary_crossentropy",
              optimizer=Adam(learning_rate = 0.01),
              metrics=["accuracy"])

In [None]:
history_3 = model.fit(train_data,
                        validation_data = test_data,
                        epochs=5)

In [None]:
plot_accuracy(history_3)