In [1]:
#### Articles Used to Generate Code
#https://towardsdatascience.com/eigenfaces-recovering-humans-from-ghosts-17606c328184
#https://machinelearningmastery.com/face-recognition-using-principal-component-analysis/

In [2]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [3]:
import os
import cv2
import dlib
from google.colab.patches import cv2_imshow
import numpy as np 
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import librosa
from scipy.io import wavfile
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import pandas as pd

In [4]:
SYM_PATH = '/content/drive/MyDrive/DeepFakeDetection'
%cd $SYM_PATH
%pip install -e .

/content/drive/MyDrive/DeepFakeDetection
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Obtaining file:///content/drive/MyDrive/DeepFakeDetection
Installing collected packages: DeepFake
  Running setup.py develop for DeepFake
Successfully installed DeepFake-0.1.0


In [5]:
from packages.DlibManager import DlibManager

predictor_path = '/content/drive/MyDrive/DeepFakeDetection/model/shape_predictor_68_face_landmarks.dat'
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(predictor_path)

In [6]:
#load the metadata
metadata = pd.read_csv('/content/drive/MyDrive/DeepFakeDetection/data/FakeAVCeleb_v1.2/meta_data.csv')
metadata = metadata[(metadata['method']=='real') | (metadata['method']=='wav2lip')]
metadata = metadata.rename(columns={'Unnamed: 9':'full_path'})
metadata['full_path'] = metadata['full_path'].str.replace('FakeAVCeleb/', '/content/drive/MyDrive/DeepFakeDetection/data/FakeAVCeleb_v1.2/')
metadata['full_path'] = metadata['full_path'] + '/' + metadata['path']

In [7]:
metadata.head()

Unnamed: 0,source,target1,target2,method,category,type,race,gender,path,full_path
0,id00076,-,-,real,A,RealVideo-RealAudio,African,men,00109.mp4,/content/drive/MyDrive/DeepFakeDetection/data/...
1,id00166,-,-,real,A,RealVideo-RealAudio,African,men,00010.mp4,/content/drive/MyDrive/DeepFakeDetection/data/...
2,id00173,-,-,real,A,RealVideo-RealAudio,African,men,00118.mp4,/content/drive/MyDrive/DeepFakeDetection/data/...
3,id00366,-,-,real,A,RealVideo-RealAudio,African,men,00118.mp4,/content/drive/MyDrive/DeepFakeDetection/data/...
4,id00391,-,-,real,A,RealVideo-RealAudio,African,men,00052.mp4,/content/drive/MyDrive/DeepFakeDetection/data/...


In [8]:
training_ids = np.random.choice(metadata.source.unique(), 400, replace=False)
testing_ids = np.array(metadata[~metadata['source'].isin(training_ids)]['source'].unique())

In [9]:
training_videos_details = []
failed_training_videos_dlib = []
for idx, training_id in enumerate(training_ids):
  print(f'Video #{idx+1} out of {len(training_ids)}')

  video_path = metadata[(metadata['source'] == training_id) & (metadata['method'] == 'real')]['full_path'].values[0]
  video = cv2.VideoCapture(video_path)
  audio_path = metadata[(metadata['source'] == training_id) & (metadata['method'] == 'real')]['full_path'].values[0].replace('.mp4', '.wav')
  audio, sample_rate = librosa.load(audio_path)

  try:
    dlib_video = DlibManager(predictor, detector, video)
    training_videos_details.append((training_id, dlib_video.lip_frames, audio, sample_rate))

  except:
    print(f'Failed to Upload: {training_id}')
    failed_training_videos_dlib

  video.release()


  

Video #1 out of 400
Video #2 out of 400
Video #3 out of 400
Video #4 out of 400
Video #5 out of 400
Video #6 out of 400
Video #7 out of 400
Video #8 out of 400
Video #9 out of 400
Failed to Upload: id01178
Video #10 out of 400
Video #11 out of 400
Video #12 out of 400
Video #13 out of 400
Video #14 out of 400
Failed to Upload: id07039
Video #15 out of 400
Video #16 out of 400
Video #17 out of 400
Video #18 out of 400
Video #19 out of 400
Video #20 out of 400
Failed to Upload: id00021
Video #21 out of 400
Video #22 out of 400
Video #23 out of 400
Video #24 out of 400
Video #25 out of 400
Video #26 out of 400
Video #27 out of 400
Video #28 out of 400
Video #29 out of 400
Video #30 out of 400
Video #31 out of 400
Failed to Upload: id04564
Video #32 out of 400
Video #33 out of 400
Video #34 out of 400
Failed to Upload: id06428
Video #35 out of 400
Video #36 out of 400
Video #37 out of 400
Failed to Upload: id01392
Video #38 out of 400
Video #39 out of 400
Video #40 out of 400
Video #41 out

In [10]:
#video_path = '/content/drive/MyDrive/DeepFakeDetection/data/FakeAVCeleb_v1.2/RealVideo-RealAudio/African/men/id00076/00109.mp4'
#video = cv2.VideoCapture(video_path)

#predictor_path = '/content/drive/MyDrive/DeepFakeDetection/model/shape_predictor_68_face_landmarks.dat'
#detector = dlib.get_frontal_face_detector()
#predictor = dlib.shape_predictor(predictor_path)

In [11]:
#audio_path = '/content/drive/MyDrive/DeepFakeDetection/data/FakeAVCeleb_v1.2/RealVideo-RealAudio/African/men/id00076/00109.wav'
#audio, sample_rate = librosa.load(audio_path)

In [12]:
#vid1_dlib = DlibManager(predictor, detector, video)

In [13]:
def preprocess_lips(lip_frames, height=90, width=70):

  lip_frames_resized = []

  for frame in lip_frames:
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    resized_frame = cv2.resize(frame, (height, width))
    lip_frames_resized.append(resized_frame)

  lip_frames_resized = np.array(lip_frames_resized)
  lip_frames_resized = lip_frames_resized.reshape(lip_frames_resized.shape[0], lip_frames_resized.shape[1]*lip_frames_resized.shape[2])

  return lip_frames_resized


In [14]:
#resize all the lip frames to 70 x 90
#lip_frames_resized = []
#for frame in vid1_dlib.lip_frames:
#  frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
#  resized_frame = cv2.resize(frame, (90, 70))
#  lip_frames_resized.append(resized_frame)
#lip_frames_resized = np.array(lip_frames_resized)

In [15]:
def generate_training_data(training_video_details, num_components):
  
  lips_data = None
  X  = None
  y = None

  for idx, training_video_data in enumerate(training_video_details):
    if len(training_video_data) > 4:
      for item in training_video_data:
        print(item)
    id, lip_frames, audio, sample_rate = training_video_data

    lip_frames_resized = preprocess_lips(lip_frames)
    #lip_frames_resized = lip_frames_resized.reshape(lip_frames_resized.shape[0], lip_frames_resized.shape[1]*lip_frames_resized.shape[2])
    
    row_count = lip_frames_resized.shape[0]

    mfcc_features = librosa.feature.mfcc(y=audio, hop_length=int(sample_rate*librosa.get_duration(audio)/row_count)).T[:row_count, :]

    if isinstance(lips_data, type(None)):
      lips_data = lip_frames_resized
      y = mfcc_features
    else:
      lips_data = np.vstack((lips_data, lip_frames_resized))
      y = np.vstack((y, mfcc_features))

  
  lips_data_mean = np.mean(lips_data, axis=0)
  lips_data_centered = lips_data - lips_data_mean

  pca = PCA()
  pca_lip_frames = pca.fit_transform(lips_data_centered)

  eigenfaces = pca.components_[:num_components]
  weights = np.dot(lips_data_centered, eigenfaces.T)

  X = weights

  return X, y, lips_data_mean, pca


In [None]:
X, y, lips_data_mean, pca = generate_training_data(training_videos_details, 10)

In [None]:
def train_models(X, y):

  num_coef = y.shape[1]
  models = []
  for i in range(num_coef):
    model = LinearRegression().fit(X, y[:, i])
    models.append(model)

  return models

In [None]:
models = train_models(X, y)

In [None]:
testing_videos_details = []
failed_testing_videos_dlib = []
for idx, testing_id in enumerate(testing_ids):
  print(f'Video #{idx+1} out of {len(testing_ids)}')

  real_video_path = metadata[(metadata['source'] == testing_id) & (metadata['method'] == 'real')]['full_path'].values[0]
  fake_video_path = metadata[(metadata['source'] == testing_id) & (metadata['method']=='wav2lip')].sample(1)['full_path'].values[0]

  real_video = cv2.VideoCapture(real_video_path)
  fake_video = cv2.VideoCapture(fake_video_path)

  real_audio_path = real_video_path.replace('.mp4', '.wav')
  fake_audio_path = fake_video_path.replace('.mp4', '.wav')

  real_audio, real_sample_rate = librosa.load(real_audio_path)
  fake_audio, fake_sample_rate = librosa.load(fake_audio_path)

  try:
    real_dlib_video = DlibManager(predictor, detector, real_video)
    fake_dlib_video = DlibManager(predictor, detector, fake_video)
    testing_videos_details.append((testing_id, real_dlib_video.lip_frames, fake_dlib_video.lip_frames, real_audio, fake_audio, real_sample_rate, fake_sample_rate))

  except:
    print(f'Failed to Upload: {testing_id}')
    testing_videos_details.append((real_video_path, fake_video_path))

  real_video.release()
  fake_video.release()

In [None]:
def eval_test_data(testing_video_details, pca, lips_data_mean, models, num_components=10, debug_mode=False):

  real_video_error = []
  fake_video_error = []

  for idx, testing_video in enumerate(testing_video_details):

    real_mfcc_errors = []
    fake_mfcc_errors = []

    testing_id, real_lip_frames, fake_lip_frames, real_audio, fake_audio, real_sample_rate, fake_sample_rate = testing_video

    real_lip_frames_resized = preprocess_lips(real_lip_frames)
    fake_lip_frames_resized = preprocess_lips(fake_lip_frames)

    if debug_mode:
      print('LIP FRAME SHAPES')
      print('Real Lip Frames: ', real_lip_frames_resized.shape)
      print('Fake Lip Frames: ', fake_lip_frames_resized.shape)
      print('--------------------------------------------')

    real_row_count = real_lip_frames_resized.shape[0]
    fake_row_count = fake_lip_frames_resized.shape[0]

    real_mfcc_features = librosa.feature.mfcc(y=audio, hop_length=int(real_sample_rate*librosa.get_duration(real_audio)/real_row_count)).T[:real_row_count, :]
    fake_mfcc_features = librosa.feature.mfcc(y=audio, hop_length=int(fake_sample_rate*librosa.get_duration(fake_audio)/fake_row_count)).T[:fake_row_count, :]

    if debug_mode:
      print('ORIGINAL MFCC FRAME SHAPES')
      print('Real MFCC Frames: ', librosa.feature.mfcc(y=audio, hop_length=int(real_sample_rate*librosa.get_duration(real_audio)/real_row_count)).T.shape)
      print('Fake MFCC Frames: ', librosa.feature.mfcc(y=audio, hop_length=int(fake_sample_rate*librosa.get_duration(fake_audio)/fake_row_count)).T.shape)
      print('--------------------------------------------')


    real_lips_centered = real_lip_frames_resized - lips_data_mean
    fake_lips_centered = fake_lip_frames_resized - lips_data_mean

    real_pca_lip_frames = pca.transform(real_lips_centered)
    fake_pca_lip_frames = pca.transform(fake_lips_centered)


    eigenfaces = pca.components_[:num_components]
    real_weights = np.dot(real_lips_centered, eigenfaces.T)
    fake_weights = np.dot(fake_lips_centered, eigenfaces.T)

    X_real = real_weights
    X_fake = fake_weights
    y_real = real_mfcc_features
    y_fake = fake_mfcc_features


    if debug_mode:
      print('TRAINING DATA SHAPES')
      print('X Real Shape:', X_real.shape)
      print('X Fake Shape:', X_fake.shape)
      print('y Real Shape:', y_real.shape)
      print('y Fake Shape:', y_fake.shape)
      print('--------------------------------------------')

    for idx, model in enumerate(models):
      y_real_pred = model.predict(X_real)
      y_fake_pred = model.predict(X_fake)

      real_err = np.sqrt(mean_squared_error(y_true=y_real[:, idx], y_pred=y_real_pred))/len(y_real_pred)
      fake_err = np.sqrt(mean_squared_error(y_true=y_fake[:, idx], y_pred=y_fake_pred))/len(y_fake_pred)

      real_mfcc_errors.append(real_err)
      fake_mfcc_errors.append(fake_err)

    real_video_error.append(np.mean(real_mfcc_errors))
    fake_video_error.append(np.mean(fake_mfcc_errors))


    return real_video_error, fake_video_error


In [None]:
real_video_error, fake_video_error = eval_test_data(testing_videos_details, pca, lips_data_mean, models, 10, True)

In [None]:
#reshape_lip_frames = lip_frames_resized.reshape(lip_frames_resized.shape[0], lip_frames_resized.shape[1]*lip_frames_resized.shape[2])


In [None]:
#scalar = StandardScaler()
#centered_lip_frames = scalar.fit_transform(reshape_lip_frames)

#avg_lip_frames = np.mean(reshape_lip_frames, axis=0)
#centered_lip_frames = reshape_lip_frames - avg_lip_frames

In [None]:
#pca = PCA()
#pca_lip_frames = pca.fit_transform(centered_lip_frames)

In [None]:
#random_samples = np.random.choice(np.arange(0, reshape_lip_frames.shape[0]), 5)
#random_samples

In [None]:
def reconstruction(pca, num_components, centered_data, mean, image_idx, height=70, width=90):
  eigenfaces = pca.components_[:num_components]
  samples, features = centered_data.shape
  weights = np.dot(centered_data, eigenfaces.T)
  recovered_image = (np.dot(weights[image_idx,:], eigenfaces)+mean).reshape(height, width)
  return recovered_image



In [None]:
fig, axes = plt.subplots(5,2,sharex=True,sharey=True,figsize=(8,10))
fig.suptitle('Eigenface Count: 1', fontsize=16)
for idx, sample in enumerate(random_samples):
  orig = vid1_dlib.lip_frames[sample]
  reconstructed = reconstruction(pca, 1, centered_lip_frames, avg_lip_frames, sample)
  axes[idx, 0].imshow(orig)
  axes[idx, 1].imshow(reconstructed, cmap="gray")
plt.show()
  

In [None]:
fig, axes = plt.subplots(5,2,sharex=True,sharey=True,figsize=(8,10))
fig.suptitle('Eigenface Count: 10', fontsize=16)
for idx, sample in enumerate(random_samples):
  orig = vid1_dlib.lip_frames[sample]
  reconstructed = reconstruction(pca, 10, centered_lip_frames, avg_lip_frames, sample)
  axes[idx, 0].imshow(orig)
  axes[idx, 1].imshow(reconstructed, cmap="gray")
plt.show()
  

In [None]:
mfcc_features = librosa.feature.mfcc(y=audio, hop_length=int(sample_rate*librosa.get_duration(audio)/251)).T[:251, :]

In [None]:
coef_one_mfcc = mfcc_features[:, 0].reshape(251, 1)

In [None]:
weights.shape

In [None]:
coef_one_mfcc.shape


In [None]:
eigenfaces = pca.components_[:10]
samples, features = centered_lip_frames.shape
weights = np.dot(centered_lip_frames, eigenfaces.T)

train_test_data = np.hstack((weights, coef_one_mfcc.reshape(251, 1)))

In [None]:
X_train, X_test, y_train, y_test = train_test_split(weights, coef_one_mfcc, test_size=0.15, random_state=12)

In [None]:
model = LinearRegression()
model.fit(X_train, y_train)

pred = model.predict(X_test)
err = np.sqrt(mean_squared_error(y_true=y_test, y_pred=pred))/len(pred)
print(err)

In [None]:
def train(X, y)

In [None]:
def evaluate(X_test, y_):
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, random_state=12)

  model = LinearRegression()
  model.fit(X_train, y_train)

  pred = model.predict(X_test)
  err = np.sqrt(mean_squared_error(y_true=y_test, y_pred=pred))/len(pred)

  return pred, err



In [None]:
coef_one_mfcc