**MLP USING NUMPY**

side note: we ran the code in a seperate py file, as it was easier to handle the big data amount

In [1]:
# Libraries
import numpy as np
import random
import matplotlib.pyplot as plt

import copy
import datetime
import os

import math
from numpy.lib.stride_tricks import sliding_window_view
import random
import time

A method to print strings into a file with filename

In [2]:
def log_message(message):
    with open(filename, "a") as f:
        f.write(message + "\n")

#dataset preprocessing methods 

In [3]:
#Normalizing the pixel values so that no pixel gets too important
#for the model when it should not.
def z_normalize_images(images):
    mean = images.mean()
    std  = images.std()
    eps  = 1e-8
    return (images - mean) / (std + eps)

    
#read training set, 80% of the original dataset
#reads the rest 20% of the dataset and splits
#those 20% into validation and test set
def preprocess_dataset_for (ann_type: str):
    train_x = np.load(f'train_{ann_type}_x.npy') 
    train_y = np.load(f'train_{ann_type}_y.npy')
    val_and_test_x  = np.load(f'test_{ann_type}_x.npy')   
    val_and_test_y  = np.load(f'test_{ann_type}_y.npy')

    N = len(val_and_test_x)
    perm = np.random.RandomState(seed=42).permutation(N)
    split_at = N // 2

    val_idx = perm[:split_at]
    test_idx = perm[split_at:]

    val_x  = val_and_test_x[val_idx]
    val_y  = val_and_test_y[val_idx]
    test_x = val_and_test_x[test_idx]
    test_y = val_and_test_y[test_idx]

        # z_normalization of inputs
    train_x  = z_normalize_images(train_x)  
    val_x  = z_normalize_images(val_x)  
    test_x  = z_normalize_images(test_x)  

    #create training, val and test set by zipping 
    #corresponding inputs and targets
    training_set = zip(train_x, train_y)
    val_set = zip(val_x, val_y)
    test_set = zip(test_x, test_y)

    return training_set, val_set, test_set 

The model built is a multi-layer perceptron (MLP). The networks architecture is defined in __init__. The input dimension is 2304 (48x48 pixels flattened), stemming from the input images. Upon initialization, an MLP model has n_layers hidden ReLU layers with hidden_dim neurons per layer. Output layer of the network can be interpreted as indicating which of the target classes the network input falls under. The higher the output of a neuron in the final network layer, the greater the likelihood that inputs fall under the target class which corresponds to that neuron. Here, we deal with 7 classes, one for each facial expression. 

Weights oh hidden ReLU layers are initilized via the Kaiming method to get better value of the CEL during training and run into numerical issues less frequently. I am not sure what the latter effect has to do with the initialization scheme, empirically Kaiming initialization helped me avoid numerical errors until the hidden dimension of at least 335. Last layer weights are sampled from the normal distribution with mean of 0 and standard deviation of 0.2.

When a target value of a certain vector of the input space is fed into the network on top of that input space vector, softmax normalizes logits viz. outputs of neurons in the final network layer to turn them into probabilities. If no target value is fed into the network, it returns logits i.e. unnormalized values for each of the target classes. Model prediction as to the target class of the input image is then the target class with the highest logit. Using the numerically greatest logis to determine target class saves computational recources during inference.

Finally, the model backpropagates the losses by calculating the gradient of cross-entropy loss (CEL) wrt each of the hidden layer parameter matrices. Some of the computations to determine the gradient of CEL wrt to more posterior network layers are reused to compute the gradient of CEL wrt more prior layers thereof. 

In [4]:

class MLP ():
    #input layer is not counted in n_layers
    #output layer is
    def __init__(self, input_dim=2304, n_layers=5, hidden_dim=32, n_classes=7):
        rng = np.random.default_rng(seed=42) 
        
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.layers = []
        for i in range (n_layers):
            in_dim = hidden_dim
            out_dim = hidden_dim
            if i == 0:
                in_dim = input_dim
            elif (i+1) == n_layers:
                out_dim = n_classes
            std = 0.2
            if in_dim == out_dim:
                std = np.sqrt(2/in_dim)
            current_layer = rng.normal(
                #mean
                loc=0.0,      
                #standard deviation
                scale=std,        
                size=(in_dim, out_dim)
            ).astype(np.float32)
            self.layers.append (current_layer)

    def ReLU (self, x):
        x = np.asarray(x)
        return np.maximum(0, x)
    
    def _get_normalized_logits_with_softmax_denom(self, logits):
        logits = logits - np.max(logits)
        exp_logits = np.exp(logits)
        softmax_denominator = np.sum(exp_logits)
        return exp_logits / softmax_denominator, softmax_denominator
    """
    def _get_normalized_logits_with_softmax_denom(self, logits):
        softmax_denominator = np.sum(np.exp(logits))
        return np.exp(logits) / softmax_denominator, softmax_denominator
    """
    def forward(self, inputs, target_value=None, requires_grad=False):
        hidden_layer_activations = []
        #layer_activations.append(inputs.copy())
     
        
        for i in range(self.n_layers):
            
             # Special case: only one layer, it’s the output layer
            if self.n_layers == 1:
                logits = inputs @ self.layers[i]
            #ReLU activations in all the layers but the last
            elif i == 0:
                hidden_layer_activations.append(self.ReLU(inputs @ self.layers[i]))
                
            elif (i+1) < self.n_layers:
                hidden_layer_activations.append(self.ReLU(hidden_layer_activations[i-1] @ self.layers[i]))
            #no activation function applied so far
            #in the last layer
            #softmax will be applied to the output 
            #of the last layer later if necessary
            else:
                logits = hidden_layer_activations[i-1] @ self.layers[i]

        #if no target value is passed to the model.forward
        #then the model is in the inference mode
        #no logit/output normalization/softmax is necessary
        #in the inference mode
        #since one can base model prediction on the highest
        #unnormalized/unsoftmaxed logit
        if target_value is None:
            return logits
        else:
            #this is softmax
            #softmax denominator is returned by softmax 
            #on top of normalized logits to be
            #reused in computing CEL
            normalized_logits, softmax_denom = self._get_normalized_logits_with_softmax_denom(logits)

            
            #CEL is equal to -ln(exp(logits[target_value])/softmax_denom))
            #the formula below is algebraically equivalent to the one above
            #CEL_value = -logits[target_value] + np.log(softmax_denom)
            CEL_value = -np.log(normalized_logits[target_value])
            
        #if no gradient is required,
        #then just return CEL
        if not requires_grad:
            return CEL_value
        

        #if a target value is passed to model.forward
        #and CEL is required
        #then the model is in the training mode
        #one needs to do softmax on the inputs
        #to pass softmaxed logits into 
        #CEL/cross-entropy loss
        else:

            #this one is the gradient of CEL
            #d_softmax = normalized_logits.copy()
            d_softmax = normalized_logits
            d_softmax[target_value] -= 1

            #initialize an list with as many empty
            #elements as there are hidden layers
            #i.e. param matrices
            layer_gradients = [None] * self.n_layers

            #this computes gradients of layer params
            for i in range(self.n_layers-1, -1, -1):

                if self.n_layers == 1:
                    # Single-layer network
                    dynamic_gradient = d_softmax  # from your loss derivative
                    layer_gradients[i] = np.outer(inputs, dynamic_gradient)
                #output softmax layer
                elif i == (self.n_layers-1):
                    #this is the part of the gradient
                    #which is reused across layers
                    dynamic_gradient = d_softmax

                    #this one contrains gradient of i-th layer
                    layer_gradients[i] = np.outer(hidden_layer_activations[i-1], dynamic_gradient)
                else:
                    #this one multiplies dynamic gradient by the gradient of pre-activations
                    #gradient of pre-activations is equal to the corresponding weight matrix
                    #which us stored in self.layers[i+1]
                    dynamic_gradient = self.layers[i+1] @ dynamic_gradient
                    #gradient of hidden ReLU activation is equal to 1
                    #if that activation is equal to 1 and 0 otherwise 
                    relu_grad = (hidden_layer_activations[i]>0).astype(float)
                    #mulitply dynamic grad by activation gradient
                    dynamic_gradient *= relu_grad
                    #input layer
                    if i == 0:
                        layer_gradients[i] = np.outer(inputs, dynamic_gradient)
                    #neither output nor input layer
                    else:
                        layer_gradients[i] = np.outer(hidden_layer_activations[i-1], dynamic_gradient)
            return CEL_value, layer_gradients

The next function runs teh model on a dataset and computes the overall loss: 

In [5]:
def evaluate_model_on(model, dataset):
    total_loss = 0
    for x, y in dataset:
        total_loss += model.forward(x, y, requires_grad=False)
    return total_loss/len(dataset)

The model is trained via stochastics gradient descent. The batch size is thus equal to 1. After each epoch, the learning rate is multiplied by an antecedently specified hyperparameter whose value is constant during training and is smaller than 1. This measure is introduced so that the optimizer is not vulnerable to a scenario in which it constantly overshoots a local minimum of CEL and thus diverges therefrom. A decreasing learning rate/step size ensures that a local minimum of CEL is being overshot less and less as model training is progressing.

The optimizer retuns the state of the model which achieved the lowest CEL score on the validation set as opposed to the state of the model which achieved the lowest CEL score on the validation set after n_epochs epochs of training.

In [6]:
def train_model_with_SGD (model, 
                         training_set,
                         validation_set,
                         lr: float, 
                         n_epochs: int, 
                         sgd_lr_multiplier: float = 0.95
                        ):
    

    print ("_" * 50)
    print (f"Initial LR = {lr}")
    print (f"LR multipliter per epoch = {sgd_lr_multiplier:.5f}")
    print (f"Number of layers = {model.n_layers}")
    print (f"Dimensionality of hidden layers = {model.hidden_dim}")
    print ("_" * 50)

    log_message ("_" * 50)
    log_message (f"Initial LR = {lr}")
    log_message (f"LR multipliter per epoch = {sgd_lr_multiplier:.5f}")
    log_message (f"Number of layers = {model.n_layers}")
    log_message (f"Dimensionality of hidden layers = {model.hidden_dim}")
    log_message ("_" * 50)


    train_loss_history = []
    val_loss_history = []

    best_avg_epoch_loss = 1000
    #best_model = copy.deepcopy(model)

    for epoch_index in range(1, n_epochs + 1):

        print (f"Epoch {epoch_index}/{n_epochs}")
        print (f"current SGD learning rate = {lr}")

        log_message (f"Epoch {epoch_index}/{n_epochs}")

        log_message (f"current SGD learning rate = {lr}")

        total_train_loss = 0

        #shuffle training set in a reproducible manner
        random.seed(42 + epoch_index)
        random.shuffle(training_set) 

        for x,y in training_set:

            #get the CEL gradient from the forward pass directly
            loss, layer_grads = model.forward(x, y, requires_grad=True)
         
            #do SGD step
            for i in range(model.n_layers):
                model.layers[i] -= lr*layer_grads[i]

            total_train_loss += loss
          
        #decrease lr each epoch
        lr *= sgd_lr_multiplier
        
        #compute, print and save avg loss per epoch
        avg_train_loss = total_train_loss / len(training_set)
        print (f"average train loss = {avg_train_loss:.5f}")
        log_message (f"average train loss = {avg_train_loss:.5f}")
        train_loss_history.append(avg_train_loss)
 
        avg_val_loss = evaluate_model_on (model, validation_set)
        print (f"average val loss = {avg_val_loss:.5f}")
        log_message (f"average val loss = {avg_val_loss:.5f}")
        val_loss_history.append(avg_val_loss)
        
        if avg_val_loss < best_avg_epoch_loss:
            best_avg_epoch_loss = avg_val_loss
            best_model = copy.deepcopy(model)

        print ("_" * 50)
        log_message (f"average val loss = {avg_val_loss:.5f}")
    if best_model is not None:
        return best_model, train_loss_history, val_loss_history
    else:
        return model, train_loss_history, val_loss_history


In [7]:
def plot_loss(train_loss_history, val_loss_history):
    plt.figure(figsize=(8, 5))
    plt.plot(train_loss_history, label="Training Loss")
    plt.plot(val_loss_history, label="Validation Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Average Loss")
    plt.title("Training vs Validation Loss")
    plt.legend()
    plt.grid(True)
    plt.show()


Grid search tries out several combinations of hyperparameters in order to achieve the lowest validation loss. 

In [8]:
def grid_search(hyperparameters: dict,
                train_set: list,
                validation_set: list,
                n_epochs: int):
    best_loss = float("inf")
    best_params = {}
    best_model = None

    # extract hyperparameter combinations
    learning_rate = hyperparameters.get('lr', [])
    lr_multipliers = hyperparameters.get('lr_multiplier', [])
    hidden_dims = hyperparameters.get('hidden_dim', [])
    n_layers_list = hyperparameters.get('n_layers', [])

    # iterate over all combinations
    for lr in learning_rate:
        for lr_multiplier in lr_multipliers:
            for hidden_dim in hidden_dims:
                for n_layers in n_layers_list:
                    print ("_" * 100)
                    print(f"Testing: lr={lr}, lr_multiplier={lr_multiplier}, "
                          f"hidden_dim={hidden_dim}, n_layers={n_layers}")
                    
                    log_message ("_" * 100)
                    log_message (f"Testing: lr={lr}, lr_multiplier={lr_multiplier}, "
                          f"hidden_dim={hidden_dim}, n_layers={n_layers}")

                    # create a new model for each combination
                    model = MLP(n_layers=n_layers, hidden_dim=hidden_dim)

                    # train the model and track validation loss history
                    trained_model, train_loss_history, val_loss_history = train_model_with_SGD(
                        model,
                        train_set,
                        validation_set,
                        lr=lr,
                        n_epochs=n_epochs,
                        sgd_lr_multiplier=lr_multiplier
                    )

                    # find the best validation loss
                    min_val_loss = min(val_loss_history)
                    #print (f"min_val_loss = {min_val_loss:.5f}")

                    # check if new combination is best
                    if min_val_loss < best_loss:
                        best_loss = min_val_loss
                        best_params = {
                            'lr': lr,
                            'lr_multiplier': lr_multiplier,
                            'hidden_dim': hidden_dim,
                            'n_layers': n_layers
                        }
                        best_model = trained_model
                        best_train_loss_history = train_loss_history
                        best_val_loss_history = val_loss_history
                    
                    print (f"min best val loss so far = {best_loss}")
                    log_message (f"min best val loss so far = {best_loss}")
              

    return best_train_loss_history, best_val_loss_history, best_model, best_params, best_loss

Splitting datasets into validation and training data, input and target values. The data is shuffled with a fixed seed in order to take out any unwanted data structure that may arise from grouped or sorted data. Inputs are normalized and decimal notation is specified.

In [9]:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"mlp_grid_search_log_{timestamp}.txt"

with open(filename, "w") as f:
    f.write("=== New Log Start ===\n")



train_set, val_set, test_set = preprocess_dataset_for("mlp")

train_set = list(train_set)
val_set = list (val_set)
test_set = list (test_set)

#numpy printing instruction for decimal notation
np.set_printoptions(
    precision   = 5,       
    floatmode   = 'fixed',  
    suppress    = True     
)
 

#create training, val and test set by zipping 
#corresponding inputs and targets
#training_set = list(train_set)
#val_set = list(val_set)
#test_set = list(test_set)

#numpy printing instruction for decimal notation
np.set_printoptions(
    precision   = 5,       
    floatmode   = 'fixed',  
    suppress    = True     
)


Originally, grd search was performed on 27 different combinations of hyperparameters. The learning rates of 0.001, 0.005 and 0.01 as well as hidden dimensions i.e. width of hidden layers of 16, 32 and 64 were tested. On the 22nd of August 2025, the best validation loss was achieved for 7 hidden layers with 32 neurons each via the learning rate of 0.005. You can see complete grid search log in the file mlp_grid_search_log_20250822_111958.txt. A different result can be obtained on a different day even with the random seed.

There is no need to test 27 combinations of hyperparameters again to demonstrate that grid search works. We only test networks of varying depth with different hidden dimension of 32 or 64 with the learning rate of 0.005. 

Such reduced grid search is sufficient to demonstrate that increasing the depth of MLP is helpful at least as long as no vanishing gradient problem is encountered. Apparently, increasing network width is also a path of extremely diminishing returns. There may be an upper bound of efficiency one gains via increasing network width. This network width upper bound may have to do with the complexity of the problem an MLP is trained to solve.  

In [10]:
#mlp = MLP()

#SGD_LEARNING_RATE = 2e-3
#LEARNING_RATE_MULTIPLIER_PER_EPOCH = 0.95
# Define the hyperparameter grid
hyperparameters_to_tune = {
    #'lr': [0.001, 0.005, 0.01],
    'lr': [0.005],
    'lr_multiplier': [0.95],
    #'hidden_dim': [16, 32, 64],
    'hidden_dim': [32, 64],
    #'hidden_dim': [8, 16],
    'n_layers': [1, 4, 7]
    #'n_layers': [1]
   
    }

N_EPOCHS = 10

train_loss_history, val_loss_history, best_model, best_params, best_loss = grid_search(
        hyperparameters_to_tune,
        train_set,
        val_set,
        n_epochs=N_EPOCHS
    )


avg_test_loss = evaluate_model_on(best_model, test_set)

print (f"best hyperparams are {best_params}")
print (f"VAL LOSS of best model is = {best_loss:.5f}")
print (f"TEST LOSS of best model is = {avg_test_loss:.5f}")

log_message ("_" * 100)
log_message (f"best hyperparams are {best_params}")
log_message (f"VAL LOSS of best model is = {best_loss:.5f}")
log_message (f"TEST LOSS of best model is = {avg_test_loss:.5f}")

____________________________________________________________________________________________________
Testing: lr=0.005, lr_multiplier=0.95, hidden_dim=32, n_layers=1
__________________________________________________
Initial LR = 0.005
LR multipliter per epoch = 0.95000
Number of layers = 1
Dimensionality of hidden layers = 32
__________________________________________________
Epoch 1/10
current SGD learning rate = 0.005
average train loss = 9.05477
average val loss = 9.53613
__________________________________________________
Epoch 2/10
current SGD learning rate = 0.00475
average train loss = 7.81578
average val loss = 7.46549
__________________________________________________
Epoch 3/10
current SGD learning rate = 0.0045125
average train loss = 7.16407
average val loss = 8.11160
__________________________________________________
Epoch 4/10
current SGD learning rate = 0.004286875
average train loss = 6.71783
average val loss = 6.60034
__________________________________________________


KeyboardInterrupt: 

Let´s plot this: 

In [None]:
#model, train_loss_history, val_loss_history = train_model_with_SGD(
#    model, training_set, validation_set, lr=0.01, n_epochs=20
#)

print (f"best hyperparams are {best_params}")
print (f"VAL LOSS of best model is = {best_loss:.5f}")
print (f"TEST LOSS of best model is = {avg_test_loss:.5f}")

plot_loss(train_loss_history, val_loss_history)

Confusion matrix of the model on the test set:

In [None]:
# import for visualization
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

#confusion matrix heatmap

def print_confusion_matrix_of (model, test_set):

    # Get predictions on test set
    test_predictions = []
    test_true_labels = []
    
    for x, y in test_set:
        logits = model.forward(x)  # Get model predictions
        prediction = np.argmax(logits)  # Convert to class prediction
        test_predictions.append(prediction)
        test_true_labels.append(y)
    
    # Create confusion matrix
    cm = confusion_matrix(test_true_labels, test_predictions)
    
    # Plot confusion matrix
    emotion_labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=emotion_labels, yticklabels=emotion_labels)
    plt.title('Confusion Matrix - FER-2013 Test Set')
    plt.ylabel('True Emotion')
    plt.xlabel('Predicted Emotion')
    plt.tight_layout()
    plt.show()


print_confusion_matrix_of (best_model, test_set)

A crutch aka dirty trick: downsample images in a dataset to decrease training time

In [None]:
def maxpool2d_grayscale(image, pool_size=2, stride=2):
    
    # If image has a single channel dimension, remove it
    if image.ndim == 3 and image.shape[0] == 1:
        image = image[0]
    
    H, W = image.shape
  
    patches = sliding_window_view(image, (pool_size, pool_size))
    patches = patches[::stride, ::stride, :, :]
    pooled = patches.max(axis=(-2, -1))
    return pooled

Since 48x48 images have been downsampled to 24x24, that is input dimension accepted by the CNN class by default. Since the image is that small, 7x7 and 5x5 convolutions hardly make sense, 3x3 convolutional kernel size is default in the class. The model has n_conv_layers ReLU

CNN implements a varying number of convolutional layers with ReLU activation functions with the number of channels which is growing from layer to layer by a factor passed into CNN constructor. One fully connected softmax layer is hardcoded into the network to project the output of convolutional layers onto a 7-dimensional vector space whose vectors give a predicted probability distribution over the target classes. One also can pass the desired size of the convolutional kernel. Like in standard deep learning libraries, this CNN implementation uses discrete cross-correlation rather than discrete convolution in each “convolutional” layer. In this report, I use term ‘cross-correlation’ and ‘cross-correlational’ interchangeably with ‘convolution’ and ‘convolutional’.  

Kaiming weight initialization has been used for ReLU layers, Xavier one for the hardcoded softmax fully connected layer. 

The skeleton of the CNN class is similar to the one of the MLP class. Were there more time in my disposal, I would have the MLP and the CNN class inherit some of the attributes and methods from a more abstract class.

In [None]:


class CNN ():
    #def __init__(self, in_dim: int = 48, out_dim: int=7, kernel_size: int = 5, n_channels: int = 2, n_conv_layers: int = 5):
    def __init__(self, in_dim: int = 24, out_dim: int=7, kernel_size: int = 3, n_channels: int = 1, n_chan_mult: int = 1.1, n_conv_layers: int = 3):
        
        rng = np.random.default_rng(42)

        self.input_h = in_dim
        self.input_w = in_dim
        self.output_dim = out_dim
        self.kernel_size = kernel_size

        #contains number of channels in i-th layer
        #where 0th layer is the input layer with
        #one channel since inputs are grayscale
        self.n_channels = [1]

        #no more conv layers are initialized 
        #than necessary so that the receptive field
        #of the last conv layer covers the entire input image
        self.n_conv = min(n_conv_layers, (in_dim // (kernel_size-1)) -1 )
        

        self.layers = []

        for i in range(self.n_conv):
            self.n_channels.append(n_channels)

            #compute standard deviation for Kaiming initialization of
            #weights of convolutional kernels
            #because ReLU activations are used
            current_input_dim = n_channels * (kernel_size ** 2)
            std = np.sqrt(2/current_input_dim )

            #initialize as many kernels as there are channels
            #those n_channels kernales are layer parameters
            self.layers.append(rng.normal(loc=0.0, scale=std, size=(self.n_channels[i+1], self.n_channels[i], self.kernel_size, self.kernel_size)))

            #increase n_channels across layers gradually 
            n_channels = math.ceil(n_chan_mult*n_channels)
            #n_channels = math.trunc(1.1*n_channels)
            #if n_channels < 256:
            #    n_channels *= 2

        
        

        #sampling boundaries for uniform Xavier weight init for a fully connected layer
        #last convolutional layer has self.n_channels[-1] channels
        #with feature widheight and width = in_dim - n_conv * (kernel_size - 1)
        #next output layer has out dim parameters
        n_param_per_channel_last_conv_layer = (self.input_w - self.n_conv * (self.kernel_size - 1)) ** 2
        n_param_total_last_conv_layer = self.n_channels[-1] * n_param_per_channel_last_conv_layer
        uniform_boundary = np.sqrt(6/(n_param_total_last_conv_layer + self.output_dim))

        #add one fully connected layer on top of convolutional ones
        #to project output of successive convolutions onto
        #the vector space of dimensionality out_dim
        self.n_fc = 1
        self.fc = rng.uniform(-uniform_boundary, uniform_boundary, size=(n_param_total_last_conv_layer, self.output_dim))
        
    
    #def cross_correlation_2D_of (self, input, kernels):
    def _cross_correlation_2D_of_with (self, kernels, input, forward_pass = True):
       
        picture_patches = sliding_window_view(input, kernels.shape[-2:], axis = (1,2))

        #this is cc2d for the backward pass pass
        if forward_pass:
            cc2d = np.einsum("ihwkl, oikl->ohw", picture_patches, kernels)
            
        #this is cc2d for the forward pass
        else:
            cc2d = np.einsum("ihwkl, okl->oihw", picture_patches, kernels)

        return cc2d
    
    def _transposed_cross_correlation_2D_of_with (self, error_signal, network_layer):

     
        _, _, kH, kW = network_layer.shape

        pad_h, pad_w = kH - 1, kW - 1
        # pad spatial axes only
        error_signal_padded = np.pad(error_signal, ((0,0), (pad_h,pad_h), (pad_w,pad_w)), mode='constant')

        
        picture_patches = sliding_window_view(error_signal_padded, (kH, kW), axis = (1,2))

        tcc2d = np.einsum("ohwkl, oikl->ihw", picture_patches, network_layer)


        return tcc2d
   
    def _ReLU (self, x):
        return np.maximum(x, 0, out=x)
    """
    def _get_normalized_logits_with_softmax_denom(self, logits):
        softmax_denominator = np.sum(np.exp(logits))
        return np.exp(logits) / softmax_denominator, softmax_denominator
    """
    def _get_normalized_logits_with_softmax_denom(self, logits):
        logits = logits - np.max(logits)
        exp_logits = np.exp(logits)
        softmax_denominator = np.sum(exp_logits)
        return exp_logits / softmax_denominator, softmax_denominator
   
    def forward (self, inputs, target_value=None, requires_grad = False):
        if len(inputs.shape) == 2:
            inputs = np.expand_dims(inputs, axis=0)
  
        layer_activations = [inputs]
        for i in range(self.n_conv):
            cross_correlation = self._cross_correlation_2D_of_with(self.layers[i], layer_activations[i])
            layer_activations.append(self._ReLU(cross_correlation))
         

        #push output into the fully conntected layer
        #to get as many logits as there are
        #target classes
        #print ("fc shape ", self.fc.shape)
        #print ("last conv flattened shape ", layer_activations[-1].flatten().shape)
        logits =  layer_activations[-1].flatten() @ self.fc

        
        #if no target value is passed to the model.forward
        #then the model is in the inference mode
        #no logit/output normalization/softmax is necessary
        #in the inference mode
        #since one can base model prediction on the highest
        #unnormalized/unsoftmaxed logit
        if target_value is None:
            return logits
           
        else:
            #this is softmax
            #softmax denominator is returned by softmax 
            #on top of normalized logits to be
            #reused in computing CEL
            normalized_logits, softmax_denom = self._get_normalized_logits_with_softmax_denom(logits)

            
            #CEL is equal to -ln(exp(logits[target_value])/softmax_denom))
            #the formula below is algebraically equivalent to the one above
            #CEL_value = -logits[target_value] + np.log(softmax_denom)
            # normalized logits for stable computation
            CEL_value = -np.log(normalized_logits[target_value])
            
        #if no gradient is required,
        #then just return CEL
        if not requires_grad:
            return CEL_value
        

        #if a target value is passed to model.forward
        #and CEL is required
        #then the model is in the training mode
        #one needs to do softmax on the inputs
        #to pass softmaxed logits into 
        #CEL/cross-entropy loss
        else:

            #this one is the gradient of CEL
            #d_softmax = normalized_logits.copy()
            d_softmax = normalized_logits
            d_softmax[target_value] -= 1

            #gradient of the fully connected layer
            fc_gradient = np.outer(layer_activations[-1].flatten(), d_softmax)

            #initialize an list with as many empty
            #elements as there are conv layers
            #i.e. cross-correlation param tensors
            conv_layer_gradients = [None] * (self.n_conv)
            #error_signal = self.fc @ d_softmax

            #!!!!!
            reshaped_fc_shape = list(layer_activations[-1].shape)
            reshaped_fc_shape.append(self.output_dim)
           
            error_signal = self.fc.reshape(reshaped_fc_shape) @ d_softmax
       

            
            for i in range(self.n_conv-1, -1, -1):
                
                error_signal *= (layer_activations[i+1] > 0)

                current_conv_layer_grad = self._cross_correlation_2D_of_with(
                    error_signal, layer_activations[i], forward_pass=False
                )

         
                error_signal = self._transposed_cross_correlation_2D_of_with(
                    error_signal, self.layers[i]
                )

                conv_layer_gradients[i] = current_conv_layer_grad
                
            
            layer_gradients = conv_layer_gradients
            layer_gradients.append(fc_gradient)

            return CEL_value, layer_gradients
        



CNN optimizer is the same as for MLP, I lacked time to refactor the method to be able to optimize instances of both the MLP and the CNN classes.

The model is trained via stochastics gradient descent. The batch size is thus equal to 1. After each epoch, the learning rate is multiplied by an antecedently specified hyperparameter whose value is constant during training and is smaller than 1. This measure is introduced so that the optimizer is not vulnerable to a scenario in which it constantly overshoots a local minimum of CEL and thus diverges therefrom. A decreasing learning rate/step size ensures that a local minimum of CEL is being overshot less and less as model training is progressing.

The optimizer retuns the state of the model which achieved the lowest CEL score on the validation set as opposed to the state of the model which achieved the lowest CEL score on the validation set after n_epochs epochs of training.

In [None]:
def train_CNN_with_SGD (model, 
                         training_set,
                         validation_set,
                         lr: float, 
                         n_epochs: int, 
                         sgd_lr_multiplier: float = 0.95
                        ):
    

    print ("_" * 50)
    print (f"Initial LR = {lr}")
    print (f"LR multipliter per epoch = {sgd_lr_multiplier:.5f}")
    print (f"Number of conv layers = {model.n_conv}")
    print (f"Number of channels in conv layers = {model.n_channels[1:]}")
    print ("_" * 50)

    log_message ("_" * 50)
    log_message (f"Initial LR = {lr}")
    log_message (f"LR multipliter per epoch = {sgd_lr_multiplier:.5f}")
    log_message (f"Number of conv layers = {model.n_conv}")
    log_message (f"Number of channels in conv layers = {model.n_channels[1:]}")
    log_message ("_" * 50)

    train_loss_history = []
    val_loss_history = []

    best_avg_epoch_loss = 1000

    for epoch_index in range(1, n_epochs + 1):

        print (f"Epoch {epoch_index}/{n_epochs}")
        print (f"current SGD learning rate = {lr}")

        log_message (f"Epoch {epoch_index}/{n_epochs}")
        log_message (f"current SGD learning rate = {lr}")

        total_train_loss = 0

        #shuffle training set in a reproducible manner
        random.seed(42 + epoch_index)
        random.shuffle(training_set) 
        start1 = time.time()
        for x,y in training_set:
            #start2 = time.time()
            #get the CEL gradient from the forward pass directly
            loss, layer_grads = model.forward(x, y, requires_grad=True)
         
            #do SGD step
            model.fc -= lr*layer_grads[-1]
            
            model.layers = [layer - lr * grad for layer, grad in zip(model.layers, layer_grads[:-1])]
                 
            #for i in range(model.n_conv):
            #   model.layers[i] -= lr*layer_grads[i]
           

       
            #end = time.time()
            #rint(f"Elapsed time per SGD iter: {end - start2} seconds")
            total_train_loss += loss
        end = time.time()
        print(f"Elapsed time per epoch iter: {end - start1} seconds")
        
        #decrease lr each epoch
        lr *= sgd_lr_multiplier
        
        #compute, print and save avg loss per epoch
        avg_train_loss = total_train_loss / len(training_set)
        print (f"average train loss = {avg_train_loss:.5f}")
        log_message (f"average train loss = {avg_train_loss:.5f}")
        train_loss_history.append(avg_train_loss)
 
        avg_val_loss = evaluate_model_on (model, validation_set)

        if avg_val_loss < best_avg_epoch_loss:
            best_avg_epoch_loss = avg_val_loss
            best_model = copy.deepcopy(model)



        print (f"average val loss = {avg_val_loss:.5f}")
        log_message (f"average val loss = {avg_val_loss:.5f}")
        val_loss_history.append(avg_val_loss)
        print ("_" * 50)
        log_message ("_" * 50)




    if best_model is not None:
        return best_model, train_loss_history, val_loss_history
    else:
        return model, train_loss_history, val_loss_history
    #return model, train_loss_history, val_loss_history


What goes for SGD optimizer, also goes for grid search. In principle, those method do the same, I have no time left to make one method work for both MLP and CNN.

Grid search tries out several combinations of hyperparameters in order to achieve the lowest validation loss.

In [None]:
def grid_search(hyperparameters: dict,
                train_set: list,
                validation_set: list,
                n_epochs: int):
    best_loss = float("inf")
    best_params = {}
    best_model = None

    # extract hyperparameter combinations
    #learning_rate = hyperparameters.get('lr', [])
    #lr_multipliers = hyperparameters.get('lr_multiplier', [])
    #hidden_dims = hyperparameters.get('hidden_dim', [])
    #n_layers_list = hyperparameters.get('n_layers', [])

    n_conv_layers = hyperparameters.get('n_conv_layers', [])
    n_chan_mult = hyperparameters.get('n_chan_mult', [])


    #hyperparameters_to_tune = {
    #'n_conv_layers': [3, 5, 7],
    #'n_chan_mult': [1.1, 1.3],

    lr = SGD_LEARNING_RATE
    lr_multiplier = LEARNING_RATE_MULTIPLIER_PER_EPOCH
    n_channels = 4


    for n_conv in n_conv_layers:
        for chan_mult in n_chan_mult:
            print ("_" * 100)
            #print(f"Testing: lr={lr}, lr_multiplier={lr_multiplier}, "
            #        f"number of conv layers={n_conv}, number of channels in first conv layer = {n_channels}",
            #        f"multiplier of number of channels with each subsequent layer = {chan_mult}")
            print (f"Testing: lr={lr}, lr_multiplier={lr_multiplier}")
            print (f"number of conv layers={n_conv}, number of channels in first conv layer = {n_channels}")
            print (f"multiplier of number of channels with each subsequent layer = {chan_mult}")
          
            
            log_message ("_" * 100)
            log_message (f"Testing: lr={lr}, lr_multiplier={lr_multiplier}")
            log_message(f"number of conv layers={n_conv}, number of channels in first conv layer = {n_channels}")
            log_message(f"multiplier of number of channels with each subsequent layer = {chan_mult}")
          
            # create a new model for each combination

            model = CNN(in_dim = 24, out_dim=7, kernel_size = 3, n_channels = n_channels, n_chan_mult = chan_mult, n_conv_layers = n_conv)
        
            
            #model = MLP(n_layers=n_layers, hidden_dim=hidden_dim)

            # train the model and track validation loss history
            trained_model, train_loss_history, val_loss_history = train_CNN_with_SGD(
                model,
                train_set,
                validation_set,
                lr=lr,
                n_epochs=n_epochs,
                sgd_lr_multiplier=lr_multiplier
            )

            # find the best validation loss
            min_val_loss = min(val_loss_history)
            #print (f"min_val_loss = {min_val_loss:.5f}")

            # check if new combination is best
            if min_val_loss < best_loss:
                best_loss = min_val_loss
                best_params = {
                    'lr': lr,
                    'lr_multiplier': lr_multiplier,
                    'n_channels': n_channels,
                    'n_chan_mult': chan_mult,
                    'n_conv_layers': n_conv
                }
                best_model = trained_model
                best_train_loss_history = train_loss_history
                best_val_loss_history = val_loss_history
            
            print (f"min best val loss so far = {best_loss}")
            log_message (f"min best val loss so far = {best_loss}")
        
    return best_train_loss_history, best_val_loss_history, best_model, best_params, best_loss



Due to large training time of CNNs via SGD, I found a good learning rate as well as learning rate decay rate manually (0.0015 and 0.95 respectively).

I optimized for the number of convolutional layers as well as for the factor with which the number of channels grows from conv layer to conv layer.

In [None]:
            
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"cnn_grid_search_log_{timestamp}.txt"

with open(filename, "w") as f:
    f.write("=== New Log Start ===\n")
       

train_set, val_set, test_set = preprocess_dataset_for("cnn")

train_set = [(maxpool2d_grayscale(img), label) for img, label in train_set]
val_set   = [(maxpool2d_grayscale(img), label) for img, label in val_set]
test_set  = [(maxpool2d_grayscale(img), label) for img, label in test_set]




SGD_LEARNING_RATE = 15e-4
LEARNING_RATE_MULTIPLIER_PER_EPOCH = 0.95
N_EPOCHS = 5

hyperparameters_to_tune = {
    'n_conv_layers': [1, 3, 6],
    #'n_conv_layers': [2],
    'n_chan_mult': [1.1, 1.2]
    }


train_loss_history, val_loss_history, best_model, best_params, best_loss = grid_search(
        hyperparameters_to_tune,
        list(train_set),
        list(val_set),
        n_epochs=N_EPOCHS
    )




#cnn_trained, train_loss_history, val_loss_history = train_CNN_with_SGD (cnn,
#                                            list(train_set),
#                                            list(val_set),
#                                            SGD_LEARNING_RATE,
#                                            N_EPOCHS,
#                                            LEARNING_RATE_MULTIPLIER_PER_EPOCH
#                                            )
#avg_test_loss = evaluate_model_on(cnn_trained, list(test_set))
#print (f"TEST LOSS = {avg_test_loss:.5f}")
#log_message (f"TEST LOSS = {avg_test_loss:.5f}")
#print ("_" * 50)
#log_message ("_" * 50)

avg_test_loss = evaluate_model_on(best_model, list(test_set))
print (f"TEST LOSS = {avg_test_loss:.5f}")
log_message (f"TEST LOSS = {avg_test_loss:.5f}")
print ("_" * 50)
log_message ("_" * 50)

print (f"best hyperparams are {best_params}")
print (f"VAL LOSS of best model is = {best_loss:.5f}")
print (f"TEST LOSS of best model is = {avg_test_loss:.5f}")

log_message ("_" * 100)
log_message (f"best hyperparams are {best_params}")
log_message (f"VAL LOSS of best model is = {best_loss:.5f}")
log_message (f"TEST LOSS of best model is = {avg_test_loss:.5f}")


Train a CNN with "optimal" hyperparameters longer

In [None]:
n_channels = 4
new_cnn = CNN(in_dim = 24,
            out_dim=7, 
            kernel_size = 3, 
            n_channels = best_params['n_channels'], 
            n_chan_mult = best_params['chan_mult'],
            n_conv_layers = best_params['n_conv_layers'])
        
new_cnn_trained, train_loss_history, val_loss_history = train_CNN_with_SGD (new_cnn,
                                            list(train_set),
                                            list(val_set),
                                            best_params['lr'],
                                            10,
                                            best_params['lr_multiplier']
                                            )
avg_test_loss = evaluate_model_on(new_cnn_trained, list(test_set))
print (f"TEST LOSS = {avg_test_loss:.5f}")
print ("_" * 50)


Plot best CNN model loss history

In [None]:

plot_loss(train_loss_history, val_loss_history)

Plot CNN confusion matrix:

In [None]:
#print_confusion_matrix_of (best_model, test_set)
print_confusion_matrix_of (new_cnn_trained, test_set)