# COMP4329 Assignment 1

## 1.1 Loading packages

In [None]:
import numpy as np
import matplotlib.pyplot as pl
# from ipywidgets import interact, widgets
# from matplotlib import animation

Matplotlib is building the font cache; this may take a moment.


## 1.2 Load Data

In [6]:
#load training data
train_data = np.load("Assignment1-Dataset/test_data.npy")
train_labels = np.load("Assignment1-Dataset/train_label.npy")

#load testing data 
test_data = np.load("Assignment1-Dataset/test_data.npy")
test_labels = np.load("Assignment1-Dataset/test_label.npy")

## 1.3 Visualise data


In [7]:
unique_arr = np.unique(train_labels)
print(unique_arr)

[0 1 2 3 4 5 6 7 8 9]


## 2. Activation Class

In [None]:
class Activation:

    def __tanh(self, x):
        return np.tanh(x)

    def __tanh_derivative(self, a):
        return 1.0 - a**2

    def __sigmoid(self, x):
        return 1.0 / (1.0 + np.exp(-x))

    def __sigmoid_derivative(self, a):
        return a * (1 - a)

    def __relu(self, x):
        return np.maximum(0, x)

    def __relu_derivative(self, a):
        # Note: Heaviside at 0 is often 0.5 or 1. Using 0 matches some conventions for ReLU derivative.
        return np.heaviside(a, 0)

    def __leakyrelu(self, x, alpha=0.01):
        return np.where(x >= 0, x, alpha * x)

    def __leakyrelu_derivative(self, x, alpha=0.01):
         # Derivative is 1 for x>=0, alpha otherwise. Using heaviside approximates this.
        return np.heaviside(x, 1) * (1 - alpha) + alpha

    def __softmax(self, z):
        # Ensure input is 2D for consistent axis operations
        z = np.atleast_2d(z)
        # Improve numerical stability by subtracting the max
        max_z = np.max(z, axis=1, keepdims=True)
        stable_z = z - max_z
        exp_z = np.exp(stable_z)
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def __softmax_derivative(self, z, z_hat):
        # Derivative of Cross-Entropy Loss w.r.t Softmax input (z) is simply (y_hat - y)
        # This assumes it's used with Cross-Entropy loss during backpropagation.
        # z = true labels (one-hot), z_hat = predictions (softmax output)
        return z_hat - z

    def __init__(self, activation_function = 'relu'):
        self.f = None
        self.f_derivative = None

        if activation_function == "tanh":
            self.f = self.__tanh
            self.f_derivative = self.__tanh_derivative
        elif activation_function == "sigmoid":
            self.f = self.__sigmoid
            self.f_derivative = self.__sigmoid_derivative
        elif activation_function == 'relu':
            self.f = self.__relu
            self.f_derivative = self.__relu_derivative
        elif activation_function == 'leakyrelu':
            self.f = self.__leakyrelu
            self.f_derivative = self.__leakyrelu_derivative
        elif activation_function == "softmax":
            self.f = self.__softmax
            # Note: Softmax derivative is often coupled with CE loss in backprop
            self.f_derivative = self.__softmax_derivative

## 3. Defining Hidden Layers
Just going through one hidden to wrap head around it

In [None]:
class HiddenLayer():

    def __init__(self,
                 n_in,
                 n_out,
                 activation_function_previous_layer = 'relu',
                 activation_function = 'relu',
                 W = None, # Not used for initialization in current code
                 b = None,  # Not used for initialization in current code
                 v_W = None, # Not used in current code
                 v_b = None, # Not used in current code
                 last_hidden_layer = False): # Not used in current code logic

        self.input = None
        self.activation_function = Activation(activation_function).f

        # Set activation derivative for the *previous* layer's activation
        # This is used in the backward pass calculation: delta_prev = delta_curr * W * f'(input_prev)
        self.activation_function_derivative = None
        if activation_function_previous_layer:
            # Get the derivative function object from the Activation class
            self.activation_function_derivative = Activation(activation_function_previous_layer).f_derivative

        # Xavier Initialisation for weights
        self.W = np.random.uniform(low = -np.sqrt(6. / (n_in + n_out)),
                                   high = np.sqrt(6. / (n_in + n_out )),
                                   size = (n_in, n_out))

        # Initialise bias as zeros
        self.b = np.zeros(n_out)

        # Weight scaling for sigmoid activation
        if activation_function == 'sigmoid':
           self.W *= 4 # Why 4? Standard practice is often just Xavier/He init.

        # Initialize gradients
        self.grad_W = np.zeros(self.W.shape)
        self.grad_b = np.zeros(self.b.shape)

        # Velocity terms (for momentum) are initialized but not used in the update function
        # self.v_W = np.zeros_like(self.grad_W)
        # self.v_b = np.zeros_like(self.grad_b)

    def forward(self, input):
        # Ensure input is at least 2D for dot product
        input = np.atleast_2d(input)
        self.input = input # Store the actual input to this layer

        # Compute linear output: z = xW + b
        linear_output = np.dot(self.input, self.W) + self.b

        # Apply activation function: a = f(z)
        self.output = (
            linear_output if self.activation_function is None
            else self.activation_function(linear_output)
        )

        return self.output

    def backward(self, delta, layer_number, output_layer = False):
        # Ensure delta is 2D
        delta = np.atleast_2d(delta)

        # Calculate gradients for weights and biases
        # dL/dW = dL/da * da/dz * dz/dW = delta * f'(z) * x  <- But f'(z) handled in delta propagation
        # Simplified: dL/dW = x^T * delta (where delta is dL/dz for the current layer)
        # Input needs to be from the forward pass (self.input)
        self.grad_W = np.dot(self.input.T, delta)

        # dL/db = dL/da * da/dz * dz/db = delta * f'(z) * 1 <- But f'(z) handled in delta propagation
        # Simplified: dL/db = sum(delta) over batch
        self.grad_b = np.average(delta, axis=0) # Average gradient over the batch

        # Propagate the error (delta) to the previous layer
        # delta_prev = dL/da_prev = dL/dz * dz/da_prev = delta * W^T
        # We also need to multiply by the derivative of the *previous* layer's activation function:
        # delta_prev = (delta * W^T) * f'(z_prev)
        # Note: self.activation_function_derivative is f' of the *previous* layer's activation
        if self.activation_function_derivative:
            # Calculate dL/da_prev
            delta_prev_activation = np.dot(delta, self.W.T)
            # Calculate dL/dz_prev = dL/da_prev * f'(z_prev)
            # Here, self.activation_function_derivative is f' and self.input is the output of the previous layer (a_prev),
            # but we need f'(z_prev). Assuming a_prev is close enough or using the derivative w.r.t 'a' instead of 'z'.
            # A potentially more correct way is delta_prev = np.dot(delta, self.W.T) * self.activation_function_derivative(z_prev)
            # where z_prev would need to be stored. The current code uses f'(a_prev).
            delta = delta_prev_activation * self.activation_function_derivative(self.input) # self.input is a_prev

        return delta

## MLP class

In [None]:
class MLP:
    def __init__(self,
                 layers, # List of neuron counts, e.g., [input_dim, hidden1_dim, ..., output_dim]
                 activation_function = [None, 'relu', 'relu','relu', 'softmax'],
                 weight_decay = 1.0): # Note: Weight decay not implemented in update

        self.layers = []
        self.params = [] # Not used

        self.activation_function = activation_function
        self.weight_decay = weight_decay # Stored but not used in loss/update

        # Create layers
        for i in range(len(layers)-1):
            # Determine previous layer's activation for derivative calculation in backward pass
            activation_prev = activation_function[i]
            # Determine current layer's activation
            activation_curr = activation_function[i+1]

            # The 'last_hidden_layer' flag seems unused in the HiddenLayer class logic
            last_hidden_layer = (i == len(layers) - 2)

            self.layers.append(HiddenLayer(layers[i],
                                           layers[i+1],
                                           activation_function_previous_layer = activation_prev,
                                           activation_function = activation_curr,
                                           last_hidden_layer=last_hidden_layer)) # Flag seems unused

    def forward(self, input):
        current_output = input
        for layer in self.layers:
            current_output = layer.forward(current_output)
        # Final output is the output of the last layer
        return current_output

    def CE_loss(self, z, z_hat):
        # z = true labels (one-hot), z_hat = predictions (softmax output)
        # Ensure stability: clip predictions to avoid log(0)
        epsilon = 1e-12
        z_hat = np.clip(z_hat, epsilon, 1. - epsilon)

        # Calculate Cross-Entropy loss
        loss = - np.sum(z * np.log(z_hat), axis=1) # Sum over classes first
        loss = np.mean(loss) # Average over batch

        # Apply weight decay (L2 regularization) - NOTE: Not done correctly here.
        # L2 should be added to the loss: loss += (lambda/2m) * sum(W^2)
        # The multiplication by self.weight_decay here seems incorrect.
        # Also, weight decay usually applies during the weight update step.
        # loss = loss * self.weight_decay # This is likely incorrect

        # Calculate the initial delta for backpropagation (dL/dz for the output layer)
        # For Softmax + CrossEntropy, this is simply (y_hat - y)
        delta = Activation(self.activation_function[-1]).f_derivative(z, z_hat) # y_hat - y

        return loss, delta

    def backward(self, delta):
        # Backpropagate delta through layers in reverse order
        current_delta = delta
        # Start from the last layer and go backwards
        for layer_number, layer in reversed(list(enumerate(self.layers))):
             output_layer_flag = (layer_number == len(self.layers) - 1)
             # Pass the current delta and get the delta for the previous layer
             current_delta = layer.backward(current_delta, layer_number, output_layer=output_layer_flag)


    def update(self, lr, SGD_optim = None): # SGD_optim not used
        # Standard Gradient Descent update
        # Add weight decay term here if desired: W = W - lr * (grad_W + lambda * W)
        for layer in self.layers:
            # Standard SGD update
            layer.W -= lr * layer.grad_W
            layer.b -= lr * layer.grad_b
            # Example with L2 weight decay (lambda = self.weight_decay):
            # layer.W -= lr * (layer.grad_W + self.weight_decay * layer.W)
            # layer.b -= lr * layer.grad_b # Bias usually not regularized


    def fit(self, X, y, learning_rate = 0.1, epochs = 100, SGD_optim = None, batch_size = 1):
        X = np.array(X)
        y = np.array(y) # Assumes y is already one-hot encoded here
        training_loss_history = []
        training_accuracy_history = []
        testing_accuracy_history = []

        num_samples = X.shape[0]

        for k in range(epochs):
            start_time_epoch = time.time()
            epoch_loss = []

            # Shuffle data at the beginning of each epoch
            X_shuffled, y_shuffled = Utils.shuffle(X, y)

            num_batches = int(np.ceil(num_samples / batch_size))

            for batch_idx in range(num_batches):
                # Get mini-batch
                start_idx = batch_idx * batch_size
                end_idx = min(start_idx + batch_size, num_samples)
                X_batch = X_shuffled[start_idx:end_idx]
                y_batch = y_shuffled[start_idx:end_idx]

                # Forward pass
                y_hat = self.forward(X_batch)

                # Calculate loss and initial delta
                loss, delta = self.CE_loss(y_batch, y_hat)
                epoch_loss.append(loss)

                # Backward pass
                self.backward(delta)

                # Update weights
                self.update(learning_rate, SGD_optim)

            # Calculate average loss for the epoch
            avg_epoch_loss = np.mean(epoch_loss)
            training_loss_history.append(avg_epoch_loss)

            # --- Evaluation ---
            # Get predictions on training and test data
            # Note: predict method expects single samples, modify for batch prediction if needed
            # Or iterate - inefficient for large datasets
            train_predictions_one_hot = self.predict(train_df.X) # Using global train_df
            test_predictions_one_hot = self.predict(test_df.X)   # Using global test_df

            # Decode predictions and true labels
            train_predictions_decoded = Preprocessing.decode(train_predictions_one_hot)
            test_predictions_decoded = Preprocessing.decode(test_predictions_one_hot)

            # Assuming test_labels is loaded globally and is [N, 1] shape
            test_labels_flat = test_labels.flatten()
            # Decode one-hot encoded training labels used in fit()
            train_labels_decoded = Preprocessing.decode(y) # y was passed to fit

            # Calculate accuracy
            train_accuracy = np.mean(train_predictions_decoded == train_labels_decoded)
            test_accuracy = np.mean(test_predictions_decoded == test_labels_flat)

            training_accuracy_history.append(train_accuracy)
            testing_accuracy_history.append(test_accuracy)

            end_time_epoch = time.time()
            epoch_duration = end_time_epoch - start_time_epoch

            print(f'Epoch {k+1}/{epochs} | Train Loss: {avg_epoch_loss:.4f} | Train Acc: {train_accuracy*100:.2f}% | Test Acc: {test_accuracy*100:.2f}% | Time: {epoch_duration:.2f}s')

        # Prepare output dictionary
        output_dict = {'Training Loss': training_loss_history,
                       'Training Accuracy': training_accuracy_history,
                       'Testing Accuracy': testing_accuracy_history}

        return output_dict

    def predict(self, x):
        # Assumes x is the full dataset (N, features)
        # Iterates through each sample - potentially slow
        x = np.array(x)
        num_samples = x.shape[0]
        # Initialize output array - determine output shape based on last layer
        output_dim = self.layers[-1].W.shape[1]
        output = np.zeros((num_samples, output_dim))

        for i in range(num_samples):
             # Pass single sample, ensure it's 2D (1, features)
             sample = np.atleast_2d(x[i, :])
             output[i, :] = self.forward(sample)

        return output # Returns (N, num_classes) probabilities

In [None]:
class Preprocessing:

    def __init__(self, X, y):
        self.X = X
        self.y = y # Expects raw labels (not one-hot initially)
        # self.predictions = None # Seems unused

    def normalize(self):
        """Min-Max normalization"""
        min_val = np.min(self.X) # Global min/max, consider feature-wise
        max_val = np.max(self.X)
        # Avoid division by zero if max == min
        if max_val - min_val != 0:
            self.X = (self.X - min_val) / (max_val - min_val)
        # Else: data is constant, normalization doesn't change it but avoids NaN

    def standardize(self):
        """Standardization (Z-score normalization)"""
        mean_val = np.mean(self.X) # Global mean/std, consider feature-wise
        std_val = np.std(self.X)
        # Avoid division by zero if std is 0
        if std_val != 0:
            self.X = (self.X - mean_val) / std_val
        # Else: data is constant, standardization results in zeros but avoids NaN


    @staticmethod
    def label_encode(label_vector):
        """Converts a vector of integer labels into a one-hot matrix."""
        labels = label_vector.flatten().astype(int) # Ensure flat integer array
        num_classes = np.max(labels) + 1 # Determine num_classes dynamically
        # Alternative: num_classes = len(np.unique(labels)) if labels might not contain all classes 0..N-1

        one_hot = np.zeros((labels.size, num_classes))
        one_hot[np.arange(labels.size), labels] = 1
        return one_hot

    @staticmethod
    def decode(prediction_matrix):
        """Converts a matrix of probabilities/scores into class labels."""
        # Finds the index (class) with the highest value for each sample
        decoded_predictions = np.argmax(prediction_matrix, axis=1)
        return decoded_predictions

In [None]:
class Utils:

    @staticmethod
    def shuffle(X, y):
        """Shuffles X and y arrays consistently."""
        assert X.shape[0] == y.shape[0], "X and y must have the same number of samples."
        shuffled_idx = np.arange(X.shape[0])
        np.random.shuffle(shuffled_idx)
        return X[shuffled_idx], y[shuffled_idx]