# Deepfake Detection Test

**Key Resources:**

- [Kaggle 1st place solution](https://www.kaggle.com/competitions/deepfake-detection-challenge/discussion/145721#818975)
- [Kaggle 3rd place solution](https://www.kaggle.com/competitions/deepfake-detection-challenge/discussion/158158#884330)

In [1]:
import tensorflow as tf
import keras
from platform import python_version

python_version(), tf.__version__, keras.__version__

('3.8.13', '2.8.0', '2.8.0')

In [8]:
import cv2
import json
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline
import numpy as np
import pandas as pd
import pickle
import os
import time

from IPython.display import Video
from pathlib import Path
from typing import List

DATA_PATH = "../datasets/Kaggle-Deepfake-Detection-Challenge/"

TRAIN_PATH = DATA_PATH + "train_sample_videos"
TEST_PATH = DATA_PATH + "test_videos"
LABEL_PATH = DATA_PATH + "train_sample_videos/metadata.json"

In [9]:
!pwd

/Users/michellesun/Dropbox/dev/IN_PROGRESS/deepfake-detection/model/notebooks


## Load Dataset

### Labels

In [10]:
# Labels
with open(LABEL_PATH, 'r', encoding='utf-8') as f:
  labels_dict = json.load(f)

# Preview first line of labels_dict
print(next(iter(labels_dict.items())))

# Restructure dictionary for pandas dataframe input {'col':[1, 2], 'col2':[3, 4]}
filenames = list(labels_dict.keys())
labels = [ labels_dict[name]['label'] for name in filenames ]
splits = [ labels_dict[name]['split'] for name in filenames ]
originals = [ labels_dict[name]['original'] for name in filenames ]
labels_dict = {
  'filename': filenames,
  'label': labels,
  'split': splits,
  'original': originals
}

# 
labels = pd.DataFrame(labels_dict)
labels.head(10)

('aagfhgtpmv.mp4', {'label': 'FAKE', 'split': 'train', 'original': 'vudstovrck.mp4'})


Unnamed: 0,filename,label,split,original
0,aagfhgtpmv.mp4,FAKE,train,vudstovrck.mp4
1,aapnvogymq.mp4,FAKE,train,jdubbvfswz.mp4
2,abarnvbtwb.mp4,REAL,train,
3,abofeumbvv.mp4,FAKE,train,atvmxvwyns.mp4
4,abqwwspghj.mp4,FAKE,train,qzimuostzz.mp4
5,acifjvzvpm.mp4,FAKE,train,kbvibjhfzo.mp4
6,acqfdwsrhi.mp4,FAKE,train,ccfoszqabv.mp4
7,acxnxvbsxk.mp4,FAKE,train,fjlyaizcwc.mp4
8,acxwigylke.mp4,FAKE,train,ffcwhpnpuw.mp4
9,aczrgyricp.mp4,FAKE,train,slwkmefgde.mp4


In [11]:
labels.shape

(400, 4)

In [12]:
# Validate unique values
print("Unique inputs:", len(labels['filename'].unique()))

Unique inputs: 400


In [13]:
labels['original'].value_counts()

atvmxvwyns.mp4    6
meawmsgiti.mp4    6
kgbkktcjxf.mp4    5
qeumxirsme.mp4    5
gjypopglvi.mp4    4
                 ..
znjupdqnwo.mp4    1
hyhjfdxqxy.mp4    1
kbvibjhfzo.mp4    1
lmlyvmfbbe.mp4    1
gnyspcpbhd.mp4    1
Name: original, Length: 209, dtype: int64

In [14]:
labels['split'].value_counts() # all train

train    400
Name: split, dtype: int64

In [15]:
labels['label'].value_counts()

FAKE    323
REAL     77
Name: label, dtype: int64

**Observation:** Data imbalance with significantly more FAKE videos than real. Potential bias towards FAKE videos.

### Load Deepfake Dataset

In [16]:
# Helper Functions

def isdir(fullpath: str) -> bool:
    """Returns true if is a file, and false if otherwise (a directory)."""
    try:
        if os.path.exists(fullpath):
            if os.path.isdir(fullpath):
                return True
            return False
    except FileNotFoundError:
        print(f'{fullpath} does not exist.')

def iterate_files(directory: str) -> List:
    """Iterates over the files in the given directory and returns a list of 
    found files."""
    files = []
    for file in os.listdir(directory):
        filename = os.fsdecode(file)
        fullpath = os.path.join(directory, filename)
        if (isdir(fullpath)):
            files += iterate_files(fullpath)
        else:
            files.append(fullpath)
    return files
  
def get_filename(source: str):
    """Returns the filename given a full path."""
    return os.path.basename(source)

In [17]:
# dataset currently 
dataset = iterate_files(TRAIN_PATH)

# Removes label (metadata.json) from dataset
if LABEL_PATH in dataset:
  dataset.remove(LABEL_PATH)

# Sorts alphabetically
dataset = sorted(dataset)

In [18]:
random_video = dataset[np.random.random_integers(1,len(dataset))]
EXPORT_PATH = "extracted_frames"

def get_frames_evenly(video_source: str, n_frames: int) -> List[str]:
  """
  Exports n_frames number of frames from the video_source distributed
  evenly across the video.
  
  :return: list of paths for the exported frames
  """
  vidcap = cv2.VideoCapture(video_source)
  total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) 
  if n_frames > 0 and n_frames <= total_frames:
    step = int(total_frames/n_frames)
    frames = np.arange(1,total_frames+1,step)
  else:
    frames = np.arange(1,total_frames,1)
  
  video_name = os.path.splitext(get_filename(video_source))[0] + "/"
  export_loc = os.path.join(EXPORT_PATH, video_name)
  Path(export_loc).mkdir(parents=True, exist_ok=True)
  exported_files = []
  for i in frames:
    vidcap.set(1,i)
    success, image = vidcap.read()
    exported_path = os.path.join(export_loc, f"frame{i}.jpg")
    cv2.imwrite(exported_path, image)
    exported_files.append(exported_path)
  
  print(f"Successfully extracted {n_frames} frames from {get_filename(video_source)}.")
  return exported_files

random_video = dataset[100]
extracted_frames = get_frames_evenly(random_video, n_frames=10)

Video(random_video, width=500)

  random_video = dataset[np.random.random_integers(1,len(dataset))]


Successfully extracted 10 frames from benmsfzfaz.mp4.


In [19]:
extracted_frames

['extracted_frames/benmsfzfaz/frame1.jpg',
 'extracted_frames/benmsfzfaz/frame31.jpg',
 'extracted_frames/benmsfzfaz/frame61.jpg',
 'extracted_frames/benmsfzfaz/frame91.jpg',
 'extracted_frames/benmsfzfaz/frame121.jpg',
 'extracted_frames/benmsfzfaz/frame151.jpg',
 'extracted_frames/benmsfzfaz/frame181.jpg',
 'extracted_frames/benmsfzfaz/frame211.jpg',
 'extracted_frames/benmsfzfaz/frame241.jpg',
 'extracted_frames/benmsfzfaz/frame271.jpg']

## Facial Detections

Using deepface facial detector: https://github.com/serengil/deepface

In [20]:
from deepface import DeepFace

def get_bounding_box(image_path):
    """Returns the bounding box using the deepface analyze function."""
    actions=['age','gender','race','emotion']
    res = DeepFace.analyze(img_path=image_path, actions=['emotion'])
    res = res['region']
    x, y, w, h = res['x'], res['y'], res['w'], res['h']
    return x, y, w, h

type(get_bounding_box(extracted_frames[0])[0])

ModuleNotFoundError: No module named 'deepface'

In [None]:
from mtcnn import MTCNN

detector = MTCNN()

img_path = extracted_frames[0]
img = cv2.imread(img_path)
detector.detect_faces(img)

In [None]:
from PIL import Image

file = extracted_frames[0]

img = cv2.imread(file)
mask = np.array(Image.open(file))

x, y, w, h = get_bounding_box(file)
out = cv2.rectangle(img.copy(), (x, y), (x+w, y+h), (0, 255, 0), 2)

# Show image with bounding box
cv2.imshow('img_' + 'test', out)

# Show mask
cv2.imshow('mask', mask)
cv2.waitKey(0)
cv2.destroyAllWindows()
cv2.waitKey(1) # must include to prevent not responding on Mac

# Drawing Face on video test

In [3]:
thresh = 0.5
video_path = dataset[0]
print(get_filename(video_path))

vc = cv2.VideoCapture(video_path)

scale_percent = 50
width = int(vc.get(cv2.CAP_PROP_FRAME_WIDTH) * scale_percent/100)
height = int(vc.get(cv2.CAP_PROP_FRAME_HEIGHT) * scale_percent/100)
dim = (width, height)

try:
    while True:
        success, frame = vc.read()
        if success:
            frame = cv2.resize(frame, dim, interpolation=cv2.INTER_AREA)
            result = detector.detect_faces(frame)[0]
            x, y, w, h = result['box'][0], result['box'][1], result['box'][2], result['box'][3]
            out = cv2.rectangle(frame.copy(), (x, y), (x+w, y+h), (0, 255, 0), 2)
            cv2.imshow('frame', out)
            if cv2.waitKey(1) and 0xFF == ord('q'):
                break
        else:
            break
except KeyboardInterrupt:
    pass
finally:
    vc.release()
    cv2.destroyAllWindows()
    cv2.waitKey(1)

NameError: name 'dataset' is not defined

In [31]:
cv2.destroyAllWindows()
cv2.waitKey(1)

-1

In [33]:
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 90]
result, encimg = cv2.imencode(img_path, img, encode_param)

In [47]:
cap = cv2.VideoCapture(video_path)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)