## Import libraries

In [1]:
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, WeightedRandomSampler
import timm

# Set random seeds for reproducible results
torch.manual_seed(42)

  from .autonotebook import tqdm as notebook_tqdm


<torch._C.Generator at 0x147a80830>

In [7]:
class Dataset():

    image_size = (48, 48)
    batch_size = 64
    number_of_workers = 4

    # Data transforms
    train_transform = transforms.Compose([
        transforms.Resize(image_size),
        transforms.Grayscale(num_output_channels=1),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5]),  # Normalize pixel values to [-1, 1]
    ])

    val_transform = transforms.Compose([
        transforms.Resize(image_size),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5]),
    ])

    def __init__(self, data_dir):
        self.train_data_dir = data_dir + "/train"
        self.val_data_dir = data_dir + "/test"
    
    def get_data_loaders(self):
        train_data = datasets.ImageFolder(self.train_data_dir, transform=self.train_transform)
        val_data = datasets.ImageFolder(self.val_data_dir, transform=self.val_transform)
        self.train_loader = DataLoader(train_data, batch_size=self.batch_size, shuffle=True, num_workers=self.number_of_workers)
        self.val_loader = DataLoader(val_data, batch_size=self.batch_size, shuffle=False, num_workers=self.number_of_workers)
        self.n_classes = len(train_data.classes)
        return self.train_loader, self.val_loader
    
    def get_balanced_data_loaders(self):
        train_data = datasets.ImageFolder(self.train_data_dir, transform=self.train_transform)
        val_data = datasets.ImageFolder(self.val_data_dir, transform=self.val_transform)

        self.n_classes = len(train_data.classes)

        # Count the number of images per class in the training dataset
        class_counts = dict(Counter(train_data.targets))

        # Calculate class weights for WeightedRandomSampler
        class_weights = [1 / class_counts[i] for i in train_data.targets]

        # Use WeightedRandomSampler to balance the classes during training
        sampler = WeightedRandomSampler(weights=class_weights, num_samples=len(train_data), replacement=True)

        self.train_loader = DataLoader(train_data, batch_size=self.batch_size, sampler=sampler, 
                                  num_workers=self.number_of_workers)
        self.val_loader = DataLoader(val_data, batch_size=self.batch_size, shuffle=False, 
                                 num_workers=self.number_of_workers)
        return self.train_loader, self.val_loader
    
    def get_samples_per_class(self):
        # Initialize a dictionary to store the number of samples per class
        samples_per_class = {}

        # Loop through the train_loader to count samples per class
        for _, labels in self.train_loader:
            for label in labels:
                if label.item() not in samples_per_class:
                    samples_per_class[label.item()] = 1
                else:
                    samples_per_class[label.item()] += 1

        return samples_per_class
        

## Load dataset

In [8]:
# Define data paths
data_dir = "../data/fer-2013"

ds = Dataset(data_dir)

# Load dataset
#train_loader, val_loader = ds.get_data_loaders() 

# Load dataset with balanced classes
train_loader, val_loader = ds.get_balanced_data_loaders() 

n_classes = ds.n_classes

# Print samples per class
samples_per_class = ds.get_samples_per_class()
print("Samples per class: ", samples_per_class)

Samples per class:  {3: 4063, 6: 4151, 0: 4132, 1: 4111, 2: 4146, 5: 4051, 4: 4055}


# Model Creation

In [5]:
def create_model(model_name, n_classes):
    # Load a pre-trained timm model for transfer learning
    model = timm.create_model(model_name, pretrained=True)

    # if model_name starts with "efficientnet"
    if model_name.startswith("efficientnet"):
        # Change the first layer to accept 1 input channel (instead of 3 for RGB)
        model.conv_stem = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)

        # Change the last fully connected layer for our task
        in_features = model.classifier.in_features
        model.classifier = nn.Linear(in_features, n_classes)

        return model
    
    elif model_name.startswith("densenet"):
        # Change the first layer to accept 1 input channel (instead of 3 for RGB)
        model.features.conv0 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

        # Change the last fully connected layer for our task
        in_features = model.classifier.in_features
        model.classifier = nn.Linear(in_features, n_classes)

        return model

    elif model_name.startswith("resnet"):
        # Change the first layer to accept 1 input channel (instead of 3 for RGB)
        model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

        # Change the last fully connected layer for our task
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, n_classes)

        return model

resnet, 0.001, sgd -> 25 epochs aprox, check regla 3

# Model Training

In [9]:
# Training ID
train_id = 'pt_train_2'

# List of models to train
model_names = ['resnet18']#, 'efficientnet_b0', 'densenet121' ]

# Training parameters
optimizer_names = ['sgd']
learning_rates = [0.0025]
epochs = 30

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
results = []

for model_name in model_names:  # Loop through models
    for lr in learning_rates:   # Loop through learning rates
        for optimizer_name in optimizer_names:  # Loop through optimizers

            if (model_name == 'resnet18') and (optimizer_name == 'adam') and (lr == 0.01):
                continue # Skip this combination

            # Create model
            model = create_model(model_name, n_classes)
            model.to(device)

            # Choose optimizer
            if optimizer_name == 'adam':
                optimizer = optim.Adam(model.parameters(), lr=lr)
            elif optimizer_name == 'sgd':
                optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
            elif optimizer_name == 'radam':
                optimizer = optim.RAdam(model.parameters(), lr=lr)

            name = model_name + '_epochs_' + str(epochs) + '_lr_' + str(lr) + '_' + optimizer_name # Name of model

            model.train() # Set model to training mode

            criterion = nn.CrossEntropyLoss() # Loss function

            # Lists to store training and validation loss values
            train_losses = []
            val_losses = []

            # Training loop
            for epoch in range(epochs):
                running_loss = 0.0
                for batch_idx, (images, labels) in enumerate(train_loader):
                    images, labels = images.to(device), labels.to(device)
                    optimizer.zero_grad()
                    outputs = model(images)
                    loss = criterion(outputs, labels) 
                    loss.backward()
                    optimizer.step()
                    running_loss += loss.item() * images.size(0)
                    print(f"Model {name}, Epoch [{epoch + 1}/{epochs}], Batch [{batch_idx + 1}/{len(train_loader)}], Loss: {loss.item():.4f}")

                epoch_loss = running_loss / len(train_loader.dataset)
                train_losses.append(epoch_loss)  # Append training loss for this epoch

                # Validation loop
                model.eval()  # Set model to evaluation mode
                val_loss = 0.0
                with torch.no_grad():
                    for images, labels in val_loader:
                        images, labels = images.to(device), labels.to(device)
                        outputs = model(images)
                        val_loss += criterion(outputs, labels).item() * images.size(0)

                val_loss /= len(val_loader.dataset)
                val_losses.append(val_loss)  # Append validation loss for this epoch

                model.train()  # Set model back to training mode

            # Step 4: Model Evaluation
            model.eval() # Set model to evaluation mode
            correct = 0
            total = 0
            with torch.no_grad():
                for images, labels in val_loader:
                    images, labels = images.to(device), labels.to(device)
                    outputs = model(images)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            accuracy = 100 * correct / total

            # Append results to list
            results.append({'model_name': name, 'accuracy': accuracy})

            display(results) # Display results

            # Save train_losses and val_losses to a csv file using the model name
            df = pd.DataFrame({'train_loss': train_losses, 'val_loss': val_losses})
            df.to_csv('../losses/{}.csv'.format(name), index=False)

            # Save model
            torch.save(model, "../models/{}.pth".format(name))

# Save results to a csv file
df = pd.DataFrame(results)
df.to_csv("../results/results_{}.csv".format(train_id), index=False)

Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [1/449], Loss: 1.9245
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [2/449], Loss: 1.9868
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [3/449], Loss: 1.9501
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [4/449], Loss: 1.9505
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [5/449], Loss: 1.9394
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [6/449], Loss: 1.9906
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [7/449], Loss: 1.9561
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [8/449], Loss: 1.9985
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [9/449], Loss: 1.9630
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [10/449], Loss: 2.0081
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [11/449], Loss: 1.9423
Model resnet18_epochs_30_lr_0.0025_sgd, Epoch [1/30], Batch [12/449], Loss: 1.9611
Model resnet1

[{'model_name': 'resnet18_epochs_30_lr_0.0025_sgd',
  'accuracy': 45.95987740317637}]

# Model Evaluation

In [None]:
def plot_losses(model_name):
    # Load model loss history
    df = pd.read_csv('../losses/{}.csv'.format(model_name))

    train_losses = df['train_loss']
    val_losses = df['val_loss']
    
    epochs = len(train_losses)
    
    # Plot the loss vs. epoch chart
    plt.figure()
    plt.plot(range(1, epochs + 1), train_losses, label='Training Loss')
    plt.plot(range(1, epochs + 1), val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Model: {model_name}')
    plt.legend()
    plt.show()

In [None]:
train_id = 'train_0'

# Read results
results = pd.read_csv('../results/results_{}.csv'.format(train_id))
#results = pd.DataFrame({'model_name':['resnet18_epochs_15_lr_0.01_adam'], 'accuracy': [0.58]})

for index, row in results.iterrows(): # Iterate over each model
    # Print model name and accuracy
    print('Model: {} - Accuracy: {}'.format(row['model_name'], row['accuracy']))

    # Plot loss history
    plot_losses(row['model_name'])

In [10]:
def real_test_models(model_names: list):
        
    from PIL import Image
    import pandas as pd

    emotion_labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']

    results = pd.DataFrame()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    new_transform = transforms.Compose([
        transforms.Resize((48, 48)),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5]),
    ])

    for index, model_name in enumerate(model_names):
        model = torch.load('../models/' + model_name + '.pth')

        model.eval()
        with torch.no_grad(): # No need to track the gradients
            for label in emotion_labels:
                image = Image.open('../data/test_predict/' + label.lower() + '.png')
                image = new_transform(image).unsqueeze(0).to(device)
                output = model(image)

                #probabilities = F.softmax(output, dim=1)
                #probabilities_rounded = torch.round(probabilities * 1000) / 1000  # Round to 2 decimal places

                _, predicted = torch.max(output.data, 1)

                # add model name and prediction probabilities to results dataframe
                results.loc[index, 'Model'] = model_name
                results.loc[index, label] = label == emotion_labels[predicted.item()]
    
    results['Accuracy'] = results.iloc[:, 1:].mean(axis=1).apply(lambda x: round(x, 2))
    
    #print(f"Predicted Emotion: {emotion_labels[predicted.item()]}")
    display(results)

In [None]:
# Evaluate models on own images
real_test_results = real_test_models(results['model_name'])