In [19]:
import numpy as np
import sys
import matplotlib.pyplot as plt

In [20]:
class Neuron:
    #initilize neuron with activation type, number of inputs, learning rate, and possibly with set weights
    def __init__(self,acfunc, input_num, lr, weights=None):
        # print('constructor')
        self.acfunc = acfunc
        if weights is None:
            self.weights = np.random.random([input_num + 1]) # including bias
        else:
            self.weights = weights
        self.lr = lr

    #This method returns the activation of the net
    def activate(self,net):
        # print('activate')
        if self.acfunc == 0: # linear
            return net
        else: # logistic
            return 1/(1+np.exp(-net))

    #Calculate the output of the neuron should save the input and output for back-propagation.   
    def calculate(self,Input):
        self.ins = np.append(Input, 1.0)
        net = np.sum(self.ins*self.weights, axis=1)
        self.outs = round(self.activate(net), 3)
        return self.outs

    #This method returns the derivative of the activation function with respect to the net   
    def activationderivative(self):
        # print('activationderivative')
        if self.acfunc == 0: # activation function is linear
            return 1
        else: # activation function is sigmoid
            return self.outs * (1-self.outs)
    
    #This method calculates the partial derivative for each weight and returns the delta*w to be used in the previous layer
    def calcpartialderivative(self, wtimesdelta):
        # print('calcpartialderivative')
        self.deltapartw = wtimesdelta.reshape(-1,1) * self.activationderivative() * self.ins
        wd = wtimesdelta.reshape(-1,1) * self.activationderivative() * self.weights[:-1] # wtimesdelta of current cell
        return wd
    
    #Simply update the weights using the partial derivatives and the leranring weight
    def updateweight(self):
        # print('updateweight')
        self.weights = self.weights - self.lr * self.deltapartw
        return 0

    def updateweight_conv(self, NEWweights):
        # used in convolution layer, to synchronize weights
        self.weights = NEWweights
        return 0

In [21]:
class FullyConnected:
    #initialize with the number of neurons in the layer, their activation,the input size, the leraning rate and a 2d matrix of weights (or else initilize randomly)
    def __init__(self,numOfNeurons, acfunc, input_num, lr, weights=None):
        # print('constructor')
        #self.d = input_num
        self.layer = []
        if weights is None:
            for i in range(numOfNeurons):
                self.layer.append(Neuron(acfunc, input_num, lr))
        else:
            for i in range(numOfNeurons):
                self.layer.append(Neuron(acfunc, input_num, lr, weights[i]))
        
    #calcualte the output of all the neurons in the layer and return a vector with those values (go through the neurons and call the calcualte() method)      
    def calculate(self, Input):
        # print('calculate')
        output = []
        #Input = np.append(Input, 1)
        for cell in self.layer:
            output.append(cell.calculate(Input))

        return output
            
    #given the next layer's w*delta, should run through the neurons calling calcpartialderivative() for each (with the correct value), sum up its ownw*delta, and then update the wieghts (using the updateweight() method). I should return the sum of w*delta.          
    def calcwdeltas(self, wtimesdelta):
        #wtimesdeltaNEW = np.zeros(self.d)
        wtimesdeltaNEW = 0
        for cell, wd in zip(self.layer, wtimesdelta):
            wtimesdeltaNEW += cell.calcpartialderivative(wd)
            #print(cell.calcpartialderivative(wd))
            self.weights = self.weights - self.lr * self.deltapartw
            cell.updateweight()

        return wtimesdeltaNEW

In [27]:
class convolutionalLayer:
    def __init__(self, KernelShape:int, KernelNum:int, acfunc:int, InputShape:int, lr:float, weights=None):
        # the KernelShape represents edge lenght of square
        self.acfunc = acfunc
        self.InputShape = InputShape
        self.OutputShape = InputShape-KernelShape+1 # equales cell shape stride is always 1
        if weights is None:
            self.weights = np.random.random([InputShape + 1, KernelNum])
            # kernel is flattened in row order ("C"), with bias at last position
            # only one layer of cell, while weights have multiple layer
        else:
            self.weights = np.hstack((weights, np.ones([KernelNum,1]))) # add 1s to the end
        self.layer = []
        for i in range(self.OutputShape*self.OutputShape):
            # number of neurons equals number of outputs
            self.layer.append([Neuron(acfunc, InputShape, lr, self.weights)])

    def activationderivative(self):
        # print('activationderivative')
        if self.acfunc == 0: # activation function is linear
            return np.ones_like(self.conv_output)
        else: # activation function is sigmoid
            return self.conv_output * (np.ones_like(self.conv_output)-self.conv_output)

    def calculate(self, Input):
        self.conv_input = []
        for layer1 in np.lib.stride_tricks.sliding_window_view(Input, window_shape=(2,2)):
            for layer2 in layer1:
                self.conv_input.append(layer2.flatten()) # input is flatten to 1d
        self.conv_input = np.array(self.conv_input)
        self.conv_output = []
        for index, cell in enumerate(self.layer):
            self.conv_output.append(cell.calculate(self.conv_input[index]))
        self.conv_output = np.array(self.conv_output)
        outs = self.conv_output.reshape(self.OutputShape, self.OutputShape) # change back to 2d
        return outs

    def calcwdeltas(self, wtimesdelta):
        # update weights
        wtimesdeltaNEW = []
        deltapartw = []
        for wd, cell in zip(wtimesdelta.reshape(-1, self.InputShape).T, self.layer):
            wtimesdeltaNEW.append(cell.calcpartialderivative(wd))
            deltapartw.append(cell.deltapartw)
        self.weights = self.weights - self.lr * deltapartw
        for cell in self.layer:
            cell.updateweight_conv(self.weights)
        # calculate w*delta of former layer
        wtimesdeltaNEW = wtimesdelta * self.activationderivative() * self.weights[:,:-1]
        return wtimesdeltaNEW

In [29]:
class MaxPoolingLayer:
    def __init__(self, KernelShape:int, InputShape:int):
        #self.InputShape = InputShape
        self.OutputShape = InputShape/KernelShape # stride = kernel size

    def calculate(self, Input):
        self.conv_input = []
        for layer1 in np.lib.stride_tricks.sliding_window_view(Input, window_shape=(self.OutputShape,self.OutputShape))[0::self.OutputShape,0::self.OutputShape]: # use index to perform stride
            for layer2 in layer1:
                self.conv_input.append(layer2.flatten()) # input is flatten to 1d
        self.conv_input = np.array(self.conv_input)
        self.conv_output = []
        self.activationderivative = [] # record the position of max (which has activation derivative of 1, other positions are 0), used for back-propagation
        for ins in self.conv_input:
            # get the max index
            idx = np.zeros_like(ins)
            idx[ins.argmax()] = 1
            self.activationderivative.append(idx)
            # get the max and append to output
            self.conv_output.append(np.amax(ins))
        self.activationderivative = np.array(self.activationderivative)
        self.conv_output = np.array(self.conv_output)
        outs = self.conv_output.reshape(self.OutputShape, self.OutputShape) # change back to 2d
        return outs

    def calcwdeltas(self, wtimesdelta):
        wtimesdeltaNEW = wtimesdelta * self.activationderivative
        return wtimesdeltaNEW

In [28]:
class FlattenLayer:
    def __init__(self, InputShape:int):
        # flatten the input row wise
        self.InputShape = InputShape

    def calculate(self, Input):
        return Input.flatten()

    def calcwdeltas(self, wtimesdelta):
        return wtimesdelta.reshape(self.InputShape, self.InputShape)

In [23]:
class NeuralNetwork:
    #initialize with the number of layers, number of neurons in each layer (vector), input size, activation (for each layer), the loss function, the learning rate and a 3d matrix of weights weights (or else initialize randomly)
    def __init__(self, InputSize:int, lossfunc, lr):
        # Input is the edge length of input
        self.InputSize = InputSize # the initial input size, change after every layer
        self.lossfunc = lossfunc
        self.lr = lr
    
    def addLayer()
    #Given an input, calculate the output (using the layers calculate() method)
    def calculate(self,Input):
        # print('constructor')
        for layer in self.network:
            output = layer.calculate(Input)
            #print("layer output", output)
            Input = output

        return output
        
    #Given a predicted output and ground truth output simply return the loss (depending on the loss function)
    def calculateloss(self,yp,y):
        # print('calculate')
        if self.loss == 0:# sum of squared errors
            return 0.5 * np.sum(np.square(yp-y))
        else: # binary cross entropy
            return np.sum(-(y.dot(np.log(yp)) + (np.full_like(y, 1)-y).dot(np.log(np.full_like(yp, 1)-yp))))
        
    
    #Given a predicted output and ground truth output simply return the derivative of the loss (depending on the loss function)        
    def lossderiv(self,yp,y):
        if self.loss == 0: # sum of squared errors
            #return np.abs(y-yp)
            return yp - y
        else: # binary cross entropy
            #return -y/yp + (np.full_like(y,1)-y)/(np.full_like(yp,1)-yp)
            return (yp - y) / (yp - yp * yp)
    
    #Given a single input and desired output preform one step of backpropagation (including a forward pass, getting the derivative of the loss, and then calling calcwdeltas for layers with the right values         
    def train(self,x,y):
        yp = self.calculate(x) # predicted result
        E = self.calculateloss(yp, y) # loss
        dE = self.lossderiv(yp, y) # delta loss
        for layer in reversed(self.network):
            dE = layer.calcwdeltas(dE)
        return yp, E

In [24]:
def DoExample(w,x,y):
    # numOfLayers,numOfNeurons, inputSize, activation, loss, lr, weights
    model = NeuralNetwork(2, [2,2], 2, [1,1], 0, 0.5, w)
    ypredict, E = model.train(x,y)
    for layer in model.network:
        for cell in layer.layer:
            print(cell.weights)

    print(ypredict, E)
    return 0

In [25]:
# old output
# [0.14955069 0.19910139 0.34101389]
# [0.24949899 0.29899797 0.33997971]
# [0.3589151  0.40863797 0.53071687]
# [0.48871011 0.53863395 0.5809614 ]

In [26]:
# old output
w = np.array([[[.15,.2,.35],[.25,.3,.35]],[[.4,.45,.6],[.5,.55,.6]]])
x = np.array([0.05,0.1])
y = np.array([0.01,0.99])
DoExample(w,x,y)

[0.0554265  0.06235482]
[-0.0190386  -0.02094246]
[0.00131734 0.00175645]
[0.00249086 0.00298903]
[0.14978044 0.19956089 0.34560887]
[0.24975091 0.29950183 0.34501828]
[0.3589151  0.40863797 0.53071687]
[0.51128989 0.56136605 0.6190386 ]
[0.751, 0.773] 0.298085


0