In [3]:
import numpy as np
import pandas as pd
import scipy.signal
import PIL.Image as Image
import matplotlib.pyplot as plt

In [4]:
import sys
import numpy
numpy.set_printoptions(threshold=sys.maxsize)

In [5]:
IMAGE_SIZE = (28, 28)

# Import data
data = pd.read_csv(r"../digit-recognizer/train.csv")
data_test = pd.read_csv(r"../digit-recognizer/test.csv")

# Get labels and image array from data
# Only 30000 data used. The rest were used for testing (with labels, to check the accuracy)
labels : np.ndarray = data.values[:30000, 0]
images : np.ndarray = data.values[:30000, 1:].astype('uint8').reshape((-1, 1) + IMAGE_SIZE) / 255

images_test : np.ndarray = data_test.values.astype('uint8') / 255

images_test_with_label = data.values[30000:, 1:].astype('uint8') / 255
labels_test_with_label = data.values[30000:, 0]

In [17]:
class Convolutional2DLayer:
    def __init__(self, in_channel: int, out_channel: int, input_shape: tuple, kernel_size: int, stride: int = 1, padding: int = 0):
        """Convolution layer of CNN. Receives: 
            in_channel -> amount of channels the input has
            out_channel -> amount of channels the output has
            input_shape -> input shape (without the channels)
            kernel_size -> kernel size used for convolution
            stride -> how many cells kernel will move
            padding -> amount of zero padding
        """

        self.kernels: np.ndarray = np.random.randn(out_channel, kernel_size, kernel_size)
        # self.kernels = np.ones((out_channel, kernel_size, kernel_size)) # debug
        self.stride: int = stride
        self.padding: int = padding
        self.input_shape: tuple = (in_channel,) + input_shape
        self.out_shape: tuple = (out_channel, ) + tuple(np.add(np.add(np.subtract(input_shape, kernel_size), 2*padding) // stride, 1))

        self.gradient = np.zeros(self.input_shape)

    def _convolve(self, x: np.ndarray, kernel: np.ndarray):
        """Convolution process"""

        # assert (type(x) == np.ndarray), f"Error: x must be a numpy array"
        x = np.pad(x, self.padding)
        res = np.zeros(self.out_shape[1:])
        for i in range(0, res.shape[1], self.stride):
            for j in range(0, res.shape[0], self.stride):
                for k in range(0, kernel.shape[1]):
                    for l in range(0, kernel.shape[0]):
                        # print(res.shape[1], res)
                        res[i][j] += x[i+k][j+l] * kernel[k][l]

        return res

    def backward(self, grad):
        self.kernels += 0
        return grad

    def __call__(self, x: np.ndarray):
        """Forward method"""    

        assert (type(x) == np.ndarray), f"Error: x must be a numpy array"
        assert (x.shape == self.input_shape), f"Error: layer {self.__class__.__name__}accepts {self.input_shape} input, while x is shaped as {x.shape}"
        self.gradient += x
        output = np.zeros(self.out_shape, dtype=np.float64)
        for i, kernel in enumerate(self.kernels):
            for channel in x:
                output[i] += self._convolve(channel, kernel)
                # output[i] += scipy.signal.convolve2d(channel, kernel, mode='valid') # scipy method
        
        return output

class LinearLayer:
    def __init__(self, input, output):
        self.weight = np.random.randn(input, output)
        self.gradient: np.ndarray = np.zeros(input)

    def backward(self, grad):
        newgrad = np.dot(self.weight, grad) 
        self.weight += np.outer(self.gradient, grad)
        self.gradient = 0
        return newgrad

    def __call__(self, x: np.ndarray):
        assert (type(x) == np.ndarray), f"Error: x must be a numpy array"
        assert (x.shape[0] == self.weight.shape[0]), f"Error: this layer accepts (n, {self.weight.shape[0]}) input, while x is shaped as {x.shape}"

        self.gradient += x
        output = np.dot(x, self.weight)
        return output

class MaxPooling2d:
    def __init__(self, input_shape: tuple, kernel_size: int, stride: int=1, padding: int=0):
        """2 dimensional max pooling"""
        assert (len(input_shape) == 3), f"Error: 2D max pooling layer's input shape must be in (channel, row, height) format. Input is {input_shape}"
        self.input_shape: tuple = input_shape
        self.kernel_size: int = kernel_size
        self.stride: int = stride
        self.padding: int = padding    
        self.out_shape: tuple = (input_shape[0], ) + tuple(np.add(np.add(np.subtract(input_shape[1:], kernel_size), 2*padding) // stride, 1))
        self.gradient: np.ndarray = np.zeros(self.input_shape)
    
    def backward(self, grad):
        grad = grad.reshape(grad.shape[:-1])

        for c in range(self.gradient.shape[0]):
            k = l = 0
            for i in range(self.gradient.shape[1]):
                for j in range(self.gradient.shape[2]):
                    if (self.gradient[c][i][j] == 1):
                        print(self.gradient)
                        self.gradient[c][i][j] = grad[c][k][l]
                        l += 1
                        if (l >= self.gradient.shape[-1]):
                            l = 0
                            k += 1

        grad = self.gradient.copy()
        self.gradient = 0

        # print(f"This: {grad}")
        return grad

    def __call__(self, x: np.ndarray):
        """2 dimensional max pooling"""

        assert (type(x) == np.ndarray), f"Error: x must be a numpy array"
        assert (x.shape == self.input_shape), f"Error: this layer accepts {self.input_shape} input, while x is shaped as {x.shape}"
        # print(x.shape[0])
        # print((np.subtract(x.shape[1:], kernel_size) // stride))
        output = np.full(self.out_shape, 0, dtype=np.float64)    
        grad = np.full(self.input_shape, 0, dtype=np.float64)
        for c in range(output.shape[0]):
            for i in range(0, output.shape[2], self.stride):
                for j in range(0, output.shape[1], self.stride):
                    maxk = maxl = -1
                    for k in range(self.kernel_size):
                        for l in range(self.kernel_size):
                            if (x[c][i+k][j+l] > output[c][i][j]):
                                output[c][i][j] = x[c][i+k][j+l]
                                maxk = k
                                maxl = l
                                
                    if (maxk != -1 and maxl != -1):
                        grad[c][i+maxk][j+maxl] = 1 
                            
        self.gradient += grad

        return output

class Relu:
    def __call__(self, x: np.ndarray):
        self.gradient = np.ceil(np.clip(x, 0, 1)).reshape(x.shape + (1,))
        return np.maximum(0, x)

    def backward(self, grad):
        grad *= self.gradient
        self.gradient = 0
        return grad

class Flatten:
    def __init__(self, input_shape):
        self.input_shape = input_shape
        
    def __call__(self, x):
        return x.reshape((-1))

    def backward(self, grad):
        return grad.reshape(self.input_shape + (1,))
    
def get_flatten_shape(shape: tuple):
    return int(np.prod(np.array(shape)))

def softmax(x: np.ndarray):
    exps = np.exp(x - x.max())
    return np.clip(exps / np.sum(exps), 1e-7, 1)

class CNN:
    def __init__(self):
        conv1 = Convolutional2DLayer(1, 5, IMAGE_SIZE, 3)            
        maxpool1 = MaxPooling2d(conv1.out_shape, 2, stride=2)
        relu1 = Relu()
        conv2 = Convolutional2DLayer(maxpool1.out_shape[0], 10, maxpool1.out_shape[1:], 3)
        maxpool2 = MaxPooling2d(conv2.out_shape, 2, stride=2)
        relu2 = Relu()
        flatten = Flatten(maxpool2.out_shape)
        linear1 = LinearLayer(get_flatten_shape(maxpool2.out_shape), 512)
        relu3 = Relu()
        linear2 = LinearLayer(512, 10)

        print(conv1.out_shape)
        print(maxpool1.out_shape)
        print(conv2.out_shape)
        print(maxpool2.out_shape)

        self.layers = [
            conv1, maxpool1, relu1, conv2, maxpool2, relu2, flatten, linear1, relu3, linear2   
        ]

        self.gradient = 0

    def forward(self, x):
        # print()
        for layer in self.layers:
            # print(f"Input: {x}")
            x = layer(x)

            
            # if (hasattr(layer, "out_shape")):
            #     print(layer.out_shape)
            # # if hasattr(layer, "gradient"):
            # #     print("Grad: ", layer.gradient.shape)
            # # if hasattr(layer, "kernels"):
            # #     print("Kernel: ", layer.kernels.shape)
            # if hasattr(layer, "weight"):
            #     print("Weight: ", layer.weight.shape)

        return x

    def backward(self):
        for layer in reversed(self.layers):
            # print(self.gradient.shape)
            self.gradient = layer.backward(self.gradient)

        self.gradient = 0
        
    def cross_entropy_loss(self, pred, label):
        pred = softmax(pred)

        d_pred = []
        for i in range(len(pred)):
            if i == label:
                d_pred.append(pred[label] * (1-pred[i]))
            else:
                d_pred.append(-pred[label] * pred[i])
        
        self.gradient = np.outer(d_pred, -1 / pred[label])

        return -np.log(pred[label])

# maxpool3 = MaxPooling2d((1, 2, 2), 2)
# print(maxpool3(np.array([[[1, 2], [3, 4]]])))

# print(linear1.gradient)

cnn = CNN()
res = cnn.forward(images[0])
loss = cnn.cross_entropy_loss(res, labels[0])
cnn.backward()
# res = softmax(res)

print(res, loss)

(5, 26, 26)
(5, 13, 13)
(10, 11, 11)
(10, 5, 5)
0 0 0 0
0 0 0 1
0 0 0 2
0 0 1 0
0 0 1 1
0 0 1 2
0 0 2 0
0 0 2 1
0 0 2 2
0 1 0 0
0 1 0 1
0 1 0 2
0 1 1 0
0 1 1 1
0 1 1 2
0 1 2 0
0 1 2 1
0 1 2 2
0 2 0 0
0 2 0 1
0 2 0 2
0 2 1 0
0 2 1 1
0 2 1 2
0 2 2 0
0 2 2 1
0 2 2 2
0 3 0 0
0 3 0 1
0 3 0 2
0 3 1 0
0 3 1 1
0 3 1 2
0 3 2 0
0 3 2 1
0 3 2 2
0 4 0 0
0 4 0 1
0 4 0 2
0 4 1 0
0 4 1 1
0 4 1 2
0 4 2 0
0 4 2 1
0 4 2 2
0 5 0 0
0 5 0 1
0 5 0 2
0 5 1 0
0 5 1 1
0 5 1 2
0 5 2 0
0 5 2 1
0 5 2 2
0 6 0 0
0 6 0 1
0 6 0 2
0 6 1 0
0 6 1 1
0 6 1 2
0 6 2 0
0 6 2 1
0 6 2 2
0 7 0 0
0 7 0 1
0 7 0 2
0 7 1 0
0 7 1 1
0 7 1 2
0 7 2 0
0 7 2 1
0 7 2 2
0 8 0 0
0 8 0 1
0 8 0 2
0 8 1 0
0 8 1 1
0 8 1 2
0 8 2 0
0 8 2 1
0 8 2 2
0 9 0 0
0 9 0 1
0 9 0 2
0 9 1 0
0 9 1 1
0 9 1 2
0 9 2 0
0 9 2 1
0 9 2 2
0 10 0 0
0 10 0 1
0 10 0 2
0 10 1 0
0 10 1 1
0 10 1 2
0 10 2 0
0 10 2 1
0 10 2 2
0 11 0 0
0 11 0 1
0 11 0 2
0 11 1 0
0 11 1 1
0 11 1 2
0 11 2 0
0 11 2 1
0 11 2 2
0 12 0 0
0 12 0 1
0 12 0 2
0 12 1 0
0 12 1 1
0 12 1 2
0 12 2 0
0 12 2 

IndexError: index 26 is out of bounds for axis 0 with size 26