In [None]:
#import packages
import glob
import shutil
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
import copy
import random

In [None]:
#HyperParams
im_size = 224
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

seqlen = 10
batchSize = 7
device = device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = "cpu"
curr_epoch = 1
random_state = 47
path_to_model = 'ckpts/checkpoint.pt'
video_files = glob.glob('/dataset/*.mp4')
print(f'Total videos: {len(video_files)}')

In [None]:
frame_count = []
for video_file in video_files:
  cap = cv2.VideoCapture(video_file)
  if(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))<(seqlen+10)):
    video_files.remove(video_file)
    continue
  frame_count.append(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)))
print("Total no of video: " , len(frame_count))
print('Average frame per video:',np.mean(frame_count))

In [None]:
from torchvision import transforms

# Function to pad frames with black pixels
def pad_frame(frame, target_size=(100, 100)):

    height, width, _ = frame.shape

    if width < target_size[1]:
        left_padding = (target_size[0] - width) // 2
        right_padding = target_size[0] - width - left_padding
        frame = cv2.copyMakeBorder(frame, 0, 0, left_padding, right_padding, cv2.BORDER_CONSTANT, value=(0, 0, 0))

    if height < target_size[0]:
        top_padding = (target_size[1] - height) // 2
        bottom_padding = target_size[1] - height - top_padding
        frame = cv2.copyMakeBorder(frame, top_padding, bottom_padding, 0, 0, cv2.BORDER_CONSTANT, value=(0, 0, 0))

    return cv2.resize(frame,target_size)

In [None]:
index_list = {
    "top" : [[164,172],[188,198],[282,292]],
    "nose_lips" : [[202,242],[242,266]]

    }

def extract_individual_facial_parts(frame, nparr):

    facial_parts_dict = {}

    for idx in index_list:
        concat = []
        for i in index_list[idx]:
            temp = nparr[i[0]:i[1]]
            for val in temp:
                concat.append(val)

        f_set = concat
        min_x, min_y = max_x, max_y = next(iter(f_set))
        for point in f_set:
            x, y = point
            min_x = min(min_x, x)
            max_x = max(max_x, x)
            min_y = min(min_y, y)
            max_y = max(max_y, y)

        # Extreme diagonal vertices
        min_x = int(min_x * 1)
        min_x = max(0,min_x)
        max_x = int(max_x * 1)
        max_x = min(im_size,max_x)
        min_y = int(min_y * 1)
        min_y = max(0,min_y)
        max_y = int(max_y * 1)
        max_y = min(im_size,max_y)
        x = (min_x, max_x)
        y = (min_y, max_y)

        temp = frame[y[0]:y[1], x[0]:x[1]]

        if idx == "top":
            # temp = pad_frame(temp,[100,100])
            facial_parts_dict.update({"top" : temp})
        elif idx == "nose_lips":
            # temp = pad_frame(temp,[80,80])
            facial_parts_dict.update({"nose_lips" : temp})

    return facial_parts_dict

In [None]:
def combined_canvas(img1, img2):
    
    # Define the gap size
    gap_size = 10  # Adjust as needed
    
    # Create a blank canvas to accommodate both images with the gap
    canvas_height = img1.shape[0] + img2.shape[0] + gap_size
    canvas_width = max(img1.shape[1], img2.shape[1])
    canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8)

    # Calculate the vertical offset to align the second image in the middle
    offset_y = (canvas_height - img1.shape[0] - img2.shape[0] - gap_size) // 2

    # Copy img1 to the top portion of the canvas
    canvas[:img1.shape[0], :img1.shape[1]] = img1

    # Calculate the horizontal offset to align the second image in the middle
    offset_x = (canvas_width - img2.shape[1]) // 2

    # Copy img2 to the middle portion of the canvas
    canvas[offset_y + img1.shape[0] + gap_size:offset_y + img1.shape[0] + img2.shape[0] + gap_size, offset_x:offset_x + img2.shape[1]] = img2


    return pad_frame(canvas,[124,124])

In [None]:
# load the video name and labels from csv
class video_dataset(Dataset):
    def __init__(self,video_names,labels,blendshapes,facial_parts,sequence_length,transform = None, transform2 = None):
        self.video_names = video_names
        self.labels = labels
        self.blendshapes = blendshapes
        self.facial_parts = facial_parts
        self.transform = transform
        self.transform2 = transform2
        self.count = sequence_length

    def __len__(self):
        return len(self.video_names)
    def __getitem__(self,idx):
        video_path = self.video_names[idx]
        frames = []
        vid_blendshapes = []
        vid_facial_parts = []
        a = int(100/self.count)
        first_frame = np.random.randint(0,a)
        temp_video = video_path.split('/')[-1]
        blendshapes_path = os.path.join(self.blendshapes, temp_video.split(".")[0])
        facial_parts_path = os.path.join(self.facial_parts, temp_video.split(".")[0])
        label = self.labels.iloc[(labels.loc[labels["file"] == temp_video].index.values[0]),1]
        if(label == 'FAKE'):
          label = 0
        if(label == 'REAL'):
          label = 1
        capture = cv2.VideoCapture(video_path)
        frames_num = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
        frames_num = min(frames_num,50)
        for i in range(frames_num):
          capture.grab()
          success, frame = capture.retrieve()
          if not success :
            continue
          temp_path_nparr = os.path.join(blendshapes_path,f"{i+1}.npy")
          temp_path_nparr2 = os.path.join(facial_parts_path,f"{i+1}.npy")
          if os.path.exists(temp_path_nparr) and os.path.exists(temp_path_nparr2):
            frames.append(self.transform(frame))
            temp_nparr = np.load(temp_path_nparr)
            temp_narr2 = np.load(temp_path_nparr2)
            facial_parts_temp = extract_individual_facial_parts(frame=frame,nparr=temp_narr2)
            vid_blendshapes.append(torch.tensor(temp_nparr))
            vid_facial_parts.append(self.transform2(combined_canvas(facial_parts_temp["top"],facial_parts_temp["nose_lips"])))
          if(len(frames) == self.count):
            break
        frames = torch.stack(frames)
        vid_blendshapes = torch.stack(vid_blendshapes)
        vid_blendshapes = vid_blendshapes[:self.count]
        vid_facial_parts_copy = torch.stack(vid_facial_parts[:self.count])
        frames = frames[:self.count]

        
        return frames,vid_blendshapes,vid_facial_parts_copy,label

def im_plot(tensor):
    image = tensor.cpu().numpy().transpose(1,2,0)
    b,g,r = cv2.split(image)
    image = cv2.merge((r,g,b))
    image = image*[0.22803, 0.22145, 0.216989] +  [0.43216, 0.394666, 0.37645]
    image = image*255.0
    plt.imshow(image.astype(int))
    plt.show()

In [None]:
#generate list of fake and real videos
def real_and_fake_videos_list(data_list):
  header_list = ["file","label","origfile"]
  lab = pd.read_csv('dataset/metadata.csv',names=header_list)
  fake = []
  real = []
  for i in data_list:
    temp_video = i.split('/')[-1]
    label = lab.iloc[(labels.loc[labels["file"] == temp_video].index.values[0]),1]
    if(label == 'FAKE'):
      fake.append(i)
    if(label == 'REAL'):
      real.append(i)
  return real,fake

In [None]:
# load the labels and video in data loader
import pandas as pd
from sklearn.model_selection import train_test_split

header_list = ["file","label","origfile"]
labels = pd.read_csv('dataset/metadata.csv',names=header_list)
blendshapes = os.path.join("blendshapes/")
facial_parts = os.path.join("facial_parts/")
# train_videos = video_files[:int(0.8*len(video_files))]
# valid_videos = video_files[int(0.8*len(video_files)):]

real, fake = real_and_fake_videos_list(video_files) 
temp = min(len(real), len(fake))
if len(real) == temp:
    fake = fake[:temp]
if len(fake) == temp:
    real = real[:temp]
    
real_train, real_valid, fake_train, fake_valid = train_test_split(real,fake, train_size=0.8, random_state=random_state) 
train_videos = real_train + fake_train
valid_videos = real_valid + fake_valid

print("train : " , len(train_videos))
print("test : " , len(valid_videos))

print("TRAIN: ", "Real:",len(real_train)," Fake:",len(fake_train))
print("TEST: ", "Real:",len(real_valid)," Fake:",len(fake_valid))


In [None]:
def collate_fn(batch):
  return (
      torch.stack([x[0] for x in batch]),
      torch.stack([x[1] for x in batch]),
      torch.stack([x[2] for x in batch]),
      torch.tensor([x[3] for x in batch])
  )

In [None]:
#Apply transformation and Dataloader

train_transforms0 = transforms.Compose([
        # create_train_transforms(),
        transforms.ToPILImage(),                         
        transforms.Resize((im_size,im_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean,std)
       ])
train_transforms_ccnn0 = transforms.Compose([
        # create_train_transforms(),
        transforms.ToPILImage(),                         
        transforms.ToTensor(),
        transforms.Normalize(mean,std)
       ])

train_transforms1 = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomApply([
        transforms.RandomRotation(15),  
        transforms.ColorJitter(brightness=0.15),  
    ], p=1),  
    
    transforms.Resize((im_size, im_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

train_transforms_ccnn1 = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomApply([
        transforms.RandomRotation(15),  
        transforms.ColorJitter(brightness=0.15),  
    ], p=1),  
    
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])


test_transforms = transforms.Compose([
        # create_val_transforms(),
        transforms.ToPILImage(),
        transforms.Resize((im_size,im_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean,std)
        ])

test_transforms_ccnn = transforms.Compose([
        # create_val_transforms(),
        transforms.ToPILImage(),
        transforms.ToTensor(),
        transforms.Normalize(mean,std)
        ])

train_data0 = video_dataset(train_videos,labels,blendshapes,facial_parts,sequence_length = seqlen,transform = train_transforms0, transform2 = train_transforms_ccnn0)
train_data1 = video_dataset(train_videos,labels,blendshapes,facial_parts,sequence_length = seqlen,transform = train_transforms1, transform2 = train_transforms_ccnn1)
val_data = video_dataset(valid_videos,labels,blendshapes,facial_parts,sequence_length = seqlen,transform = test_transforms,transform2 = test_transforms_ccnn)
train_data = train_data0 + train_data1





train_loader = DataLoader(train_data,batch_size = batchSize,shuffle = True, num_workers = 8, collate_fn=collate_fn)
valid_loader = DataLoader(val_data,batch_size = batchSize,shuffle = True, num_workers = 8, collate_fn=collate_fn)

image,vid_blendshapes,vid_facial_parts,label = train_data[1239]

im_plot(vid_facial_parts[2,:,:,:])        
im_plot(image[0,:,:,:])

In [None]:
import torch
import torch.nn as nn
import numpy as np
from torchvision import models
from facenet_pytorch import InceptionResnetV1

class Model(nn.Module):
    def __init__(self, num_classes, lstm_hidden_dim=2612, bidirectional=True):
        super(Model, self).__init__()
        model = models.resnext50_32x4d(pretrained = True)
        self.model = nn.Sequential(*list(model.children())[:-2])
        self.avgpool = nn.AdaptiveAvgPool2d(1)  # AdaptiveAvgPool2d to replace avgpool
        self.lstm = nn.LSTM(lstm_hidden_dim, lstm_hidden_dim, bidirectional=bidirectional, batch_first=True)
        self.dense1 = nn.Linear(lstm_hidden_dim * (2 if bidirectional else 1), 512)
        self.dense2=nn.Linear(512, 2)
        self.dp1 = nn.Dropout(0.4)
        self.dp2 = nn.Dropout(0.4)
        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()
        self.facenet = InceptionResnetV1(pretrained='vggface2')

    def forward(self, x, y, p1):
        batch_size, seq_length, c, h, w = x.shape
        batch_size, seq_length, c, h1, w1 = p1.shape


        x = x.view(batch_size * seq_length, c, h, w)
        p1 = p1.view(batch_size * seq_length, c, h1, w1)


        p1 = self.facenet(p1)


        x = self.model(x)
        x = self.avgpool(x).flatten(1)

        x = x.view(batch_size, seq_length, -1)  # Flatten x
        y = y.view(batch_size, seq_length, -1)  # Flatten x
        p1 = p1.view(batch_size, seq_length, -1)  # Flatten x

        
        x = torch.cat((x, y, p1), dim=2)
        lstm_out, _ = self.lstm(x)
        lstm_out = lstm_out.contiguous().view(batch_size * seq_length, -1)
        lstm_out = lstm_out.view(batch_size, seq_length, -1) 
        lstm_out = lstm_out[:, -1, :]  
        lstm_out = self.dp1(lstm_out) #dropout layer 1
        output1 = self.relu(self.dense1(lstm_out))
        output1 = self.dp2(output1) #dropout layer 2
        output=self.sigmoid(self.dense2(output1))
        return 0, output

# Example usage
model = Model(2).to(device)



In [None]:
#model train test functions
from torch.autograd import Variable
import time
import sys
longTensor = torch.cuda.LongTensor if device != "cpu" else torch.LongTensor
floatTensor = torch.cuda.FloatTensor if device != "cpu" else torch.FloatTensor
def train_epoch(curr_epoch, num_epochs, data_loader, model, criterion, optimizer):
    model.train()
    trlosses = AverageMeter()
    accuracies = AverageMeter()
    t = []
    print(f"[Epoch {curr_epoch}/{num_epochs}]")
    print("TRAIN")
    for i, (inputs,blendshapes_data,p1, targets) in enumerate(data_loader):
        if torch.cuda.is_available() :
            targets = targets.type(floatTensor)
            blendshapes_data = blendshapes_data.type(floatTensor)
            inputs = inputs.to(device)
            p1 = p1.to(device)
        _,outputs = model(inputs,blendshapes_data,p1)
        loss  = criterion(outputs,targets.type(longTensor))
        acc = calculate_accuracy(outputs, targets.type(longTensor))
        trlosses.update(loss.item(), inputs.size(0))
        accuracies.update(acc, inputs.size(0))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        sys.stdout.write(
                "\r[Batch %d / %d] [Loss: %f, Acc: %.2f%%]"
                % (
                    i,
                    len(data_loader),
                    trlosses.avg,
                    accuracies.avg))
   
    return model, optimizer, criterion, trlosses.avg, accuracies.avg

def test(curr_epoch,model, data_loader ,criterion):
    print("\nTEST")
    model.eval()
    tlosses = AverageMeter()
    accuracies = AverageMeter()
    pred = []
    true = []
    count = 0
    with torch.no_grad():
        for i, (inputs,blendshapes_data,p1, targets) in enumerate(data_loader):
            if torch.cuda.is_available():
                targets = targets.to(device).type(floatTensor)
                blendshapes_data = blendshapes_data.type(floatTensor)
                inputs = inputs.to(device)
                p1 = p1.to(device)
            _,outputs = model(inputs,blendshapes_data,p1)
            loss = torch.mean(criterion(outputs, targets.type(longTensor)))
            acc = calculate_accuracy(outputs,targets.type(longTensor))
            _,p = torch.max(outputs,1)
            true += (targets.type(longTensor)).detach().cpu().numpy().reshape(len(targets)).tolist()
            pred += p.detach().cpu().numpy().reshape(len(p)).tolist()
            tlosses.update(loss.item(), inputs.size(0))
            accuracies.update(acc, inputs.size(0))
            sys.stdout.write(
                    "\r[Batch %d / %d]  [Loss: %f, Acc: %.2f%%]"
                    % (
                        i,
                        len(data_loader),
                        tlosses.avg,
                        accuracies.avg
                        )
                    )
        print("\n")

    return true, pred, tlosses.avg, accuracies.avg
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()
    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count
def calculate_accuracy(outputs, targets):
    batch_size = targets.size(0)

    _, pred = outputs.topk(1, 1, True)
    pred = pred.t()
    correct = pred.eq(targets.view(1, -1))
    n_correct_elems = correct.float().sum().item()
    return 100* n_correct_elems / batch_size

In [None]:
#confusion matrix plot and save function
import seaborn as sn
from sklearn.metrics import confusion_matrix

def print_confusion_matrix(y_true, y_pred,dirpath):
    cm = confusion_matrix(y_true, y_pred)
    print('True positive = ', cm[0][0])
    print('False positive = ', cm[0][1])
    print('False negative = ', cm[1][0])
    print('True negative = ', cm[1][1])
    print('\n')
    df_cm = pd.DataFrame(cm, range(2), range(2))
    sn.set(font_scale=1.4) # for label size
    sn.heatmap(df_cm, annot=True, annot_kws={"size": 16}, fmt='d') # font size
    plt.ylabel('Actual label', size = 20)
    plt.xlabel('Predicted label', size = 20)
    plt.xticks(np.arange(2), ['Fake', 'Real'], size = 16)
    plt.yticks(np.arange(2), ['Fake', 'Real'], size = 16)
    plt.ylim([2, 0])
    plotpath = f'{dirpath}/confmatrix.png'
    plt.savefig(plotpath, bbox_inches='tight')
    plt.show()
    calculated_acc = (cm[0][0]+cm[1][1])/(cm[0][0]+cm[0][1]+cm[1][0]+ cm[1][1])
    print("Calculated Accuracy",calculated_acc*100)

In [None]:
#graph plot and save functions cell
def plot_loss(train_loss_avg,test_loss_avg,num_epochs, dirpath):
  loss_train = train_loss_avg
  loss_val = test_loss_avg
  epochs = range(1,num_epochs+1)
  plt.plot(epochs, loss_train, 'g', label='Training loss')
  plt.plot(epochs, loss_val, 'b', label='validation loss')
  plt.title('Training and Validation loss')
  plt.xlabel('Epochs')
  plt.ylabel('Loss')
  plt.legend()
  plotpath = f'{dirpath}/lossplot.png'
  plt.savefig(plotpath, bbox_inches='tight')
  plt.show()
def plot_accuracy(train_accuracy,test_accuracy,num_epochs, dirpath):
  loss_train = train_accuracy
  loss_val = test_accuracy
  minm = (min(loss_train) - 10) if min(loss_train) <= min(loss_val) else (min(loss_val) - 10)
  epochs = range(1,num_epochs+1)
  plt.ylim(minm, 100)
  plt.plot(epochs, loss_train, 'g', label='Training accuracy')
  plt.plot(epochs, loss_val, 'b', label='validation accuracy')
  plt.title('Training and Validation accuracy')
  plt.xlabel('Epochs')
  plt.ylabel('Accuracy')
  plt.legend()
  plotpath = f'{dirpath}/accplot.png'
  plt.savefig(plotpath, bbox_inches='tight')
  plt.show()

In [None]:
#load ckpts
from os.path import exists
lr = 0.00001 
weightDecay = 0.00001
optimizer = torch.optim.Adam(model.parameters(), lr= lr,weight_decay = weightDecay)
# class_weights = torch.tensor([0.12, 0.88])
# criterion = nn.CrossEntropyLoss(weight= class_weights).cuda()
criterion = nn.CrossEntropyLoss().to(device)
train_loss_avg =[]
train_accuracy = []
test_loss_avg = []
test_accuracy = []
print(exists(path_to_model))

if exists(path_to_model):
    checkpoint = torch.load(path_to_model)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    criterion.load_state_dict(checkpoint['criterion_state_dict'])
    curr_epoch = checkpoint['epoch'] + 1
    train_loss_avg = checkpoint['loss']
    train_accuracy = checkpoint['accuracy'] 
    try:
        test_loss_avg = checkpoint['test_loss']
        test_accuracy = checkpoint['test_accuracy']
    except KeyError:
        test_loss_avg = []
        test_accuracy = []
    del checkpoint


In [None]:
#graph print, directory creation and save execution cell
def plotsave(modelname, lr, num_epochs, true, pred):
    modelname = modelname
    dirname = f"{modelname} seq{seqlen} b{batchSize} lr{lr} ep{num_epochs}"
    dirpath = f'rslts/{dirname}'
    os.mkdir(dirpath) 

    plot_loss(train_loss_avg,test_loss_avg,len(train_loss_avg),dirpath)
    plot_accuracy(train_accuracy,test_accuracy,len(train_accuracy),dirpath)
    print(confusion_matrix(true,pred))
    print_confusion_matrix(true,pred,dirpath)

    mvpath = f'{dirpath}/checkpoint.pt'
    os.replace('ckpts/checkpoint.pt', mvpath)

In [None]:
#model train execution cell
num_epochs = 20

for epoch in range(curr_epoch ,num_epochs+1):
    msd, osd, csd, l, acc = train_epoch(epoch,num_epochs,train_loader,model,criterion,optimizer)
    train_loss_avg.append(l)
    train_accuracy.append(acc)
    true,pred,tl,t_acc = test(epoch,model,valid_loader,criterion)
    test_loss_avg.append(tl)
    test_accuracy.append(t_acc)
    torch.save({
            'epoch': epoch,
            'model_state_dict': msd.state_dict(),
            'optimizer_state_dict': osd.state_dict(),
            'criterion_state_dict': csd.state_dict(),
            'loss': train_loss_avg,
            'accuracy': train_accuracy,
            'test_loss': test_loss_avg,
            'test_accuracy': test_accuracy
            }, path_to_model)
plotsave(f"moddedmodelblendshapesfacialparts 2dp0.4,0.4 relufacenet main_balanced_dset augmented sq{seqlen} im{im_size} rs{random_state}",lr,num_epochs, true, pred)


