# Importing Libraries and Loading Data

In [None]:
from torchvision import datasets, transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, confusion_matrix
import numpy as np
from tabulate import tabulate
import pandas as pd
import matplotlib.pyplot as plt
import pickle

# Define transformation
transform = transforms.ToTensor()

# Load the training dataset
train_dataset = datasets.FashionMNIST(root='data', train=True, download=True, transform=transform)

# Load test dataset separately
test_dataset = datasets.FashionMNIST(root='data', train=False, download=True, transform=transform)

# CONSTANT SEED

In [None]:
# seed all
np.random.seed(42)

# Base Layer Definition

In [None]:
class Layer():
    def __init__(self):
        self.training = True
        pass

    def forward(self, input):
        pass

    def backward(self, grad_output):
        pass

    def train(self):
        self.training = True
        
    def eval(self):
        self.training = False

    def parameters(self):
            return []
    
    def info(self):
        return []

# Dense Layer Definition

In [None]:
class Dense(Layer):
    def __init__(self, in_dimension, out_dimension):
        super().__init__()
        # Initialize weights with HE initialization [Better for ReLU activation]
        self.weights = np.random.randn(in_dimension, out_dimension) * np.sqrt(2.0 / in_dimension)
        # random bias initialization
        self.bias = np.random.randn(out_dimension)
        self.grad_weights = None
        self.grad_bias = None

    def forward(self, input):
        self.input = input
        return np.dot(input, self.weights) + self.bias
    
    def backward(self, grad_output):
        self.grad_input = np.dot(grad_output, self.weights.T)
        self.grad_weights = np.dot(self.input.T, grad_output)
        self.grad_bias = np.sum(grad_output, axis=0)        
        return self.grad_input
    
    def parameters(self):
        return [
            {'params': self.weights, 'grads': self.grad_weights},
            {'params': self.bias, 'grads': self.grad_bias}
        ]
    
    def info(self):
        return [
            {
                'name': 'Dense',
                'in_dimension': self.weights.shape[0],
                'out_dimension': self.weights.shape[1],
                'weights': self.weights,
                'bias': self.bias
            }
        ]

# ReLU Layer Definition

In [None]:
class ReLU(Layer):
    def __init__(self):
        super().__init__()

    def forward(self, input):
        self.input = input
        return np.maximum(input, 0)
    
    def backward(self, grad_output):
        self.grad_input = grad_output * (self.input > 0)
        return self.grad_input

    def info(self):
        return [
            {
                'name': 'ReLU'
            }
        ]    

# Dropout Layer Definition

In [None]:
class Dropout(Layer):
    def __init__(self, drop_probability):
        super().__init__()
        # p must be between 0 and 1
        assert 0 <= drop_probability <= 1
        self.keep_probabalility = 1.0 - drop_probability

    def forward(self, input):
        if self.training:
            self.mask = np.random.binomial(1, self.keep_probabalility, input.shape) / self.keep_probabalility
            return input * self.mask
        else:
            return input
    
    def backward(self, grad_output):
        self.grad_input = grad_output * self.mask
        return self.grad_input
    
    def info(self):
        return [
            {
                'name': 'Dropout',
                'drop_probability': 1.0 - self.keep_probabalility             
            }
        ]

# Batch Normalization Layer Definition

In [None]:
class BatchNorm(Layer):
    def __init__(self, dimension, momentum=0.9):
        super().__init__()
        self.gamma = np.ones(dimension)
        self.beta = np.zeros(dimension)
        self.momentum = momentum
        self.epsilon = 1e-8
        self.count = 0
        self.running_miu = 0
        self.running_var = 0
        self.grad_gamma = None
        self.grad_beta = None

    def forward(self, input):
        if self.training:
            self.miuB = np.mean(input, axis=0)
            self.x_minus_miuB = input - self.miuB
            self.varB_actual = np.var(input, axis=0)
            self.varB = self.varB_actual + self.epsilon
            self.stdB = np.sqrt(self.varB)
            self.norm = (input - self.miuB) / self.stdB

            self.running_miu = self.momentum * self.running_miu + (1 - self.momentum) * self.miuB
            self.running_var = self.momentum * self.running_var + (1 - self.momentum) * self.varB_actual
            self.count = max(input.shape[0], self.count)
            
            return self.gamma * self.norm + self.beta
        else:
            new_var = self.running_var * (self.count / (self.count - 1))
            factor = self.gamma / np.sqrt(new_var + self.epsilon)
            return factor * input + (self.beta - self.running_miu * factor)
            
    
    def backward(self, grad_output):
        self.grad_beta = np.sum(grad_output, axis=0)
        self.grad_gamma = np.sum(grad_output * self.norm, axis=0)
        self.grad_norm = grad_output * self.gamma
        self.grad_varB = np.sum(self.grad_norm * self.x_minus_miuB * -0.5 * self.varB ** -1.5, axis=0)
        self.grad_miuB = np.sum(self.grad_norm * -1 / self.stdB, axis=0) + self.grad_varB * np.mean(-2 * self.x_minus_miuB, axis=0)
        self.grad_input = self.grad_norm / self.stdB + self.grad_varB * 2 * self.x_minus_miuB / grad_output.shape[0] + self.grad_miuB / grad_output.shape[0]
        return self.grad_input
    
    def parameters(self):
        return [
            {'params': self.gamma, 'grads': self.grad_gamma},
            {'params': self.beta, 'grads': self.grad_beta},
        ]
    
    def info(self):
        return [
            {
                'name': 'BatchNorm',
                'gamma': self.gamma,
                'beta': self.beta,
                'running_miu': self.running_miu,
                'running_var': self.running_var,
                'count': self.count,
                'momentum': self.momentum
            }
        ]

# Softmax Layer Definition

In [None]:
class SoftMax(Layer):
    def __init__(self):
        super().__init__()

    def forward(self, input):
        self.output = np.exp(input - np.max(input, axis=1, keepdims=True))
        self.output /= np.sum(self.output, axis=1, keepdims=True)
        return self.output
    
    def backward(self, grad_output):
        self.grad_input = np.zeros_like(grad_output)
        for i in range(grad_output.shape[0]):
            tmp = np.diag(self.output[i]) - np.outer(self.output[i], self.output[i])
            self.grad_input[i] = np.dot(tmp, grad_output[i])
        return self.grad_input
    
    def info(self):
        return [
            {
                'name': 'SoftMax'
            }
        ]

# Cross Entropy Loss Definition

In [None]:
class CrossEntropyLoss():
    def __init__(self):
        pass

    def calculate(self, y_true, y_pred):        
        return -np.sum(y_true * np.log(y_pred + 1e-8))

# Adam Optimizer Definition

In [None]:
class Adam:
    def __init__(self, layers, lr=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):
        self.layers = layers
        self.parameters = self.params()
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        
        self.m = [np.zeros_like(param['params']) for param in self.parameters]
        self.v = [np.zeros_like(param['params']) for param in self.parameters]
        self.t = 0    

    def params(self):
        params = []
        for layer in self.layers:
            params.extend(layer.parameters())
        return params
        
    def step(self):
        self.parameters = self.params()
        self.t += 1
        for i, param in enumerate(self.parameters):
            grads = param['grads']
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grads
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (grads** 2)
            m_hat = self.m[i] / (1 - self.beta1 ** self.t)
            v_hat = self.v[i] / (1 - self.beta2 ** self.t)
            param['params'] -= self.lr * m_hat / (np.sqrt(v_hat) + self.epsilon)

# Model Class Definition

In [None]:
class Model:
    def __init__(self):
        self.layers = []
        pass

    def train(self):
        for layer in self.layers:
            layer.train()

    def eval(self):
        for layer in self.layers:
            layer.eval()

    def add(self, layer):
        self.layers.append(layer)

    def forward(self, input):
        for layer in self.layers:
            input = layer.forward(input)
        return input
    
    def backward(self, grad_output):
        for layer in reversed(self.layers):
            grad_output = layer.backward(grad_output)
                
    def save(self, filename):
        infos = []
        for layer in self.layers:
            infos.extend(layer.info())

        with open(filename, 'wb') as f:
            pickle.dump(infos, f)

    def load(self, filename):
        with open(filename, 'rb') as f:
            infos = pickle.load(f)

        self.layers = []
        for info in infos:
            if info['name'] == 'Dense':
                layer = Dense(info['in_dimension'], info['out_dimension'])
                layer.weights = info['weights']
                layer.bias = info['bias']
                self.layers.append(layer)
            elif info['name'] == 'ReLU':
                self.layers.append(ReLU())
            elif info['name'] == 'Dropout':
                self.layers.append(Dropout(info['drop_probability']))
            elif info['name'] == 'BatchNorm':
                layer = BatchNorm(info['gamma'].shape[0])
                layer.gamma = info['gamma']
                layer.beta = info['beta']
                layer.running_miu = info['running_miu']
                layer.running_var = info['running_var']
                layer.count = info['count']
                layer.momentum = info['momentum']
                self.layers.append(layer)
            elif info['name'] == 'SoftMax':
                self.layers.append(SoftMax())

    def summary(self):
        for layer in self.layers:
            info = layer.info()[0]
            if info['name'] == 'Dense':
                print(f"Dense({info['in_dimension']}, {info['out_dimension']})")
            elif info['name'] == 'ReLU':
                print('ReLU()')
            elif info['name'] == 'Dropout':
                print(f"Dropout({info['drop_probability']})")
            elif info['name'] == 'BatchNorm':
                print(f"BatchNorm({info['gamma'].shape[0]})")
            elif info['name'] == 'SoftMax':
                print('SoftMax()')

# Problem Specific Dimensions

In [None]:
input_dimension = 28 * 28
output_dimension = 10

# Hyperparameters, Model and File Paths Initialization

In [None]:
batch_size = 64
num_epochs = 500

# learning rate
lr = 0.01 

# depends on how many models are defined
# in my case there 3 models, so possible values are 0, 1, 2
model_id = 0

model_path = f'model_{model_id}_{lr}.pickle'
metrics_base_path = f'metrics_{model_id}_{lr}'
confusion_matrix_path = f'confusion_matrix_{model_id}_{lr}'

In [None]:
models = []

# Simple model with no BatchNorm, Dropout

In [None]:
model0 = Model()

model0.add(Dense(input_dimension, 256))
model0.add(ReLU())

model0.add(Dense(256, 128))
model0.add(ReLU())

model0.add(Dense(128, output_dimension))

model0.add(SoftMax())

# A bit deep model without Dropout

In [None]:
model1 = Model()

model1.add(Dense(input_dimension, 512))
model1.add(BatchNorm(512))
model1.add(ReLU())

model1.add(Dense(512, 256))
model1.add(BatchNorm(256))
model1.add(ReLU())

model1.add(Dense(256, 128))
model1.add(BatchNorm(128))
model1.add(ReLU())

model1.add(Dense(128, output_dimension))

model1.add(SoftMax())

# Deep model with everything

In [None]:
model2 = Model()

model2.add(Dense(input_dimension, 512))
model2.add(BatchNorm(512))
model2.add(ReLU())
model2.add(Dropout(0.5))

model2.add(Dense(512, 256))
model2.add(BatchNorm(256))
model2.add(ReLU())
model2.add(Dropout(0.5))

model2.add(Dense(256, 256))
model2.add(BatchNorm(256))
model2.add(ReLU())
model2.add(Dropout(0.5))

model2.add(Dense(256, 128))
model2.add(BatchNorm(128))
model2.add(ReLU())
model2.add(Dropout(0.4))

model2.add(Dense(128, 128))
model2.add(BatchNorm(128))
model2.add(ReLU())
model2.add(Dropout(0.3))

model2.add(Dense(128, 128))
model2.add(BatchNorm(128))
model2.add(ReLU())
model2.add(Dropout(0.3))

model2.add(Dense(128, output_dimension))

model2.add(SoftMax())

# Add to models list

In [None]:
models.extend([model0, model1, model2])

# Selecting the model

In [None]:
model = models[model_id]

# Initializing the loss and optimizer

In [None]:
loss = CrossEntropyLoss()
adam = Adam(model.layers, lr)

# Data Preprocessing and Splitting

In [None]:
X_data = train_dataset.data.numpy().reshape(-1, input_dimension) / 255.0
y_data = np.eye(10)[train_dataset.targets.numpy()]

X_test = test_dataset.data.numpy().reshape(-1, input_dimension) / 255.0
y_test = np.eye(10)[test_dataset.targets.numpy()]

X_train, X_valid, y_train, y_valid = train_test_split(
    X_data, y_data, train_size=0.8, random_state=42
)

print(f'Training data shape: {X_train.shape}, {y_train.shape}')
print(f'Validation data shape: {X_valid.shape}, {y_valid.shape}')
print(f'Test data shape: {X_test.shape}, {y_test.shape}')

In [None]:
metrics = []

best_macro_f1 = -np.inf

# Training Loop and Metric Calculation with Validation at every epoch

In [None]:
# # Uncomment to train the model

# for epoch in range(num_epochs):    
#     # Shuffle training data
#     indices = np.arange(X_train.shape[0])
#     np.random.shuffle(indices)
#     X_train = X_train[indices]
#     y_train = y_train[indices]

#     model.train()
#     loss_value = 0

#     # Training loop
#     for i in range(0, X_train.shape[0], batch_size):
#         X_batch = X_train[i:i + batch_size]
#         y_batch = y_train[i:i + batch_size]

#         # Forward pass
#         y_pred = model.forward(X_batch)

#         # Compute loss
#         current_loss = loss.calculate(y_batch, y_pred)
#         loss_value += current_loss

#         # Compute loss gradient
#         loss_grad = -y_batch / (y_pred + 1e-8)

#         # Backward pass
#         model.backward(loss_grad)

#         # Update parameters
#         adam.step()
        
#     # Average training loss
#     loss_value /= X_train.shape[0]

#     # Training accuracy
#     model.eval()
#     y_train_pred = model.forward(X_train)
#     train_accuracy = np.mean(
#         np.argmax(y_train_pred, axis=1) == np.argmax(y_train, axis=1)
#     )

#     # Validation step
#     model.eval()
#     y_valid_pred = model.forward(X_valid)
#     valid_loss = loss.calculate(y_valid, y_valid_pred) / X_valid.shape[0]
#     valid_accuracy = np.mean(
#         np.argmax(y_valid_pred, axis=1) == np.argmax(y_valid, axis=1)
#     )

#     # decrease learning rate
#     adam.lr *= 0.98
    
#     y_valid_true = np.argmax(y_valid, axis=1)
#     y_valid_pred_labels = np.argmax(y_valid_pred, axis=1)
#     valid_macro_f1 = f1_score(y_valid_true, y_valid_pred_labels, average='macro')                   

#     if epoch % 10 == 9:
#         print(
#             f'Epoch {epoch + 1}/{num_epochs}, '
#             f'Train Loss: {loss_value:.4f}, '
#             f'Train Accuracy: {train_accuracy:.4f}, '
#             f'Validation Loss: {valid_loss:.4f}, '
#             f'Validation Accuracy: {valid_accuracy:.4f}, '
#             f'Validation Macro-F1: {valid_macro_f1:.4f}'
#         )

#     metrics.append([
#         epoch + 1,
#         f"{loss_value:.4f}",
#         f"{train_accuracy:.4f}",
#         f"{valid_loss:.4f}",
#         f"{valid_accuracy:.4f}",
#         f"{valid_macro_f1:.4f}"
#     ])

#     if valid_macro_f1 > best_macro_f1:        
#         print(f'Epoch {epoch + 1} - Saving model')
#         best_macro_f1 = valid_macro_f1
#         model.save(model_path) 

# Saving metrics as pretty table in a txt file

In [None]:
# # Uncomment to save the metrics

# headers = ["Epoch", "Train Loss", "Train Accuracy", "Validation Loss", "Validation Accuracy", "Validation Macro-F1"]

# table = tabulate(metrics, headers=headers, tablefmt="pretty")

# with open(f'{metrics_base_path}.txt', "w") as f:
#     f.write(table)

# Save metrics as Pandas Dataframe with headers

In [None]:
# # Uncomment to save the metrics as CSV

# df = pd.DataFrame(metrics, columns=headers)

# df.to_csv(f'{metrics_base_path}.csv', index=False)

# Necessary plotting of Train and Validation Loss and Accuracy vs Epochs
# Plotting the Validation Macro F1 Score vs Epochs

In [None]:
# # Uncomment to plot the metrics

# # Ensure the columns are numeric
# df['Epoch'] = pd.to_numeric(df['Epoch'])
# df['Train Accuracy'] = pd.to_numeric(df['Train Accuracy'])
# df['Validation Accuracy'] = pd.to_numeric(df['Validation Accuracy'])
# df['Train Loss'] = pd.to_numeric(df['Train Loss'])
# df['Validation Loss'] = pd.to_numeric(df['Validation Loss'])
# df['Validation Macro-F1'] = pd.to_numeric(df['Validation Macro-F1'])

# plt.figure(figsize=(12, 6))

# plt.subplot(1, 3, 1)
# plt.plot(df['Epoch'], df['Train Accuracy'], color='green', label='Train Accuracy')
# plt.plot(df['Epoch'], df['Validation Accuracy'], color='red', label='Validation Accuracy')

# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.title('Training and Validation Accuracy')
# plt.ylim(min(df['Train Accuracy'].min(), df['Validation Accuracy'].min()) * 0.8, 1)

# plt.legend()


# plt.subplot(1, 3, 2)
# plt.plot(df['Epoch'], df['Train Loss'], color='green', label='Train Loss')
# plt.plot(df['Epoch'], df['Validation Loss'], color='red', label='Validation Loss')
# plt.title('Training and Validation Loss')
# plt.ylim(min(df['Train Loss'].min(), df['Validation Loss'].min()) * 0.8, max(df['Train Loss'].max(), df['Validation Loss'].max()) * 1.2)

# plt.xlabel('Epoch')
# plt.ylabel('Loss')

# plt.legend()

# plt.subplot(1, 3, 3)
# plt.plot(df['Epoch'], df['Validation Macro-F1'], color='blue', label='Validation Macro-F1')

# plt.xlabel('Epoch')
# plt.ylabel('Macro F1 Score')
# plt.title('Validation Macro-F1 Score')
# plt.ylim(df['Validation Macro-F1'].min() * 0.8, 1)

# plt.legend()


# plt.tight_layout()

# # save the plot
# plt.savefig(f'{metrics_base_path}.png')

# plt.show()

# Print confusion matrix on Validation Data

In [None]:
model_val = Model()
# model_val.load(model_path)

model_val.load('model_1905118.pickle')
model.eval()

y_pred = model.forward(X_valid)

y_true = np.argmax(y_valid, axis=1)
y_pred_labels = np.argmax(y_pred, axis=1)
conf_matrix = confusion_matrix(y_true, y_pred_labels)

conf_matrix_str = tabulate(conf_matrix, tablefmt='grid')

with open(f'{confusion_matrix_path}_valid.txt', "w") as f:
    f.write(conf_matrix_str)

print(conf_matrix_str)


# Loading Model and Getting Summary of Layers

In [None]:
# Load the model
model = Model()
# model.load(model_path)

# Loading best model
model.load('model_1905118.pickle')

model.summary()

# Testing the model

In [None]:
# Report the test accuracy and macro F1 score

model.eval()

y_pred = model.forward(X_test)

test_accuracy = np.mean(np.argmax(y_pred, axis=1) == np.argmax(y_test, axis=1))
test_macro_f1_score = f1_score(np.argmax(y_test, axis=1), np.argmax(y_pred, axis=1), average='macro')

print(f'Test Accuracy : {test_accuracy:.4f}')
print(f'Test Macro F1 Score: {test_macro_f1_score:.4f}')

# Print confusion matrix on test data

In [None]:
y_true = np.argmax(y_test, axis=1)
y_pred_labels = np.argmax(y_pred, axis=1)
conf_matrix = confusion_matrix(y_true, y_pred_labels)

conf_matrix_str = tabulate(conf_matrix, tablefmt='grid')

with open(f'{confusion_matrix_path}_test.txt', "w") as f:
    f.write(conf_matrix_str)

print(conf_matrix_str)


### best on validation data
# -------------Model 2 with learning rate 0.01-------------
## Validation Accuracy: 0.9092
## Validation Macro F1 Score: 0.9093
# ---------------------------------------------------------------------------