In [39]:
import idx2numpy
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime as dt
import random as rnd
import numpy.typing as npt
import scipy.signal as sps
import string as str

IMAGE_EDGE_SIZE = 28
PIXELS_PER_IMAGE = IMAGE_EDGE_SIZE ** 2
CLASSES_COUNT = 10
ITERATIONS = 500
LEARNING_RATE = 0.15

In [40]:
def one_hot(labels: npt.ArrayLike) -> npt.NDArray:
    """
    Converts a 1D array of labels (the ground truth) to 2D matrix of shape (10, labels.size) as a probability distribution, 
    where the corresponding row given by the label value has probability of 1.
    
    :labels: The ground truth.
    :return: Encoded values of labels to probability distribution.
    """
    one_hot = np.zeros((10, labels.size))
    one_hot[labels, np.arange(labels.size)] = 1
    return one_hot

def get_accuracy(results: npt.NDArray, labels: npt.ArrayLike) -> float:
    """
    Calculates the accuracy of a neural network from the results of classification by comparing it to the ground truth.

    :results: The forward propagation results.
    :labels: The ground truth.
    :return: The accuracy as a real number. 
    """
    return (np.sum(np.argmax(results, 0) == labels) / labels.size)

def show_some_mistakes(results: npt.NDArray, labels: npt.ArrayLike, data: npt.NDArray, samples = 10) -> None:
    """
    Plots randomly choosen images, which were not classified correctly.

    :results: The forward propagation results.
    :labels: The ground truth.
    :data: The input data of forward propagation, i.e images.
    :samples: The number of shown images, 10 by default.
    """
    results = np.argmax(results, 0)
    i = rnd.randint(0, labels.size)
    j = 0
    while j < samples:
        i = (i + 1) % labels.size
        if results[i] != labels[i]:
            print("labeled:", labels[i], "-- classified:", results[i])
            plt.imshow(data[:, i].reshape((IMAGE_EDGE_SIZE, IMAGE_EDGE_SIZE)), cmap='gray')
            plt.show()
            j += 1

def random_name():
    return "".join(rnd.choices(str.ascii_letters + str.digits, k=8))

def argmax2D(matrix):
    x, y = 0, 0
    for i in range(1, matrix.shape[0]):
        for j in range(1, matrix.shape[1]):
            if matrix[i, j] > matrix[x, y]:
                x, y = i, j
    return x, y

In [41]:
def ReLU(X: npt.NDArray) -> npt.NDArray:
    """
    Calculates the Rectified Linear Units of a numpy matrix.
    
    :X: Matrix of values.
    :return: For all nonnegative numbers returns its value, otherwise 0.
    """
    return np.maximum(0, X)

def leaky_ReLU(X: npt.NDArray) -> npt.NDArray:
    """
    Calculates the Rectified Linear Units of a numpy matrix.
    
    :X: Matrix of values.
    :return: For all nonnegative numbers returns its value, otherwise 0.
    """
    return np.maximum(0.1 * X, X)

def ReLU_deriv(X: npt.NDArray) -> npt.NDArray:
    """
    Calculates the derivation of ReLu function of a numpy matrix.

    :X: Matrix of values.
    :return: For all positive numbers returns 1, otherwise 0.
    """
    return X > 0

def leaky_ReLU_deriv(X: npt.NDArray) -> npt.NDArray:
    """
    Calculates the Rectified Linear Units of a numpy matrix.
    
    :X: Matrix of values.
    :return: For all nonnegative numbers returns its value, otherwise 0.
    """
    return np.where(X > 0, 1, 0.1)

def sigmoid(L: npt.NDArray) -> npt.NDArray:
    """
    Calculates the Sigmoid function of a numpy matrix.
    
    :L: Values of a hidden layer.
    :return: For all indexes with value x returns 1 / (1 + e^(-x)).
    """
    return 1 / (1 + np.exp(-L))

def softmax(X: npt.NDArray) -> npt.NDArray:
    """
    Converts matrix of N values in a row to probability distribution of N outcomes for each row.

    :X: Values of an output layer.
    :return: For all indexes of the given matrix returns the probability of a given index in its row.
    """
    #exp = np.exp(X - np.max(X))
    exp = np.exp(X)
    return exp / np.sum(exp, 0)

In [42]:
def load_training_data() -> tuple:
    """
    Loads training data and training labels from files and transforms them to desired shape.

    :return: Matrix of training data and array of training labels.
    """
    training_data = idx2numpy.convert_from_file("mnist/train-images.idx3-ubyte") / 255
    training_labels = idx2numpy.convert_from_file("mnist/train-labels.idx1-ubyte")
    return training_data, training_labels

def load_test_data() -> tuple:
    """
    Loads testing data and training labels from files and transforms them to desired shape.

    :return: Matrix of testing data and array of testing labels.
    """
    test_data = idx2numpy.convert_from_file("mnist/t10k-images.idx3-ubyte") / 255
    test_labels = idx2numpy.convert_from_file("mnist/t10k-labels.idx1-ubyte")
    return test_data, test_labels

In [43]:
class Layer():
    def __init__(self, name):
        self.name = name
        
    def forward(self, input):
        pass

    def backward(self, dOutput, sample_count, learning_rate):
        pass

    def backward(self, dOutput):
        pass

    def save(self):
        try:
            np.save(f"{self.name}_K.npy", self.kernels)
        except:
            pass
    
        try:
            np.save(f"{self.name}_W.npy", self.weights)
        except:
            pass
    
        try:
            np.save(f"{self.name}_B.npy", self.biases)
        except:
            pass

    def load(self, kernels: bool, weights: bool, biases: bool):
        if kernels:
            try:
                self.kernels = np.load(self.name + "_K.npy")
            except:
                kernels = False
        
        if weights:
            try:
                self.weights = np.load(self.name + "_W.npy")
            except:
                weights = False
        
        if biases:
            try:
                self.biases = np.load(self.name + "_B.npy")
            except:
                biases = False
        
        return kernels, weights, biases

In [44]:
class ConvolutionLayer(Layer):
    def __init__(self, kernel_count, kernel_size, activation, activation_deriv, name = random_name()):
        Layer.__init__(self, name)
        self.activation = activation
        self.activation_deriv = activation_deriv
        self.kernel_count = kernel_count
        self.kernel_shrink = kernel_size - 1
        kernels, _, biases = self.load(True, False, True)
        if not kernels:
            lower, upper = -(1.0 / np.sqrt(kernel_size * kernel_size)), (1.0 / np.sqrt(kernel_size * kernel_size))
            self.kernels = lower + np.random.randn(kernel_count, kernel_size, kernel_size) * (upper - lower)
        if not biases:
            self.biases = np.zeros((kernel_count, 1))

    def forward(self, input):
        self.input = input
        input_count = input.shape[0]
        output = np.zeros((input_count * self.kernel_count, input.shape[1] - self.kernel_shrink, input.shape[2] - self.kernel_shrink))
        
        for i in range(input_count):
            k = i * self.kernel_count
            for j in range(self.kernel_count):
                output[k + j] = self.activation(sps.correlate2d(input[i], self.kernels[j], "valid") + self.biases[j])
        
        return output
    
    def backward(self, dOutput, sample_count, learning_rate):
        new_dOutput = None
        if self.activation_deriv is not None:
            new_dOutput = np.zeros_like(self.input)
            rotated_kernels = np.rot90(self.kernels, 2, (1, 2))
            for i in range(self.input.shape[0]):
                k = i * self.kernel_count
                for j in range(self.kernel_count):
                    new_dOutput[i] += sps.correlate2d(rotated_kernels[j], dOutput[k + j], "full")
                    
            new_dOutput = new_dOutput * self.activation_deriv(self.input)

        dKernels = np.zeros_like(self.kernels)
        for i in range(sample_count):
            k = i * self.kernel_count
            for j in range(self.kernel_count):
                dKernels[j] += sps.correlate2d(self.input[i], dOutput[k + j], "valid")
        
        self.biases = self.biases - (np.sum(dOutput) / sample_count) * learning_rate
        self.kernels = self.kernels - (dKernels / sample_count) * learning_rate

        return new_dOutput

In [45]:
class MaxPoolLayer(Layer):
    def __init__(self, window_size, name = random_name()):
        Layer.__init__(self, name)
        self.window_size = window_size
        
    def forward(self, input):
        self.input = input
        self.input_count = input.shape[0]
        self.pooled_width = int(input.shape[1] / self.window_size)
        self.pooled_height = int(input.shape[2] / self.window_size)
        pooled = np.zeros((self.input_count, self.pooled_width, self.pooled_height))
        
        for i in range(self.input_count):
            for j in range(self.pooled_width):
                for k in range(self.pooled_height):
                    l = j * self.window_size
                    m = k * self.window_size
                    pooled[i, j, k] = np.max(input[i, l:l+self.window_size, m:m+self.window_size])
        
        return pooled

    def backward(self, dOutput, *_):
        new_dOutput = np.zeros_like(self.input)
        for i in range(self.input_count):
            for j in range(self.pooled_width):
                for k in range(self.pooled_height):
                    l = j * self.window_size
                    m = k * self.window_size
                    x, y = argmax2D(self.input[i, l:l+self.window_size, m:m+self.window_size])
                    x += l
                    y += m
                    new_dOutput[i, x, y] = dOutput[i, j, k]

        return new_dOutput

In [46]:
class FlatteningLayer(Layer):
    def __init__(self, channel_count, name = random_name()):
        Layer.__init__(self, name)
        self.channel_count = channel_count

    def forward(self, input):
        self.input_shape = input.shape
        return np.reshape(input, (-1, input.shape[1] * input.shape[2] * self.channel_count)).T
    
    def backward(self, dOutput, *_):
        return np.reshape(dOutput.T, self.input_shape)

In [47]:
class FullyConnectedLayer(Layer):
    def __init__(self, input_size, output_size, activation, activation_deriv, name = random_name()):
        Layer.__init__(self, name)
        self.activation = activation
        self.activation_deriv = activation_deriv
        _, weights, biases = self.load(False, True, True)
        if not weights:
            self.weights = np.random.randn(output_size, input_size) * np.sqrt(2 / input_size)
        if not biases:
            self.biases = np.zeros((output_size, 1))
    
    def forward(self, input):
        self.input = input
        return self.activation(self.weights.dot(input) + self.biases)
    
    def backward(self, dOutput, sample_count, learning_rate):
        new_dOutput = None
        if self.activation_deriv is not None:
            new_dOutput = self.weights.T.dot(dOutput) * self.activation_deriv(self.input)
            
        self.weights = self.weights - (dOutput.dot(self.input.T) / sample_count) * learning_rate
        self.biases = self.biases - (np.sum(dOutput) / sample_count) * learning_rate
        return new_dOutput

In [48]:
class NeuralNetwork():
    def __init__(self, training_data, training_labels, sample_count, name = random_name(), *layers: Layer):
        self.training_data = training_data
        self.training_data_count = training_data.shape[0]
        self.sample_count = sample_count
        self.training_labels = training_labels
        self.name = name
        self.layers = layers
    
    def train(self, iterations, learning_rate):
        sample_index = self.sample_count
        for _ in range(iterations):
            input = self.training_data[sample_index - self.sample_count : sample_index]
            for layer in self.layers:
                input = layer.forward(input)

            dOutput = input - self.training_labels[:, sample_index - self.sample_count : sample_index]
            for i in range(len(self.layers) - 1, -1, -1):
                dOutput = self.layers[i].backward(dOutput, self.sample_count, learning_rate)

            sample_index += self.sample_count
            if sample_index > self.training_data_count:
                sample_index = self.sample_count
        
    def save(self):
        for layer in self.layers:
            layer.save()

    def assess(self, test_data, test_labels, text):
        input = test_data
        for layer in self.layers:
            input = layer.forward(input)

        accuracy = (np.sum(np.argmax(input, 0) == test_labels) / test_labels.size)
        print(f"\n############################# Neural Network '{self.name}' Results #############################")
        print(f"Accuracy {text}:", accuracy, "({:.2f}%)".format(accuracy * 100))

In [49]:
training_data, training_labels = load_training_data()
test_data, test_labels = load_test_data()

In [50]:


neural_network_1 = NeuralNetwork(training_data, one_hot(training_labels), 1000, "CL_CL_CL_FL_FCL_OL",
                              ConvolutionLayer(2, 3, leaky_ReLU, None, "CL_CL_CL_FL_FCL_OL/CL1"),
                              ConvolutionLayer(2, 3, leaky_ReLU, leaky_ReLU_deriv, "CL_CL_CL_FL_FCL_OL/CL2"),
                              ConvolutionLayer(2, 5, leaky_ReLU, leaky_ReLU_deriv, "CL_CL_CL_FL_FCL_OL/CL3"),
                              FlatteningLayer(8, "CL_CL_CL_FL_FCL_OL/FL1"), 
                              FullyConnectedLayer(3200, 80, ReLU, leaky_ReLU_deriv, "CL_CL_CL_FL_FCL_OL/FCL1"), 
                              FullyConnectedLayer(80, 10, softmax, ReLU_deriv, "CL_CL_CL_FL_FCL_OL/OL"))
neural_network_1.train(1, 0.1)
neural_network_1.save()
neural_network_1.assess(training_data, training_labels, "on training set")
neural_network_1.assess(test_data, test_labels, "on test set")


############################# Neural Network 'CL_CL_CL_FL_FCL_OL' Results #############################
Accuracy on training set: 0.9932333333333333 (99.32%)

############################# Neural Network 'CL_CL_CL_FL_FCL_OL' Results #############################
Accuracy on test set: 0.9759 (97.59%)


In [54]:
neural_network_2 = NeuralNetwork(training_data, one_hot(training_labels), 100, "CL_CL_CL_FL_FCL_FCL_OL",
                              ConvolutionLayer(3, 3, leaky_ReLU, None, "CL_CL_CL_FL_FCL_FCL_OL/CL1"),
                              ConvolutionLayer(3, 3, leaky_ReLU, leaky_ReLU_deriv, "CL_CL_CL_FL_FCL_FCL_OL/CL2"),
                              ConvolutionLayer(2, 5, leaky_ReLU, leaky_ReLU_deriv, "CL_CL_CL_FL_FCL_FCL_OL/CL3"),
                              FlatteningLayer(18, "CL_CL_CL_FL_FCL_OL/FL1"), 
                              FullyConnectedLayer(7200, 400, leaky_ReLU, leaky_ReLU_deriv, "CL_CL_CL_FL_FCL_FCL_OL/FCL1"),
                              FullyConnectedLayer(400, 80, ReLU, leaky_ReLU_deriv, "CL_CL_CL_FL_FCL_FCL_OL/FCL2"), 
                              FullyConnectedLayer(80, 10, softmax, ReLU_deriv, "CL_CL_CL_FL_FCL_FCL_OL/OL"))
neural_network_2.train(600, 0.1)
neural_network_2.save()
neural_network_2.assess(training_data, training_labels, "on training set")
neural_network_2.assess(test_data, test_labels, "on test set")

  exp = np.exp(X)
  return exp / np.sum(exp, 0)


KeyboardInterrupt: 