# Downloading the dataset

In [27]:
from torchvision import datasets, transforms
import pickle

transform = transforms.ToTensor()

# train_dataset = datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
# test_dataset = datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)
with open('a1.pkl', 'rb') as a1:
  test_dataset = pickle.load(a1)

# Preprocessing

In [28]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder

def conv_to_dataframe(dataset):
    x = []
    y = []
    for img,label in dataset:
        x.append(np.array(img).flatten())
        y.append(label)
    dataframe = pd.DataFrame(x)
    dataframe['target'] = y
    return dataframe

def preprocessing(dataframe,target):
    # drop target column null datapoints
    dataframe.dropna(subset=[target], inplace=True)

    # fill in missing null values
    for i in dataframe:
        if dataframe[i].dtypes=='object':
            dataframe.fillna({i:dataframe[i].mode()[0]}, inplace=True)
        else:
            dataframe.fillna({i:dataframe[i].mean()}, inplace=True)
    
    # drop duplicates
    dataframe.drop_duplicates(inplace=True)
    
    Features = dataframe.drop(target,axis=1)
    Labels = dataframe[target]
    Labels = Labels.to_numpy().astype(int)

    one_hot = np.zeros((Labels.shape[0],10))
    one_hot[np.arange(Labels.shape[0]),Labels] = 1

    Features = Features.to_numpy().astype(float)
    
    # For scaling matching with train dataset
    Features = (Features - np.min(Features)) / (np.max(Features) - np.min(Features))

    # one_hot = one_hot.to_numpy().astype(float)
    return Features, one_hot

# train_dataframe = conv_to_dataframe(train_dataset)
test_dataframe = conv_to_dataframe(test_dataset)



In [29]:
# x_train,y_train = preprocessing(train_dataframe,'target')
x_test,y_test = preprocessing(test_dataframe,'target')

# FNN and other necessary classes

In [30]:
import numpy as np
import pickle
from tqdm import tqdm
from sklearn.metrics import accuracy_score,recall_score,confusion_matrix,precision_score,f1_score,roc_auc_score,average_precision_score
import matplotlib.pyplot as plt
import seaborn as sns


class ReLU:
    def __init__(self):
        self.input = None
    
    def forward(self, input):
        self.input = input
        return np.maximum(0,self.input)
    
    def backward(self, input):
        temp = input.copy()
        temp[self.input<0] = 0
        return temp
    
class Dense:
    def __init__(self, n_input, n_output):
        temp = np.sqrt(6 / (n_input + n_output))
        self.weight = np.random.uniform(-temp,temp,(n_input, n_output))
        self.bias = np.random.uniform(-temp,temp,(1, n_output))
        self.input = self.bias_grad = self.weight_grad = None

    def forward(self, input):
        self.input = input
        return (self.input @ self.weight) + self.bias
    
    def backward(self, input):
        self.bias_grad = np.sum(input,axis=0)
        self.weight_grad = self.input.T @ input
        return input @ self.weight.T
    
class BatchNormalization:
    def __init__(self,input_n,momentum=0.9):
        self.momentum = momentum
        self.gamma = np.ones((1, input_n))
        self.beta = np.zeros((1, input_n))
        self.running_var = np.ones((1, input_n))
        self.running_mean = np.zeros((1, input_n))
        self.x = self.beta_grad = self.gamma_grad = None

    def forward(self,input,is_training):
        epsilon = 1e-12
        if is_training:
            self.x = input
            mean = np.mean(self.x,axis=0)
            var = np.var(self.x,axis=0)
            self.running_mean = self.running_mean*self.momentum + (1-self.momentum)*mean
            self.running_var = self.running_var*self.momentum + (1-self.momentum)*var
            return self.gamma*(self.x-mean)/np.sqrt(var+epsilon) + self.beta
        else:
            return self.gamma*(input-self.running_mean)/np.sqrt(self.running_var+epsilon) + self.beta
        
    def backward(self,input):
        epsilon = 1e-12
        mean = np.mean(self.x,axis=0)
        var = np.var(self.x,axis=0)
        self.beta_grad = np.sum(input, axis=0)
        self.gamma_grad = np.sum(input*(self.x-mean)/np.sqrt(var+epsilon), axis=0)
        d_var = -0.5 * self.gamma * np.power(var+epsilon,-3/2) * np.sum((self.x-mean)*input,axis=0)
        d_mean = (-self.gamma/np.sqrt(var+epsilon))*np.sum(input,axis=0) + d_var * -2 * np.sum(self.x-mean,axis=0)/input.shape[0]
        dx = input * self.gamma/np.sqrt(var+epsilon) + d_var * 2 * (self.x-mean)/input.shape[0] + d_mean/input.shape[0]
        return dx
    
class Dropout:
    def __init__(self,rate):
        self.rate = rate
        self.mask = None
    
    def forward(self, input, is_training):
        if is_training:
            self.mask = (np.random.rand(1,input.shape[1])>self.rate).astype(np.float32)
            return self.mask*input/(1-self.rate)
        else:
            return input

    def backward(self, input):
        return self.mask*input/(1-self.rate)

class SoftMax:
    def __init__(self):
        self.input = self.output = self.r_input = self.r_output = None
    
    def forward(self, input):
        self.input = input
        temp = self.input-np.max(self.input,axis=1).reshape(-1,1)
        temp = np.exp(temp)
        self.output = temp/np.sum(temp,axis=1).reshape(-1,1)
        return self.output
    
    def backward(self,input):
        self.r_input = input
        self.r_output = np.empty((0,self.r_input.shape[1]))
        for i in range(len(self.r_input)):
            temp = np.zeros((self.r_input.shape[1],self.r_input.shape[1]))
            for j in range(temp.shape[1]):
                for k in range(temp.shape[1]):
                    if (j==k):
                        temp[k,j] = self.output[i,j]*(1-self.output[i,j])
                    else:
                        temp[k,j] = -self.output[i,j]*self.output[i,k]
            temp = self.r_input[i] @ temp
            self.r_output = np.vstack([self.r_output,temp])
        return self.r_output

class CrossEntropyLoss:
    def forward(self,y_pred,y):
        temp = np.log(y_pred)
        temp = -temp*y
        temp = np.sum(temp,axis=1)
        return np.mean(temp)
    
    def backward(self,y_pred,y):
        temp = - y/y_pred
        return temp/y.shape[0]
    
class SquaredErrorLoss:
    def forward(self,y_pred,y):
        return np.mean(np.sum(np.power(y-y_pred,2),axis=1))


    def backward(self,y_pred,y):
        return (-2*(y-y_pred))/y.shape[0]


class FNN:
    def __init__(self):
        self.layers = []
        self.optimizer = None
        self.loss = None
        self.metrics = ['loss']
    
    def add_layer(self,layer):
        self.layers.append(layer)

    def compile(self,optimizer,loss,metrics=[]):
        self.loss = loss
        self.optimizer = optimizer
        if hasattr(self.optimizer,'initialize_params'):
            self.optimizer.initialize_params(self.layers)
        self.metrics = ['loss'] + metrics

    def performance_metrics(self,y, y_pred, y_pred_prob):
        result = {}
        if ('accuracy' in self.metrics):
            result['accuracy'] = accuracy_score(y, y_pred)
        if ('recall' in self.metrics):
            result['recall'] = recall_score(y, y_pred, average='weighted', zero_division=0)
        if ('precision' in self.metrics):
            result['precision'] = precision_score(y, y_pred, average='weighted', zero_division=0)
        if ('f1-score' in self.metrics):
            result['f1-score'] = f1_score(y, y_pred, average='macro', zero_division=0)
        if ('auroc' in self.metrics):
            result['auroc'] = roc_auc_score(y, y_pred_prob, average='weighted', multi_class='ovr')
        if ('aupr' in self.metrics):
            result['aupr'] = average_precision_score(y, y_pred_prob, average='weighted')
        return result
    
    def save_model(self,name):
        for i in self.layers:
            if isinstance(i,Dense):
                i.input = i.bias_grad = i.weight_grad = None
            elif isinstance(i,ReLU):
                i.input = None
            elif isinstance(i,BatchNormalization):
                i.x = i.beta_grad = i.gamma_grad = None
            elif isinstance(i,Dropout):
                i.mask = None
            elif isinstance(i,SoftMax):
                i.input = i.output = i.r_input = i.r_output = None
        with open(name, 'wb') as file:
            pickle.dump(self, file)


    def forward(self,input,is_training):
        temp = input.copy()
        for layer in self.layers:
            if isinstance(layer,Dropout) or isinstance(layer,BatchNormalization):
                temp = layer.forward(temp,is_training)
            else:
                temp = layer.forward(temp)
        return temp
    
    def backward(self,input):
        temp = input.copy()
        for layer in reversed(self.layers):
            temp = layer.backward(temp)
        return temp
    
    def evaluate(self,X,y):
        y_pred = self.forward(X,False)
        epsilon = 1e-12
        y_pred = np.clip(y_pred, epsilon, 1. - epsilon)
        loss = self.loss.forward(y_pred,y) 

        one_hot = np.zeros_like(y_pred)  
        max_indices = np.argmax(y_pred, axis=1)  
        one_hot[np.arange(y_pred.shape[0]), max_indices] = 1

        res = self.performance_metrics(y,one_hot,y_pred)
        res = {'loss': loss, **res}

        return res
    
    def predict(self,X):
        return self.forward(X,False)
    
    def train(self,X,y,epochs,batch_size=64,validation_split=0,show_graph=False):
        ind = np.arange(y.shape[0])
        np.random.shuffle(ind)
        train_indices = ind[:int(y.shape[0] * (1 - validation_split))]
        val_indices = ind[int(y.shape[0] * (1 - validation_split)):]
        X_train, X_val = X[train_indices], X[val_indices]
        y_train, y_val = y[train_indices], y[val_indices]
        batch_n = int(np.ceil(y_train.shape[0]/batch_size))
        training_metric = []
        validation_metric = []
        confusion_matrix_training = None
        confusion_matrix_validation = None
        res = None
        v_res = None
        for i in range(epochs):
            print(f"Epoch {i+1}/{epochs}")
            loss = 0
            total_y_pred_prob = np.empty((0,y_train.shape[1]))
            total_y_pred = np.empty((0,y_train.shape[1]))
            total_y = np.empty((0,y_train.shape[1]))
            indices = np.arange(y_train.shape[0])
            np.random.shuffle(indices)
            pbar =tqdm(total=batch_n)
            for j in range(batch_n):
                batch_indices = indices[j * batch_size:(j + 1) * batch_size]
                X_batch = X_train[batch_indices]
                y_batch = y_train[batch_indices]

                y_pred = self.forward(X_batch,True)
                epsilon = 1e-12
                y_pred = np.clip(y_pred, epsilon, 1. - epsilon)
                loss += self.loss.forward(y_pred,y_batch) 

                temp = self.loss.backward(y_pred,y_batch)
                self.backward(temp)

                self.optimizer.update(self.layers)
                
                one_hot = np.zeros_like(y_pred)  
                max_indices = np.argmax(y_pred, axis=1)  
                one_hot[np.arange(y_pred.shape[0]), max_indices] = 1

                total_y_pred_prob = np.vstack([total_y_pred_prob,y_pred])
                total_y_pred = np.vstack([total_y_pred,one_hot])
                total_y = np.vstack([total_y,y_batch])

                res = self.performance_metrics(total_y,total_y_pred,total_y_pred_prob)
                res = {'loss': loss/(j+1), **res}

                # print(f"{', '.join(f"{key}: {res[key]}" for key in res)}")
                pbar.set_postfix(res)
                pbar.update(1)

            self.optimizer.update_learning_rate(learning_rate_scheduler(self.optimizer.learning_rate,i+1))
            confusion_matrix_training = confusion_matrix(np.argmax(total_y,axis=1),np.argmax(total_y_pred,axis=1))
            res = {'epoch': i+1, **res}
            training_metric.append(res)
            if validation_split>0:
                v_res = self.evaluate(X_val,y_val)
                print(f"{', '.join(f"validation {key}: {v_res[key]}" for key in v_res)}")
                y_pred = self.predict(X_val)
                one_hot = np.zeros_like(y_pred)  
                max_indices = np.argmax(y_pred, axis=1)  
                one_hot[np.arange(y_pred.shape[0]), max_indices] = 1
                confusion_matrix_validation = confusion_matrix(np.argmax(y_val,axis=1),np.argmax(one_hot,axis=1))
            v_res = {'epoch': i+1, **v_res}
            validation_metric.append(v_res)
            pbar.close()
        
        # print(f"{', '.join(f"training {key}: {res[key]}" for key in res)}")
        # print(f"{', '.join(f"validation {key}: {v_res[key]}" for key in v_res)}")

        if show_graph:
            fields = {key: [metric[key] for metric in training_metric] for key in training_metric[0]}
            for field, values in fields.items():
                if field != 'epoch':
                    plt.plot(fields['epoch'], values, label="training " + field)
            if validation_split>0:
                fields = {key: [metric[key] for metric in validation_metric] for key in validation_metric[0]}
                for field, values in fields.items():
                    if field != 'epoch':
                        plt.plot(fields['epoch'], values, label="validation " + field)
            plt.xlabel('Epoch')
            plt.ylabel('Metric Value')
            plt.title('Metrics Over Epochs')
            plt.legend()
            plt.show()
            print("Training confusion matrix:")
            plt.figure(figsize=(8, 6))
            sns.heatmap(confusion_matrix_training, annot=True, fmt="d", cmap="Blues")
            plt.xlabel("Predicted Label")
            plt.ylabel("True Label")
            plt.title("Confusion Matrix")
            plt.show()
            print("Validation confusion matrix:")
            plt.figure(figsize=(8, 6))
            sns.heatmap(confusion_matrix_validation, annot=True, fmt="d", cmap="Blues")
            plt.xlabel("Predicted Label")
            plt.ylabel("True Label")
            plt.title("Confusion Matrix")
            plt.show()

class Adam:
    def __init__(self,learning_rate = 0.01,beta1=0.9,beta2=0.999):
        self.learning_rate = learning_rate
        self.initial_learning_rate = self.learning_rate
        self.beta1 = beta1
        self.beta2 = beta2
        self.t = 1
        self.m = []
        self.v = []

    def initialize_params(self,layers):
        for i in layers:
            if isinstance(i,Dense):
                self.m.append([np.zeros_like(i.bias),np.zeros_like(i.weight)])
                self.v.append([np.zeros_like(i.bias),np.zeros_like(i.weight)])
            elif isinstance(i,BatchNormalization):
                self.m.append([np.zeros_like(i.gamma),np.zeros_like(i.beta)])
                self.v.append([np.zeros_like(i.gamma),np.zeros_like(i.beta)])

    def update_learning_rate(self,learning_rate):
        self.learning_rate = learning_rate
    
    def update(self,layers):
        epsilon = 1e-12
        j = 0
        for i in layers:
            if isinstance(i,Dense):
                self.m[j][0] = self.m[j][0]*self.beta1+(1-self.beta1)*i.bias_grad
                self.m[j][1] = self.m[j][1]*self.beta1+(1-self.beta1)*i.weight_grad
                self.v[j][0] = self.v[j][0]*self.beta2+(1-self.beta2)*np.power(i.bias_grad,2)
                self.v[j][1] = self.v[j][1]*self.beta2+(1-self.beta2)*np.power(i.weight_grad,2)

                temp1 = self.m[j][0]/(1-(self.beta1**self.t))
                temp2 = self.v[j][0]/(1-(self.beta2**self.t))

                i.bias -= (self.learning_rate*temp1)/(np.sqrt(temp2)+epsilon)

                temp1 = self.m[j][1]/(1-(self.beta1**self.t))
                temp2 = self.v[j][1]/(1-(self.beta2**self.t))

                i.weight -= (self.learning_rate*temp1)/(np.sqrt(temp2)+epsilon)
                j += 1

            elif isinstance(i,BatchNormalization):
                self.m[j][0] = self.m[j][0]*self.beta1+(1-self.beta1)*i.gamma_grad
                self.m[j][1] = self.m[j][1]*self.beta1+(1-self.beta1)*i.beta_grad
                self.v[j][0] = self.v[j][0]*self.beta2+(1-self.beta2)*np.power(i.gamma_grad,2)
                self.v[j][1] = self.v[j][1]*self.beta2+(1-self.beta2)*np.power(i.beta_grad,2)

                temp1 = self.m[j][0]/(1-(self.beta1**self.t))
                temp2 = self.v[j][0]/(1-(self.beta2**self.t))

                i.gamma -= (self.learning_rate*temp1)/(np.sqrt(temp2)+epsilon)

                temp1 = self.m[j][1]/(1-(self.beta1**self.t))
                temp2 = self.v[j][1]/(1-(self.beta2**self.t))

                i.beta -= (self.learning_rate*temp1)/(np.sqrt(temp2)+epsilon)
                j += 1

        self.t += 1
    
def learning_rate_scheduler(learning_rate,epoch):
    if (epoch%10==0):
        learning_rate = learning_rate*0.8
    return learning_rate

# Training

In [31]:

# fnn = FNN()
# fnn.add_layer(Dense(784,512))
# fnn.add_layer(BatchNormalization(512))
# fnn.add_layer(ReLU())
# fnn.add_layer(Dropout(0.4))
# fnn.add_layer(Dense(512,256))
# fnn.add_layer(BatchNormalization(256))
# fnn.add_layer(ReLU())
# fnn.add_layer(Dropout(0.3))
# fnn.add_layer(Dense(256,64))
# fnn.add_layer(BatchNormalization(64))
# fnn.add_layer(ReLU())
# fnn.add_layer(Dropout(0.2))
# fnn.add_layer(Dense(64,10))
# fnn.add_layer(BatchNormalization(10))
# fnn.add_layer(SoftMax())

# fnn.compile(optimizer=Adam(learning_rate=0.01),loss=CrossEntropyLoss(),metrics=['accuracy','f1-score'])
# fnn.train(x_train,y_train,epochs=100,batch_size=10000,validation_split=0.2)
        

# Training and Validation for 4 diffferent learning rate and 3 different models

In [32]:
# import copy

# learning_rate = [0.005,0.003,0.001,0.01]
# model = []

# # model 1
# fnn = FNN()
# fnn.add_layer(Dense(784,256))
# fnn.add_layer(BatchNormalization(256))
# fnn.add_layer(ReLU())
# fnn.add_layer(Dropout(0.5))
# fnn.add_layer(Dense(256,64))
# fnn.add_layer(BatchNormalization(64))
# fnn.add_layer(ReLU())
# fnn.add_layer(Dropout(0.5))
# fnn.add_layer(Dense(64,10))
# fnn.add_layer(BatchNormalization(10))
# fnn.add_layer(SoftMax())

# model.append(fnn)

# # model 2
# fnn = FNN()
# fnn.add_layer(Dense(784,512))
# fnn.add_layer(BatchNormalization(512))
# fnn.add_layer(ReLU())
# fnn.add_layer(Dropout(0.4))
# fnn.add_layer(Dense(512,256))
# fnn.add_layer(BatchNormalization(256))
# fnn.add_layer(ReLU())
# fnn.add_layer(Dropout(0.3))
# fnn.add_layer(Dense(256,64))
# fnn.add_layer(BatchNormalization(64))
# fnn.add_layer(ReLU())
# fnn.add_layer(Dropout(0.2))
# fnn.add_layer(Dense(64,10))
# fnn.add_layer(BatchNormalization(10))
# fnn.add_layer(SoftMax())

# model.append(fnn)

# # model 3
# fnn = FNN()
# fnn.add_layer(Dense(784,128))
# fnn.add_layer(BatchNormalization(128))
# fnn.add_layer(ReLU())
# fnn.add_layer(Dropout(0.5))
# fnn.add_layer(Dense(128,10))
# fnn.add_layer(BatchNormalization(10))
# fnn.add_layer(SoftMax())

# model.append(fnn)

# model = [copy.deepcopy(model) for i in range(len(learning_rate))]


In [33]:
# for i in range(len(learning_rate)):
#     j = 0
#     for fnn in model[i]:
#         print(f"Model: {j+1}, Learning rate: {learning_rate[i]}")
#         fnn.compile(optimizer=Adam(learning_rate=learning_rate[i]),loss=CrossEntropyLoss(),metrics=['accuracy','f1-score'])
#         fnn.train(x_train,y_train,epochs=50,batch_size=10000,validation_split=0.2,show_graph=True)
#         j+=1

# Saving the best performing model

In [34]:

# model[3][2].save_model('model_1905016.pickle')

# Evaluating the test set on best performing model

In [35]:

with open('model_1905016.pickle', 'rb') as file:
    fnn = pickle.load(file)


res = fnn.evaluate(x_test,y_test)
print(f"{', '.join(f"test {key}: {res[key]}" for key in res)}")

test loss: 0.979001640507303, test accuracy: 0.6970368319025201, test f1-score: 0.30829199830090637
