In [1]:
import random
import cupy as cp
import numpy as np
import matplotlib.pyplot as plt
import json

In [13]:
# Loading the data
import pickle
import gzip

def load_data():
    f = gzip.open('mnist.pkl.gz', 'rb')
    u = pickle._Unpickler(f)
    u.encoding = 'latin1'
    training_data, validation_data, test_data = u.load()
    f.close()
    return training_data, validation_data, test_data

data = load_data()

training_data = data[0]
testing_data = data[1]

training_data_inputs = training_data[0]
training_data_outputs = training_data[1]

prepped_training_data_inputs = np.array([np.reshape(elem, (784, 1)) for elem in training_data_inputs])

prepped_training_data_outputs = np.zeros((50000, 10, 1))
prepped_training_data_outputs[np.arange(50000), training_data_outputs, 0] = 1

prepped_training_data = [(a, b) for a, b in zip(prepped_training_data_inputs, prepped_training_data_outputs)]

In [14]:
# Prepping the testing data
testing_data_inputs = testing_data[0]
prepped_testing_data_inputs = np.array([np.reshape(elem, (784, 1)) for elem in testing_data_inputs])
prepped_testing_data_outputs = testing_data[1]
# img_array = np.reshape(prepped_testing_data_inputs[88], (28, 28))
# display_image(img_array)
prepped_testing_data = list(zip(prepped_testing_data_inputs, prepped_testing_data_outputs))

In [2]:
# Visualisation
def display_image(array):
    if array.shape != (28, 28):
        raise ValueError("Input array must be 28x28 in shape.")

    plt.imshow(array, cmap='gray', interpolation='nearest')
    plt.axis('off')  # Hide axis for better visualization
    plt.show()

In [3]:
def softmax(x):
    x_exp = np.exp(x - np.max(x))  # Stabilized exponentiation
    return x_exp / np.sum(x_exp)

In [4]:
def cost_derivative(activation, y):
    return activation - y

In [11]:
def sigmoid(z):
    return 1.0/ (1.0 + np.exp(-z))


def sigmoid_prime(z):
    return sigmoid(z) * (1 - sigmoid(z))


class Network(object):
    def __init__(self, sizes):
        """
        creates the network
        :param sizes: list containing the number of neurons in each layer, eg: [2, 3, 1] -> 2 neurons, 3 neurons, 1 neuron
        """
        self.num_layers = len(sizes)    # gives the number of layers of neurons
        self.sizes = sizes              # stores the sizes
        self.biases = [np.random.randn(y, 1) for y in sizes[1:]]
        # stores the random biases: [np.array([[-1.2323], [2.3434], ...]),
        #                           np.array([-1.2323], [2.3434], ...)] depending upon sizes, total sizes - 1 elements, each with y elements(for y in sizes[1:])
        self.weights = [np.random.randn(y, x) for x, y in zip(sizes[:-1], sizes[1:])]
        # stores the random weights, initialised such that there is a weight for each of the connections
        # if we take the [2, 3, 1] example, we have the following zips: (2, 3), (3, 1) -> we get [3 elems of 2 random values in each] + [1 elem with 3 random values in it] =>
        #                                                                                           [np.array([1.2, 2.3], np.array([4.5, 4.5]), np.array([5.6, 7.8]), np.array([3.3, 7.7, 9.9])]
        self.accuracy = []

    def print_everything(self):
        print(f"number of layers: {self.num_layers}")
        print(f"sizes: {self.sizes}")
        print(f"biases: {self.biases}")
        print(f"weights: {self.weights}")

    def feedforward(self, a):
        """
        calculates the output, given 'a' as input, essentially turning the network on
        :param a: a numpy array of shape: (n, 1), for handwriting detection, would be a single column array with the pixel values
        :return: returns a numpy array with sizes[-1] values
        """
        # print(f"The shape of the input 'a' is: {a.shape}, should of the format (n, 1)")
        a = np.reshape(a, (self.sizes[0], 1))
        # print(f"The shape of the input 'a' is: {a.shape}, should of the format (n, 1)")
        for b, w in zip(self.biases[:-1], self.weights[:-1]):
            a = sigmoid(cp.dot(w, a) + b)
            # print(f"a: {a}")
        a = softmax(cp.dot(self.weights[-1], a) + self.biases[-1])
        return a

    def stochastic_gd(self, training_data, epochs, mini_batch_size, eta, test_data=None):
        """
        Training the neural network using the stochastic gradient descent. Stochastic => batches of data.

        Working:
            for each epoch, it randomly shuffles the data, partitions it into the appropriate size of batches, and for each mini_batch we apply 1 step of gradient descent
            the single step of gradient descent is done by the step "self.update_mini_batch(mini_batch, eta)" -> updating the network's weights and biases
        :param training_data: list of tuples (x, y) representing training inputs and desired outputs
        :param epochs: the number of epochs to train for
        :param mini_batch_size: the size of the mini_batches to use for training
        :param eta: the learning rate
        :param test_data: if provided, then the network will be evaluated against the test data after each epoch, and partial progress will be printed out, this is slow
        :return: None, updates the neural network's weights and biases
        """
        if test_data:
            n_test = len(test_data)
        n = len(training_data)
        for j in range(epochs):
            random.shuffle(training_data)
            mini_batches = [training_data[k: k + mini_batch_size] for k in range(0, n, mini_batch_size)]
            for mini_batch in mini_batches:
                self.update_mini_batch(mini_batch, eta)
            if test_data:
                passed = self.evaluate(test_data)
                print(f"Epoch {j}: {passed}/{n_test}")
                self.accuracy.append(passed/n_test)
            else:
                print(f"Epoch {j} completed")


    def evaluate(self, testing_data):
        """
        evaluates the model after every single epoch.
        :param testing_data: list of tuples (x, y) of testing data, x is the np array of the image, y is the expected output, a number from [0 - 9]
        :return: the total number of correct matches
        """
        matches = 0
        for i, data in enumerate(testing_data):
            temp = self.feedforward(data[0])
            predicted_number = np.argmax(temp)
            # print(f"predicted_number: {predicted_number}, actual_number: {data[1]}")
            if predicted_number == data[1]:
                matches += 1
        return matches


    def update_mini_batch(self, mini_batch, eta):
        """
        Update the network's weights and biases by applying gradient descent using backprop, to a single mini_batch.
        :param mini_batch: list of tuples, (x, y)
        :param eta: learning rate eta
        :return:
        """
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        nabla_w = [np.zeros(w.shape) for w in self.weights]
        for x, y in mini_batch:
            delta_nabla_b, delta_nabla_w = self.backprop(x, y)
            nabla_b = [nb + dnb for nb, dnb in zip(nabla_b, delta_nabla_b)]
            nabla_w = [nw + dnw for nw, dnw in zip(nabla_w, delta_nabla_w)]
        self.weights = [w - (eta/len(mini_batch)) * nw for w, nw in zip(self.weights, nabla_w)]
        self.biases = [b - (eta/len(mini_batch)) * nb for b, nb in zip(self.biases, nabla_b)]

    def backprop(self, x, y):
        """

        :param x: input to the neural network, for handwriting NN example, array of pixel activation values
        :param y: the labelled output of what x should be, for handwriting NN example, a single digit [0-9]
        :return: a tuple (nabla_b, nabla_w) representing the gradient of the cost_function (gradient of cost_function calculates in which direction it increases the most)
                nabla_b and nabla_w are lists for each layer, just like self.weights and self.biases
        """
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        nabla_w = [np.zeros(w.shape) for w in self.weights]

        # feedforward to get the prediction of the NN for the given x input
        x = np.reshape(x, (self.sizes[0], 1))
        activation = x  # currently stores the input as the activation of layer 0
        activations = [x] # list to store all the activations, layer by layer, already consists of activation of layer 0, the input
        zs = [] # list to store all the z vectors, layer by layer
        for b, w in zip(self.biases, self.weights):
            z = np.dot(w, activation) + b
            zs.append(z)
            activation = sigmoid(z)
            activations.append(activation)

        # backward pass
        delta = cost_derivative(activations[-1], y) * sigmoid_prime(zs[-1])
        nabla_b[-1] = delta
        nabla_w[-1] = np.dot(delta, activations[-2].transpose())        # this is completely relying on the fact that the orders work, fix this

        for l in range(2, self.num_layers):
            # l ranges from 2 all the way till number of layers - 1, so if total number of layers = 3, so, l = 2... that's it
            z = zs[-l]
            sp = sigmoid_prime(z)
            delta = np.dot(self.weights[-l + 1].transpose(), delta) * sp
            nabla_b[-l] = delta
            nabla_w[-l] = np.dot(delta, activations[-l-1].transpose())
        return nabla_b, nabla_w

    def save_model(self):
        model_params = {'sizes': self.sizes, 'num_layers': self.num_layers, 'weights': [elem.tolist() for elem in self.weights],
                        'biases': [elem.tolist() for elem in self.biases], 'accuracy': self.accuracy}
        file_name = input("Enter the name of file(without .json): ")
        file_name = file_name.strip().lower() + ".json"
        try:
            with open(file_name, "w") as file:
                json.dump(model_params, file, indent=4)
            print(f"Model image has been created! - {file_name}")
        except (IOError, json.JSONDecodeError) as e:
            print(f"Model image creation failed!: {e}")

    def load_model(self, file_name):
        try:
            with open(file_name, "r") as file:
                data = json.load(file)
            print("JSON successfully read!")
            self.sizes = data['sizes']
            self.num_layers = data['num_layers']
            self.weights = [np.array(elem) for elem in data['weights']]
            self.biases = [np.array(elem) for elem in data['biases']]
            self.accuracy = data['accuracy']
            print("Model loaded successfully!")

        except (IOError, FileNotFoundError) as e:
            print(f"Error reading file: {e}")
        except json.JSONDecodeError as e:
            print(f"Invalid JSON format: {e}")

In [12]:
n1 = Network([784, 128, 64, 10])
# n1.print_everything()

In [10]:
n1.save_model()

<class 'method'>
Model image has been created! - save1.json.json


In [15]:
n1.stochastic_gd(prepped_training_data, 50, 50, 0.03, test_data=prepped_testing_data)

Epoch 0: 903/10000
Epoch 1: 892/10000
Epoch 2: 1781/10000
Epoch 3: 2139/10000
Epoch 4: 2481/10000
Epoch 5: 2827/10000
Epoch 6: 3290/10000
Epoch 7: 3816/10000
Epoch 8: 4223/10000
Epoch 9: 4494/10000
Epoch 10: 4716/10000
Epoch 11: 4888/10000
Epoch 12: 5053/10000
Epoch 13: 5196/10000
Epoch 14: 5322/10000
Epoch 15: 5492/10000
Epoch 16: 5647/10000
Epoch 17: 5823/10000
Epoch 18: 6012/10000
Epoch 19: 6166/10000
Epoch 20: 6323/10000
Epoch 21: 6442/10000
Epoch 22: 6538/10000
Epoch 23: 6679/10000
Epoch 24: 6759/10000
Epoch 25: 6827/10000
Epoch 26: 6933/10000
Epoch 27: 7017/10000
Epoch 28: 7086/10000
Epoch 29: 7165/10000
Epoch 30: 7231/10000
Epoch 31: 7287/10000
Epoch 32: 7335/10000
Epoch 33: 7372/10000
Epoch 34: 7435/10000
Epoch 35: 7492/10000
Epoch 36: 7529/10000
Epoch 37: 7588/10000
Epoch 38: 7624/10000
Epoch 39: 7663/10000
Epoch 40: 7719/10000
Epoch 41: 7734/10000
Epoch 42: 7779/10000
Epoch 43: 7815/10000
Epoch 44: 7844/10000
Epoch 45: 7902/10000
Epoch 46: 7937/10000
Epoch 47: 7972/10000
Epoc

In [16]:
n1.stochastic_gd(prepped_training_data, 10, 10, 0.01, test_data=prepped_testing_data)

Epoch 0: 8100/10000
Epoch 1: 8161/10000
Epoch 2: 8211/10000
Epoch 3: 8254/10000
Epoch 4: 8298/10000
Epoch 5: 8350/10000
Epoch 6: 8371/10000
Epoch 7: 8398/10000
Epoch 8: 8436/10000
Epoch 9: 8474/10000


In [17]:
n1.stochastic_gd(prepped_training_data, 10, 10, 0.015, test_data=prepped_testing_data)

Epoch 0: 8487/10000
Epoch 1: 8519/10000
Epoch 2: 8537/10000
Epoch 3: 8569/10000
Epoch 4: 8599/10000
Epoch 5: 8628/10000
Epoch 6: 8645/10000
Epoch 7: 8666/10000
Epoch 8: 8688/10000
Epoch 9: 8706/10000


In [18]:
n1.stochastic_gd(prepped_training_data, 30, 5, 0.015, test_data=prepped_testing_data)

Epoch 0: 8737/10000
Epoch 1: 8764/10000
Epoch 2: 8797/10000
Epoch 3: 8821/10000
Epoch 4: 8842/10000
Epoch 5: 8858/10000
Epoch 6: 8883/10000
Epoch 7: 8900/10000
Epoch 8: 8920/10000
Epoch 9: 8944/10000
Epoch 10: 8962/10000
Epoch 11: 8976/10000
Epoch 12: 8986/10000
Epoch 13: 8995/10000
Epoch 14: 9006/10000
Epoch 15: 9026/10000
Epoch 16: 9041/10000
Epoch 17: 9045/10000
Epoch 18: 9054/10000
Epoch 19: 9061/10000
Epoch 20: 9067/10000
Epoch 21: 9083/10000
Epoch 22: 9078/10000
Epoch 23: 9089/10000
Epoch 24: 9097/10000
Epoch 25: 9114/10000
Epoch 26: 9118/10000
Epoch 27: 9122/10000
Epoch 28: 9129/10000
Epoch 29: 9136/10000


In [19]:
n1.stochastic_gd(prepped_training_data, 10, 5, 0.02, test_data=prepped_testing_data)

Epoch 0: 9148/10000
Epoch 1: 9153/10000
Epoch 2: 9169/10000
Epoch 3: 9175/10000
Epoch 4: 9174/10000
Epoch 5: 9179/10000
Epoch 6: 9185/10000
Epoch 7: 9205/10000
Epoch 8: 9199/10000
Epoch 9: 9208/10000


In [20]:
n1.stochastic_gd(prepped_training_data, 10, 1, 0.015, test_data=prepped_testing_data)

Epoch 0: 9217/10000
Epoch 1: 9241/10000
Epoch 2: 9264/10000
Epoch 3: 9286/10000
Epoch 4: 9303/10000
Epoch 5: 9304/10000
Epoch 6: 9326/10000
Epoch 7: 9343/10000
Epoch 8: 9347/10000
Epoch 9: 9355/10000


In [26]:
n1.stochastic_gd(prepped_training_data, 30, 5, 0.02, test_data=prepped_testing_data)

Epoch 0: 9370/10000
Epoch 1: 9376/10000
Epoch 2: 9366/10000
Epoch 3: 9359/10000
Epoch 4: 9363/10000
Epoch 5: 9368/10000
Epoch 6: 9384/10000
Epoch 7: 9372/10000
Epoch 8: 9384/10000
Epoch 9: 9382/10000
Epoch 10: 9380/10000
Epoch 11: 9379/10000
Epoch 12: 9386/10000
Epoch 13: 9383/10000
Epoch 14: 9391/10000
Epoch 15: 9392/10000
Epoch 16: 9386/10000
Epoch 17: 9384/10000
Epoch 18: 9380/10000
Epoch 19: 9393/10000
Epoch 20: 9392/10000
Epoch 21: 9398/10000
Epoch 22: 9391/10000
Epoch 23: 9396/10000
Epoch 24: 9404/10000
Epoch 25: 9398/10000
Epoch 26: 9400/10000
Epoch 27: 9402/10000
Epoch 28: 9402/10000
Epoch 29: 9395/10000


In [27]:
n1.save_model()

Model image has been created! - save_1.json


In [22]:
n1.load_model("save_1.json")

JSON successfully read!
Model loaded successfully!


In [24]:
# Taking sample image, processing it and running through the model.
import cv2

def load_img():
    img = cv2.imread("./handwritten_numbers/Untitled.jpg")
    cv2.imshow("Original Image", img)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    return img

def rotate_90_ccw(img):
    temp = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)
    cv2.imshow("Original Image", temp)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    return temp

def resize_img(img):
    temp = cv2.resize(img, (28, 28), interpolation=cv2.INTER_AREA)
    cv2.imshow("Original Image", temp)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    return temp

def convert_grayscale(img):
    temp = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    cv2.imshow("Original Image", temp)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    return temp

def save_img(img):
    cv2.imwrite("processed_numbers_1.jpg", img)
    return None

def processed_img():
    temp_img = load_img()
    temp_img = convert_grayscale(temp_img)
    temp_img = resize_img(temp_img)
    temp_img = np.reshape(temp_img, (784, 1))
    normalized_arr = temp_img / 255.0
    inverse_arr = 1 - normalized_arr
    return inverse_arr

In [25]:
input_img = processed_img()
ans = n1.feedforward(input_img)
print(np.argmax(softmax(ans)))

4
