In [69]:
import numpy as np
import pandas as pd
import pickle
from torchvision import datasets,transforms
from torch.utils.data import DataLoader, random_split
import os
from sklearn.metrics import confusion_matrix,f1_score


Dense Layer

In [70]:
class Layer:
    def __init__(self, input_dim, output_dim):
        # Xavier initialization
        self.weights = np.random.randn(input_dim, output_dim) * np.sqrt(2 / (input_dim+output_dim))
        self.bias = np.zeros((1, output_dim))
        
    def forward(self, X):
        # print(f'{self.__class__.__name__}')
        # print(X.shape)
        self.input = X
        # Wx+b
        self.output = np.dot(X, self.weights) + self.bias
        return self.output
    
    def backward(self, dL_doutput):
        # Gradients
        dL_dinput = np.dot(dL_doutput, self.weights.T)
        dL_dweights = np.dot(self.input.T, dL_doutput)
        dL_dbias = np.sum(dL_doutput, axis=0, keepdims=True)
        
        # Update weights and biases
        self.grads = {
            'weights': dL_dweights,
            'bias': dL_dbias
        }
        return dL_dinput

Normalization

In [71]:
class BatchNormalization:
    def __init__(self, input_dim, epsilon=1e-5, momentum=0.9):
        self.input_dim=input_dim
        self.epsilon = epsilon
        self.momentum = momentum
        # scale parameter
        self.gamma = np.ones((1, self.input_dim))
        # shift parameter
        self.beta = np.zeros((1, self.input_dim))
        self.moving_mean = np.zeros((1, self.input_dim))
        self.moving_var = np.ones((1, self.input_dim))

    def forward(self, X, training=True):
        # print(f'{self.__class__.__name__}')
        # print(X.shape)
        if training:
            # page 296, something about len(X.shape)
            self.mean = np.mean(X, axis=0, keepdims=True)
            self.var = np.var(X, axis=0, keepdims=True)
            self.X_centered = X - self.mean
            self.std = np.sqrt(self.var + self.epsilon)
            self.X_norm=self.X_centered/self.std

            # Update running statistics
            self.moving_mean = self.momentum * self.moving_mean + (1 - self.momentum) * self.mean
            self.moving_var = self.momentum * self.moving_var + (1 - self.momentum) * self.var
        else:
            self.X_norm = (X - self.moving_mean) / np.sqrt(self.moving_var + self.epsilon)
        
        return self.gamma * self.X_norm + self.beta
    # confused about the backward propagation
    def backward(self, dL_doutput):
        m = dL_doutput.shape[0]
        dL_dgamma = np.sum(dL_doutput * self.X_norm, axis=0, keepdims=True)
        dL_dbeta = np.sum(dL_doutput, axis=0, keepdims=True)
        dL_dX_norm = dL_doutput * self.gamma
        dL_dvar = np.sum(dL_dX_norm * self.X_centered * -0.5 / (self.std ** 3), axis=0, keepdims=True)
        dL_dmean = np.sum(dL_dX_norm * -1 / self.std, axis=0, keepdims=True) + dL_dvar * np.mean(-2 * self.X_centered, axis=0, keepdims=True)
        
        dL_dinput = dL_dX_norm / self.std + dL_dvar * 2 * self.X_centered / m + dL_dmean / m
        
        self.grads = {
            'gamma': dL_dgamma,
            'beta': dL_dbeta
        }
        
        return dL_dinput


Activation

In [72]:
class ReLU:
    def forward(self, X):
        # print(f'{self.__class__.__name__}')
        # print(X.shape)
        self.output = np.maximum(0, X)
        return self.output
    
    def backward(self, dL_doutput):
        return dL_doutput * (self.output > 0)


Regularization

In [73]:
# Apply after activation function, page 177
class Dropout:

    def __init__(self, dropout):
        self.dropout = dropout


    def forward(self, X, training=True):
        # print(f'{self.__class__.__name__}')
        # print(X.shape)
        if training:
            assert 0 <= self.dropout <= 1
            # In this case, all elements are dropped out
            if self.dropout == 1:
                return np.zeros_like(X)
            # In this case, all elements are kept
            if self.dropout == 0:
                return X
            self.mask = np.random.uniform(0, 1, X.shape) > self.dropout
            # self.mask = np.random.binomial(1, 1 - self.dropout, size=X.shape)

            return X * self.mask.astype(np.float32) / (1.0 - self.dropout)

        return X

    # confused about the back propagation
    def backward(self, dL_doutput):
        return dL_doutput * self.mask

Optimization

In [74]:
class AdamOptimizer:
    def __init__(self, learning_rate=0.005, beta1=0.9, beta2=0.999, epsilon=1e-8):
        self.learning_rate = learning_rate
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        # momentum
        self.v = {}
        # second momentum
        self.s = {}
        # time step counter
        self.t = 0
        
    def update(self, params, grads):
        self.t += 1
        for param_name in params:
            if param_name not in self.v.keys():
                self.v[param_name] = np.zeros_like(grads[param_name])
                self.s[param_name] = np.zeros_like(grads[param_name])
            
            # Momentum estimates
            self.v[param_name] = self.beta1 * self.v[param_name] + (1 - self.beta1) * grads[param_name]
            self.s[param_name] = self.beta2 * self.s[param_name] + (1 - self.beta2) * np.square(grads[param_name])
            
            v_bias_corr = self.v[param_name] / (1 - self.beta1 ** self.t)
            s_bias_corr = self.s[param_name] / (1 - self.beta2 ** self.t)
            
            params[param_name] -= self.learning_rate * v_bias_corr / (np.sqrt(s_bias_corr) + self.epsilon)


Regression

In [75]:
class Softmax:
    def forward(self, X):
        # print(f'{self.__class__.__name__}')
        # print(X.shape)
        exps = np.exp(X - np.max(X, axis=1, keepdims=True))
        self.output = exps / np.sum(exps, axis=1, keepdims=True)
        return self.output
    
    def backward(self, dL_doutput):
        # In multi-class classification, the gradient of cross-entropy with softmax simplifies the backward pass.
        return dL_doutput


Feed-Forward Neural Network implementation

In [88]:
class NeuralNetwork:
    def __init__(self, layers):
        self.layers = layers

    def forward(self, X, training=True):

        # print('start')
        # print(f'{self.__class__.__name__}')
        # print(X.shape)
        for layer in self.layers:
            if isinstance(layer, Dropout):
                X = layer.forward(X, training)
            else:
                X = layer.forward(X)
        return X

    def backward(self, dL_doutput):
        for layer in reversed(self.layers):
            dL_doutput = layer.backward(dL_doutput)

    def get_params_and_grads(self):
        params_and_grads = {}
        for layer in self.layers:
            if isinstance(layer, Layer) or isinstance(layer, BatchNormalization):
                if isinstance(layer, Layer):
                    params_and_grads.update({
                        f'{layer.__class__.__name__}_weights': layer.weights,
                        f'{layer.__class__.__name__}_bias': layer.bias,
                        f'{layer.__class__.__name__}_grads_weights': layer.grads['weights'],
                        f'{layer.__class__.__name__}_grads_bias': layer.grads['bias']
                    })
                if isinstance(layer, BatchNormalization):
                    params_and_grads.update({
                        f'{layer.__class__.__name__}_gamma': layer.gamma,
                        f'{layer.__class__.__name__}_beta': layer.beta,
                        f'{layer.__class__.__name__}_grads_gamma': layer.grads['gamma'],
                        f'{layer.__class__.__name__}_grads_beta': layer.grads['beta']
                    })
        return params_and_grads

    def train(self, X_train, y_train, epochs, batch_size,filepath):
        optimizer = AdamOptimizer(learning_rate=0.005)
        for epoch in range(epochs):
            # Divide into batches
            for i in range(0, len(X_train), batch_size):
                X_batch = X_train[i:i+batch_size]
                y_batch = y_train[i:i+batch_size]
                
                # Forward pass
                predictions = self.forward(X_batch, training=True)
                loss = -np.mean(np.sum(y_batch * np.log(predictions + 1e-8), axis=1))  # Cross-entropy loss
                
                # Backward pass
                dL_doutput = predictions - y_batch  # Gradient of cross-entropy
                self.backward(dL_doutput)
                
                # Get params and gradients, update using Adam optimizer
                params_and_grads = self.get_params_and_grads()
                optimizer.update({k: params_and_grads[k] for k in params_and_grads if 'grads' not in k},
                                 {k.split('grads_')[0]+k.split('grads_')[1]: params_and_grads[k] for k in params_and_grads if 'grads' in k})
                if epoch==epochs-1 and i+batch_size>=len(X_train):
                    self.save_model(filepath)
                
            print(f'Epoch {epoch + 1}/{epochs}, Loss: {loss}')
        
    def predict(self, X_test):
        return self.forward(X_test, training=False)
    
    def evaluate(self,X_test,y_test):
        prediction = self.predict(X_test)
        y_pred=[[1 if prob == max(row) else 0 for prob in row] for row in prediction]
        y_pred=np.array(y_pred)
        accuracy = np.mean(y_pred == y_test)
        loss = -np.mean(np.sum(y_test * np.log(prediction + 1e-8), axis=1))  # Cross-entropy loss
        print(f"Accuracy: {accuracy * 100:.2f}%")
        print(f"Loss: {loss}")
        # y_test=[[j if k == 1 else 0 for j,k in enumerate(i)] for i in y_test]
        # y_test=[max(i) for i in y_test]
        # y_pred=[[j if k == 1 else 0 for j,k in enumerate(i)] for i in y_pred]
        # y_pred=[max(i) for i in y_pred]
        tn, fp, fn, tp = 0,0,0,0
        for i,row in enumerate(y_test):
            for j,element in enumerate(row):
                # print(element,y_pred[j])
                if element==1 and y_pred[i][j]==1:
                    tp+=1
                elif element==1 and y_pred[i][j]==0:
                    fn+=1
        for i,row in enumerate(y_pred):
            for j,element in enumerate(row):
                # print(element,y_pred[j])
                if element==0:
                    continue
                elif element==1 and y_test[i][j]==0:
                    fp+=1
        tn=len(y_test)-fp
        print('tn\tfp\tfn\ttp')
        print(f'{tn}\t{fp}\t{fn}\t{tp}')
        # print(y_test[:10])
        # print(y_pred[:10])
        specificity = tn / (tn + fp)
        precision = tp/(tp+fp)
        f1 = 2*specificity*precision/(specificity+precision)
        # print(f1)
        print("F1 Score: ", f1)
        return accuracy,loss,f1

    def save_model(self, filepath):
        params = {}
        for i, layer in enumerate(self.layers):
            if isinstance(layer, Layer):
                params[f"layer_{i}_weights"] = layer.weights
                params[f"layer_{i}_bias"] = layer.bias
            if isinstance(layer, BatchNormalization):
                params[f"layer_{i}_gamma"] = layer.gamma
                params[f"layer_{i}_beta"] = layer.beta
        with open(filepath, 'wb') as f:
            pickle.dump(params, f)

    def load_model(self, filepath):
        with open(filepath, 'rb') as f:
            params = pickle.load(f)

        for i, layer in enumerate(self.layers):
            if isinstance(layer, Layer):
                layer.weights = params[f"layer_{i}_weights"]
                layer.bias = params[f"layer_{i}_bias"]
            if isinstance(layer, BatchNormalization):
                layer.gamma = params[f"layer_{i}_gamma"]
                layer.beta = params[f"layer_{i}_beta"]


Data loading

In [77]:
transform=transforms.ToTensor()
train_dataset=datasets.FashionMNIST(root='./data',train=True,transform=transform,download=True)
test_dataset=datasets.FashionMNIST(root='./data',train=False,transform=transform,download=True)



Data cleaning

Train-test-validation split

In [None]:
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=len(train_dataset), shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=len(val_dataset), shuffle=False)

X_train, y_train = next(iter(train_loader))
X_train = X_train.reshape(-1, 28*28).numpy()  # Flatten the images
y_train=np.eye(10)[y_train]
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)


Building the architecture

In [None]:
# Define network architecture
network = NeuralNetwork([
    Layer(input_dim=28*28, output_dim=128),
    BatchNormalization(input_dim=128),
    ReLU(),
    Dropout(dropout=0.2),
    Layer(input_dim=128, output_dim=128),
    BatchNormalization(input_dim=128),
    ReLU(),
    Dropout(0.5),
    Layer(input_dim=128, output_dim=10),
    Softmax()
])

# here Layer(784, 128) means input layer has 784 neurons and hidden layer has 128 neurons
# Layer(128, 10) that hidden layer with 128 layers plus output layer with 10 neurons

folder_path = './pickle/'
filename = 'weights.pkl'
os.makedirs(folder_path, exist_ok=True)
filepath = os.path.join(folder_path, filename)
network.train(X_train,y_train,100,64,filepath)

# network.save_model(filepath)

Validation block

In [89]:
X_val, y_val = next(iter(val_loader))
X_val = X_val.reshape(-1, 28*28).numpy()  # Flatten the images
y_val=np.eye(10)[y_val]
# print(y_val[:10])

# y_val=[[j if k == 1 else 0 for j,k in enumerate(i)] for i in y_val]
# y_val=[max(i) for i in y_val]
# print(y_val[:10])
print("X_val shape:", X_val.shape)
print("y_val shape:", y_val.shape)
network.evaluate(X_val,y_val)

X_val shape: (12000, 784)
y_val shape: (12000, 10)
Accuracy: 94.81%
Loss: 0.7423473425440631
tn	fp	fn	tp
9	1	1	9
F1 Score:  0.9


(0.94815, 0.7423473425440631, 0.9)

In [None]:
# The hyperparameter we'll change is the number of hidden layers, ranging from 1 to 5
best_accuracy = 0
best_model = None
layers=[]
layers.append(Layer(input_dim=28*28, output_dim=256))
for i, layer_size in enumerate(range(1,6,1)):
    print ('{}: Testing neural network MLP classifier with {} hidden layers with constant number of layers'.format(i+1, layer_size))
    if layer_size!=1:
        layers.pop()
        layers.pop()
        layers.append(Layer(256,256))
    layers.append(BatchNormalization(256))
    layers.append(ReLU())
    layers.append(Dropout(0.2))
    layers.append(Layer(256,10))
    layers.append(Softmax())
    model = NeuralNetwork(layers)

    # Train
    model.train(X_train, y_train,10,64,filepath)

    current_accuracy,current_loss,current_f1 = model.evaluate(X_val, y_val)

    if best_accuracy < current_accuracy:
        best_model=model
        best_accuracy = current_accuracy

    del (model)


In [None]:
# The hyperparameter we'll change is the number of neurons and hidden layers, ranging from 1 to 5 for hidden layers and n_neurons for neurons
best_accuracy = 0
best_model = None
n_neurons=[784,512,256,128,64,32,16,10]
layers=[]
layers.append(Layer(input_dim=n_neurons[0], output_dim=n_neurons[1]))
for i, layer_size in enumerate(range(1,len(n_neurons)-1,1)):
    print ('{}: Testing neural network MLP classifier with {} hidden layers and variable number of neurons at each layer'.format(i+1, layer_size))
    if layer_size!=1:
        layers.pop()
        layers.pop()
        layers.append(Layer(n_neurons[layer_size-1],n_neurons[layer_size]))
    layers.append(BatchNormalization(n_neurons[layer_size]))
    layers.append(ReLU())
    layers.append(Dropout(0.2))
    layers.append(Layer(n_neurons[layer_size],n_neurons[-1]))
    layers.append(Softmax())
    model = NeuralNetwork(layers)

    # Train
    model.train(X_train, y_train,10,64,filepath)

    current_accuracy,current_loss,current_f1 = model.evaluate(X_val, y_val)

    if best_accuracy < current_accuracy:
        best_model=model
        best_accuracy = current_accuracy

    del (model)


In [None]:
# The hyperparameter we'll change is the number of neurons in the hidden layer, ranging from n_neurons[0] to n_neurons[-1]
accuracies = []
models = []
n_neurons=[784,512,256,128,64,32,16,10]

for i, layer_size in enumerate(range(1,len(n_neurons)-1,1)):
    print ('{}: Testing neural network MLP classifier with {} neurons in hidden layer'.format(i+1, n_neurons[layer_size]))
    layers=[]
    layers.append(Layer(input_dim=n_neurons[0], output_dim=n_neurons[layer_size]))
    layers.append(BatchNormalization(n_neurons[layer_size]))
    layers.append(ReLU())
    layers.append(Dropout(0.2))
    layers.append(Layer(n_neurons[layer_size],n_neurons[-1]))
    layers.append(Softmax())
    model = NeuralNetwork(layers)

    # Train
    model.train(X_train, y_train,10,64,filepath)

    current_accuracy,current_loss,current_f1 = model.evaluate(X_val, y_val)
    
    models.append(model)
    accuracies.append(current_accuracy)

    del (model)
sorted_models = pd.DataFrame({
    'Models': models,
    'Accuracies': accuracies
    })
sorted_models = sorted_models.sort_values(by='Accuracies', ascending=False)
sorted_models

Testing block

In [None]:
network.load_model(filepath)
test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)
X_test, y_test = next(iter(test_loader))
X_test = X_test.reshape(-1, 28*28).numpy()
y_test=np.eye(10)[y_test]
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)
network.evaluate(X_test,y_test)