<a href="https://colab.research.google.com/github/vasid99/cs6910-dl/blob/main/Assignment01/Assignment01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install wandb

In [None]:
# imports
import numpy as np
import pandas as pd
from keras.datasets import fashion_mnist

In [82]:
# function constants
ACTIVATION_DUMMY     = -1
ACTIVATION_SIGMOID   = 0
ACTIVATION_SOFTMAX   = 1
ACTIVATION_THRESHOLD = 2
ACTIVATION_RELU      = 3

LOSS_SQERROR      = 0
LOSS_CROSSENTROPY = 1

# math helper class
class neuralNetworkMathFunctions:
  """
  Helper class to handle mathematical operations of neural network passes, specifically activations and loss calculation
  """
  def __init__(self,hyperparams):
    """
    Create a math helper class for the neural network. Hyperparameter object can be the same as given to neural network
    """
    assert len(hyperparams["layerSizes"])-1==len(hyperparams["activations"]), "number of layers and number of activations don't match"
    self.activations = hyperparams["activations"]
    self.activations.insert(0,ACTIVATION_DUMMY)
    self.lossFn = hyperparams["lossFn"]
  
  def activation(self,layerNum,x):
    """
    Compute and return activation values for a given layer and its sum values
    """
    if self.activations[layerNum]==ACTIVATION_SIGMOID:
      return 1/(1+np.exp(-x))
    elif self.activations[layerNum]==ACTIVATION_SOFTMAX:
      z = np.exp(x)
      return z/np.sum(z)
    elif self.activations[layerNum]==ACTIVATION_THRESHOLD:
      return (x>=0)+0
    elif self.activations[layerNum]==ACTIVATION_RELU:
      return np.max(x,0)
  
  def activationDerivative(self,layerNum,**kwargs):
    """
    Compute and return activation derivative values for a given layer and its sum or output values depending on the given argument
    """
    assert ( len(kwargs.keys())==1 and np.any([_ in kwargs.keys() for _ in ["x","y"]]) ), "activationDerivative argument malformed. \
    Use activationDerivative(x=x_val) or activationDerivative(y=y_val)"
    
    if "y" in kwargs.keys():
      y = kwargs["y"]
      if self.activations[layerNum]==ACTIVATION_SIGMOID:
        return y*(1-y)
      elif self.activations[layerNum]==ACTIVATION_SOFTMAX:
        return y*(1-y)
      elif self.activations[layerNum]==ACTIVATION_THRESHOLD:
        return y*(1-y)
      elif self.activations[layerNum]==ACTIVATION_RELU:
        return (y>=0)+0
    else:
      x = kwargs["x"]
      if self.activations[layerNum]==ACTIVATION_SIGMOID:
        return np.exp(-x)/(1+np.exp(-x))**2
      elif self.activations[layerNum]==ACTIVATION_SOFTMAX:
        z = np.exp(x)
        s = np.sum(z)
        return z*(s-z)/(s**2)
      elif self.activations[layerNum]==ACTIVATION_THRESHOLD:
        return np.exp(-x)/(1+np.exp(-x))**2
      elif self.activations[layerNum]==ACTIVATION_RELU:
        return (x>=0)+0
  
  def lossOutputDerivative(self,outputData,targetData):
    """
    Compute and return loss derivatives for given output and target data
    """
    if self.lossFn==LOSS_SQERROR:
      return outputData-targetData
    elif self.lossFn==LOSS_CROSSENTROPY:
      return targetData * np.log2(outputData)

class neuralNetwork:
  """
  Class for a neural network made up of multiple layers of perceptrons
  """
  def __init__(self,hyperparams):
    # assign basic hyperparameters
    self.layerSizes = hyperparams["layerSizes"]
    self.batchSize = hyperparams["batchSize"]
    self.learningRate = hyperparams["learningRate"]
    self.epochs = hyperparams["epochs"]

    self.numLayers = len(self.layerSizes) - 1 # first layer is stand-in for inputs (can think of its weight matrix as identity)
    
    # create NN functions
    self.fns = neuralNetworkMathFunctions(hyperparams)

    # initialize the weight and bias matrices of the NN
    self.initModel(hyperparams)

    ## add states: forward_done, grad_calced, backward_done, train_done
  
  def initModel(self,hyperparams,**kwargs):
    # argchecks
    bounds = (0,1)
    if "weightBounds" in hyperparams.keys():
      assert len(hyperparams["weightBounds"])==2, "bounds arg has to be a list/tuple of 2 numbers"
      bounds = hyperparams["weightBounds"]

    # create list of weight matrices and bias vectors. The goal is to make the indexing same as that in class, hence the dummy values
    self.wmat = [np.array([1],ndmin=2)]
    self.bias = [np.array([1],ndmin=2)]
    
    # create random initial parameters and append them to the above initialized lists
    for i in range(1,self.numLayers+1):
      self.wmat.append((bounds[1]-bounds[0])*np.random.rand(self.layerSizes[i],self.layerSizes[i-1])+bounds[0])
      self.bias.append((bounds[1]-bounds[0])*np.random.rand(self.layerSizes[i],1)+bounds[0])
  
  def forwardPass(self, inputData):
    h = inputData
    hData   = [h]
    
    for i in range(1,self.numLayers+1):
      a      = self.wmat[i] @ h + self.bias[i]            # a[i] = w[i] @ h[i-1] + b[i]
      h      = self.fns.activation(i,a)                   # h[i] = g(a[i]) ## resolve layer-wise activation hyperparam
      hData.append(h)
    
    return hData
  
  def backwardPass(self, layerwiseOutputData, targetData):
    lossData    = self.fns.lossOutputDerivative(layerwiseOutputData[-1], targetData)
    Delta       = lossData
    datasetSize = np.shape(targetData)[1]
    biasInputs  = np.array(np.ones(datasetSize),ndmin=2).T
    gradW       = []
    gradB       = []

    for iFwd in range(self.numLayers):
      i            = self.numLayers - iFwd
      stocBiasCorr = self.fns.activationDerivative(i,y=layerwiseOutputData[i]) * Delta
      gW           = stocBiasCorr @ layerwiseOutputData[i-1].T
      gB           = stocBiasCorr @ biasInputs
      Delta        = self.wmat[i].T @ stocBiasCorr
      
      gradW.append(gW)
      gradB.append(gB)
    
    gradW.append(np.array([0],ndmin=2))
    gradW.reverse()
    gradB.append(np.array([0],ndmin=2))
    gradB.reverse()
    
    return (gradW,gradB)
  
  def infer(self,inputData,**kwargs):
    inputData  = np.array(inputData,ndmin=2)
    if "colwiseData" in kwargs and kwargs["colwiseData"]==True:
      pass
    else:
      inputData  = inputData.T
    return self.forwardPass(inputData)[-1]

  def train(self, inputData, targetData, **kwargs):
    inputData  = np.array(inputData,ndmin=2)
    targetData = np.array(targetData,ndmin=2)
    if "colwiseData" in kwargs and kwargs["colwiseData"]==True:
      pass
    else:
      inputData  = inputData.T
      targetData = targetData.T
    assert np.shape(inputData)[1]==np.shape(targetData)[1], "input and target datasets have different dataset sizes"
    assert np.shape(inputData)[0]==self.layerSizes[0], "size of input datapoint differs from size of input vector given as hyperparameter"
    assert np.shape(targetData)[0]==self.layerSizes[-1], "size of target datapoint differs from size of target vector given as hyperparameter"
    datasetSize = np.shape(targetData)[1]

    batchSize = datasetSize if self.batchSize==-1 else self.batchSize
    numBatches = int(np.ceil(datasetSize / batchSize))

    for epoch in range(self.epochs): ## epoch kwarg?
      for batchIndex in range(numBatches):
        startIndex  = batchSize * batchIndex
        endIndex    = min(startIndex + batchSize, datasetSize)
        inputBatch  = inputData[:,startIndex:endIndex]
        targetBatch = targetData[:,startIndex:endIndex]
        
        layerwiseOutputData = self.forwardPass(inputBatch)
        (gradW, gradB)      = self.backwardPass(layerwiseOutputData,targetBatch)
        
        for i in range(1,self.numLayers+1):
          self.wmat[i] += -self.learningRate * gradW[i]
          self.bias[i] += -self.learningRate * gradB[i]


hp = {
    "layerSizes": [2,4,1],
    "batchSize": 1,
    "learningRate": 1,
    "epochs": 500,
    "activations": [ACTIVATION_SIGMOID, ACTIVATION_SIGMOID],
    "lossFn": LOSS_SQERROR,
    "weightBounds": (-0.1,0.1)
}

x = neuralNetwork(hp)
inp = np.array([[1,0.5],[-0.5,0.25],[1,2]])
tar = np.array([[0.5],[0.75],[0.67]])
print("Target data:")
print(tar.T)
print("Output before training:")
print(x.infer(inp))
x.train(inp,tar)
print("Output after training for %d epochs with learning rate of %.2f:"%(hp["epochs"],hp["learningRate"]))
print(x.infer(inp))

Target data:
[[0.5  0.75 0.67]]
Output before training:
[[0.48886327 0.48852887 0.48888447]]
Output after training for 500 epochs with learning rate of 1.00:
[[0.52017802 0.74300063 0.66098754]]
