In [12]:
import numpy as np
import cv2
import os
import sys
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as td
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from sklearn.metrics import f1_score

In [13]:
trains_kfolds = True

In [14]:
# Hyperparameters and settings
batch_size = 64
test_batch_size = 64
input_size = 1 # because there is only one channel 
output_size = 4
num_epochs = 1000
learning_rate = 0.001



In [15]:
# Load traiing, validation and training data

data_loader = torch.load('data_loader.pt')
valid_loader = torch.load('valid_loader.pt')
test_loader = torch.load('test_loader.pt')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Move the data loaders to the desired device
data_loader.dataset.tensors = tuple(tensor.to(device) for tensor in data_loader.dataset.tensors)
valid_loader.dataset.tensors = tuple(tensor.to(device) for tensor in valid_loader.dataset.tensors)
test_loader.dataset.tensors = tuple(tensor.to(device) for tensor in test_loader.dataset.tensors)


In [16]:
class CNN(nn.Module):
    def __init__(self):
        self.name = "CNN"
        super(CNN, self).__init__()
        self.conv_layer = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(inplace=True),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.fc_layer = nn.Sequential(
            nn.Dropout(p=0.1),
            nn.Linear(12 * 12 * 64, 1000),
            nn.ReLU(inplace=True),
            nn.Linear(1000, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.1),
            nn.Linear(512, 4)
        )
    def forward(self, x):
        # conv layers
        x = self.conv_layer(x)
        # print(x.shape)
        # flatten
        x = x.view(x.size(0), -1)
        # print(x.shape)
        # fc layer
        x = self.fc_layer(x)
        return x



In [17]:
class CNN2(nn.Module):
    def __init__(self):
        super(CNN2, self).__init__()
        self.name = "CNN2"
        self.conv_layer = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(inplace=True),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=5, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=5, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            #get dimensions of last layer
            
        )
        self.fc_layer = nn.Sequential(
            nn.Dropout(p=0.1),
            nn.Linear(64*9*9, 32),
            nn.ReLU(inplace=True),
            nn.Linear(32, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.1),
            nn.Linear(512, 4)
        )
    def forward(self, x):
        # conv layers
        x = self.conv_layer(x)
        # print(x.shape)
        # flatten
        x = x.view(x.size(0), -1)
        # print(x.shape)
        # fc layer
        x = self.fc_layer(x)
        return x

In [18]:
class CNN3(nn.Module):
    def __init__(self):
        super(CNN3, self).__init__()
        self.name = "CNN3"
        self.conv_layer = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=7, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=7, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=7, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),           
        )
        
        self.fc_layer = nn.Sequential(
            nn.Dropout(p=0.3),
            nn.Linear(7 * 7 * 64, 1000),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.3),
            nn.Linear(1000, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.3),
            nn.Linear(512, 4)
        )
        
    def forward(self, x, y=None):
        # conv layers
        x = self.conv_layer(x)
        # print(x.shape)
        # flatten
        x = x.view(x.size(0), -1)
        # print(x.shape)
        # fc layer
        x = self.fc_layer(x)
        return x

In [19]:
def train_and_save_model(model, data_loader, valid_loader, device, save_as_name = ''):
    if (save_as_name == '') :
        save_as_name = model.name
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    total_step = len(data_loader)
    loss_list = []
    acc_list = []
    f1_list = []

    loss_list_test = []
    acc_list_test = []
    f1_list_test = []

    # early stopping parameters
    best_loss = 100
    patience = 10 # number of epochs to wait before stopping
    count = 0
    best_epoch = 0
    for epoch in range(num_epochs):
        for i, (images, labels) in enumerate(data_loader):

            # Move images and labels to the device
            images = images.to(device)
            labels = labels.to(device)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss_list.append(loss.item())
            # Backprop and optimisation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            # Train accuracy
            total = labels.size(0)
            _, predicted = torch.max(outputs.data, 1)
            correct = (predicted == labels).sum().item()
            acc_list.append(correct / total)
            # Train F1 score
            f1 = f1_score(labels.cpu().numpy(), predicted.cpu().numpy(), average='macro')
            f1_list.append(f1)
            # print(i)
            # if (i + 1) % 10 == 0:
            #     print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {:.2f}%'
            #     .format(epoch + 1, num_epochs, i + 1, total_step, loss.item(),
            #     (correct / total) * 100))

        with torch.no_grad():
            for i, (images_test, labels_test) in enumerate(valid_loader):
                images_test = images_test.to(device)
                labels_test = labels_test.to(device)
                outputs_test = model(images_test)
                loss_test = criterion(outputs_test, labels_test)
                loss_list_test.append(loss_test.item())
                total_test = labels_test.size(0)
                _, predicted_test = torch.max(outputs_test.data, 1)
                correct_test = (predicted_test == labels_test).sum().item()
                acc_list_test.append(correct_test / total_test)

                #Test F1 score
                f1_test = f1_score(labels_test.cpu().numpy(), predicted_test.cpu().numpy(), average='macro')
                f1_list_test.append(f1_test)
        print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {:.2f}%, F1 Score: {:.2f}'
        .format(epoch + 1, num_epochs, i + 1, total_step, loss_test.item(),
        (correct / total) * 100, f1_test))


        #================= Early stopping =================
        
        #---------- Early stopping with avg F1 score-------
        # early stopping to prevent overfitting
        # saving model with best accuracy
        # avg_f1 = sum(f1_list_test)/len(f1_list_test)
        # print('Average F1 score: ', avg_f1)
        # if avg_f1 > best_f1:
        #     best_f1 = avg_f1
        #     count = 0 # reset count
        #     #saving best model thus far
        #     torch.save(model.state_dict(), '%s.pth'%(model.name))
        # else:
        #     count += 1
        #     if count == patience:
        #         print('Early stopping at epoch %d'%(epoch))
        #         break

        #------------- Early stopping with loss------------
        if loss_test.item() < best_loss:
            best_loss = loss_test.item()
            count = 0 # reset count
            best_epoch = epoch
            #saving best model thus far
            torch.save(model.state_dict(), '%s.pth'%(save_as_name))
            if best_loss < 0.0001:
                print('\n----------------------------------------\nEarly stopping at epoch %d'%(epoch))
                print('Best epoch: ', best_epoch)
                print('Best loss: ', best_loss, ' Best test F1 score: ', f1_test, ' Best test accuracy: ', acc_list_test[-1])
                break
        else:
            count += 1
            if count == patience:
                print('\n----------------------------------------\nEarly stopping at epoch %d'%(epoch))
                print('Best epoch: ', best_epoch)
                print('Best loss: ', best_loss, ' Best test F1 score: ', f1_test, ' Best test accuracy: ', acc_list_test[-1])
                break

        



In [20]:
model1 = CNN().to(device)
if (trains_kfolds):
    model = model1
    for i in range(1, 11):
        print("Training for model %s"%(model.name), "kfolds: ", i)
        data_loader = torch.load('kfolds/data_loader_kfold%s.pt'%(i))
        valid_loader = torch.load('kfolds/valid_loader_kfold%s.pt'%(i))
        train_and_save_model(model, data_loader, valid_loader, device, 'trained_k_folds/CNN_kfold_%s_%s'%(model.name, i))
else:
    train_and_save_model(model1, data_loader, valid_loader, device)


Training for model CNN kfolds:  1
Epoch [1/1000], Step [1/5], Loss: 1.7096, Accuracy: 30.07%, F1 Score: 0.26


KeyboardInterrupt: 

In [None]:
#device = torch.device('cpu')model1 = CNN().to(device)

model2 = CNN2().to(device)
if (trains_kfolds):
    model = model2
    for i in range(1, 11):
        print("Training for model %s"%(model.name), "kfolds: ", i)
        data_loader = torch.load('kfolds/data_loader_kfold%s.pt'%(i))
        valid_loader = torch.load('kfolds/valid_loader_kfold%s.pt'%(i))
        train_and_save_model(model, data_loader, valid_loader, device, 'trained_k_folds/CNN_kfold_%s_%s'%(model.name, i))
else:
    train_and_save_model(model2, data_loader, valid_loader, device)


Epoch [1/1000], Step [1/4], Loss: 1.3887, Accuracy: 25.12%, F1 Score: 0.10
Epoch [2/1000], Step [1/4], Loss: 1.3778, Accuracy: 26.99%, F1 Score: 0.17
Epoch [3/1000], Step [1/4], Loss: 1.3408, Accuracy: 35.16%, F1 Score: 0.24
Epoch [4/1000], Step [1/4], Loss: 1.2543, Accuracy: 40.77%, F1 Score: 0.28
Epoch [5/1000], Step [1/4], Loss: 1.1742, Accuracy: 43.22%, F1 Score: 0.31
Epoch [6/1000], Step [1/4], Loss: 1.1097, Accuracy: 43.69%, F1 Score: 0.33
Epoch [7/1000], Step [1/4], Loss: 1.0365, Accuracy: 46.85%, F1 Score: 0.34
Epoch [8/1000], Step [1/4], Loss: 0.9703, Accuracy: 46.96%, F1 Score: 0.37
Epoch [9/1000], Step [1/4], Loss: 0.9291, Accuracy: 56.31%, F1 Score: 0.40
Epoch [10/1000], Step [1/4], Loss: 0.9668, Accuracy: 56.66%, F1 Score: 0.47
Epoch [11/1000], Step [1/4], Loss: 0.9427, Accuracy: 56.66%, F1 Score: 0.47
Epoch [12/1000], Step [1/4], Loss: 0.8614, Accuracy: 60.40%, F1 Score: 0.56
Epoch [13/1000], Step [1/4], Loss: 0.7800, Accuracy: 64.95%, F1 Score: 0.63
Epoch [14/1000], Step

In [None]:

model3 = CNN3().to(device)
if (trains_kfolds):
    model = model3
    for i in range(1, 11):
        print("Training for model %s"%(model.name), "kfolds: ", i)
        data_loader = torch.load('kfolds/data_loader_kfold%s.pt'%(i))
        valid_loader = torch.load('kfolds/valid_loader_kfold%s.pt'%(i))
        train_and_save_model(model, data_loader, valid_loader, device, 'trained_k_folds/CNN_kfold_%s_%s'%(model.name, i))
else:
    train_and_save_model(model3, data_loader, valid_loader, device)

Epoch [1/1000], Step [1/4], Loss: 1.2100, Accuracy: 36.68%, F1 Score: 0.33
Epoch [2/1000], Step [1/4], Loss: 0.8203, Accuracy: 67.99%, F1 Score: 0.65
Epoch [3/1000], Step [1/4], Loss: 0.6293, Accuracy: 76.05%, F1 Score: 0.75
Epoch [4/1000], Step [1/4], Loss: 0.5338, Accuracy: 81.89%, F1 Score: 0.80
Epoch [5/1000], Step [1/4], Loss: 0.4419, Accuracy: 88.55%, F1 Score: 0.84
Epoch [6/1000], Step [1/4], Loss: 0.4042, Accuracy: 87.97%, F1 Score: 0.85
Epoch [7/1000], Step [1/4], Loss: 0.3546, Accuracy: 87.73%, F1 Score: 0.86
Epoch [8/1000], Step [1/4], Loss: 0.3080, Accuracy: 90.65%, F1 Score: 0.89
Epoch [9/1000], Step [1/4], Loss: 0.3256, Accuracy: 91.00%, F1 Score: 0.89
Epoch [10/1000], Step [1/4], Loss: 0.2853, Accuracy: 93.57%, F1 Score: 0.90
Epoch [11/1000], Step [1/4], Loss: 0.2892, Accuracy: 94.51%, F1 Score: 0.90
Epoch [12/1000], Step [1/4], Loss: 0.3188, Accuracy: 95.91%, F1 Score: 0.90
Epoch [13/1000], Step [1/4], Loss: 0.2850, Accuracy: 94.04%, F1 Score: 0.92
Epoch [14/1000], Step

In [21]:
def evaluate_model(model, valid_loader):
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        all_labels = []
        all_predictions = []
        for images, labels in valid_loader:
            images = images.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())
        print('Test Accuracy of the model (validation): {:.2f} %'
        .format((correct / total) * 100))
        print('F1 Score of the model (validation): {:.2f} %'
        .format((f1_score(all_labels, all_predictions, average='macro')) * 100))

def test_model(model, test_loader):
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        all_labels = []
        all_predictions = []
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())
        print('Test Accuracy of the model (testing): {:.2f} %'
        .format((correct / total) * 100))
        print('F1 Score of the model (testing): {:.2f} %'
        .format((f1_score(all_labels, all_predictions, average='macro')) * 100))

In [22]:
for model in [model1, model2, model3]:
    print('\nModel: ', model.name, '\n')

    evaluate_model(model, valid_loader)
    print('\n')
    test_model(model, test_loader)

NameError: name 'model2' is not defined

In [24]:
# function to test an individual image from an external source
def test_individual_image(model, image_name, category, read_custom_path = ''):
    if (read_custom_path != ''):
        img = cv2.imread(read_custom_path, 0)
        image_name = read_custom_path
        #save the image
    else:
        img = cv2.imread("../concat_data/%s/%s" % (category, image_name), 0)
    
    #resize to 48x48 pixels
    img = cv2.resize(img, (48, 48))



    #if image 3 channel convert to 1 channel
    if len(img.shape) > 2:
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    cv2.imwrite("image_mod.jpg", img)
    
    # Convert image to tensor and add batch and channel dimensions
    img_tensor = torch.tensor(img, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
    
    # Get label tensor
    labels = {'focused': 0, 'happy': 1, 'neutral': 2, 'surprised': 3}
    reverse_labels = {0: 'focused', 1: 'happy', 2: 'neutral', 3: 'surprised'}
    label_tensor = torch.tensor(labels[category], dtype=torch.long)

    # Forward pass for the single image
    img_tensor = img_tensor.to(device)
    output = model(img_tensor)
    _, predicted = torch.max(output, 1)

    # Print results
    print("Predicted:", predicted.item(), "(", reverse_labels[predicted.item()], ")")
    print("Actual:", label_tensor.item(), "(", reverse_labels[label_tensor.item()], ")")
    print("Image:", image_name)
    print("Category:", category)
    print("///////")
    
    return predicted.item()

In [26]:
test_individual_image(model1, "86_MMA-FACIAL-EXPRESSION-mahmoud.jpg", "happy", read_custom_path=r"../unseen-test-imgs/test_smile.PNG")


error: OpenCV(4.9.0) D:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\resize.cpp:4152: error: (-215:Assertion failed) !ssize.empty() in function 'cv::resize'


In [None]:
test_individual_image(model3, "86_MMA-FACIAL-EXPRESSION-mahmoud.jpg", "neutral", read_custom_path=r"../unseen-test-imgs/test_neutral.PNG")

Predicted: 1 ( happy )
Actual: 2 ( neutral )
Image: ../unseen-test-imgs/test_neutral.PNG
Category: neutral
///////


1

In [None]:
test_individual_image(model2, "86_MMA-FACIAL-EXPRESSION-mahmoud.jpg", "focused", read_custom_path=r"../unseen-test-imgs/test_focused.PNG")
test_individual_image(model2, "86_MMA-FACIAL-EXPRESSION-mahmoud.jpg", "focused", read_custom_path=r"../unseen-test-imgs/test_focused_2.PNG")
test_individual_image(model2, "86_MMA-FACIAL-EXPRESSION-mahmoud.jpg", "focused", read_custom_path=r"../unseen-test-imgs/test_focused_3.PNG")
test_individual_image(model2, "86_MMA-FACIAL-EXPRESSION-mahmoud.jpg", "focused", read_custom_path=r"../unseen-test-imgs/test_focused_4.PNG")
test_individual_image(model2, "86_MMA-FACIAL-EXPRESSION-mahmoud.jpg", "focused", read_custom_path=r"../unseen-test-imgs/test_focused_5.PNG")

Predicted: 0 ( focused )
Actual: 0 ( focused )
Image: ../unseen-test-imgs/test_focused.PNG
Category: focused
///////
Predicted: 3 ( surprised )
Actual: 0 ( focused )
Image: ../unseen-test-imgs/test_focused_2.PNG
Category: focused
///////
Predicted: 0 ( focused )
Actual: 0 ( focused )
Image: ../unseen-test-imgs/test_focused_3.PNG
Category: focused
///////
Predicted: 0 ( focused )
Actual: 0 ( focused )
Image: ../unseen-test-imgs/test_focused_4.PNG
Category: focused
///////
Predicted: 0 ( focused )
Actual: 0 ( focused )
Image: ../unseen-test-imgs/test_focused_5.PNG
Category: focused
///////


0

In [None]:
test_individual_image(model1, "86_MMA-FACIAL-EXPRESSION-mahmoud.jpg", "surprised", read_custom_path=r"../unseen-test-imgs/test_surprised.PNG")

Predicted: 3 ( surprised )
Actual: 3 ( surprised )
Image: ../unseen-test-imgs/test_surprised.PNG
Category: surprised
///////


3

In [None]:
test_individual_image(model1, "86_MMA-FACIAL-EXPRESSION-mahmoud.jpg", "focused")

[ WARN:0@1142.759] global loadsave.cpp:248 findDecoder imread_('../concat_data/focused/86_MMA-FACIAL-EXPRESSION-mahmoud.jpg'): can't open/read file: check file path/integrity


error: OpenCV(4.9.0) /io/opencv/modules/imgproc/src/resize.cpp:4152: error: (-215:Assertion failed) !ssize.empty() in function 'resize'
