In [1]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import glob
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils

import torch.nn as nn
import torchvision.transforms as transforms

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import scipy.io as sio
from os import listdir
from os.path import isfile, join
import time
import copy

## Load in Dataset

In [2]:
!git clone https://github.com/mzhao98/emotion_recognition

fatal: destination path 'emotion_recognition' already exists and is not an empty directory.


In [3]:
def load_dataset():
    train_dir = 'emotion_recognition/data/train/'
    test_dir = 'emotion_recognition/data/test/'
    categories = ['happy', 'sad', 'fear', 'surprise', 'neutral', 'angry', 'disgust']

    train_file_dictionary = {}
    train_imagefile_to_class_dictionary = {}
    for emotion in categories:
        train_file_dictionary[emotion] = []

    counter = 0
    for i in range(len(categories)):
        for subdir, dirs, files in os.walk(train_dir+categories[i]+'/'):
            for file in files:
                train_file_dictionary[categories[i]].append(train_dir+categories[i]+'/'+file)
                train_imagefile_to_class_dictionary[counter] = {}
                train_imagefile_to_class_dictionary[counter]['file'] = train_dir+categories[i]+'/'+file
                train_imagefile_to_class_dictionary[counter]['label'] = i
                counter += 1



    test_file_dictionary = {}
    test_imagefile_to_class_dictionary = {}
    for emotion in categories:
        test_file_dictionary[emotion] = []

    counter = 0
    for i in range(len(categories)):
        for subdir, dirs, files in os.walk(test_dir+categories[i]+'/'):
            for file in files:
                test_file_dictionary[categories[i]].append(test_dir+categories[i]+'/'+file)
                test_imagefile_to_class_dictionary[counter] = {}
                test_imagefile_to_class_dictionary[counter]['file'] = test_dir+categories[i]+'/'+file
                test_imagefile_to_class_dictionary[counter]['label'] = i
                counter += 1

    return train_imagefile_to_class_dictionary, test_imagefile_to_class_dictionary

## Create Image Dataset

In [4]:
class FacialEmotionDataset(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, imagefile_to_class_dictionary, transform=None):
        """
        Args:
            root_dir (string): Directory with all the images.
            imagefile_to_class_dictionary (dictionary): Dictionary of image filenames to class for each emotion.
        """
#         self.root_dir = root_dir
        self.imagefile_to_class_dictionary = imagefile_to_class_dictionary
        self.transform = transforms.Compose(
                [
                    transforms.Resize((96, 96)),
                    transforms.ToTensor(),
                    # transforms.CenterCrop(10),
                 
                 transforms.Normalize((0.5), 
                                      (0.5))])


    def __len__(self):
        return len(self.imagefile_to_class_dictionary.keys())

    def __getitem__(self, idx):
#         print("idx", idx)
        path_to_image = self.imagefile_to_class_dictionary[idx]['file']
#         image = io.imread(path_to_image)
        image = Image.open(path_to_image)
        image = self.transform(image).float()
        label = int(self.imagefile_to_class_dictionary[idx]['label'])
        return image, label

In [5]:


class FacialEmotionDataset_Augmented(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, imagefile_to_class_dictionary, transform=None):
        """
        Args:
            root_dir (string): Directory with all the images.
            imagefile_to_class_dictionary (dictionary): Dictionary of image filenames to class for each emotion.
        """
#         self.root_dir = root_dir
        self.imagefile_to_class_dictionary = imagefile_to_class_dictionary
        self.transform = transforms.Compose([
                                    

        transforms.RandomResizedCrop(96),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5), 
                                      (0.5)),
        transforms.Resize((96, 96)),

        
    ])


    def __len__(self):
        return len(self.imagefile_to_class_dictionary.keys())

    def __getitem__(self, idx):
#         print("idx", idx)
        path_to_image = self.imagefile_to_class_dictionary[idx]['file']
#         image = io.imread(path_to_image)
        image = Image.open(path_to_image)
        image = self.transform(image).float()
        label = int(self.imagefile_to_class_dictionary[idx]['label'])
        return image, label

## Create CNN

In [6]:
class FaceNet(nn.Module):
    def __init__(self):
        super(FaceNet, self).__init__()
        # torch.Size([256, 1, 64, 64])
        # 3 input image channel (RGB), #6 output channels, 4x4 kernel 
        self.conv1 = nn.Conv2d(1, 32, kernel_size=(3,3), stride=1, 
                               padding=1, dilation=1, groups=1, 
                               bias=True, padding_mode='reflect')
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=(3,3), stride=1, 
                               padding=1, dilation=1, groups=1, 
                               bias=True, padding_mode='reflect')
        
        self.conv3 = nn.Conv2d(64, 64, kernel_size=(3,3), stride=1, 
                               padding=1, dilation=1, groups=1, 
                               bias=True, padding_mode='reflect')
        
        self.conv4 = nn.Conv2d(64, 32, kernel_size=(3,3), stride=1, 
                               padding=1, dilation=1, groups=1, 
                               bias=True, padding_mode='reflect')
        
        
        self.drop1 = nn.Dropout(p=0.1)
        self.norm1 = nn.LayerNorm([48, 48])
        self.norm2 = nn.LayerNorm([24, 24])
        
        self.fc1 = nn.Linear(1152, 256)
        self.fc2 = nn.Linear(256, 96)
        self.fc3 = nn.Linear(96, 7)
        
    def forward(self, x):
        # Max pooling over a (2, 2) window
        x = F.max_pool2d(F.relu(self.conv1(x)), (2,2))
        x = self.norm1(x)
#         print(x.shape)
        
        x = F.max_pool2d(F.relu(self.conv2(x)), (2,2))
        x = self.norm2(x)
#         print(x.shape)
        
        x = F.max_pool2d(F.relu(self.conv3(x)), (2,2))

        x = F.max_pool2d(F.relu(self.conv4(x)), (2,2))
#         print(x.shape)

        x = torch.flatten(x, 1)
#         print(x.shape)
        
        x = self.fc1(x)
        x = F.relu(x)
        x = self.drop1(x)
        
        x = self.fc2(x)
        x = F.relu(x)
        x = self.drop1(x)
        
        x = self.fc3(x)
        x = F.relu(x)
#         x = self.drop1(x)
        
#         output = x
        output = F.log_softmax(x, dim=1)
        return output
        

In [7]:
train_imagefile_to_class_dictionary, test_imagefile_to_class_dictionary = load_dataset()
train_dataset = FacialEmotionDataset(train_imagefile_to_class_dictionary)
test_dataset = FacialEmotionDataset(test_imagefile_to_class_dictionary)

In [8]:
len(train_dataset)

28709

In [9]:
def train_model(model, train_dataset, dataloaders, criterion, optimizer, test_data_loader, test_dataset, num_epochs=25, is_inception=False):
    since = time.time()

    val_acc_history = []
    train_acc_history = []
    test_acc_history = []
    loss_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for batch_idx, (inputs, labels) in enumerate(dataloaders):
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    # Get model outputs and calculate loss
                    # Special case for inception because in training it has an auxiliary output. In train
                    #   mode we calculate the loss by summing the final output and the auxiliary output
                    #   but in testing we only consider the final output.
                    if is_inception and phase == 'train':
                        # From https://discuss.pytorch.org/t/how-to-optimize-inception-model-with-auxiliary-classifiers/7958
                        outputs, aux_outputs = model(inputs)
                        loss1 = criterion(outputs, labels)
                        loss2 = criterion(aux_outputs, labels)
                        loss = loss1 + 0.4*loss2
                    else:
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders)
            
            epoch_acc = running_corrects.double() / len(train_dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
            if phase == 'val':
              test_acc = compute_test_accuray(model, test_data_loader, test_dataset)
              if test_acc > best_acc:
                  best_acc = test_acc
                  best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)
                test_acc_history.append(test_acc)
            if phase == 'train':
                train_acc_history.append(epoch_acc)
                loss_history.append(epoch_loss)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_acc_history, best_model_wts, train_acc_history, test_acc_history, loss_history

In [10]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = FaceNet()
model_ft = model.to(device)
feature_extract = False

params_to_update = model_ft.parameters()
print("Params to learn:")
if feature_extract:
    params_to_update = []
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
            print("\t",name)
else:
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            print("\t",name)

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(params_to_update, lr=0.001, momentum=0.9)

Params to learn:
	 conv1.weight
	 conv1.bias
	 conv2.weight
	 conv2.bias
	 conv3.weight
	 conv3.bias
	 conv4.weight
	 conv4.bias
	 norm1.weight
	 norm1.bias
	 norm2.weight
	 norm2.bias
	 fc1.weight
	 fc1.bias
	 fc2.weight
	 fc2.bias
	 fc3.weight
	 fc3.bias


In [11]:
train_dataset = FacialEmotionDataset(train_imagefile_to_class_dictionary)
train_dataset_augmented = FacialEmotionDataset_Augmented(train_imagefile_to_class_dictionary)

test_dataset = FacialEmotionDataset(test_imagefile_to_class_dictionary)


increased_dataset = torch.utils.data.ConcatDataset([train_dataset_augmented,train_dataset])

print(len(increased_dataset))
train_data_loader = torch.utils.data.DataLoader(increased_dataset, batch_size=32,
                                          shuffle=True,
                                         )
test_data_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32,
                                          shuffle=True,
                                         )

57418


In [None]:

criterion = nn.CrossEntropyLoss()
num_epochs = 100

# Train and evaluate
model, val_acc_history, best_model_wts, train_acc_history, test_acc_history, loss_history = train_model(model_ft, increased_dataset, train_data_loader, criterion, optimizer_ft, test_data_loader, test_dataset, num_epochs=num_epochs)

Epoch 0/99
----------
train Loss: 51.6775 Acc: 0.3695
val Loss: 50.2221 Acc: 0.3908
Test accuracy =  tensor(0.4253, device='cuda:0', dtype=torch.float64)

Epoch 1/99
----------
train Loss: 49.6049 Acc: 0.3962
val Loss: 47.9229 Acc: 0.4233
Test accuracy =  tensor(0.4642, device='cuda:0', dtype=torch.float64)

Epoch 2/99
----------
train Loss: 47.6737 Acc: 0.4264
val Loss: 45.4672 Acc: 0.4613
Test accuracy =  tensor(0.5098, device='cuda:0', dtype=torch.float64)

Epoch 3/99
----------
train Loss: 43.9726 Acc: 0.4764
val Loss: 42.3566 Acc: 0.4967
Test accuracy =  tensor(0.5400, device='cuda:0', dtype=torch.float64)

Epoch 4/99
----------
train Loss: 42.3280 Acc: 0.4944
val Loss: 41.5264 Acc: 0.5023
Test accuracy =  tensor(0.5453, device='cuda:0', dtype=torch.float64)

Epoch 5/99
----------
train Loss: 41.1955 Acc: 0.5064
val Loss: 40.2104 Acc: 0.5202
Test accuracy =  tensor(0.5538, device='cuda:0', dtype=torch.float64)

Epoch 6/99
----------
train Loss: 39.9338 Acc: 0.5220
val Loss: 38.273

In [None]:
plt.plot(range(len(val_acc_history)), val_acc_history)
plt.title("Validation Accuracy")
plt.show()

In [None]:
torch.save(model.state_dict(), '4layer_cnn_2.pkl')


In [None]:
from google.colab import files
files.download("4layer_cnn_2.pkl")

In [None]:
train_accuracy = 0
for batch_idx, (inputs, labels) in enumerate(train_data_loader):
    inputs = inputs.to(device)
    labels = labels.to(device)

   

    outputs = model(inputs)
    loss = criterion(outputs, labels)

    _, preds = torch.max(outputs, 1)


    # statistics
#     running_loss += loss.item() * inputs.size(0)
#     print("preds", preds)
#     print('labels.data', labels.data)
    train_accuracy += torch.sum(preds == labels.data)
#     break

# epoch_loss = running_loss / len(dataloaders)
train_acc = train_accuracy.double() / len(train_dataset)
print('Training accuracy = ', train_acc )

In [None]:
test_accuracy = 0
for batch_idx, (inputs, labels) in enumerate(test_data_loader):
    inputs = inputs.to(device)
    labels = labels.to(device)

  

    outputs = model(inputs)
    loss = criterion(outputs, labels)

    _, preds = torch.max(outputs, 1)


    # statistics
#     running_loss += loss.item() * inputs.size(0)
    test_accuracy += torch.sum(preds == labels.data)

# epoch_loss = running_loss / len(dataloaders)
test_acc = test_accuracy.double() / len(test_dataset)
print('Test accuracy = ', test_acc )

In [15]:
def compute_test_accuray(model, test_data_loader, test_dataset):
  test_accuracy = 0
  for batch_idx, (inputs, labels) in enumerate(test_data_loader):
      inputs = inputs.to(device)
      labels = labels.to(device)

    

      outputs = model(inputs)
      loss = criterion(outputs, labels)

      _, preds = torch.max(outputs, 1)


      # statistics
  #     running_loss += loss.item() * inputs.size(0)
      test_accuracy += torch.sum(preds == labels.data)

  # epoch_loss = running_loss / len(dataloaders)
  test_acc = test_accuracy.double() / len(test_dataset)
  print('Test accuracy = ', test_acc )
  return test_acc