In [10]:
# Mount Drive to access datasets

from google.colab import drive
drive.mount('/content/drive/', force_remount=True) 

Mounted at /content/drive/


In [11]:
# Import raw FER dataset from Drive

import pandas as pd
import os

# load df
df = pd.read_csv('drive/My Drive/Colab Notebooks/fer2013.csv')

# transform cell dta
df["pixels"] = df["pixels"].apply(lambda x: np.reshape(np.array([int(num) for num in x.split(" ")]), (-1, 48)))
df["emotion"] = df["emotion"].apply(lambda x: int(x))

# split training, validation, evaluation sets
training_data = df[df['Usage']=='Training']
validation_data = df[df['Usage']=='PrivateTest']
test_data = df[df['Usage']=='PublicTest']

training_data.head()

Unnamed: 0,emotion,pixels,Usage
0,0,"[[70, 80, 82, 72, 58, 58, 60, 63, 54, 58, 60, ...",Training
1,0,"[[151, 150, 147, 155, 148, 133, 111, 140, 170,...",Training
2,2,"[[231, 212, 156, 164, 174, 138, 161, 173, 182,...",Training
3,4,"[[24, 32, 36, 30, 32, 23, 19, 20, 30, 41, 21, ...",Training
4,6,"[[4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 15, 2...",Training


In [12]:
# Import preprocessed FER dataset from Drive

import numpy as np
import collections

# import training data
trainX = np.load('drive/My Drive/preprocessed_data/fer2013_train_X.npy', allow_pickle=True)
trainY = np.array([x.index(1) for x in np.load('drive/My Drive/preprocessed_data/fer2013_train_Y.npy', allow_pickle=True).tolist()])

# import validation data
valX = np.load('drive/My Drive/preprocessed_data/fer2013_val_X.npy', allow_pickle=True)
valY = np.array([x.index(1) for x in np.load('drive/My Drive/preprocessed_data/fer2013_val_Y.npy', allow_pickle=True).tolist()])

# import test data
testX = np.load('drive/My Drive/preprocessed_data/fer2013_test_X.npy', allow_pickle=True)
testY = np.array([x.index(1) for x in np.load('drive/My Drive/preprocessed_data/fer2013_test_Y.npy', allow_pickle=True).tolist()])

print(len(trainX))
print(len(valX))
print(len(testX))

57418
3589
3589


In [13]:
# Define Model Architecture heavily based off of https://github.com/pytorch/vision/blob/f95b0533243dfbc901b5ed5f5db28a5a46bdb699/torchvision/models/resnet.py

import torch
from torch import Tensor
import torch.nn as nn
from torch.hub import load_state_dict_from_url
from typing import Type, Any, Callable, Union, List, Optional

# pretrained resnet18 model, not in use
model_urls = {
    'resnet18': 'https://download.pytorch.org/models/resnet18-5c106cde.pth',
}


def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)

def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


class BasicBlock(nn.Module):
    expansion: int = 1

    def __init__(
        self,
        inplanes: int,
        planes: int,
        stride: int = 1,
        downsample: Optional[nn.Module] = None,
        groups: int = 1,
        base_width: int = 64,
        dilation: int = 1,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        super(BasicBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if groups != 1 or base_width != 64:
            raise ValueError('BasicBlock only supports groups=1 and base_width=64')
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
        # Both self.conv1 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = norm_layer(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = norm_layer(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

class ResNet(nn.Module):

    def __init__(
        self,
        block: Type[Union[BasicBlock]],
        layers: List[int],
        num_classes: int = 1000,
        zero_init_residual: bool = False,
        groups: int = 1,
        width_per_group: int = 64,
        replace_stride_with_dilation: Optional[List[bool]] = None,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        super(ResNet, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError("replace_stride_with_dilation should be None "
                             "or a 3-element tuple, got {}".format(replace_stride_with_dilation))
        self.groups = groups
        self.base_width = width_per_group

        # layers
        self.conv1 = nn.Conv2d(1, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256 * block.expansion, num_classes)

        self.conv6 = nn.Conv2d(512, 256, kernel_size=1, stride=2, bias=False)
        self.bn2 = norm_layer(256)
        self.relu2 = nn.ReLU(inplace=True)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)
                else:
                  print("Unexpected module type")

    def _make_layer(self, block: Type[Union[BasicBlock]], planes: int, blocks: int,
                    stride: int = 1, dilate: bool = False) -> nn.Sequential:
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
                            self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, groups=self.groups,
                                base_width=self.base_width, dilation=self.dilation,
                                norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def _forward_impl(self, x: Tensor) -> Tensor:
        # conv1,3*3,64
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        # conv2x,3*3,64
        x = self.maxpool(x)
        x = self.layer1(x)

        # conv3x,3*3,128
        x = self.layer2(x)

        # conv4x,3*3,128
        x = self.layer3(x)

        # conv5x,3*3,128
        x = self.layer4(x)

        # GAP
        x = self.avgpool(x)

        # conv6,1*1,256
        x = self.conv6(x)
        x = self.bn2(x)
        x = self.relu2(x)

        # conv7,1*1, 7
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def forward(self, x: Tensor) -> Tensor:
        return self._forward_impl(x)


def _resnet(arch: str, block: Type[Union[BasicBlock]], layers: List[int], pretrained: bool, progress: bool, **kwargs: Any) -> ResNet:
    model = ResNet(block, layers, **kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(pretrained_model_urls[arch],
                                              progress=progress)
        model.load_state_dict(state_dict)
    return model

def resnet18(pretrained: bool = False, progress: bool = True, **kwargs: Any) -> ResNet:
    r"""ResNet-18 model from
    `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_
    Args:
        pretrained (bool): If True, returns a model pre-trained on ImageNet
        progress (bool): If True, displays a progress bar of the download to stderr
    """
    return _resnet('resnet18', BasicBlock, [2, 2, 2, 2], pretrained, progress,
                   **kwargs)





Train Model

In [14]:
# Define Training and Testing Infrastructure

import torchvision
from torch.autograd import Variable
from PIL import Image
import torch.optim as optim
import copy

class average_meter(object):
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

class MyDataset(torch.utils.data.Dataset):
    def __init__(self, dataframe):
        self.dataframe = dataframe

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, index):
        row = self.dataframe.iloc[index]
        return (
            torchvision.transforms.functional.to_tensor(row["pixels"]).float(),
            int(row["emotion"]),
        )

class MyNumpyDataset(torch.utils.data.Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __len__(self):
        # arbitrary 
        return len(self.x)

    def __getitem__(self, index):
        return (
            torchvision.transforms.functional.to_tensor(self.x[index]).float(),
            self.y[index]
        )

def train(model, batch_size, log_interval, training_dataset, optimizer, **kwargs):
    model.train()

    losses = average_meter()
    accuracy = average_meter()


    train_loader = torch.utils.data.DataLoader(training_dataset, batch_size=batch_size, shuffle=True, **kwargs)

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = Variable(data).cuda(), Variable(target).cuda()
        output = model(data)
        loss_function = nn.CrossEntropyLoss()
        loss = loss_function(output, target)
        losses.update(loss.data, data.size(0))

        pred = output.data.max(1)[1]
        prec = pred.eq(target.data).cpu().sum()
        accuracy.update(float(prec) / data.size(0), data.size(0))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # if batch_idx % log_interval == 0:
            # print('Batch:[{:5d}/{:5d}({:3.0f}%)], '                     
            #       'Loss:{:.4f}, '
            #       'Batch accuracy:{:.4f}'.format(
            #           batch_idx * len(data), len(training_dataset),
            #           100. * batch_idx / len(train_loader), losses.val, accuracy.val))


def test(model, test_batch_size, validation_dataset, name, **kwargs):
    model.eval()

    losses = average_meter()
    accuracy = average_meter()

    test_loader = torch.utils.data.DataLoader(validation_dataset, batch_size=test_batch_size, shuffle=False, **kwargs)


    for data, target in test_loader:

        data, target = Variable(data).cuda(), Variable(target).cuda()
        output = model(data)
        loss_function = nn.CrossEntropyLoss()
        loss = loss_function(output, target)
        losses.update(loss.data, data.size(0))

        pred = output.data.max(1)[1]
        prec = pred.eq(target.data).cpu().sum()
        accuracy.update(float(prec) / data.size(0), data.size(0))

    print('{} Test: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(name,
        losses.avg, int(accuracy.sum), len(validation_dataset), 100. * accuracy.avg))

    return accuracy.avg


In [16]:
# Train and Test the Model

# select raw or preprocessed dataset
use_raw_data = False

training_dataset = None
validation_dataset = None
test_dataset = None
if use_raw_data:
  training_dataset = MyDataset(training_data)
  validation_dataset = MyDataset(validation_data)
  test_dataset = MyDataset(test_data)
else:
  training_dataset = MyNumpyDataset(trainX, trainY)
  validation_dataset = MyNumpyDataset(valX, valY)
  test_dataset = MyNumpyDataset(testX, testY)

# select training parameters
epochs= 100
for batch_size in [64, 256, 1024]:
  for lr in [0.01, 0.001, 0.0001]:
    for momentum in [0.5]:

      # current model
      model = resnet18(num_classes=7).cuda()

      # track best model
      best_model = copy.deepcopy(model)
      best_accuracy = 0.0
      best_training_accuracy = 0.0

      optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum ,weight_decay=0.0005,nesterov=True)
      scheduler = optim.lr_scheduler.StepLR(optimizer,step_size=20, gamma=0.1)

      for e in range(1, epochs + 1):
        # train model for given epoch
        print('Epoch:{}'.format(e))
        train(model, batch_size=batch_size, log_interval=224, training_dataset=training_dataset, optimizer=optimizer)

        # evaluate accuracy at given epoch
        train_accuracy = test(model, test_batch_size=512, validation_dataset=training_dataset, name="Training")
        val_accuracy = test(model, test_batch_size=256, validation_dataset=validation_dataset, name="Validation")

        scheduler.step()

        # update best model if improvement seen
        if best_accuracy < val_accuracy:
            best_model   = copy.deepcopy(model)
            best_accuracy = val_accuracy
            best_training_accuracy = train_accuracy

      # calculate test accuracy on final model
      test_accuracy = test(model, test_batch_size=256, validation_dataset=test_dataset, name="Test")

      # print and save model state to file
      print("Best Model batch_size:{}, learning_rate:{}, momentum:{}, Training Accuracy: {:.6f}, Validation Accuracy: {:.6f}, Test Accuracy: {:.6f}".format(batch_size, lr, momentum, best_training_accuracy, best_accuracy, test_accuracy))
      torch.save(best_model.state_dict(), 'drive/My Drive/resnet_models/model_bs{}_lr{}_momentum{}.pt'.format(batch_size, lr, momentum))
      




Epoch:1


KeyboardInterrupt: ignored

In [17]:
# Generate Visualizations

import sklearn.metrics
import matplotlib.pyplot as plt
import seaborn as sns

selected_model = None # REPLACE_ME_WITH_MODEL_TO_GENERATE_FIGURES_FOR

def answers(model, **kwargs):
  model.eval()

  test_loader = torch.utils.data.DataLoader(validation_dataset, batch_size=1, shuffle=False, **kwargs)
  results = []
  expected = []

  for data, target in test_loader:
      data, target = Variable(data).cuda(), Variable(target).cuda()
      output = model(data)
      pred = output.data.max(1)[1]
      results.append(pred[0].item())
      expected.append(target[0].item())

  return (results, expected)

predictions = answers(selected_model)

def get_emotion_labels(labels):
  emotions = []
  for label in labels:
    if label == 0:
      emotions.append('Angry')
    elif label == 1:
      emotions.append('Disgust')
    elif label == 2:
      emotions.append('Fear')
    elif label == 3:
      emotions.append('Happy')
    elif label == 4:
      emotions.append('Sad')
    elif label == 5:
      emotions.append('Surprise')
    elif label == 6:
      emotions.append('Neutral')
  return emotions

fer_emotions_dict = {0: 'Angry', 1: 'Disgust', 2: 'Fear', 3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'}

confusion_matrix = sklearn.metrics.confusion_matrix(get_emotion_labels(predictions[0]), get_emotion_labels(predictions[1]))
confusion_matrix = np.around(confusion_matrix.astype('float') / confusion_matrix.sum(axis=1)[:, np.newaxis], decimals=2)
confusion_matrix = pd.DataFrame(confusion_matrix,
                                index = fer_emotions_dict.values(), 
                                columns = fer_emotions_dict.values())

figure = plt.figure(figsize=(8, 8))
sns.heatmap(confusion_matrix, annot=True,cmap=plt.cm.Blues)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

AttributeError: ignored