In [72]:
# https://www.kaggle.com/code/hojjatk/read-mnist-dataset/notebook

#
# This is a sample Notebook to demonstrate how to read "MNIST Dataset"
#
import numpy as np # linear algebra
import struct
from array import array
from os.path  import join

#
# MNIST Data Loader Class
#
class MnistDataloader(object):
    def __init__(self, training_images_filepath,training_labels_filepath,
                 test_images_filepath, test_labels_filepath):
        self.training_images_filepath = training_images_filepath
        self.training_labels_filepath = training_labels_filepath
        self.test_images_filepath = test_images_filepath
        self.test_labels_filepath = test_labels_filepath
    
    def read_images_labels(self, images_filepath, labels_filepath):        
        labels = []
        with open(labels_filepath, 'rb') as file:
            magic, size = struct.unpack(">II", file.read(8))
            if magic != 2049:
                raise ValueError('Magic number mismatch, expected 2049, got {}'.format(magic))
            labels = array("B", file.read())        
        
        with open(images_filepath, 'rb') as file:
            magic, size, rows, cols = struct.unpack(">IIII", file.read(16))
            if magic != 2051:
                raise ValueError('Magic number mismatch, expected 2051, got {}'.format(magic))
            image_data = array("B", file.read())        
        images = []
        for i in range(size):
            images.append([0] * rows * cols)
        for i in range(size):
            img = np.array(image_data[i * rows * cols:(i + 1) * rows * cols])
            img = img.reshape(28, 28)
            images[i][:] = img            
        
        return images, labels
            
    def load_data(self):
        x_train, y_train = self.read_images_labels(self.training_images_filepath, self.training_labels_filepath)
        x_test, y_test = self.read_images_labels(self.test_images_filepath, self.test_labels_filepath)
        return (x_train, y_train),(x_test, y_test)    

In [73]:
# # https://www.kaggle.com/code/hojjatk/read-mnist-dataset/notebook

# #
# # Verify Reading Dataset via MnistDataloader class
# #
# %matplotlib inline
# import random
# import matplotlib.pyplot as plt
# import pickle

# #
# # Set file paths based on added MNIST Datasets
# #
# input_path = 'dataset/'
# training_images_filepath = join(input_path, 'train-images-idx3-ubyte/train-images-idx3-ubyte')
# training_labels_filepath = join(input_path, 'train-labels-idx1-ubyte/train-labels-idx1-ubyte')
# test_images_filepath = join(input_path, 't10k-images-idx3-ubyte/t10k-images-idx3-ubyte')
# test_labels_filepath = join(input_path, 't10k-labels-idx1-ubyte/t10k-labels-idx1-ubyte')

# #
# # Helper function to show a list of images with their relating titles
# #
# def show_images(images, title_texts):
#     cols = 5
#     rows = int(len(images)/cols) + 1
#     plt.figure(figsize=(30,20))
#     index = 1    
#     for x in zip(images, title_texts):        
#         image = x[0]        
#         title_text = x[1]
#         plt.subplot(rows, cols, index)        
#         plt.imshow(image, cmap=plt.cm.gray)
#         if (title_text != ''):
#             plt.title(title_text, fontsize = 15);        
#         index += 1

# #
# # Load MINST dataset
# #
# mnist_dataloader = MnistDataloader(training_images_filepath, training_labels_filepath, test_images_filepath, test_labels_filepath)
# (data_train, label_train), (data_test, label_test) = mnist_dataloader.load_data()

# with open("dataset/pickled/data_train.pickle", "wb") as outfile:
#     pickle.dump(data_train, outfile)
# with open("dataset/pickled/label_train.pickle", "wb") as outfile:
#     pickle.dump(label_train, outfile)
# with open("dataset/pickled/data_test.pickle", "wb") as outfile:
#     pickle.dump(data_test, outfile)
# with open("dataset/pickled/label_test.pickle", "wb") as outfile:
#     pickle.dump(label_test, outfile)

# first_10_data_train = []
# first_10_label_train = []
# for i in range(0, 10):
#     first_10_data_train.append(data_train[i])
#     first_10_label_train.append(label_train[i])

# with open("dataset/pickled/first_10_data_train.pickle", "wb") as outfile:
#     pickle.dump(first_10_data_train, outfile)
# with open("dataset/pickled/first_10_label_train.pickle", "wb") as outfile:
#     pickle.dump(first_10_label_train, outfile)

# #
# # Show some random training and test images 
# #
# images_2_show = []
# titles_2_show = []
# for i in range(0, 10):
#    images_2_show.append(first_10_data_train[i])
#    titles_2_show.append('training image [' + str(i) + '] = ' + str(first_10_label_train[i]))    

# #for i in range(0, 5):
# #   r = random.randint(1, 10000)
# #   images_2_show.append(data_test[r])        
# #   titles_2_show.append('test image [' + str(r) + '] = ' + str(label_test[r]))    

# show_images(images_2_show, titles_2_show)


In [74]:
import random
import numpy as np

class Neuron:
    def __init__(self, numberOfIncoming):
        self.activation = random.uniform(0, 1)
        self.bias = random.uniform(0, 1)
        self.incomingWeights = np.array([random.uniform(0, 1) for i in range(numberOfIncoming)])

    def cout(self):
        print("Activation: ", self.activation)
        print("Bias: ", self.bias)
        print("Incoming Weights: ", self.incomingWeights)


In [75]:
class Layer:
    def __init__(self, currentLayerLen):
        #self.neurons = np.array([Neuron(prevLayerLen) for i in range(currentLayerLen)])
        self.activations = np.array([random.uniform(0, 1) for i in range(currentLayerLen)])
        self.biases = np.array([random.uniform(0, 1) for i in range(currentLayerLen)])
        self.zVector = np.zeros(shape = (currentLayerLen, 1))
        self.errorVector = np.zeros(shape = (currentLayerLen, 1))
        self.size = currentLayerLen

    def cout(self):
        print("Activations: ")
        print(self.activations)
        print("Biases: ")
        print(self.biases)
        print("Z Vector: ")
        print(self.zVector)
        print("Error: ")
        print(self.errorVector)

    def coutBase(self):
        print("Activations: ")
        print(self.activations)
        print("Biases: ")
        print(self.biases)

In [76]:
class weightMatrix:
    def __init__(self, prevLayerLen, nextLayerLen):
        self.matrix = np.random.rand(nextLayerLen, prevLayerLen)

    def cout(self):
        print(self.matrix)

In [77]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoidDeriv(x):
    return np.exp(-x) / (1 + np.exp(-x))

class Network:

    def __init__(self, start, first, second, end):
        startLayer = Layer(start)
        firstLayer = Layer(first)
        secondLayer = Layer(second)
        endLayer = Layer(end)

        self.Layers = np.array([startLayer, firstLayer, secondLayer, endLayer])

        firstMatrix = weightMatrix(start, first)
        secondMatrix = weightMatrix(first, second)
        endMatrix = weightMatrix(second, end)

        # Indexed with the layer before the matrix
        self.Matrices = np.array([firstMatrix, secondMatrix, endMatrix])

    def calculateZVector(self, layerIdx):
        prevLayer = self.Layers[layerIdx - 1]
        currLayer = self.Layers[layerIdx]

        # weight matrix is Matrices[layerIdx - 1]
        weightMatrix = self.Matrices[layerIdx - 1].matrix

        # input activation is the activation of the previous layer
        prevActivations = prevLayer.activations
        
        # output biases is the bias of the current layer
        currBias = currLayer.biases

        currLayer.zVector = np.dot(weightMatrix, prevActivations)
        currLayer.zVector += currBias

    def forwardPropagationStep(self, layerIdx):
        currLayer = self.Layers[layerIdx]

        self.calculateZVector(layerIdx)

        for i in range(currLayer.size):
            currLayer.activations[i] = sigmoid(currLayer.zVector[i])

    # Assumes that data is between 0 and 255 value
    def setStartLayerActivations(self, dataset):
        if len(dataset) * len(dataset[0]) != self.Layers[0].size:
            print("There is a mismatch between the size of the input data and the start layer!")
            print("Size of dataset is: " + str(len(dataset) * len(dataset[0])))
            print("Size of first layer is: " + str(self.Layers[0].size))

        print("1) Set the activations of the first layer")

        layerIdx = 0
        for row in range(0, len(dataset)):
            for col in range(0, len(dataset[0])):
                self.Layers[0].neurons[layerIdx].activation = dataset[row][col] / 255
                layerIdx += 1

    def fullForwardPropagation(self, target):
        print("2) Feedforward: Compute all activations for all layers")

        self.forwardPropagationStep(1)
        self.forwardPropagationStep(2)
        self.forwardPropagationStep(3)
        print("Target is: " + str(target))
        print("Cost is: " + str(self.cost(target)))
    
    def cost(self, target):
        endLayer = self.Layers[self.Layers.size - 1]
        sum = 0

        for i in range(endLayer.size):
            if (i == target):
                sum += pow(endLayer.activations[i] - 1.0, 2)
            else:
                sum += pow(endLayer.activations[i], 2)
        return sum / (2 * endLayer.size)
    
    def fullBackwardPropagation(self, target):
        print("3) Output Error in last layer")
        self.calculateErrorInLastLayerForTarget(target)

        print("4) Backpropagate error: calculate error for all layers")
        self.calculateErrorFromNextLayerError(2)
        self.calculateErrorFromNextLayerError(1)
    
    # The cost function is hard coded
    def calculateErrorInLastLayerForTarget(self, target):
        # Target is defined in domain 1 - 9, indices start from 0
        target = target - 1
        CGradient = np.zeros(shape = (self.Layers[self.Layers.size - 1].size, 1))
        for i in range(0, self.Layers[self.Layers.size - 1].size):
            CGradient[i] = self.Layers[self.Layers.size - 1].neurons[i].activation
        CGradient[target] -= 1.0

        for i in range(self.Layers[self.Layers.size - 1].size):
            self.Layers[self.Layers.size - 1].zVector[i] = sigmoidDeriv(self.Layers[self.Layers.size - 1].zVector[i])

        # sigmoid' was applied to self.Layers[self.Layers.size - 1].zVector in place
        self.Layers[self.Layers.size - 1].errorVector = np.multiply(CGradient, self.Layers[self.Layers.size - 1].zVector)

    # Assumes error in next layer is up to date
    def calculateErrorFromNextLayerError(self, layerIdx):
        outLayer = self.Layers[layerIdx]
        inLayer = self.Layers[layerIdx - 1]
        weightMatrix = np.empty((outLayer.size, inLayer.size))
        # Trying to transpose weightMatrix as it is filled up
        for col in range (outLayer.size):
            for row in range (outLayer.neurons[0].incomingWeights.size):
                # Instead of weightMatrix[row][col] = outLayer.neurons[col].incomingWeights[row] (to Transpose in place):
                weightMatrix[col][row] = outLayer.neurons[col].incomingWeights[row]

        weightTimesError = np.dot(weightMatrix, self.Layers[layerIdx + 1].errorVector)

        for i in range(self.Layers[layerIdx].size):
            self.Layers[layerIdx].zVector[i] = sigmoidDeriv(self.Layers[layerIdx].zVector[i])

        # sigmoid' was applied to self.Layers[self.Layers.size - 1].zVector in place
        self.Layers[layerIdx].errorVector = np.multiply(weightTimesError, self.Layers[layerIdx].zVector)

    # All of this would be MUCH easier if W and B were stored as matrices and vectors
    def adjustBasedOnGradientDescentForCurrentExample(self, learningRate, numberInBatch):
        print("5) Gradient Descent")

        npm = learningRate / numberInBatch

        # Adjust biases
        layerIdx = 3
        for i in range(self.Layers[layerIdx].size):
            self.Layers[layerIdx].neurons[i].bias -= npm * self.Layers[layerIdx].errorVector[i]
        layerIdx = 2
        for i in range(self.Layers[layerIdx].size):
            self.Layers[layerIdx].neurons[i].bias -= npm * self.Layers[layerIdx].errorVector[i]
        layerIdx = 1
        for i in range(self.Layers[layerIdx].size):
            self.Layers[layerIdx].neurons[i].bias -= npm * self.Layers[layerIdx].errorVector[i]

        # Adjust weights
        layerIdx = 3
        for nIdx in range(self.Layers[layerIdx].size):
            for wIdx in range(self.Layers[layerIdx].neurons[nIdx].incomingWeights.size):
                self.Layers[layerIdx].neurons[nIdx].incomingWeights[wIdx] -= npm * self.Layers[layerIdx].errorVector[i] * self.Layers[layerIdx - 1].neurons[wIdx].activation
        layerIdx = 2
        for nIdx in range(self.Layers[layerIdx].size):
            for wIdx in range(self.Layers[layerIdx].neurons[nIdx].incomingWeights.size):
                self.Layers[layerIdx].neurons[nIdx].incomingWeights[wIdx] -= npm * self.Layers[layerIdx].errorVector[i] * self.Layers[layerIdx - 1].neurons[wIdx].activation
        layerIdx = 1
        for nIdx in range(self.Layers[layerIdx].size):
            for wIdx in range(self.Layers[layerIdx].neurons[nIdx].incomingWeights.size):
                self.Layers[layerIdx].neurons[nIdx].incomingWeights[wIdx] -= npm * self.Layers[layerIdx].errorVector[i] * self.Layers[layerIdx - 1].neurons[wIdx].activation
        
    def coutBase(self):
        for i in range(self.Matrices.size):
            self.Layers[i].coutBase()
            print()
            self.Matrices[i].cout()
            print()

        self.Layers[self.Layers.size - 1].coutBase()

    def cout(self):
        for i in range(self.Matrices.size):
            self.Layers[i].cout()
            print()
            self.Matrices[i].cout()
            print()

        self.Layers[self.Layers.size - 1].cout()

In [78]:
# MNIST Dataset: 28 x 28 = 784
import pickle

#net = Network(784, 16, 16, 10)
net = Network(4, 3, 3, 2)

#with open("dataset/pickled/first_10_data_train.pickle", "rb") as infile:
#    data_batch = pickle.load(infile)
#with open("dataset/pickled/first_10_label_train.pickle", "rb") as infile:
#    label_batch = pickle.load(infile)

#with open("network.pickle", "rb") as infile:
#   net = pickle.load(infile)

#for i in range(0, len(data_batch)):
#    net.prepForFullForwardPropagation(data_batch[i])
#    net.fullForwardPropagation(label_batch[i])
#    print()

net.coutBase()
net.fullForwardPropagation(1)
net.coutBase()
# net.fullBackwardPropagation(1)
# net.coutBase()
# net.adjustBasedOnGradientDescentForCurrentExample(1, 1)
# net.coutBase()

#with open("network.pickle", "wb") as outfile:
#    pickle.dump(net, outfile)

Activations: 
[0.35493049 0.98018456 0.03143758 0.86858582]
Biases: 
[0.49476099 0.3296349  0.48545865 0.74475343]

[[0.76225323 0.83105371 0.19991615 0.52278153]
 [0.16880031 0.66471294 0.95119166 0.26051279]
 [0.23823026 0.99794992 0.83641332 0.39272434]]

Activations: 
[0.11732166 0.33123903 0.20175531]
Biases: 
[0.51449052 0.36210166 0.76365407]

[[0.83187676 0.53959079 0.78864319]
 [0.00333915 0.26241949 0.5074961 ]
 [0.39318048 0.84755439 0.98915525]]

Activations: 
[0.87638117 0.53181895 0.78174936]
Biases: 
[0.1181369  0.9371652  0.57807627]

[[0.7003159  0.28493311 0.57479546]
 [0.17754372 0.85710859 0.52480597]]

Activations: 
[0.15970947 0.86038723]
Biases: 
[0.2562546 0.2867413]
2) Feedforward: Compute all activations for all layers
Target is: 1
Cost is: 0.727905614657231
Activations: 
[0.35493049 0.98018456 0.03143758 0.86858582]
Biases: 
[0.49476099 0.3296349  0.48545865 0.74475343]

[[0.76225323 0.83105371 0.19991615 0.52278153]
 [0.16880031 0.66471294 0.95119166 0.26051