# Evaluate AffectNet March2021
* Author: Sungguk Cha
* eMail: sungguk@ncsoft.com
* Date: 4th Nov. 2022

## Libraries

In [None]:
import copy
import glob
import os

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image
# from sklearn.metrics import plot_confusion_matrix
import timm
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
from torchvision import transforms
from torchvision.models import resnet101, mobilenet_v2
from tqdm.notebook import tqdm
import seaborn as sns
from sklearn.metrics import confusion_matrix,f1_score

from robust_optimization import RobustOptimizer

print(f'Torch: {torch.__version__}')
print(f'Timm: {timm.__version__}')

## Training configurations

In [None]:
affectnet_dir = './data_eila_ft_test/'
USE_ENET2=False #False

In [None]:
# Training settings
batch_size = 32 #48# 32# 32 #16 #8 #
epochs = 40
lr = 3e-5
gamma = 0.7
seed = 42
device = 'cuda'
use_cuda = torch.cuda.is_available()
print(use_cuda)

In [None]:
IMG_SIZE=260 if USE_ENET2 else 224 # 300 # 80 #
train_transforms = transforms.Compose(
    [
        transforms.Resize((IMG_SIZE,IMG_SIZE)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    ]
)

test_transforms = transforms.Compose(
    [
        transforms.Resize((IMG_SIZE,IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    ]
)
print(test_transforms)

In [None]:
kwargs = {'num_workers': 0, 'pin_memory': True} if use_cuda else {}

## Changing EiLA dataset 

In [None]:
# Read the files in the ./data directory and create a csv file with paths and labels
#create a df
df = pd.DataFrame(columns=['phase','img_path','label'])
for i in range(7):
    file_names = os.listdir(f'../../EiLA_data/val_set/{i}')
    for file_name in file_names:
        # only if .ds_store is not in the file_name
        if '.DS_Store' not in file_name:
            df.loc[len(df)] = {'phase': 'val', 'img_path': f'{i}/{file_name}', 'label': i} #./data/val/{i}/{file_name}
        
df.to_csv('../../EiLA_data/affectnet_val.csv')

In [None]:
df

## AffectNet Dataloader

In [None]:
# reference https://github.com/yaoing/DAN/blob/main/affectnet.py
# phase: one of ['train', 'val']
class AffectNet(data.Dataset):
    def __init__(self, aff_path, phase, use_cache=True, transforms=None, force=False):
        self.phase = phase
        self.transforms = transforms
        self.aff_path = aff_path
        self.base_path = os.path.join(self.aff_path, f'{self.phase}/')
        
        if use_cache:
            cache_path = os.path.join(aff_path,f'eila.csv')
            if os.path.exists(cache_path) and not force:
                df = pd.read_csv(cache_path)
            else:
                df = self.get_df()
                df.to_csv(cache_path)
        else:
            df = self.get_df()

        self.data = df[df['phase'] == phase]

        self.file_paths = self.data.loc[:, 'img_path'].values
        self.label = self.data.loc[:, 'label'].values

        self.emotion_labels=['Neutral','Happiness', 'Sadness', 'Surprise', 'Fear', 'Disgust', 'Anger']
        sample_label, sample_counts = np.unique(self.label, return_counts=True)
        for l, c in zip(sample_label, sample_counts):
            print(f'{self.emotion_labels[l]}: {c} ', end='')
        print(f'\n{len(self)} images')

    def get_df(self):
        base_path = os.path.join(self.aff_path, f'{self.phase}_set/')
        self.base_path = base_path
        data = []
        
        for anno in glob.glob(base_path + 'annotations/*_exp.npy'):
            idx = os.path.basename(anno).split('_')[0]
            img_path = f'images/{idx}.jpg'
            label = int(np.load(anno))
            data.append([self.phase,img_path,label])
        
        return pd.DataFrame(data = data,columns = ['phase','img_path','label'])
    
    def get_weight(self):
        self.emotion_labels=['Neutral','Happiness', 'Sadness', 'Surprise', 'Fear', 'Disgust', 'Anger']
        self.class_to_idx = {}
        self.idx_to_class = {}
        for i, emotion in enumerate(self.emotion_labels):
            self.class_to_idx[emotion] = i
            self.idx_to_class[i] = emotion
        sample_label, sample_counts = np.unique(self.label, return_counts=True)
        for l, c in zip(sample_label, sample_counts):
            print(f'{self.emotion_labels[l]}: {c} ', end='')
        print('')
        
        cw = 1/sample_counts
        cw /= cw.min()
        class_weights = {i:cwi for i, cwi in zip(sample_label, cw)}
        print(class_weights)
        return class_weights

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        path = os.path.join(self.base_path, self.file_paths[idx])
        image = Image.open(path).convert('RGB')
        label = self.label[idx]

        if self.transforms is not None:
            image = self.transforms(image)
        
        return image, label

In [None]:
# trainset = AffectNet(affectnet_dir, 'train', transforms=train_transforms, force=False)
valset = AffectNet(affectnet_dir, 'val', transforms=test_transforms, force=False)
# trainloader = data.DataLoader(trainset, batch_size=batch_size, shuffle=True, **kwargs)
valloader = data.DataLoader(valset, batch_size=batch_size, shuffle=False, **kwargs)

In [None]:
class_weights = valset.get_weight()

## Functions

In [None]:
#adapted from https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html
def set_parameter_requires_grad(model, requires_grad):
    for param in model.parameters():
        param.requires_grad = requires_grad

In [None]:
# loss function
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

weights = torch.FloatTensor(list(class_weights.values())).to(device)

def label_smooth(target, n_classes: int, label_smoothing=0.1):
    # convert to one-hot
    batch_size = target.size(0)
    target = torch.unsqueeze(target, 1)
    soft_target = torch.zeros((batch_size, n_classes), device=target.device)
    soft_target.scatter_(1, target, 1)
    # label smoothing
    soft_target = soft_target * (1 - label_smoothing) + label_smoothing / n_classes
    return soft_target

def cross_entropy_loss_with_soft_target(pred, soft_target):
    #logsoftmax = nn.LogSoftmax(dim=-1)
    return torch.mean(torch.sum(- weights*soft_target * torch.nn.functional.log_softmax(pred, -1), 1))

def cross_entropy_with_label_smoothing(pred, target):
    soft_target = label_smooth(target, pred.size(1)) #num_classes) #
    return cross_entropy_loss_with_soft_target(pred, soft_target)

criterion=cross_entropy_with_label_smoothing

In [None]:
models = []
# models.append(('affectnet_FT_gpt_200epochs.pt', '../../models/affectnet_emotions/affectnet_FT_gpt_200epochs.pt'))
# models.append(('EfficientNet_b0_best_afew', '../../models/affectnet_emotions/enet_b0_8_best_afew.pt'))
# models.append(('EfficientNet_b0_best_vgaf', '../../models/affectnet_emotions/enet_b0_8_best_vgaf.pt'))
# models.append(('affectnet_vggface2_rexnet150','../../models/affectnet_emotions/enet_b0_8_va_mtl.pt'))
# models.append(('enet_b2_best','../../models/affectnet_emotions/enet_b2_8.pt'))
models.append(('affectnet_FT_1 is fine-tune version of enet_b2_best','../../models/affectnet_emotions/affectnet_FT_1.pt'))

In [None]:
pretrained_8 = {0: 'Anger', 1: 'Contempt', 2: 'Disgust', 3: 'Fear', 4: 'Happiness', 5: 'Neutral', 6: 'Sadness', 7: 'Surprise'}
new_order_8 = ['Neutral','Happiness', 'Sadness', 'Surprise', 'Fear', 'Disgust', 'Anger', 'Contempt']
new_order_8 = {k: new_order_8.index(v) for k, v in pretrained_8.items()}
print(new_order_8)

In [None]:
@torch.no_grad()
def eval_pretrained_7(model, length, dataloader, criterion, device):
    pretrained_8 = {0: 'Anger', 1: 'Disgust', 2: 'Fear', 3: 'Happiness', 4: 'Neutral', 5: 'Sadness', 6: 'Surprise'}
    new_order_8 = ['Neutral', 'Happiness', 'Sadness', 'Surprise', 'Fear', 'Disgust', 'Anger']
    new_order_8 = {k: new_order_8.index(v) for k, v in pretrained_8.items()}
    new_order = new_order_8
    print(new_order)

    reversed_order = {v: k for k, v in new_order.items()}
    reordered_labels = [pretrained_8[reversed_order[i]] for i in range(len(pretrained_8))]

    model.eval()
    model.to(device)  # Move model to the appropriate device (MPS or CPU)

    loss = 0.0
    accuracy = 0.0

    all_preds = []
    all_labels = []

    for (images, emotions) in tqdm(dataloader):
        images = images.to(device)  # Move images to the appropriate device (MPS or CPU)
        emotions = emotions.to(device)  # Ensure emotions are also on the correct device

        preds = model(images)

        # accuracy
        preds = torch.concat([preds[:, 0:1], preds[:, 2:]], dim=1)  # Concatenating predictions
        preds = torch.argmax(preds, dim=1).cpu()  # Moving predictions back to CPU to apply new_order
        preds = preds.apply_(new_order.get)  # Apply new order to map emotions

        acc = torch.eq(preds, emotions.cpu()).sum()  # Move emotions to CPU for comparison
        accuracy += acc

        all_preds.extend(preds.cpu().numpy())  # Collect predictions for confusion matrix
        all_labels.extend(emotions.cpu().numpy())  # Collect true labels for confusion matrix

    loss /= length
    accuracy /= length #(length - 499)  # Accuracy calculation

    cm = confusion_matrix(all_labels, all_preds)

    # Normalize confusion matrix to get percentages
    cm_percentage = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    # Print accuracy and loss
    print(f'Accuracy: {accuracy:.4f}, Loss: {loss:.4f}')

    # Plot the confusion matrix with percentages
    fig, ax = plt.subplots(figsize=(12, 10))
    sns.heatmap(cm_percentage, annot=True, fmt='.2%', cmap='Blues', xticklabels=reordered_labels, yticklabels=reordered_labels)
    ax.set_xlabel('Predicted Labels')
    ax.set_ylabel('True Labels')
    ax.set_title('Confusion Matrix with Percentages')
    
    #save the cm 
    plt.savefig('cm.png')

    # Show plot
    plt.show()

    # Calculate F1-scores and balanced accuracy
    f1_scores = f1_score(all_labels, all_preds, average=None)
    macro_f1 = f1_score(all_labels, all_preds, average='macro')
    print("F1-scores per class:", f1_scores)
    print("Macro F1-score: %.4f" % macro_f1)


    # Calculate weighted accuracy (balanced accuracy)
    matrix = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]  # Normalize confusion matrix
    class_recalls = np.diag(matrix) / np.sum(matrix, axis=1)  # Recall per class
    balanced_accuracy = np.mean(class_recalls)  # Weighted accuracy
    print("Weighted accuracy: %.4f" % balanced_accuracy)


In [None]:
def test(model_path, valloader, valset):
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")  # Choose device: MPS or CPU

    model = torch.load(model_path, map_location=device)  # Load model and move to appropriate device
    model = model.eval().to(device)  # Set model to evaluation mode and move to device

    eval_pretrained_7(model, len(valset), valloader, criterion=None, device=device)  # Pass device to eval_pretrained_7

In [None]:
for name, model in models:
    print(name)
    test(model, valloader,valset)