Mount drive

In [None]:
import os
from google.colab import drive
drive.mount('/content/gdrive')

path = '/content/gdrive/MyDrive/DeXpression'
os.chdir(path)

DATA_FOLDER = '/content/gdrive/MyDrive/DeXpression/data/'
RESULTS_FOLDER = '/content/gdrive/MyDrive/DeXpression/results'

Standard imports


In [None]:
import numpy as np

import torch

from torchsummary import summary

from torch.nn import Module, Conv2d, MaxPool2d, Linear, ReLU, LogSoftmax

from torch.nn import LayerNorm, BatchNorm2d, Dropout

import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

#DeXpression model



Transfer to GPU (if applicable)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class DeXpression(Module):
  
  def __init__(self):
    super(DeXpression, self).__init__()

    # block-1
    self.conv1 = Conv2d(in_channels=1, out_channels=64, kernel_size=7, stride=2, padding=3)
    self.pool1 = MaxPool2d(kernel_size=3, stride=2, padding=0)
    self.lrn1  = LayerNorm([64, 55, 55])

    # feature extractor 1
    self.conv2a = Conv2d(in_channels=64, out_channels=96, kernel_size=1, stride=1, padding=0)
    self.conv2b = Conv2d(in_channels=96, out_channels=208, kernel_size=3, stride=1, padding=1)
    self.pool2a = MaxPool2d(kernel_size=3, stride=1, padding=1)
    self.conv2c = Conv2d(in_channels=64, out_channels=64, kernel_size=1, stride=1, padding=0)
    self.pool2b = MaxPool2d(kernel_size=3, stride=2, padding=0)

    # feature extractor 2
    self.conv3a = Conv2d(in_channels=272, out_channels=96, kernel_size=1, stride=1, padding=0)
    self.conv3b = Conv2d(in_channels=96, out_channels=208, kernel_size=3, stride=1, padding=1)
    self.pool3a = MaxPool2d(kernel_size=3, stride=1, padding=1)
    self.conv3c = Conv2d(in_channels=272, out_channels=64, kernel_size=1, stride=1, padding=0)
    self.pool3b = MaxPool2d(kernel_size=3, stride=2, padding=0)

    # fully connected layer 
    self.fc                  = Linear(in_features=272*13*13, out_features=7)
    self.softmax             = LogSoftmax(dim=1)
    self.batch_normalization = BatchNorm2d(272)
    self.dropout             = Dropout(p=0.2)

  def forward(self, x, dropout=True, batch_normalization=True):
    """
    Perform a single step of forward propogation
    """
    # block-1
    conv1_out = F.relu(self.conv1(x))
    pool1_out = self.pool1(conv1_out)
    lrn1_out  = self.lrn1(pool1_out)

    # feature extractor 1
    # branch 1
    conv2a_out = F.relu(self.conv2a(lrn1_out))
    conv2b_out = F.relu(self.conv2b(conv2a_out))
    # branch 2
    pool2a_out = self.pool2a(lrn1_out)
    conv2c_out = F.relu(self.conv2c(pool2a_out))
    # concatenate both branches
    concat2_out = torch.cat((conv2b_out, conv2c_out), 1)
    pool2b_out  = self.pool2b(concat2_out)

    # feature extractor 2
    # branch 1
    conv3a_out = F.relu(self.conv3a(pool2b_out))
    conv3b_out = F.relu(self.conv3b(conv3a_out))
    # branch 2
    pool3a_out = self.pool3a(pool2b_out)
    conv3c_out = F.relu(self.conv3c(pool3a_out))
    # concatenate both branches
    concat3_out = torch.cat((conv3b_out, conv3c_out), 1)
    pool3b_out  = self.pool3b(concat3_out)

    # dropout enabled
    if dropout:
      pool3b_out = self.dropout(pool3b_out)
    
    # batch normalization enabled
    if batch_normalization:
      pool3b_out = self.batch_normalization(pool3b_out)

    pool3b_shape = pool3b_out.shape
    pool3b_flat  = pool3b_out.reshape([-1, pool3b_shape[1] * pool3b_shape[2] * pool3b_shape[3]])

    output = self.fc(pool3b_flat)
    logits = self.softmax(output)

    return logits

Utility functions to read data

In [None]:
import glob
from pathlib import Path
from matplotlib import image as im

def data(*paths):
  return os.path.join(DATA_FOLDER, *paths)

def results(*paths):
  return os.path.join(RESULTS_FOLDER, *paths)


def read_file(img_file, label_file):
  """
  Read and convert images to numpy arrays
  """
  image = im.imread(img_file)

  with open(label_file, "r") as file:
        label = float(file.read())

  return image, label


def load_from_array():
  """
  Load dataset from a specified folder
  """
  x = np.load(data("x.npy")).reshape(-1, 1, 224, 224)
  y = np.load(data("y.npy"))

  return x, y


def save_to_array():
  """
  Save dataset to a specified folder
  """
  with open(data("x.npy"), "wb") as file:
    np.save(file, x)

  with open(data("y.npy"), "wb") as file:
    np.save(file, y)


def load_dataset(use_existing = True):
  """
  Return input and output variables from the
  dataset
  """
  if use_existing:
    x, y = load_from_array()
  
  else:
    data_dir = DATA_FOLDER
    images   = []
    labels   = []

    for image_file in sorted(glob.glob(f"{data_dir}/**/*.png")):
      image_path = os.path.dirname(image_file)
      label_path = image_path.replace("images", "labels")

      if os.path.exists(label_path):
        if not len(os.listdir(label_path)) == 0:
          label_file = os.path.join(label_path, os.listdir(label_path)[0])

          image, label = file_reader(image_file, label_file)
          images.append(image)
          labels.append(label)

    x = np.stack(images, axis = 0).reshape(-1, 1, 224, 224)
    y = np.stack(labels, axis = 0)

    save_to_array(x, y)


  print("Loaded datasets {} and {}".format(x.shape, y.shape))
  print("")

  return x, y

Dataset utilities (Shuffle and convert to torch tensor)

In [None]:
from sklearn.model_selection import KFold
from sklearn.utils import shuffle as s


def kfold(x, y, splits = 10, shuffle = True):
  """
  Perform a K-fold split on the dataset
  x -> Input variables from the dataset
  y -> Output variables frm the dataset
  """
  x, y = s(x, y)
  kfold = KFold(n_splits = splits, shuffle = shuffle)

  for train, test in kfold.split(x, y):
    x_train, y_train = x[train], y[train]
    x_test, y_test   = x[test], y[test]

    yield x_train, y_train, x_test, y_test


def convert_to_torch(x_train, y_train, x_test, y_test):
  """
  Convert the train and test data to torch tensors
  """
  # convert training images to a torch tensor
  x_train = torch.from_numpy(x_train)
  x_train = x_train.type(torch.FloatTensor)

  # convert training labels to a torch tensor
  y_train = y_train.astype(int)
  y_train = torch.from_numpy(y_train)

  # convert test images to torch tensor
  x_test = torch.from_numpy(x_test)
  x_test = x_test.type(torch.FloatTensor)

  # convert testing labels to torch tensor
  y_test = y_test.astype(int)
  y_test = torch.from_numpy(y_test)

  return x_train, y_train, x_test, y_test


Initialize the model

In [None]:
def initialize():
  """
  Load model parameters to cuda
  """
  model = DeXpression()
  model = model.to(device)

  return model


Dump checkpoints

In [None]:
import pandas as pd
from datetime import datetime

def dump_dict_list(history):
  """
  Convert history to a pandas dataframe
  and store it in the results folder
  """
  filename = results("history.csv")

  print("Saving history {}".format(filename))
  print("")

  df = pd.DataFrame(history)
  df.to_csv(filename)


def print_progress(fold, epoch, n_epochs, avg_train_accuracy, avg_train_loss, avg_test_accuracy, avg_test_loss):
  """
  Print training and testing performance
  per epoch
  """
  print("Fold: %d, Epoch: %d/%d" %(fold+1, epoch+1, n_epochs))
  print("Train accuracy: %.2f%%" %(avg_train_accuracy * 100))
  print("Train loss: %.3f" %(avg_train_loss))
  print("Test accuracy: %.2f%%" %(avg_test_accuracy * 100))
  print("Test loss: %.3f" %(avg_test_loss))
  print("")


def save_progress(fold, epoch, avg_test_accuracy, model, model_optimizer):
  """
  Save a model checkpoint every epoch
  """
  checkpoint = "{:d}-{:.2f}.tar".format(epoch + 1, avg_test_accuracy)

  print("Saving checkpoint {}".format(results(checkpoint)))
  print("")

  # save in a dictionary
  torch.save({"fold": fold + 1, "epoch": epoch + 1, "model": model.state_dict(), "model_opt": model_optimizer.state_dict()}, results(checkpoint))



#Utility functions for training, testing and running the model
Train

In [None]:
def train(x_batch, y_batch, model, criterion, model_optimizer):
  """
  Perform a single forward and back propagation step
  on the training data
  """

  model_optimizer.zero_grad() # remove any existing gradients

  # forward propagate
  output     = model(x_batch)
  _, y_pred  = torch.max(output.data, 1)
  _, y_truth = torch.max(y_batch, 1)

  # compute model loss
  loss = criterion(output, y_truth)

  # backpropagate the gradients
  loss.backward()

  # update parameters based on backprop
  model_optimizer.step()

  # accuracy
  correct_counts = y_pred.eq(y_truth.data.view_as(y_pred))

  # average accuracy
  accuracy = torch.mean(correct_counts.type(torch.FloatTensor))

  return accuracy, loss



Test

In [None]:
def test(x_batch, y_batch, model, criterion):
  """
  Perform a single forward propagation step
  on the testing data
  """
  # forward propagate
  output     = model(x_batch)
  _, y_pred  = torch.max(output.data, 1)
  _, y_truth = torch.max(y_batch, 1)

  # compute model loss
  loss = criterion(output, y_truth)

  # compute validation accuracy
  correct_counts = y_pred.eq(y_truth.data.view_as(y_pred))

  # mean validation accuracy
  accuracy = torch.mean(correct_counts.type(torch.FloatTensor))

  # predicted and ground truth values converted to a list
  y_pred  = y_pred.tolist()
  y_truth = y_truth.tolist()

  return accuracy, loss, y_pred, y_truth

Run

In [None]:
import torch.nn as nn

history = []


def run_model(fold, model, x_train, y_train, x_test, y_test, batch_size=32, n_epochs=2, learning_rate=0.001):

  global history # for model checkpoints

  # loss function and optimizer
  criterion       = nn.NLLLoss()
  model_optimizer = optim.Adam(model.parameters(), lr=learning_rate)

  for epoch in range(n_epochs):
    running_train_accuracy = 0
    running_train_loss     = 0

    running_test_accuracy = 0
    running_test_loss     = 0

    n_iters_train = len(x_train)/batch_size
    n_iters_test  = len(x_test)/batch_size

    # train the model (for each mini-batch)
    model.train()
    for index in range(0, len(x_train), batch_size):
      x_batch = x_train[index:index+batch_size]
      y_batch = y_train[index:index+batch_size]
      x_batch, y_batch = x_batch.to(device), y_batch.to(device)

      train_accuracy, train_loss = train(x_batch, y_batch, model, criterion, model_optimizer)
      
      # update metrics
      running_train_accuracy += train_accuracy.item()
      running_train_loss     += train_loss.item()

    # test the model
    with torch.no_grad():
      model.eval()

      test_pred, test_truth = [], []

      for index in range(0, len(x_test), batch_size):
        x_batch = x_test[index:index+batch_size]
        y_batch = y_test[index:index+batch_size]
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)

      test_accuracy, test_loss, pred, truth = test(x_batch, y_batch, model, criterion)
      
      # update metrics
      running_test_accuracy += test_accuracy.item()
      running_test_loss     += test_loss.item()
      
      # append to the end of the prediction and ground truth lists
      test_pred.extend(pred)
      test_truth.extend(truth)
  

    # mean metrics
    avg_train_accuracy = running_train_accuracy/n_iters_train
    avg_train_loss     = running_train_loss/n_iters_train
  
    avg_test_accuracy  = running_test_accuracy/n_iters_test
    avg_test_loss      = running_test_loss/n_iters_test
  
    # append checkpoints to model history
    history.append({"fold" : fold + 1, "epoch" : epoch + 1, "avg_train_accuracy" : avg_train_accuracy * 100, 
                    "avg_test_accuracy" : avg_test_accuracy * 100, "avg_train_loss" : avg_train_loss, "avg_test_loss" : avg_test_loss,
                    "test_pred" : test_pred, "test_truth" : test_truth})
    
    # print progress
    print_progress(fold, epoch, n_epochs, avg_train_accuracy, avg_train_loss, avg_test_accuracy, avg_test_loss)

    # save progress
    save_progress(fold, epoch, avg_test_accuracy, model, model_optimizer)
  

# Plotting

## Some imports and initialization

In [None]:
import ast

import seaborn as sb

import matplotlib.pyplot as plt

from sklearn.metrics import confusion_matrix as cm

res = results("history.csv")

labels = ["Anger", "Contempt", "Disgust", "Fear", "Happiness", "Sadness", "Surprise"]

# Utility functions
## Get test predictions

In [None]:
def get_test_pred(fold, epoch):
  df = pd.read_csv(res)
  filter = (df["fold"] == fold) & (df["epoch"] == epoch)
  test_pred = df.loc[filter, "test_pred"].item()
  test_pred = ast.literal_eval(test_pred)

  return test_pred

## Get data

In [None]:
def get_data(metric):
  list = []
  df = pd.read_csv(res)
  for fold in range(5):
    filter = df["fold"] == fold + 1
    train_accuracy = df.loc[filter, metric].tolist()
    list.append(train_accuracy)
  
  return list 

## Get truth values 

In [None]:
def get_test_truth(fold, epoch):
  df = pd.read_csv(res)
  filter = (df["fold"] == fold) & (df["epoch"] == epoch)
  test_truth = df.loc[filter, "test_truth"].item()
  test_truth = ast.literal_eval(test_truth)

  return test_truth

## Get metrics

In [None]:
def get_metric(metric):
  if metric == "accuracy":
    train_output = get_data("avg_train_accuracy")
    test_output  = get_data("avg_test_accuracy")
  else:
    train_output = get_data("avg_train_loss")
    test_output  = get_data("avg_test_loss")
  
  filename = results("train_test_{:s}".format(metric))

  return train_output, test_output, filename

## Confusion Matrix

In [None]:
def confusion_matrix(fold, epoch=25):
  test_pred  = get_test_pred(fold=fold, epoch=epoch)
  test_truth = get_test_truth(fold=fold, epoch=epoch)

  matrix = cm(test_truth, test_pred)
  matrix = (matrix.T / matrix.astype(np.float).sum(axis=1)).T
  matrix_df = pd.DataFrame(matrxi, labels, labels)

  filename = results("confusion_matrix-fold{:d}".format(fold))

  return matrix_df, filename

## Plot confusion matrix

In [None]:
def plot_confusion_matrix(fold):
  matrix_df, filename = confusion_matrix(fold=fold)

  ax = sb.heatmap(matrix_df, annot=True, cmap="Y1GnBu")
  ax.set(ylabel="True", xlabel="Predicted")
  plt.title("Confusion Matrix")
  plt.savefig(filename, bbox_inches="tight", dpi=200)
  plt.close()

## Plot metrics

In [None]:
def plot_metrics():
  metrics = ["accuracy", "loss"]

  for metric in metrics:
    x_axis = [epoch for epoch in range(25)]
    train_output, test_output, filename = get_metric(metric)

    fig = plt.figure()
    ax1 = fig.add_subplot(1, 2, 1)
    ax2 = fig.add_subplot(1, 2, 2)

    for fold in range(5):
      ax1.plot(x_axis, train_output[fold], label="fold {:d}".format(fold))
      ax2.plot(x_axis, test_output[fold], label="fold {:d}".format(fold))
    
    ax1.set_xlabel("epochs")
    ax1.set_ylabel(metric)
    ax1.set_title("train {:s}".format(metric))
    ax1.legend()

    ax2.set_xlabel("epochs")
    ax2.set_title("test {:s}".format(metric))
    ax2.legend()

    ratio = 0.8

    for ax in [ax1, ax2]:
      xmin, xmax = ax.get_xlim()
      ymin, ymax = ax.get_ylim()
      ax.set_aspect(abs((xmax - xmin)/(ymax-ymin)) * ratio, adjustable="box")
    
    plt.savefig(filename, bbox_inches="tight", dpi=200)
    plt.close()

# Run the model

In [None]:
def run():
  x, y = load_dataset()
  folds = kfold(x, y, 5)

  for repeat in range(3):

    for fold, (x_train, y_train, x_test, y_test) in enumerate(folds):
      x_train, y_train, x_test, y_test = convert_to_torch(x_train, y_train, x_test, y_test)

      model = initialize()
      run_model(fold, model, x_train, y_train, x_test, y_test)

      dump_dict_list(res)


  print("Start plotting")
  print("")

  plot_metrics()


In [None]:
run()
