<a href="https://colab.research.google.com/github/ram-anand/ram-anand.github.io/blob/main/Multiple_Layer_Neural_Network_Implementation(Forward_and_Back_Propagation_Gradient_Descent).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Custom Neural Network

    Author: Kumar Ramanand
    Last revision: 26-12-2020
    Github: https://github.com/ram-anand

### Implemented a neural network 
  -  supports multiple layers
  -  custom loss functions (MSE, MLE and MAE)
  -  batch gradient descent
  -  RELU, SIGMOID and TANH activations
  -  simple structure for forward and back propagation
  -  training for multiple epochs
  -  faster processing , uses vectorized matrix calculations


## **How to Use**

      # Create model
          model = CustomNeuralNetwork()
      # Adding layers
          model.model_add_layer((5,4), [(2, "tanh"),(2, "relu"),(2, "relu"),(1, "sigmoid")])
      # Training model
          model.model_training(inputs, labels, learning_rate=0.1, losstype = "mle", epoch=2)

      # Prediction using a sample data
          model.model_feed_forward(example)


In [None]:
import numpy as np

In [None]:
class CustomNeuralNetwork:
    def __init__(self):
        self.n = 0 # number of samples
        self.m = 0 # number of variables
        self.epoch = 1 # number of epochs
        self.learning_rate = 0.001 # learning rate for gradient descent
        self.X = None # X data in input examples
        self.Y = None # Y labels in input examples
        self.YP = 0.0 # predicted labels for input examples
        self.layer_count = 0 # total number of layers starting from 0 to last/output layer
        self.layer_variables = [] # variable/neurons count list, stores number of neurons in each layer
        self.layer_activation = [] # activation list, stores activation used in each layer
        self.layer_weights = [] # weight matrix, dimesion = (number of neurons/output in prior layer, number of neurons in current layer)
        self.layer_bias = [] # bias matrix, dimesion = (number of neurons in the layer, 1)
        self.layer_output = {} # dictionary of output matrix at each layer, {0:output matrix 0, 1:output matrix 1,...}, dimension of output matrix = (number of neurons in the layer, 1) 
        self.layer_derivative = {} # dictionary of derivative(of loss function w.r.t. layer input) matrix at each layer, {0:derivative matrix 0, 1:derivative matrix 1,...}, dimension of derivative matrix = (number of neurons in the layer, 1) 
        self.training_loss = 0.0 # total training loss in each epoch
        self.loss_method = None # method to calculate training loss, mean squared error(mse), mean absolute error(mae), maximum likelyhood estimate (mle)
        self.convergence_limit = 0.00001 # convergence error limit when training should stop

#     def model_input_labels(self, data, labels):
#         self.n = np.array(data).shape[0] # number of input examples
#         self.m = np.array(labels).shape[1] # number of variables
#         self.X = data
#         self.Y = labels
        
    def model_add_layer(self, input_shape, list_layerdepth_layertype_tuple=[(1,"relu")]):
        self.n = input_shape[0]
        self.m = input_shape[1]
        for layerdepth, layertype in list_layerdepth_layertype_tuple:
            if self.layer_count == 0:
                self.layer_weights.append(np.random.rand(self.m, layerdepth))
            else:
                self.layer_weights.append(np.random.rand(self.layer_variables[self.layer_count-1], layerdepth))
            self.layer_bias.append(np.random.rand(1, layerdepth))
            self.layer_variables.append(layerdepth)
            self.layer_activation.append(layertype)
            self.layer_count += 1
        print("total layer count: ", self.layer_count)
        print("layers: ", list_layerdepth_layertype_tuple)
           
    def relu(self, a):
        b = np.maximum(a, 0.0)
        #a[a < 0.0] = 0.0
        # data, derivative of relu output w.r.t. to input
        c = np.greater(a, 0.0).astype(int)
        return b, c  

    def sigmoid(self, a):
        b = 1/np.exp(-a)
        # data, derivative of sigmoid output w.r.t. to input
        c = b * (1-b)
        return b, c # data, derivative 

    def tanh(self, a):
        b = (np.exp(a) - np.exp(-a))/(np.exp(a) + np.exp(-a))
        # data, derivative of tanh output w.r.t. to input
        c = (1-b**2)
        return b, c # data, derivative         

    def model_feed_forward(self, data):
        # calculate output for each layer: from 0th (first) layer to last layer
#        print("input data: \n", data)
        for layer in range(self.layer_count):
#            print("-------------------------------------------------")
#            print("layer: ", layer+1, " of ", self.layer_count)
#             print("layer variables: ", self.layer_variables[layer])
#             print("layer weight: \n", self.layer_weights[layer])
#             print("layer bias: \n", self.layer_bias[layer])
            self.X = data
            if layer == 0:
                z = (np.dot(self.X, self.layer_weights[layer]) + self.layer_bias[layer])#.reshape(self.n, self.layer_variables[layer])
#                print("layer sum: \n", z)
            else:
                z = (np.dot(self.layer_output[layer-1], self.layer_weights[layer]) + self.layer_bias[layer])#.reshape(self.n, self.layer_variables[layer])
#                print("layer sum: \n", z)
            if self.layer_activation[layer] == "relu":
                self.layer_output[layer] = self.relu(z)[0]
                self.layer_derivative[layer] = self.relu(z)[1]
#                print("layer output: \n", self.relu(z)[0])
#                print("layer output: \n", self.layer_output[layer])
            elif self.layer_activation[layer] == "sigmoid":
                self.layer_output[layer] = self.sigmoid(z)[0]
                self.layer_derivative[layer] = self.sigmoid(z)[1]
#                print("layer output: \n", self.sigmoid(z)[0])
#                print("layer output: \n", self.layer_output[layer])
            elif self.layer_activation[layer] == "tanh":
                self.layer_output[layer] = self.tanh(z)[0]
                self.layer_derivative[layer] = self.tanh(z)[1]
#                print("layer output: \n", self.tanh(z)[0])
#                print("layer output: \n", self.layer_output[layer])
            else:
                print("activation type : ", self.layer_activation[layer], "is undefined, use predefine activations - relu, sigmoid or tanh")
                return None
        return self.layer_output[layer]
        
    def model_training(self, data=None, labels=None, losstype = "mse", learning_rate = 0.01, convergence_limit = 0.001, epoch=1):

        if (data is None):
            print("please provide sample data in data=X, labels=Y form in model training")
        else:
            self.X = data
            if (self.n != np.array(data).shape[0] or self.m != np.array(data).shape[1]):
                print("reconfigure layer required as data dimension ",(self.n, self.m) ," did not match with layer input dimension ", (np.array(data).shape[0], np.array(data).shape[1]))

#             self.n = np.array(data).shape[0] # number of input examples
#             self.m = np.array(labels).shape[1] # number of variables
                      
        if (labels is None):
            print("please provide sample data in data=X, labels=Y form in model training")
        else:
            self.Y = labels

        self.loss_method = losstype
        self.learning_rate = learning_rate
        self.convergence_limit = convergence_limit
        self.epoch = epoch
        for i in range(self.epoch): 
            print("************************************************")
            print("training epoch: ", i)
            print("************************************************")
            
            print("input dim: ",(self.n, self.m), ", input data: \n", self.X)
            print("label dim: ",(self.n, 1), ", labels: \n", self.Y)

            # feed foreward output at last layer (output layer)
            self.YP = self.model_feed_forward(self.X)
            
            # calculating error 
            print(("==============================================="))
            if self.loss_method == "mle":
                self.training_loss = (1/self.n) * np.sum(-self.Y * np.log(self.YP) - (1-self.Y) * np.log(1+self.YP))
                self.training_derivative = ((-self.Y/self.YP) + (1-self.Y)/(1-self.YP)).reshape(self.n,1)
            elif self.loss_method == "mae":
                self.training_loss = (1/self.n) * np.sum(np.abs(self.Y - self.YP))
                self.training_derivative = 1
            elif self.loss_method == "mse":
                #self.training_loss = (0.5/self.n) * np.sum(np.dot((self.Y - self.YP).T, (self.Y - self.YP)))
                #self.training_loss = (0.5/self.n) * ((self.Y - self.YP)**2).mean(axis=0)
                self.training_loss = (0.5/self.n) * (np.square(self.Y - self.YP)).mean(axis=0)
                self.training_derivative = -(1/self.n) * np.sum(np.abs(self.Y - self.YP))
            else:
                print("loss type : ", self.loss_method, "is undefined, use predefine loss methods - mae, mle or mse")
                break
            
            print(self.loss_method, " - training loss: ", self.training_loss)
            if self.training_loss < self.convergence_limit:
              print("model training error converged, convergence limit: ",self.convergence_limit,", current training error: ",self.training_loss)
              break
            print("===============================================")
            # x - matrix multiplication, * - element wise matrix multiplication
            # update derivative for each layer: 
            # update sequence: from last or output layer to 0th (first or input) layer
            for layer in range(self.layer_count-1,-1,-1):
                if layer == self.layer_count-1:
                    # loss function derivative w.r.t. current(last or output) layer input  = loss function derivative w.r.t. current(last or output) layer output x current(last or output) layer output derivative w.r.t. its input
                    self.layer_derivative[layer] = self.training_derivative * self.layer_derivative[layer]
                    #self.layer_derivative[layer] = self.training_derivative * (self.layer_output[layer] * (1 - self.layer_output[layer])) # example for sigmoid output from current layer
                else:
                    # loss function derivative w.r.t. current layer(hidden or inner) input  = higher layer derivative (calculated previosuly) x higher layer weights in transposed form * current(hidden or inner) layer's output derivative w.r.t. its input
                    self.layer_derivative[layer] = np.dot(self.layer_derivative[layer+1], self.layer_weights[layer+1].T) * self.layer_derivative[layer]
                    #self.layer_derivative[layer] = np.dot(self.layer_derivative[layer+1], self.layer_weights[layer+1].T) * (self.layer_output[layer] * (1 - self.layer_output[layer])) # example for sigmoid output from current layer
            
            # x - matrix multiplication, * - element wise matrix multiplication
            # weight and bias update at each layer using loss function's derivative w.r.t. layer input 
            # update sequence: from last layer to 0th(first) layer
            for layer in range(self.layer_count-1,-1,-1):  
                if layer == 0:
                    print("old weight: \n", self.layer_weights[layer])
                    print("old bias: \n", self.layer_bias[layer])
                    # current layer weights = current layer weights - learning rate * (input for current layer(which is sample data) in transposed form x loss function derivative w.r.t. current layer input)
                    self.layer_weights[layer] = self.layer_weights[layer] - (self.learning_rate/self.n) * np.dot(self.X.T, self.layer_derivative[layer])
                    self.layer_bias[layer] = self.layer_bias[layer] - (self.learning_rate/self.n) * np.sum(self.layer_derivative[layer], keepdims=True)                    
                    print("new weight: \n", self.layer_weights[layer])
                    print("new bias: \n", self.layer_bias[layer])
                    print("layer: ", layer, " weight and bias updated")
                    print("===============================================")
                else:
                    print("old weight: \n", self.layer_weights[layer])
                    print("old bias: \n", self.layer_bias[layer])
                    # current layer weights = current layer weights - learning rate * (input for current layer(means output of layer prior to this layer) in transposed form  x loss function derivative w.r.t. current layer input)
                    self.layer_weights[layer] = self.layer_weights[layer] - (self.learning_rate/self.n) * np.dot(self.layer_output[layer-1].T, self.layer_derivative[layer])
                    self.layer_bias[layer] = self.layer_bias[layer] - (self.learning_rate/self.n) * np.sum(self.layer_derivative[layer], keepdims=True)
                    print("new weight: \n", self.layer_weights[layer])
                    print("new bias: \n", self.layer_bias[layer])
                    print("layer: ", layer, " weight and bias updated")
                    print("===============================================")

Train model using training data and labels

In [None]:
number_of_samples = 5
number_of_variables = 4
inputs = np.random.rand(number_of_samples, number_of_variables)
labels = np.random.randint(0,2, (number_of_samples, 1))
print("data columns: variable_1,  variable_2, variable_3, variable_4,   actual_label")
for row, label in zip(inputs, labels):
  print("input row: ", row, ", label: ", label)

data columns: variable_1,  variable_2, variable_3, variable_4,   actual_label
input row:  [0.20711923 0.80985653 0.59274475 0.32508462] , label:  [1]
input row:  [0.80128437 0.10399709 0.9993516  0.85643245] , label:  [1]
input row:  [0.05125211 0.55969057 0.67633784 0.07084215] , label:  [0]
input row:  [0.03169936 0.5811244  0.28407813 0.65874811] , label:  [1]
input row:  [0.64226163 0.23612786 0.44969191 0.22866992] , label:  [1]


In [None]:
model = CustomNeuralNetwork()
model.model_add_layer((5,4), [(2, "tanh"),(2, "relu"),(2, "relu"),(1, "sigmoid")])
model.model_training(inputs, labels, learning_rate=0.1, losstype = "mle", epoch=2)

total layer count:  4
layers:  [(2, 'tanh'), (2, 'relu'), (2, 'relu'), (1, 'sigmoid')]
************************************************
training epoch:  0
************************************************
input dim:  (5, 4) , input data: 
 [[0.20711923 0.80985653 0.59274475 0.32508462]
 [0.80128437 0.10399709 0.9993516  0.85643245]
 [0.05125211 0.55969057 0.67633784 0.07084215]
 [0.03169936 0.5811244  0.28407813 0.65874811]
 [0.64226163 0.23612786 0.44969191 0.22866992]]
label dim:  (5, 1) , labels: 
 [[1]
 [1]
 [0]
 [1]
 [1]]
mle  - training loss:  -3.195029247850842
model training error converged, convergence limit:  0.001 , current training error:  -3.195029247850842


Prediction using sample data

In [43]:
example = np.random.rand(4) * 0.01
print("input data: ", example)
output = (model.model_feed_forward(example) > 0).astype(int)
print("label: ", output)

input data:  [0.00894269 0.00235017 0.00356326 0.00893417]
label:  [[1]]
