In [22]:
import numpy as np
from keras.datasets import mnist
import random
import pdb

In [23]:
class Layer_Dense:

    # Layer initialization
    def __init__(self, n_inputs, n_neurons):
        # Initialize weights and biases
        self.weights = 0.01 * np.random.randn(n_inputs, n_neurons)
        self.biases = np.zeros((1, n_neurons))

        L2 = 1e-2
        self.weights_L2 = L2
        self.biases_L2  = L2

    # Forward pass
    def forward(self, inputs):
        # Remember input values
        self.inputs = inputs
        # Calculate output values from inputs, weights and biases
        self.output = np.dot(inputs, self.weights) + self.biases

    # Backward pass
    def backward(self, dvalues):
        # Gradients on parameters
        self.dweights = np.dot(self.inputs.T, dvalues)
        self.dbiases = np.sum(dvalues, axis=0, keepdims=True)
        # Gradient on values
        self.dinputs = np.dot(dvalues, self.weights.T)

        self.dbiases  = self.dbiases  + 2* self.biases_L2 *self.biases
        self.dweights = self.dweights + 2* self.weights_L2 *self.weights

In [36]:
class ConvLayer:
    def __init__(self, kShape, kNumber):
        self.xKShape = kShape[0]
        self.yKShape = kShape[1]
        self.kNumber = kNumber

        self.weights = np.random.rand(self.xKShape, self.yKShape, kNumber)
        self.biases = np.random.rand(1, kNumber)

        L2 = 1e-2
        self.weights_L2 = L2
        self.biases_L2  = L2

    def forward(self, imageMatrix, padding = 0, stride = 1):
        [xImageShape, yImageShape, channelSize, batchSize] = imageMatrix.shape

        xOutput = int((xImageShape - self.xKShape + 2 * padding ) / stride + 1)
        yOutput = int((yImageShape - self.yKShape + 2 * padding ) / stride + 1)

        output = np.zeros((xOutput, yOutput, channelSize, self.kNumber, batchSize))

        imagePadded = np.zeros((xImageShape + 2 * padding, yImageShape + 2*padding, channelSize, self.kNumber, batchSize))

        for k in range(self.kNumber):
            imagePadded[int(padding): int(padding + xImageShape), int(padding): int(padding + yImageShape), :,k,:] = imageMatrix


        if channelSize == 6 & self.kNumber == 16:
            filt = np.array([[1,0,0,0,1,1,1,0,0,1,1,1,1,0,1,1],
                             [1,1,0,0,0,1,1,1,0,0,1,1,1,1,0,1],
                             [1,1,1,0,0,0,1,1,1,0,0,1,0,1,1,1],
                             [0,1,1,1,0,0,1,1,1,1,0,0,1,0,1,1],
                             [0,0,1,1,1,0,0,1,1,1,1,0,1,1,0,1],
                             [0,0,0,1,1,1,0,0,1,1,1,1,0,1,1,1]])

        else:
            filt = np.ones([channelSize,self.kNumber])

        self.filt = filt

        for i in range(batchSize):
            for k in range(self.kNumber):
                for c in range(channelSize):
                    if filt[c, k] == 1:
                        for x in range(xOutput):
                            for y in range(yOutput):
                                yStart = y * stride
                                yEnd = yStart + self.yKShape
                                xStart = x * stride
                                xEnd = xStart + self.xKShape

                                currentSlice = imagePadded[xStart:xEnd, yStart:yEnd, c, k, i]
                                imgSlice_k_mul = np.multiply(currentSlice, self.weights[:, :, k])
                                output[x, y, c, k, i] = np.sum(imgSlice_k_mul) + self.biases[0, k].astype(float)

        self.output =  output.sum(2)
        self.input = imageMatrix
        self.paddedInput = imagePadded
        self.padding = padding
        self.stride = stride


    def backward(self, dvalues):
        stride = self.stride
        padding = self.padding
        xk = self.xKShape
        yk = self.yKShape
        nk = self.kNumber
        weights = self.weights
        paddedImage = self.paddedInput

        paddedImageShape = paddedImage.shape
        dinputs = np.zeros((paddedImageShape[0], paddedImageShape[1], paddedImageShape[2], paddedImageShape[4]))

        dbiases = np.zeros(self.biases.shape)
        dweights = np.zeros(self.weights.shape)

        xd = dvalues.shape[0]
        yd = dvalues.shape[1]
        numChan = paddedImageShape[2]
        batchSize = paddedImageShape[4]

        imagePadded = paddedImage[:,:,:,0,:]
        filt = self.filt

        for i in range(batchSize):
            for c in range(numChan):
                for k in range(nk):
                    if filt[c, k] == 1:
                      for y in range(yd):
                          for x in range(xd):
                              yStart = y * stride
                              yEnd = yStart + yk
                              xStart = x * stride
                              xEnd = xStart + xk

                              sx = slice(xStart, xEnd)
                              sy = slice(yStart, yEnd)

                              currentSlice = imagePadded[sx, sy, c, i]
                              dweights[:,:,k] += currentSlice * dvalues[x,y,k,i]
                              dinputs[sx, sy, c, i] += dweights[:,:,k] * dvalues[x,y,k,i]

                    dbiases[0,k] = np.sum(np.sum(weights[:,:,k], axis = 0), axis = 0)

        dinputs = dinputs[padding: paddedImageShape[0]-padding, padding: paddedImageShape[1]-padding, :, :]
        self.dinputs = dinputs

        self.dbiases  = dbiases  + 2 * self.biases_L2 * self.biases
        self.dweights = dweights + 2 * self.weights_L2 * self.weights

In [25]:
class FlattenLayer:
    def forward(self, imageMatrix):
        [xImageSize, yImageSize, channelSize, batchSize] = imageMatrix.shape
        flatLayerSize = xImageSize * yImageSize * channelSize

        flat_data = np.zeros((batchSize, flatLayerSize))

        for i in range(batchSize):
            flat_data[i, :] = imageMatrix[:,:,:,i].reshape(1, flatLayerSize)

        self.output = flat_data
        self.input = imageMatrix


    def backward(self, dvalues):
        [xImage, yImage, channelSize, batchSize] = self.input.shape

        dinputs = np.zeros(self.input.shape)

        for i in range(batchSize):
            dinputs[:,:,:,i] = dvalues[i,:].reshape(xImage, yImage, channelSize)

        self.dinputs = dinputs

In [26]:
class Average_Pool:

    def forward(self, M, stride = 1, KernShape = 2):

        xImgShape  = M.shape[0]
        yImgShape  = M.shape[1]
        numChans   = M.shape[2]
        numImds    = M.shape[3]

        self.inputs = M

        xK = KernShape
        yK = KernShape

        xOutput = int(((xImgShape - xK) / stride) + 1)
        yOutput = int(((yImgShape - yK) / stride) + 1)

        imagePadded = M

        output = np.zeros((xOutput,yOutput,numChans,numImds))

        for i in range(numImds):# loop over number of images
            currentIm_pad = imagePadded[:,:,:,i] #select ith padded image
            for y in range(yOutput):# loop over vert axis of output
                for x in range(xOutput):# loop over hor axis of output
                    for c in range(numChans):# loop over channels (= #filters)

                    # finding corners of the current "slice"
                        y_start = y*stride
                        y_end   = y*stride + yK
                        x_start = x*stride
                        x_end   = x*stride + xK

                        sx      = slice(x_start,x_end)
                        sy      = slice(y_start,y_end)

                        current_slice = currentIm_pad[sx,sy,c]

                        #actual average pool
                        slice_mean         = float(current_slice.mean())
                        output[x, y, c, i] = slice_mean


        #storing info, also for backpropagation
        self.xKernShape = xK
        self.yKernShape = yK
        self.output     = output
        self.impad      = imagePadded
        self.stride     = stride

    def backward(self, dvalues):

        xd = dvalues.shape[0]
        yd = dvalues.shape[1]

        numChans = dvalues.shape[2]
        numImds  = dvalues.shape[3]

        imagePadded = self.impad
        dinputs     = np.zeros(imagePadded.shape)
        Ones        = np.ones(imagePadded.shape)#for backprop

        stride  = self.stride
        xK      = self.xKernShape
        yK      = self.yKernShape

        Ones    = Ones/xK/yK # normalization that came from average pool

        for i in range(numImds):# loop over number of images
            for y in range(yd):# loop over vert axis of output
                for x in range(xd):# loop over hor axis of output
                    for c in range(numChans):# loop over channels (= #filters)

                        # finding corners of the current "slice"
                        y_start = y*stride
                        y_end   = y*stride + yK
                        x_start = x*stride
                        x_end   = x*stride + xK

                        sx      = slice(x_start,x_end)
                        sy      = slice(y_start,y_end)

                        dinputs[sx,sy,c,i]  += Ones[sx,sy,c,i]*dvalues[x,y,c,i]


        self.dinputs = dinputs

In [27]:
class Tanh:
    def forward(self, input):
        tanh = np.tanh(input)
        self.output = tanh
        self.input= input

    def backward(self, dvalues):
        deriv = 1 - self.output**2
        deriv = np.nan_to_num(deriv)
        self.dinputs = np.multiply(deriv, dvalues)

In [28]:
class Activation_Softmax:

    # Forward pass
    def forward(self, inputs):
        # Remember input values
        self.inputs = inputs

        # Get unnormalized probabilities
        exp_values = np.exp(inputs - np.max(inputs, axis=1,
                                            keepdims=True))
        # Normalize them for each sample
        probabilities = exp_values / np.sum(exp_values, axis=1,
                                            keepdims=True)

        self.output = probabilities

    # Backward pass
    def backward(self, dvalues):

        # Create uninitialized array
        self.dinputs = np.empty_like(dvalues)

        # Enumerate outputs and gradients
        for index, (single_output, single_dvalues) in \
                enumerate(zip(self.output, dvalues)):
            # Flatten output array
            single_output = single_output.reshape(-1, 1)
            # Calculate Jacobian matrix of the output
            jacobian_matrix = np.diagflat(single_output) - \
                              np.dot(single_output, single_output.T)

            # Calculate sample-wise gradient
            # and add it to the array of sample gradients
            self.dinputs[index] = np.dot(jacobian_matrix,
                                         single_dvalues)

In [29]:
class Loss:
    def calculate(self, output, y):
        sample_losses = self.forward(output, y)
        data_loss = np.mean(sample_losses)
        return (data_loss)

In [30]:
class SoftmaxLossGrad():

    def __init__(self):
        self.activation = Activation_Softmax()
        self.loss       = CategoricalCrossEntropy()

    def forward(self, inputs, y_true):
        self.activation.forward(inputs)
        self.output = self.activation.output
        #the probabilities
        #calculates and returns mean loss
        return(self.loss.calculate(self.output, y_true))

    def backward(self, dvalues, y_true):
        Nsamples = len(dvalues)
        if len(y_true.shape) == 2:
            y_true = np.argmax(y_true, axis = 1)
        self.dinputs = dvalues.copy()
        #calculating normalized gradient
        self.dinputs[range(Nsamples), y_true] -= 1
        self.dinputs = self.dinputs/Nsamples

In [31]:
class CategoricalCrossEntropy(Loss):
     def forward(self, y_pred, y_true):
         samples = len(y_pred)
         #removing vals close to zero and one bco log and accuracy
         y_pred_clipped = np.clip(y_pred, 1e-7, 1 - 1e-7)

         #now, depending on how classes are coded, we need to get the probs
         if len(y_true.shape) == 1:#classes are encoded as [[1],[2],[2],[4]]
             correct_confidences = y_pred_clipped[range(samples), y_true]
         elif len(y_true.shape) == 2:#classes are encoded as
                                    #[[1,0,0], [0,1,0], [0,1,0]]
             correct_confidences = np.sum(y_pred_clipped*y_true, axis = 1)
         #now: calculating actual losses
         negative_log_likelihoods = -np.log(correct_confidences)
         return negative_log_likelihoods

     def backward(self, dvalues, y_true):
         Nsamples = len(dvalues)
         Nlabels  = len(dvalues[0])
         #turning labels into one-hot i. e. [[1,0,0], [0,1,0], [0,1,0]], if
         #they are not
         if len(y_true.shape) == 1:
            #"eye" turns it into a diag matrix, then indexing via the label
            #itself
            y_true = np.eye(Nlabels)[y_true]
         #normalized gradient
         self.dinputs = -y_true/dvalues/Nsamples

In [32]:
class Optimizer_SGD:
    #initializing with a default learning rate of 0.1
    def __init__(self, learning_rate = 1, decay = 0, momentum = 0):
        self.learning_rate        = learning_rate
        self.current_learning_rate = learning_rate
        self.decay                 = decay
        self.iterations            = 0
        self.momentum              = momentum

    def pre_update_params(self):
        if self.decay:
            self.current_learning_rate = self.learning_rate * \
                (1/ (1 + self.decay*self.iterations))

    def update_params(self, layer):

        #if we use momentum
        if self.momentum:

            #check if layer has attribute "momentum"
            if not hasattr(layer, 'weight_momentums'):
                layer.weight_momentums = np.zeros_like(layer.weights)
                layer.bias_momentums   = np.zeros_like(layer.biases)

            #now the momentum parts
            weight_updates = self.momentum * layer.weight_momentums - \
                self.current_learning_rate * layer.dweights
            layer.weight_momentums = weight_updates

            bias_updates = self.momentum * layer.bias_momentums - \
                self.current_learning_rate * layer.dbiases
            layer.bias_momentums = bias_updates

        else:

            weight_updates = -self.current_learning_rate * layer.dweights
            bias_updates   = -self.current_learning_rate * layer.dbiases

        layer.weights += weight_updates
        layer.biases  += bias_updates

    def post_update_params(self):
        self.iterations += 1

In [33]:
class LeNet:
    def __init__(self, dataset):
        self.extract_train_data(dataset)

        self.n_neuron = 84
        self.n_input = 480
        self.conv1 = ConvLayer([5,5], 6)
        self.conv2 = ConvLayer([5,5], 16)
        self.conv3 = ConvLayer([5,5], 120)
        self.ap1 = Average_Pool()
        self.ap2 = Average_Pool()
        self.activation_tanh = [Tanh() for i in range(4)]
        self.flat_layer = FlattenLayer()
        self.dense1 = Layer_Dense(self.n_input, self.n_neuron)
        self.dense_final = Layer_Dense(self.n_neuron, self.n_category)
        self.loss_activation = SoftmaxLossGrad()
        self.optimizer = Optimizer_SGD()


    def extract_train_data(self, dataset):
        (trainX, trainY), (testX, testY) = dataset
        self.trainX = trainX
        self.trainY = trainY
        self.testX = testX
        self.testY = testY
        self.n_category = len(np.unique(self.trainY))
        self.n_total = range(len(trainY))



    def run_training(self,
                     minibatch_size=64,
                     iteration=1,
                     epochs=1,
                     learning_rate=0.1,
                     decay=0.001,
                     momentum=0.5,
                     saved_weights="NO"):
        n_total = self.n_total

        for e in range(epochs):
            idx = random.sample(n_total, minibatch_size)
            M = self.trainX[:,:,:, idx]
            C = self.trainY[idx]
            ie      = iteration * epochs
            Monitor = np.zeros((ie,3))
            ct      = 0

            for it in range(iteration):
                self.conv1.forward(M,0,1)
                self.activation_tanh[0].forward(self.conv1.output)
                self.ap1.forward(self.activation_tanh[0].output, 2, 2)

                self.conv2.forward(self.ap1.output, 0,1)
                self.activation_tanh[1].forward(self.conv2.output)
                self.ap2.forward(self.activation_tanh[1].output, 2, 2)

                self.conv3.forward(self.ap2.output, 2, 3)
                self.activation_tanh[2].forward(self.conv3.output)

                self.flat_layer.forward(self.activation_tanh[2].output)
                self.dense1.forward(self.flat_layer.output)
                self.activation_tanh[3].forward(self.dense1.output)
                self.dense_final.forward(self.activation_tanh[3].output)

                loss = self.loss_activation.forward(self.dense_final.output, C)

                self.predictions = np.argmax(self.loss_activation.output, axis=1)
                if len(C.shape) == 2:
                    C = np.argmax(C, axis=1)
                self.accuracy = np.mean(self.predictions == C)


                ##backward
                self.loss_activation.backward(self.loss_activation.output, C)
                self.dense_final.backward(self.loss_activation.dinputs)
                self.activation_tanh[3].backward(self.dense_final.dinputs)
                self.dense1.backward(self.activation_tanh[3].dinputs)
                self.flat_layer.backward(self.dense1.dinputs)
                self.activation_tanh[2].backward(self.flat_layer.dinputs)
                self.conv3.backward(self.activation_tanh[2].dinputs)
                self.ap2.backward(self.conv3.dinputs)
                self.activation_tanh[1].backward(self.ap2.dinputs)
                self.conv2.backward(self.activation_tanh[1].dinputs)
                self.ap1.backward(self.conv2.dinputs)
                self.activation_tanh[0].backward(self.ap1.dinputs)
                self.conv1.backward(self.activation_tanh[0].dinputs)

                self.optimizer.pre_update_params()

                self.optimizer.update_params(self.dense1)
                self.optimizer.update_params(self.dense_final)

                self.optimizer.update_params(self.conv1)
                self.optimizer.update_params(self.conv2)
                self.optimizer.update_params(self.conv3)

                self.optimizer.post_update_params()

                Monitor[ct,0] = self.accuracy
                Monitor[ct,1] = loss
                Monitor[ct,2] = self.optimizer.current_learning_rate

                ct += 1

                print(f'epoch: {e}, ' +
                      f'iteration : {it} ' +
                      f'accuracy: {self.accuracy: .3f}, ' +
                      f'loss: {loss: .3f}, ' +
                      f'current learning rate: {self.optimizer.current_learning_rate: .5f}')

In [34]:
class DatasetLoader:
    def load_mnist_data(self):
        (trainX, trainY), (testX, testY) = mnist.load_data()

        #convert shape (sampleSize x Height x Width) to (Height x Width x sampleSize)
        trainX = trainX.transpose(1,2,0)
        testX = testX.transpose(1,2,0)

        #convert single channel image into 3 channel image for both test and train data
        shape_train = trainX.shape
        shape_test = testX.shape

        trainX_3d = np.zeros((shape_train[0], shape_train[1], 3, shape_train[2]))
        textX_3d = np.zeros((shape_test[0], shape_test[1], 3, shape_test[2]))

        for imageChannelIndex in range(2):
            trainX_3d[:,:,imageChannelIndex,:] = trainX
            textX_3d[:,:,imageChannelIndex,:] = testX

        self.mnist_data = (trainX_3d, trainY), (textX_3d, testY)

In [37]:
dl = DatasetLoader()
dl.load_mnist_data()
(trainX, trainY), (testX, testY) = dl.mnist_data

lenet = LeNet(dl.mnist_data)
lenet.run_training()

epoch: 0, iteration : 0 accuracy:  0.078, loss:  2.302, current learning rate:  1.00000
