<a href="https://colab.research.google.com/github/sid-betalol/CS6910-FODL-Assignment1/blob/main/cs6910_assignment1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# To add
1. More activation functions: LeakyRelu
2. Given optimizers and more(eve)
3. class/function to make a network
4. adding wandb logger
5. breaking the notebook down to scripts based on code implemengtation instructions
6. dropuout and early stopping if possible

###**wandb setup**

In [None]:
!pip install wandb

In [None]:
import wandb
wandb.login()

###**Importing the required libraries and dataset**


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

##**Getting and exploring the data**

In [None]:
from keras.datasets import fashion_mnist
np.random.seed(42)
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()

In [None]:
print('Train Data:')
print('X:', x_train.shape)
print('Y:', y_train.shape)
print()
print('Test Data:')
print('X:', x_test.shape)
print('Y:', y_test.shape)

###**Classwise Sample from data**

In [None]:
# y_train is used instead of y_test, as training data is expected
# to have samples from every class unlike the test data
num_labels = np.unique(y_train).shape[0]
# print(num_labels) ## prints 10 in accordance with the keras dataset
labels = [
    'T-shirt/top', 
    'Trouser', 
    'Pullover', 
    'Dress', 
    'Coat', 
    'Sandal', 
    'Shirt', 
    'Sneaker', 
    'Bag', 
    'Ankle boot',
    ]
def show_samples(n, X = x_train, Y = y_train, n_classes = num_labels, classes = labels):
    # n : number of samples to be shown from each class
    
    # samples divided by class
    labelled_data = {i: x_train[y_train==i] for i in range(n_classes)}
    
    # maximum number of available samples in each class  
    max_samples = {i:len(labelled_data[i]) for i in range(n_classes)}

    # samples from class to be shown in random order
    shuffled_indices = {i: np.random.permutation(max_samples[i]) for i in range(n_classes)}
    
    # maximum number of available samples will be shown if the user 
    # asks for more samples than available to be shown
    num_samples = {i: min(max_samples[i], n) for i in range(n_classes)}

    fig, axs = plt.subplots(nrows = 10, ncols = max(num_samples.values()), figsize=(1.2*max(num_samples.values()), 15))
    for i in range(10):
        for j in range(num_samples[i]):
            img = labelled_data[i][shuffled_indices[i][j]].astype(np.uint8).reshape(28, 28)
            axs[i, j].imshow(img, cmap="gray")
            axs[i, j].axis("off")
            axs[i, j].set_title(classes[i])
    plt.show()

show_samples(15)

##**Template Class for Activation Functions**

In [None]:
class daddyActivation():
    def __init__(self, *args, **kwargs):
        """
        Constructor
        """
        self.grads = {}
        self.backprop_cache = {}

    def __call__(self, *args, **kwargs):
        """
        Calling the class as a function instance
        does a forward and a backward pass
        """
        op = self.forward(*args, **kwargs)
        self.grads = self.calc_grads(*args, **kwargs)
        return op

    def forward(self, *args, **kwargs):
        """
        Defining the forward pass of the activation function
        """
        pass
    
    def calc_grads(self, *args, **kwargs):
        """
        Calculates the gradient of the activation function
        with respect to its input
        """
        pass
    
    def backward(self, *args, **kwargs):
        """ 
        Calculates the gradients of the loss with respect 
        to the input of the activation function, using the gradients 
        computed in the calc_grads method
        """
        pass

###**Activation Functions**

In [None]:
class Sigmoid(daddyActivation):
    
    def __init__(self):
        
        super().__init__()

    def forward(self, x):
        
        self.backprop_cache = 1/(1+np.exp(-x))
        return self.backprop_cache

    def calc_grads(self, x):
        
        id = "x"
        y = self.backprop_cache
        diff = y*(1-y)
        return {id:diff}

    def backward(self, y_hat):
        
        return self.grads['x']*y_hat


class Tanh(daddyActivation):
    
    def __init__(self):
        
        super().__init__()

    def forward(self, x):
        
        self.backprop_cache = (np.exp(x) - np.exp(-x))/(np.exp(-x)+np.exp(x))
        return self.backprop_cache

    def calc_grads(self, x):
        
        id = "x"
        y = self.backprop_cache
        diff = 1- y**2
        return {id:diff}

    def backward(self, y_hat):
        
        return self.grads['x']*y_hat


class ReLU(daddyActivation):

    def __init__(self):
        
        super().__init__()

    def forward(self, x):
        
        self.backprop_cache = np.maximum(x, 0.0)
        return self.backprop_cache

    def calc_grads(self,x):
        
        id = "x"
        y = self.backprop_cache
        diff = (y > 0).astype("float")
        return {id:diff}

    def backward(self, y_hat):
        
        return self.grads['x']*y_hat


class LeakyReLU(daddyActivation):
    
    def __init__(self, alpha=0.1):
        super().__init__()
        self.alpha = alpha
    
    def forward(self, x):
        self.backprop_cache = np.maximum(x, self.alpha*x)
        return self.backprop_cache
    
    def calc_grads(self, x):
        id = "x"
        y = self.backprop_cache
        diff = np.where(y > 0, 1, self.alpha)
        return {id: diff}
    
    def backward(self, y_hat):
        return self.grads['x']*y_hat

class ParamReLU(daddyActivation):
    
    def __init__(self, alpha=0.1):
        super().__init__()
        self.alpha = alpha
        
    def forward(self, x):
        self.backprop_cache = np.maximum(x, self.alpha*x)
        return self.backprop_cache
    
    def calc_grads(self, x):
        id = "x"
        y = self.backprop_cache
        diff = np.where(y > 0, 1, self.alpha)
        self.grads["alpha"] = np.where(y > 0, 0, x*self.alpha)
        return {id: diff}
    
    def backward(self, y_hat):
        self.grads["x"] = self.grads["x"] + self.grads["alpha"]*self.alpha
        return self.grads["x"]*y_hat

##**Template Class for Loss Functions**

In [None]:
class daddyLoss():
    
    def __init__(self, *args, **kwargs):
        
        self.grads = {}
        self.backprop_cache = {}

    def __call__(self, y_pred, y_true, *args, **kwargs):
        
        op = self.forward(y_pred, y_true, *args, **kwargs)
        self.grads = self.calc_grads(y_pred, y_true, *args, **kwargs)
        return op

    def forward(self, y_pred, y_true, *args, **kwargs):
        
        pass

    def calc_grads(self, y_pred, y_true, *args, **kwargs):
        
        pass

    def backward(self, *args, **kwargs):
        
        return self.grads['x']


###**Helper Functions for Loss**

In [None]:
# classification problem, so outputs can be represented as
# one-hot encoded vectors

def one_hot_encode(y, num_classes):
    
    encoding = np.zeros((len(y), num_classes), dtype = int)
    encoding[np.arange(len(y)), y] = 1
    return encoding

# softmax will be used for the output layer

def softmax(x):
    
    exp_x = np.exp(x - np.max(x, axis= -1, keepdims=True))
    return exp_x/ np.sum(exp_x, axis = -1, keepdims=True)

###**Loss Functions**

In [None]:
class MSE(daddyLoss):
    
    def __init__(self):
        
        super().__init__()

    def forward(self, y_pred, y_true):
        
        num_classes = y_pred.shape[-1]
        probabs = softmax(y_pred)
        y_true_encoding = np.eye(num_classes, dtype = int)[np.array(y_true).astype(int)]
        self.backprop_cache['y_true'] = y_true_encoding
        loss = np.mean(np.sum((probabs - y_true_encoding)**2, axis=1))
        self.backprop_cache['probabs'] = probabs
        return loss

    def calc_grads(self, y_pred, y_true):
        
        batch_size = y_pred.shape[0]
        grad = 2*(self.backprop_cache["probabs"] - self.backprop_cache["y_true"])
        grad = grad/batch_size
        return {'x': grad}

class LogLoss(daddyLoss):

    def __init__(self):
        
        super().__init__()

    def forward(self, y_pred, y_true):
        
        num_classes = y_pred.shape[-1]
        probabs = softmax(y_pred)
        y_true_encoding = np.eye(num_classes, dtype = int)[np.array(y_true).astype(int)]
        self.backprop_cache['y_true'] = y_true_encoding
        loss = np.mean(np.sum(- y_true_encoding * np.log(probabs), axis=1))
        self.backprop_cache['probabs'] = probabs
        return loss

    def calc_grads(self, y_pred, y_true):
        
        batch_size = y_pred.shape[0]
        grad = self.backprop_cache["probabs"] - self.backprop_cache["y_true"]
        grad = grad/batch_size
        return {'x':grad}

##**Template Class for Optimizers**

In [None]:
class daddyOptimizer():
    
    def __init__(self, *args, **kwargs):
        
        self.history = {}
        pass

    def update_weights(self, layer, *args, **kwargs):
        
        update = self.calc_update(layer)
        for k, v in layer.weights.items():
            layer.weights[k] = layer.weights[k] + update[k]

    def calc_update(self, layer, *args, **kwargs):
        #needs to be overloaded based on the optimizer
        pass

###Helper Functions

In [None]:
def zero_array(x):
    return np.zeros(x.shape, dtype=x.dtype)

###**Optimizers**
SGD update rule: https://iitm-pod.slides.com/arunprakash_ai/cs6910-lecture-5#/0/56/7

Momentum Update Rule: https://iitm-pod.slides.com/arunprakash_ai/cs6910-lecture-5#/0/40/7

NAG Update Rule: https://iitm-pod.slides.com/arunprakash_ai/cs6910-lecture-5#/0/50/7

Implementation issue with NAG Update rule, needs to be fixed

RMSProp Update Rule: https://iitm-pod.slides.com/arunprakash_ai/cs6910-lecture-52#/0/18/5

Adam Update Rule: https://iitm-pod.slides.com/arunprakash_ai/cs6910-lecture-52#/0/40/10

NAdam Update Rule: https://iitm-pod.slides.com/arunprakash_ai/cs6910-lecture-52#/0/68

Add a comment to remark about bias correction in Adam/NAdam

Summary of optimizers: https://blog.paperspace.com/intro-to-optimization-momentum-rmsprop-adam/

In [None]:
class SGD(daddyOptimizer):

    def __init__(self, learning_rate = 0.01):

        super().__init__()
        self.learning_rate = learning_rate

    def calc_update(self, layer):

        update = {}
        for k,v in layer.weights.items():
            update[k] = -self.learning_rate*layer.del_theta[k]
        return update

class Momentum(daddyOptimizer):

    # update rule:
    # u_t = beta*u_[t-1] + eta*del(w_t)
    # w_[t+1] = w_t - u_t

    def __init__(self, learning_rate = 0.001, beta = 0.9):

        super().__init__()
        self.learning_rate = learning_rate
        self.beta = beta

    def calc_update(self, layer):

        update = {}

        if self.history == {}:
            for k, v in layer.weights.items():
                self.history[k] = {}
                self.history[k]['u'] = zero_array(v)
        
        for k, v in layer.weights.items():
            self.history[k]['u'] = self.beta*self.history[k]['u'] + self.learning_rate*layer.del_theta[k]
            update[k] = -self.history[k]['u']

        return update

class NAG(daddyOptimizer):

    # update rule:
    # u_t = beta*u_[t-1] + eta*del(w_t - beta*u_[t-1])

    def __init__(self, learning_rate = 0.001, beta = 0.9):

        super().__init__()
        self.learning_rate = learning_rate
        self.beta = beta

    def calc_update(self, layer):

        update = {}

        if self.history == {}:
            for k, v in layer.weights.items():
                self.history[k] = {}
                self.history[k]['u'] = zero_array(v)
                self.history[k]['prev_w'] = v.copy()

        for k,v in layer.weights.items():
            prev_w = self.history[k]['prev_w']
            w = v - self.beta*self.history[k]['u']
            layer.weights[k] = w

            layer(*layer.backprop_cache['x'])
            
            self.history[k]['u'] = self.beta*self.history[k]['u'] + self.learning_rate*layer.del_theta[k]

            update[k] = -self.history[k]['u']
            layer.weights[k] = prev_w

        return update

class RMSProp(daddyOptimizer):

    def __init__(self, learning_rate = 0.001, beta = 0.9, epsilon = 1e-7):

        super().__init__()
        self.learning_rate = learning_rate
        self.beta = beta
        self.epsilon = epsilon

    def calc_update(self, layer):

        update = {}
        if self.history == {}:
            for k, v in layer.weights.items():
                self.history[k] = {}
                self.history[k]['u'] = zero_array(v)

        for k,v in layer.weights.items():
            self.history[k]['u'] = self.beta*self.history[k]['u'] + (1 - self.beta)*(layer.del_theta[k]**2)
            sqrt_term = np.sqrt(self.history[k]['u'] + self.epsilon)
            update[k] = -(self.learning_rate*layer.weights[k]/sqrt_term)

        return update

class Adam(daddyOptimizer):

    def __init__(self, learning_rate = 0.001, epsilon = 1e-7, beta1 = 0.9, beta2 = 0.999):

        super().__init__()
        self.learning_rate = learning_rate
        self.epsilon = epsilon
        self.beta1 = beta1
        self.beta2 = beta2
        self.steps = 1

    def calc_update(self, layer):

        update = {}

        if self.history == {}:
            for k, v in layer.weights.items():
                self.history[k] = {}
                self.history[k]['u'] = zero_array(v)
                self.history[k]['m'] = zero_array(v)

        for k,v in layer.weights.items():
            self.history[k]['m'] = self.beta1*self.history[k]['m'] + (1 - self.beta1)*layer.del_theta[k]
            self.history[k]['u'] = self.beta2*self.history[k]['u'] + (1 - self.beta2)*(layer.del_theta[k]**2)

            corrected_avg = self.history[k]['m']/(1-(self.beta1)**self.steps)
            corrected_squared_avg = self.history[k]['m']/(1-(self.beta2)**self.steps)

            sqrt_term = np.sqrt(corrected_squared_avg) + self.epsilon
            update[k] = -(self.learning_rate*corrected_avg/sqrt_term)

        self.steps+=1
        return update

class NAdam(daddyOptimizer):

    def __init__(self, learning_rate = 0.001, epsilon = 1e-7, beta1 = 0.9, beta2 = 0.999):

        super().__init__()
        self.learning_rate = learning_rate
        self.epsilon = epsilon
        self.beta1 = beta1
        self.beta2 = beta2
        self.steps = 1

    def calc_update(self, layer):

        update = {}

        if self.history == {}:
            for k, v in layer.weights.items():
                self.history[k] = {}
                self.history[k]['u'] = zero_array(v)
                self.history[k]['m'] = zero_array(v)

        for k,v in layer.weights.items():
            self.history[k]['m'] = self.beta1*self.history[k]['m'] + (1 - self.beta1)*layer.del_theta[k]
            self.history[k]['u'] = self.beta2*self.history[k]['u'] + (1 - self.beta2)*(layer.del_theta[k]**2)

            corrected_avg = self.history[k]['m']/(1-(self.beta1)**self.steps)
            corrected_squared_avg = self.history[k]['m']/(1-(self.beta2)**self.steps)

            sqrt_term = np.sqrt(corrected_squared_avg) + self.epsilon
            delta_coeff = (1-self.beta1)/(1-self.beta1**self.steps)

            update[k] = -((self.learning_rate/sqrt_term)*(self.beta1*corrected_avg + delta_coeff*layer.del_theta[k]))

        self.steps+=1
        return update

##**Template Class for Layers**

In [None]:
class daddyLayer():

    def __init__(self, *args, **kwargs):
        
        self.grads = {}
        self.weights = {}
        self.backprop_cache = {}
        self.optimizer = None

    def __call__(self, *args, **kwargs):

        op = self.forward(*args, **kwargs)
        self.grads = self.calc_grads(*args, **kwargs)
        return op

    def init_weights(self, *args, **kwargs):
        pass

    def forward(self, *args, **kwargs):
        pass

    def calc_grads(self, *args, **kwargs):
        pass

    def backward(self, *args, **kwargs):
        pass

    def update_weights(self, *args, **kwargs):
        
        self.optimizer.update_weights(self)

###**Layers of a Feedforward neural network**
reference used for xavier and kaiming initialization: https://towardsdatascience.com/weight-initialization-in-neural-networks-a-journey-from-the-basics-to-kaiming-954fb9b47c79

In [None]:
class FNNLayer(daddyLayer):
    
    def __init__(self, in_dim, out_dim, weight_decay = None, init_method = 'random'):

        super().__init__()
        self.in_dim = in_dim
        self.out_dim = out_dim
        self.weight_decay = weight_decay
        self.init_method = init_method
        self.init_weights()

    def init_weights(self):
        
        if self.init_method == 'random':
            self.weights['w'] = np.random.randn(self.in_dim, self.out_dim)*np.sqrt(1/self.in_dim)
            self.weights['b'] = np.random.randn(1, self.out_dim)*np.sqrt(1/self.in_dim)

        elif self.init_method == 'xavier':
            max = np.sqrt(6 / (self.in_dim + self.out_dim))
            min = -max
            self.weights['w'] = np.random.uniform(low = min, high = max, size = (self.in_dim, self.out_dim))
            self.weights['b'] = np.random.uniform(low = min, high = max, size = (1, self.out_dim))

        elif self.init_method == "kaiming":
            self.weights["w"] = np.random.randn(self.in_dim, self.out_dim) * np.sqrt(2 / self.in_dim)
            self.weights["b"] = np.random.randn(1, self.out_dim) * np.sqrt(2 / self.in_dim)

    def forward(self, x):

        self.backprop_cache['x'] = x
        op = np.einsum('ij,jk->ik', x, self.weights["w"]) + self.weights["b"]
        return op

    def calc_grads(self, x):

        dels = {}
        dels['w'] = np.einsum('ij -> ji', self.backprop_cache['x'])
        dels['x'] = np.einsum('ij -> ji', self.weights['w'])
        return dels

    def backward(self, y_hat):

        x_hat = np.einsum('ij,kj->ki', y_hat, self.grads["x"])
        w_hat = np.einsum('ij,ik->kj', self.grads["w"], y_hat)
        b_hat = np.sum(y_hat, axis=0, keepdims=True)
        if self.weight_decay:
            w_hat = w_hat + 2 * self.weight_decay * self.weights["w"]
            b_hat = b_hat + 2 * self.weight_decay * self.weights["b"]
        self.del_theta = {'w': w_hat, 'b': b_hat}
        return x_hat

    def update_weights(self):

        self.optimizer.update_weights(self)