In [None]:
# get the pytorch implementation of facenet - https://github.com/timesler/facenet-pytorch
!pip install facenet-pytorch --quiet

In [None]:
# gdown is used to download the datasets from google drive, this is more general than the google.drive package and also works in kaggle
! conda install -y gdown

In [None]:
# imports
import os
import gc
import csv
import glob
import pandas as pd
import PIL
import h5py
import multiprocessing

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
from torch.autograd import Variable
from torch.utils.data import Dataset,DataLoader
from torch.utils.data import Dataset

import torchvision
import torchvision.transforms as transforms

os.environ['KMP_DUPLICATE_LIB_OK']='True'
import os.path as osp

from facenet_pytorch import MTCNN, InceptionResnetV1

In [None]:
# downloading data sets
!gdown --id 1lrT0Ub2QoJ3f3rHpRmFYRwuTUfh7bTUI # train set
!gdown --id 1dYF26xgFpoUtnBDmhYWMcvyX_qEeAPDp # validation set
!gdown --id 1YusblOsvP77d10Vw3TUUGcSbRwSVK3Ep # holdout set


In [None]:
# Data loader with data augmentation

class H5FileDataset(Dataset):
    # dataloader output: (pic_indices, color_channel, height, width)
    def __init__(self, h5_filename, transform=None, target_transform=None):
        self.h5_filename = h5_filename
        self.img_h5_file = self._load_h5_file(self.h5_filename)
        self.all_labels = self.img_h5_file['labels'][:]
        self.transform = transform  

    def __len__(self):
        return len(self.all_labels)

    def __getitem__(self, idx):
        img = self.img_h5_file['img_data'][idx]
        label = self.img_h5_file['labels'][idx]

        label = torch.as_tensor(label, dtype=torch.float64)

        img = np.transpose(img, [2, 0, 1])
        img = img.astype(np.double)
        img = torch.as_tensor(img, dtype=torch.float64)
        # img = img/255
        if self.transform is not None:
            img = self.transform(img)  
        return img, label

    def _load_h5_file(self, h5_filename):
        file = h5py.File(h5_filename, 'r')
        img_data = file['pic_mat']
        img_labels = file['labels']
        return dict(file=file, img_data=img_data, labels=img_labels)

In [None]:
# load data and apply transforms
composed = transforms.Compose([transforms.RandomCrop([500, 500]), transforms.Resize([160, 160])])

trainset = H5FileDataset('kdef_train_dataset.h5', transform=composed)
testset = H5FileDataset('kdef_val_dataset.h5', transform=composed)
holdoutset = H5FileDataset('kdef_test_dataset.h5', transform=composed)

trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=True)
holdoutloader = torch.utils.data.DataLoader(holdoutset, batch_size=batch_size, shuffle=True)

In [None]:
# hyper-parameters
use_cuda = torch.cuda.is_available()
best_acc = 0
start_epoch = 0  # start from epoch 0 or last checkpoint epoch
batch_size = 100
max_epochs = 25

base_learning_rate = 0.001

In [None]:
# define train and test loop
def train(net, epoch, trainloader, use_cuda=True):
  net.train()
  train_loss = 0
  correct = 0
  total = 0
  for batch_idx, (inputs, targets) in enumerate(trainloader):
    targets = np.array(targets, dtype=int)
    targets = torch.as_tensor(targets, dtype=torch.long)
    if use_cuda:
      inputs, targets = inputs.cuda(), targets.cuda()

    optimizer.zero_grad()
    inputs, targets = Variable(inputs), Variable(targets)
    outputs = net(inputs)
    loss = criterion(outputs, targets)
    loss.backward()

    optimizer.step()

    train_loss += loss.item()
    _, predicted = torch.max(outputs.data, 1)
    total += targets.size(0)
    correct += predicted.eq(targets.data).cpu().sum()

  return (train_loss/batch_idx, 100.*correct/total)


def test(net, epoch, testloader, outModelName, use_cuda=True):
  global best_acc
  net.eval()
  test_loss, correct, total = 0, 0, 0
  with torch.no_grad():
    for batch_idx, (inputs, targets) in enumerate(testloader):
      targets = np.array(targets, dtype=int)
      targets = torch.as_tensor(targets, dtype=torch.long)
      if use_cuda:
        inputs, targets = inputs.cuda(), targets.cuda()

      outputs = net(inputs)
      loss = criterion(outputs, targets)

      test_loss += loss.item()
      _, predicted = torch.max(outputs.data, 1)
      total += targets.size(0)
      correct += predicted.eq(targets.data).cpu().sum()

      if batch_idx % 200 == 0:
        print(batch_idx, len(testloader), 'Loss: %.3f | Acc: %.3f%% (%d/%d)'
            % (test_loss/(batch_idx+1), 100.*correct/total, correct, total))

    # Save checkpoint.
    acc = 100.*correct/total
    if acc > best_acc:
        best_acc = acc
        checkpoint(net, acc, epoch, outModelName)
    return (test_loss/batch_idx, 100.*correct/total)


# checkpoint & adjust_learning_rate
def checkpoint(model, acc, epoch, outModelName):
  # Save checkpoint.
  print('Saving..')
  state = {
      'state_dict': model.state_dict(),
      'acc': acc,
      'epoch': epoch,
      'rng_state': torch.get_rng_state()
  }
  if not os.path.isdir('checkpoint'):
      os.mkdir('checkpoint')
  torch.save(state, f'./checkpoint/{outModelName}.t7')

def adjust_learning_rate(optimizer, epoch):
  """decrease the learning rate at 100 and 150 epoch"""
  lr = base_learning_rate
  if epoch <= 9 and lr > 0.1:
    # warm-up training for large minibatch
    lr = 0.1 + (base_learning_rate - 0.1) * epoch / 10.
  if epoch >= 100:
    lr /= 10
  if epoch >= 150:
    lr /= 10
  for param_group in optimizer.param_groups:
    param_group['lr'] = lr

In [10]:
## Uncomment and run to inspect/change structure of Facenet

# net = InceptionResnetV1(pretrained='vggface2', classify=True)
# print(net) # shows structure of network

# in_ftrs = net.logits.in_features # number of input features of the last layer of the classifier
# print("+ Previous Nr of Outputs: " + str(net.logits.out_features)) # number of output features of the last layer of the classifier == num_classes
# net.logits = nn.Linear(in_ftrs, 2, )
# print("++++++ New Nr of Outputs: " + str(net.logits.out_features))

# in_ftrs = net.last_linear.in_features # number of input features of the last layer of the classifier
# print("+ Previous Nr of Outputs: " + str(net.last_linear.out_features)) # number of output features of the last layer of the classifier == num_classes
# net.last_linear = nn.Linear(in_ftrs, 2, )
# print("++++++ New Nr of Outputs: " + str(net.last_linear.out_features))

In [29]:
# load Facenet
net = InceptionResnetV1(pretrained='vggface2', num_classes = 2, classify=True)

# freeze all layers
for param in net.parameters():
  param.requires_grad = False

# unfreeze last layer
in_ftrs = net.logits.in_features # number of input features of the last layer of the classifier
net.logits = nn.Linear(in_ftrs, 2)

# change parameters data type and put them on gpu
net = net.double()
net = net.cuda()

# create folder for storing results
result_folder = './results/'
if not os.path.exists(result_folder):
    os.makedirs(result_folder)

# name for output files
logname = result_folder + net.__class__.__name__ + '_vgg_face2_to_kdef' + '__lr_' + str(base_learning_rate) + '__bs_' + str(batch_size) + '_full-retrain.csv'

# Optimizer and criterion
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=base_learning_rate, momentum=0.9, weight_decay=5e-4)

outModelName = 'pretrain'
if not os.path.exists(logname):
    with open(logname, 'w') as logfile:
        logwriter = csv.writer(logfile, delimiter=',')
        logwriter.writerow(['epoch', 'train loss', 'train acc', 'test loss', 'test acc'])

# train the model
for epoch in range(start_epoch, max_epochs):
    adjust_learning_rate(optimizer, epoch)
    #print('finished adjust lr')
    train_loss, train_acc = train(net, epoch, trainloader, use_cuda=use_cuda)
    #print('finished train epoch ', epoch)
    test_loss, test_acc = test(net, epoch, testloader, outModelName, use_cuda=use_cuda)
    #print('finished test epoch ', epoch)
    with open(logname, 'a') as logfile:
        logwriter = csv.writer(logfile, delimiter=',')
        logwriter.writerow([epoch, train_loss, train_acc.item(), test_loss, test_acc.item()])
    print(f'Epoch: {epoch} | train acc: {train_acc} | test acc: {test_acc}')

In [30]:
# read the results of training runs from the datafile
results = pd.read_csv(logname, sep =',')
results.head()

train_accuracy = results['train acc'].values
test_accuracy = results['test acc'].values

In [31]:
# plot curve of train and test accuracy
figureName = logname[:-4] + ".png"

plt.plot(results['epoch'].values, train_accuracy, label='train')
plt.plot(results['epoch'].values, test_accuracy, label='test')
plt.xlabel('Number of epochs')
plt.ylabel('Accuracy')
plt.title(f'Train/Test Accuracy curve for {max_epochs} epochs')
plt.savefig(figureName)
plt.legend()
plt.show()

In [26]:
# define function to test network on holdout data
def holdout_check(net, holdoutloader, use_cuda=use_cuda):
    correct = 0
    total = 0
    with torch.no_grad():
        for data in holdoutloader:
            inputes, targets = data[0], data[1]
            if use_cuda:
                inputes, targets = inputes.cuda(), targets.cuda()
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
    print('correct: {:d}  total: {:d}'.format(correct, total))
    print('accuracy = {:f}'.format(correct / total))

In [33]:
# run test of network on holdout data
holdout_check(net, holdoutloader, use_cuda=use_cuda)

<p> Workaround to simply download the stored paramters of the trained model from kaggle: </p>

<a href="./checkpoint/pretrain.t7"> Download File </a>