In [1]:
import numpy as np
# import tensorflow
from keras.datasets.mnist import load_data
from abc import ABC, abstractmethod

## Abstract Model

In [2]:
class Model(ABC):
    
    @abstractmethod
    def fit(self):
        '''
        Fit model on train set
        '''
        pass

    @abstractmethod
    def evaluate(self):
        '''
        Evaluate model on validation or test set
        '''
        pass

## Feedforward NN 

In [28]:
class Network(Model):
    def __init__(self, model_dim: list, batch_size) -> None: 
        '''
        Args: input: input np ndarray of shape (1,N) 
            model_dim: NN dim [748,30,60,30,10]
        '''
        self.model_dim = model_dim
        # w and b matrices excludes input layer:
        self.weights = [np.random.randn(i,j) for i,j in zip(model_dim[:-1], model_dim[1:])] # normally distributed
        self.biases = [np.random.randn(1,i) for i in model_dim[1:]] 
        self.lr_0 = 0.00001 # initial LR
        self.batch_size = batch_size        
        self.epochs = 1 
        

    def forwardpass(self, x, layer, activation_func, output_unit, mode = 'train') -> tuple[np.ndarray,np.ndarray]:
        '''
        calculates the activations of one layer
        Args: x: Activations from prev layer or input train/eval/test data 
        layer: layer number for which we are performing the pass
            w: weights of current layer, indexing of 1st layer(input) starts from 0
            b: baises of current layer, indexing of 1st layer(input) starts from 0
        y = x'Tw + b
        '''
        w = self.weights[layer-1] # idx correction. 0 idx is weights b/w 1st and 2nd layer.
        b = self.biases[layer-1]
        assert type(x) == np.ndarray, f"needed nd.array for activations, received : {type(x)}"

        if mode == 'train':

            if layer == len(self.model_dim)-1: # if last layer, use output units instead

                activation_func = output_unit
                z = np.dot(x,w) + b # and not W,x?

            return (z,activation_func(z)) #  .matmul or .dot ? 

        else: return None

    def costfunc(self, y, cost_func) -> None:
        '''
        (NOT USED)
        Calculates values of objective function during evaluation only
        MSE, RMSE, Cross Entropy etc.
        Args: y : validation data labels
            cost_func: objective function like MSE, RMSE, cross entropy
        '''

        self.costs = cost_func(self.activations[-1],y)
        return None


    def backwardpass(self, x: np.ndarray, y, optimzation_func, cost_func_deriv) -> None:
        '''
        Update params using gradient descent algo of all layers
        GD, SGD, mini-batch SGD, AdaGrad, RMSProp, Adam etc.
        Args: x : training data
            y: training labels
            backprop_func: function for 
        '''
        (self.weights, self.biases) = optimzation_func(self,self.lr_0, x,y, self.weights, self.biases,
                                                     self.batch_size, cost_func_deriv)
        return None


    def backprop(self, zs, activations, y, cost_func_deriv, activation_func_deriv, output_unit_deriv) -> tuple[np.ndarray, np.ndarray]:
        '''
         Calculate gradients of pre-activations (x.W+b) for each layer
         Args: zs : pre-activation outputs
            activations : post-activation outputs
            y : training labels
        '''

        w_gradient = [np.zeros_like(w_i) for w_i in self.weights] # i represents ith layer
        b_gradient = [np.zeros_like(b_i) for b_i in self.biases]        

        # graident at cost function
        g = cost_func_deriv(activations[-1],y)

        # graident at output unit
        g = g * output_unit_deriv(zs[-1])
        w_gradient[-1] = np.dot(g, activations[-2].T)
        b_gradient[-1] = g
        g = np.dot(self.weights[-1].T, g)

        # gradients at hidden units
        for layer in range(len(self.model_dim)-1, 0, -1):
            print(layer)
            g = g * activation_func_deriv(zs[layer])
            w_gradient[layer] = np.dot(g, activations[layer-1].T) # BUG:out of bound
            b_gradient[layer] = g
            g = np.dot(self.weights[layer].T, g)

        return (w_gradient,b_gradient)

    def evaluate(self) -> None:
        '''
        Evaluate model on validation or test set
        '''
        # batch_size = y
        # for layer in range(1,self.model_dim):
        #     self.forwardpass(layer, ReLu)

        # y = self.forwardpass(self.model_dim[-1], sigmoid)
        # loss = self.costfunc(y)
        # print(f"Evaluation loss: {loss}")

        return None

    def fit(self, trainset: tuple[np.ndarray,np.ndarray]) -> None:

        train_x, train_y = trainset
        # val_x, val_y = trainset[1]
    
        if train_x.shape[1] != self.model_dim[0] or train_y.shape[1] != self.model_dim[-1]:
            print(f"training dataset not in correct dimension. \
                  Needed {self.model[0],self.model[-1]} \
                    received {train_x.shape[1], train_y.shape[1]}")

        for epoch in range(self.epochs): #TODO: send to self.fit

            self.backwardpass(train_x, train_y, minibatchSGD, MSE_derivative)
            print('Epoch {epoch}/{epochs} done')

        pass


## Helper functions

In [29]:
def sigmoid(x: np.ndarray) -> np.ndarray:
    '''
    Sigmoid function to a layer of NN
    '''
    return 1.0 / (1.0 + np.exp(-x))

def sigmoid_derivative(x: np.ndarray) -> np.ndarray:
    '''
    Sigmoid function derivative
    '''
    return sigmoid(x) * (1.0 - sigmoid(x))

def ReLu(x: np.ndarray) -> np.ndarray:
    '''
    Relu function to a layer of NN
    '''
    return np.maximum(0,x)

def ReLu_derivative(x: np.ndarray) -> np.ndarray:
    '''
    Relu function derivative 
    '''
    if x > 0: 
        return 1

    else: return 0

def MSE(x: np.ndarray, y: np.ndarray) -> np.ndarray:
    '''
    (NOT USED)
    Cost function - Mean Squared Error
    Args: x: activations from output units
        y: data labels 
    '''
    n = x.shape[-1]
    cost = np.sum(np.absolute(y-x)**2)/(2.0*n)
    return cost

def MSE_derivative(y: np.ndarray, y_true: np.ndarray) -> np.ndarray:
    return y - y_true

def BGD(self, epsilon_fixed, x: np.ndarray, y: np.ndarray, w,b, cost_func_deriv) -> tuple[np.ndarray,np.ndarray]:
    '''
    Batched gradient decent algorithm
    '''
    batch_size = x.shape[0]

    trainset = list(zip(x,y))
    np.random.shuffle(trainset) #shuffle in-place
    # x, y = zip(*trainset)

    activations = [np.zeros((1,i)) for i in self.model_dim] # including input and output of the network
    zs = [np.zeros((1,i)) for i in self.model_dim] # including input and output of the network
    w_gradient = [np.zeros_like(w_i) for w_i in self.weights] # i represents ith layer
    b_gradient = [np.zeros_like(b_i) for b_i in self.biases]        

    # for each sample
    for x, y in trainset:
        activations[0] = x
        zs[0] = x

        # forward pass
        for layer in range(1,len(self.model_dim)): # 0 idx is input
                zs[layer], activations[layer] = self.forwardpass( activations[layer-1], layer, ReLu, sigmoid)

        # backprop
        delta_w_gradient, delta_b_gradient = self.backprop(zs,activations,y, cost_func_deriv, ReLu_derivative, sigmoid_derivative)

        # accumulate gradients before applying
        w_gradient = [wg_i + d_wg_i for wg_i, d_wg_i in zip(w_gradient,delta_w_gradient)]
        b_gradient = [bg_i + d_bg_i for bg_i, d_bg_i in zip(b_gradient,delta_b_gradient)]

    # apply gradients
    w = [w_i - (epsilon_fixed/batch_size) * w_gradient_i for w_i, w_gradient_i in zip(w, w_gradient)]
    b = [b_i - (epsilon_fixed/batch_size) * b_gradient_i for b_i, b_gradient_i in zip(b, b_gradient)]
    
    return (w, b)

setattr(Network, 'BGD', BGD)


def minibatchSGD(self, epsilon_fixed, x: np.ndarray, y: np.ndarray, 
                 w, b, batch_size, cost_func_deriv ) -> tuple[list[np.ndarray], list[np.ndarray]]:


    trainset = list(zip(x,y))
    np.random.shuffle(trainset) #shuffle in-place
    
    mini_batches = [ trainset[i:i+batch_size] for i in range(0, len(trainset), batch_size)]
    
    for mini_batch in mini_batches:

        mini_batch_x, mini_batch_y = zip(*mini_batch)
        mini_batch_x = np.stack(mini_batch_x, axis = 0)
        mini_batch_y = np.stack(mini_batch_y, axis = 0)
        
        (w, b) = BGD(self, epsilon_fixed, mini_batch_x, mini_batch_y, w, b, cost_func_deriv)

    return (w, b)
# setattr(Network, 'minibatchSGD', minibatchSGD)



    

## Data Loader

In [5]:
def load_mnist() -> tuple[tuple,tuple,tuple]:
    '''
    Args: None
    Return: tuple(train_x,train_y), tuple(val_x,val_y), tuple(test_x,test_y) 

    train data -> 60k
    test data -> 10k
    image -> 28x28 = 728
    train_x -> (50k,728) , train_y -> (50k,10)
    val_x -> (10k,728), val_y -> (10k,10)
    test_x -> (10k,728), test_y -> (10k,1)

    '''
    (trainX, trainy), (testX, testy) = load_data() #TODO: shuffle trainX

    # train_data = [np.reshape(img,(784,1)) for img in trainX]  # 60k x (784,1), if imaginging NN arch horizontal
    trainX = np.reshape(trainX,(-1,784)) # 60k x (1,784) = 60k x 784, if imagining NN arch vertical
    testX = np.reshape(testX,(-1,784)) # 10k x (1,784) = 10k x 784

    train_data, train_label = trainX[:50000], trainy[:50000]
    val_data, val_label = trainX[50000:], trainy[50000:]

    train_label = np.array([vectorize_digit(label) for label in train_label])
    val_label = np.array([vectorize_digit(label) for label in val_label])
    
    print(train_data.shape,train_label.shape)
    print(val_data.shape,val_label.shape)
    print(testX.shape,testy.shape)
    #TODO: Normalize data
    return ((train_data,train_label),(val_data, val_label), (testX,testy) )


def vectorize_digit(digit: np.uint) -> np.ndarray:
    '''
    Args: digit like 0-9
    Return: vectorized form. 7 becomes [0,0,0,0,0,0,0,1,0,0]
    '''
    d = np.zeros(10)
    d[digit] = 1.0

    return d

## Main

In [30]:
if __name__ == '__main__':

    (trainset,valset,testset) = load_mnist()
    model = Network(model_dim=[trainset[0].shape[-1],30,trainset[1].shape[-1]],
                    batch_size=10)
    model.fit(trainset)
    
    
    

(50000, 784) (50000, 10)
(10000, 784) (10000, 10)
(10000, 784) (10000,)
1 2
2 2
here
(10, 10) [[  -779.97008356  12195.90326027   2199.07734558   4208.75031492
   18054.38515053  -8539.06514356   -775.87945746  -9112.79074964
    1416.26004049 -15300.435469  ]
 [  3854.56028535  21420.60004598  -1324.94630852   8194.42872179
   19313.55495826  -9348.3893125   -4065.06877677 -18427.07686543
   -3737.66822006 -10253.26960944]
 [ -1104.96401489  19564.09389513 -16624.71040869   6921.05548406
   13669.31664024 -10356.53610554  -3368.45451496 -18560.8224169
   -7039.14691342 -21014.36168775]
 [  3136.34701772  20481.47308119   3999.03195      2783.56181963
   10818.41814131  -9972.97640726  -5153.75160521  -1323.08661542
   -5388.64915501 -12408.94074437]
 [  5980.93373185   6078.18283476  -1518.4319762    3392.49534647
     722.85558129  -3554.38130273   2198.88375533  -6531.9126437
    2584.22766078  -9779.96736223]
 [  3466.7971194   21457.51752357   6625.28121018   -146.22665761
   1416

  print(1.0 / (1.0 + np.exp(-x)))
  return 1.0 / (1.0 + np.exp(-x))
