In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import time
import os
import random
import numpy as np
import glob

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from torchvision.utils import make_grid
from torchvision.models import resnet50

from PIL import Image

In [2]:
DIR_TRAIN = "data/train/"
DIR_VAL = "data/val/"

In [3]:
class DeepFakeDataset(Dataset):
    def __init__(self, imgs, mode = "train", transforms = None):
        super().__init__()
        self.imgs = imgs
        self.mode = mode
        self.transforms = transforms
    
    def __getitem__(self, idx):
        image_name = self.imgs[idx]
        label = 0
        if ("original" in image_name): label = 1
        label = torch.tensor(label, dtype = torch.float32)
        
        # Training
        if self.mode == "train":
            img = Image.open(image_name).resize((224, 224))
            img = self.transforms(img)
            return img, label
                
        # Validation
        elif self.mode == "val":
            img = Image.open(image_name).resize((224, 224))
            img = self.transforms(img)
            return img, label
        
        # Training
        elif self.mode == "train":
            img = Image.open(image_name).resize((224, 224))
            img = self.transforms(img)
            return img, label
    
    def __len__(self):
        return len(self.imgs)

In [4]:
def make_dataloader(data_path):
    original_imgs = glob.glob(f"{data_path}/original/*")
    fake_imgs = glob.glob(f"{data_path}/manipulated/*")
    original_imgs.extend(fake_imgs)
    imgs = original_imgs
    if ("train" in data_path) :
        tranforms = T.Compose([
            T.RandomHorizontalFlip(p=0.5),
            T.RandomRotation(15),
            T.RandomCrop(204),
            T.ToTensor(),
            T.Normalize((0, 0, 0),(1, 1, 1))
        ])
    else :
        transforms = T.Compose([
            T.ToTensor(),
            T.Normalize((0, 0, 0), (1, 1, 1))
        ])  
    
    mode = "train"
    if "val" in data_path: mode = "val"
    elif "test" in data_path: mode = "test"
    
    dataset = DeepFakeDataset(imgs, mode=mode, transforms=transforms)    
    dataloader = DataLoader(dataset = dataset, batch_size = 16, shuffle = True)
    return dataloader

In [5]:
if torch.cuda.is_available():
    device = torch.device("cuda")

## Logs

In [None]:
train_logs = {"loss" : [], "accuracy" : [], "time" : []}
val_logs = {"loss" : [], "accuracy" : [], "time" : []}

In [None]:
down_boundary = []
up_boundary = []

In [6]:
def accuracy(preds, labels):
    
    for i in range(len(preds)):
        if (0.48 < preds[i] < 0.5):
            down_boundary.append(i)
        elif (0.5 <= preds[i] 0.52):
            up_boundary.append(i)
        
        if (preds[i] >= 0.5): preds[i] = 1
        elif (preds[i] < 0.5) preds[i] = 0
        
    acc = [1 if preds[i] == labels[i] else 0 for i in range(len(preds))]
    acc = np.sum(acc) / len(preds)
    return (acc * 100)

In [None]:
def epoch(dataloader, best_acc = None):
    epoch_loss = []
    epoch_acc = []
    start = time.time()
    
    for images, labels in dataloader:
        
        images = images.to(device)
        labels = labes.to(device)
        labels = labels.reshape((labels.shape[0], 1))
        
        if (not best_acc):
            # Reset Gradients
            optimizer.zero_grad()
        
        #Forward
        preds = model(images)
        
        #Calculating Loss
        _loss = loss_function(preds, labels)
        loss = _loss.item()
        epoch_loss.append(loss)
        
        #Calculating Accuracy
        acc = accuracy(preds, labels)
        epoch_acc.append(acc)
        
        if (not best_acc):
            #Backward
            _loss.backward()
            optimizer.step()
            
    ###Overall Epoch Results
    end = time.time()
    total_time = end - start
    
    ###Acc and Loss
    epoch_loss = np.mean(epoch_loss)
    epoch_acc = np.mean(epoch_acc)
    
    if (best_acc):
        val_logs["loss"].append(epoch_loss)
        val_logs["accuracy"].append(epoch_acc)
        val_logs["time"].append(total_time)
        
        if epoch_acc > best_val_acc:
            best_val_acc = epoch_acc
            torch.save(model.state_dict(),"resnet50.pth")
            
        #Print Epoch Details
        print("\Validation")
        print(f"Loss: {round(epoch_loss, 4)}")
        print(f"Acc: {round(acc, 4)}")
        print(f"Start Time: {round(start, 4)}")
        print(f"End Time: {round(end, 4)}")
        print(f"Time: {round(total_time, 4)}")
    
    else:
        train_logs["loss"].append(epoch_loss)
        train_logs["accuracy"].append(epoch_acc)
        train_logs["time"].append(total_time)
        
        #Print Epoch Details
        print("\nTraining")
        print(f"Loss: {round(epoch_loss, 4)}")
        print(f"Acc: {round(acc, 4)}")
        print(f"Start Time: {round(start, 4)}")
        print(f"End Time: {round(end, 4)}")
        print(f"Time: {round(total_time, 4)}")

In [9]:
model = resnet50(pretrained = True)

model.fc = nn.Sequential(
    nn.Linear(2048, 1, bias = True),
    nn.Sigmoid()
)

model.to(device)

## Tunable Parameter

In [10]:
optimizer = torch.optim.Adam(model.parameters(), lr = 0.0001)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma = 0.5)
loss_function = nn.BCELoss()
epochs = 48

In [None]:
best_acc = 0
for epoch in range(epochs):
    print(f"Epoch: {epoch}")
    epoch(make_dataloader(DIR_TRAIN))
    epoch(make_dataloader(DIR_VAL), best_val_acc)


Training
Epoch 1
Loss : 0.4612
Acc : 75.9436
Time : 123.6696

Validating
Epoch 1
Loss : 0.3241
Acc : 85.3723
Time : 20.2334

Training
Epoch 2
Loss : 0.2695
Acc : 88.1217
Time : 123.9044

Validating
Epoch 2
Loss : 0.278
Acc : 87.633
Time : 18.6723

Training
Epoch 3
Loss : 0.1697
Acc : 93.2393
Time : 124.0507

Validating
Epoch 3
Loss : 0.2145
Acc : 91.2899
Time : 19.2349

Training
Epoch 4
Loss : 0.125
Acc : 95.3042
Time : 130.0206

Validating
Epoch 4
Loss : 0.1959
Acc : 92.2872
Time : 21.8601

Training
Epoch 5
Loss : 0.0906
Acc : 96.8028
Time : 140.8063

Validating
Epoch 5
Loss : 0.1863
Acc : 93.617
Time : 19.4889

Training
Epoch 6
Loss : 0.0792
Acc : 96.9139
Time : 133.099

Validating
Epoch 6
Loss : 0.2045
Acc : 93.0186
Time : 19.7337

Training
Epoch 7
Loss : 0.0569
Acc : 97.9241
Time : 133.38

Validating
Epoch 7
Loss : 0.3181
Acc : 89.8271
Time : 19.5059

Training
Epoch 8
Loss : 0.0538
Acc : 98.0795
Time : 139.6996

Validating
Epoch 8
Loss : 0.21
Acc : 93.0186
Time : 22.7288

Training

## Plot Graphs

In [None]:
def plot_loss():
    plt.title("Loss")
    plt.plot(np.arange(1, 65, 1), train_logs["loss"], color = 'blue')
    plt.plot(np.arange(1, 65, 1), val_logs["loss"], color = 'yellow')
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.show()


def plot_acc():
    plt.title("Accuracy")
    plt.plot(np.arange(1, 65, 1), train_logs["accuracy"], color = 'blue')
    plt.plot(np.arange(1, 65, 1), val_logs["accuracy"], color = 'yellow')
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.show()
    
plot_loss()
plot_acc()

In [None]:
# lrs = [0.1, 0.01, 0.001, 0.0001]
# optimizers = [torch.optim.Adam(model.parameters(), lr=lr),
#               torch.optim.SGD(model.parameters(), lr=lr)]
# lr_schedulers = [torch.optim.lr_scheduler.StepLR(optimizer, step_size = 5, gamma = 0.5)] # Stays same
# loss_functions = [nn.BCELoss, nn.CrossEntropyLoss]