<a href="https://colab.research.google.com/github/woodRock/grokking-deep-learning/blob/main/chapter_10_neural_learning_about_edges_and_corners.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Chapter 10 | Neural learning about edges and corners

In [None]:
import numpy as np

# Freeze the random seed for reproducability.
np.random.seed(1)

# Load the dataset.
from keras.datasets import mnist

(X_train, y_train), (X_test, y_test) = mnist.load_data()

# Take the first 1000 images, and normalize the features between 0 and 1.
images, labels = (X_train[0:1000].reshape(1000,28*28) / 255, y_train[0:1000])

# Convert to one hot encoding
one_hot_labels = np.zeros((len(labels),10))
for i,l in enumerate(labels):
    one_hot_labels[i][l] = 1
labels = one_hot_labels

test_images = X_test.reshape(len(X_test),28*28) / 255
test_labels = np.zeros((len(y_test),10))
for i,l in enumerate(y_test):
    test_labels[i][l] = 1

# Activation functions
def tanh(x):
    return np.tanh(x)

def tanh2deriv(output):
    return 1 - (output ** 2)

def softmax(x):
    temp = np.exp(x)
    return temp / np.sum(temp, axis=1, keepdims=True)

# Hyperparameters
alpha = 2
iterations = 300
input_dim = 784
output_dim = 10
batch_size = 128

input_rows = 28
input_cols = 28

kernel_rows = 3
kernel_cols = 3
num_kernels = 16
hidden_size = ((input_rows - kernel_rows) * (input_cols - kernel_cols)) * num_kernels

print(f"hidden_size: {hidden_size}")

# Initialize the network.
kernels = 0.02 * np.random.random((kernel_rows*kernel_cols, num_kernels)) - 0.01
weights_1_2 = 0.2 * np.random.random((hidden_size, output_dim)) - 0.1

def get_image_section(layer, row_from, row_to, col_from, col_to):
    """ Select a subregion in a batch of images. """
    section = layer[:, row_from:row_to, col_from:col_to]
    return section.reshape(-1,1, row_to-row_from, col_to-col_from)

# Training loop
for j in range(iterations):
    correct_cnt = 0
    for i in range(int(len(images) / batch_size)):
        batch_start, batch_end = ((i * batch_size), ((i+1) * batch_size))
        input, target = images[batch_start:batch_end], labels[batch_start:batch_end]

        # Forward pass
        layer_0 = input
        layer_0 = layer_0.reshape(layer_0.shape[0], 28, 28)

        sections = list()
        for row_start in range(layer_0.shape[1] - kernel_rows):
            for col_start in range(layer_0.shape[2] - kernel_cols):
                section = get_image_section(layer_0,
                                            row_start,
                                            row_start + kernel_rows,
                                            col_start,
                                            col_start + kernel_rows)
                sections.append(section)

        expanded_input = np.concatenate(sections, axis=1)
        es = expanded_input.shape
        flattened_input = expanded_input.reshape(es[0]*es[1],-1)

        kernel_output = flattened_input.dot(kernels)
        layer_1 = tanh(kernel_output.reshape(es[0],-1))
        # Dropout
        dropout_mask = np.random.randint(2, size=layer_1.shape)
        layer_1 *= dropout_mask * 2
        layer_2 = softmax(np.dot(layer_1, weights_1_2))
        prediction = layer_2

        for k in range(batch_size):
            true_label = labels[batch_start+k:batch_start+k+1]
            pred_label = prediction[k:k+1]
            correct_cnt += int(np.argmax(pred_label) == np.argmax(true_label))

        # Back progation
        layer_2_delta = (target - prediction) / (batch_size * layer_2.shape[0])
        layer_1_delta = layer_2_delta.dot(weights_1_2.T) * tanh2deriv(layer_1)
        layer_1_delta *= dropout_mask

        # Update the weights
        weights_1_2 += alpha * layer_1.T.dot(layer_2_delta)
        lld_reshape = layer_1_delta.reshape(kernel_output.shape)
        k_update = flattened_input.T.dot(lld_reshape)
        kernels -= alpha * k_update

    # Every 50 iterations and the final iteration once finished training.
    if (j % 50 == 0 or j == iterations - 1):

        # Evaluate on the test set.
        test_correct_cnt = 0
        for i in range(int(len(test_images) / batch_size)):
            batch_start, batch_end = ((i * batch_size), ((i+1) * batch_size))
            input, target = test_images[batch_start:batch_end], test_labels[batch_start:batch_end]

            # Forward pass
            layer_0 = input
            layer_0 = layer_0.reshape(layer_0.shape[0],28,28)

            sections = list()

            for row_start in range(layer_0.shape[1] - kernel_rows):
                for col_start in range(layer_0.shape[2] - kernel_cols):
                    section = get_image_section(layer_0,
                                                row_start,
                                                row_start + kernel_rows,
                                                col_start,
                                                col_start + kernel_rows)
                    sections.append(section)

            expanded_input = np.concatenate(sections, axis=1)
            es = expanded_input.shape
            flattened_input = expanded_input.reshape(es[0]*es[1],-1)

            kernel_output = flattened_input.dot(kernels)
            layer_1 = tanh(kernel_output.reshape(es[0],-1))
            # Dropout
            dropout_mask = np.random.randint(2, size=layer_1.shape)
            layer_1 *= dropout_mask * 2
            layer_2 = softmax(np.dot(layer_1, weights_1_2))
            prediction = layer_2

            for k in range(batch_size):
                true_label = test_labels[batch_start+k:batch_start+k+1]
                pred_label = prediction[k:k+1]
                test_correct_cnt += int(np.argmax(pred_label) == np.argmax(true_label))

        print(f"I: {j} \t Training correct: {correct_cnt/float(len(y_train)):.4f} \t Test correct: {test_correct_cnt/float(len(y_test)):.4f}")



Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
hidden_size: 10000
I: 0 	 Training correct: 0.0009 	 Test correct: 0.0444
I: 50 	 Training correct: 0.0070 	 Test correct: 0.4642
I: 100 	 Training correct: 0.0122 	 Test correct: 0.7449
I: 150 	 Training correct: 0.0128 	 Test correct: 0.7932
I: 200 	 Training correct: 0.0133 	 Test correct: 0.8115
I: 250 	 Training correct: 0.0132 	 Test correct: 0.8242
I: 299 	 Training correct: 0.0139 	 Test correct: 0.8250


# Two Convlutional Layers

In [66]:
import numpy as np
from keras.datasets import mnist
from typing import List

class Tensor (object):

    def __init__(self,data,
                 autograd=False,
                 creators=None,
                 creation_op=None,
                 id=None):

        self.data = np.array(data)
        self.autograd = autograd
        self.grad = None
        if(id is None):
            self.id = np.random.randint(0,100000)
        else:
            self.id = id

        self.creators = creators
        self.creation_op = creation_op
        self.children = {}

        if(creators is not None):
            for c in creators:
                if(self.id not in c.children):
                    c.children[self.id] = 1
                else:
                    c.children[self.id] += 1

    def all_children_grads_accounted_for(self):
        for id,cnt in self.children.items():
            if(cnt != 0):
                return False
        return True

    def backward(self,grad=None, grad_origin=None):
        if(self.autograd):

            if(grad is None):
                grad = Tensor(np.ones_like(self.data))

            if(grad_origin is not None):
                if(self.children[grad_origin.id] == 0):
                    raise Exception("cannot backprop more than once")
                else:
                    self.children[grad_origin.id] -= 1

            if(self.grad is None):
                self.grad = grad
            else:
                self.grad += grad

            # grads must not have grads of their own
            assert grad.autograd == False

            # only continue backpropping if there's something to
            # backprop into and if all gradients (from children)
            # are accounted for override waiting for children if
            # "backprop" was called on this variable directly
            if (self.creators is not None and
               (self.all_children_grads_accounted_for() or
                grad_origin is None)):

                if (self.creation_op == "add"):
                    self.creators[0].backward(self.grad, self)
                    self.creators[1].backward(self.grad, self)

                if (self.creation_op == "sub"):
                    self.creators[0].backward(Tensor(self.grad.data), self)
                    self.creators[1].backward(Tensor(self.grad.__neg__().data), self)

                if (self.creation_op == "mul"):
                    new = self.grad * self.creators[1]
                    self.creators[0].backward(new , self)
                    new = self.grad * self.creators[0]
                    self.creators[1].backward(new, self)

                if (self.creation_op == "mm"):
                    c0 = self.creators[0]
                    c1 = self.creators[1]
                    new = self.grad.mm(c1.transpose())
                    c0.backward(new)
                    new = self.grad.transpose().mm(c0).transpose()
                    c1.backward(new)

                if (self.creation_op == "transpose"):
                    self.creators[0].backward(self.grad.transpose())

                if ("sum" in self.creation_op):
                    dim = int(self.creation_op.split("_")[1])
                    self.creators[0].backward(self.grad.expand(dim,self.creators[0].data.shape[dim]))

                if ("expand" in self.creation_op):
                    dim = int(self.creation_op.split("_")[1])
                    self.creators[0].backward(self.grad.sum(dim))

                if(self.creation_op == "neg"):
                    self.creators[0].backward(self.grad.__neg__())

                if (self.creation_op == "sigmoid"):
                    ones = Tensor(np.ones_like(self.grad.data))
                    self.creators[0].backward(self.grad * (self * (ones - self)))

                if (self.creation_op == "tanh"):
                    ones = Tensor(np.ones_like(self.grad.data))
                    self.creators[0].backward(self.grad* (ones - (self * self)))

                if (self.creation_op == "relu"):
                    self.creators[0].backward(self.grad * (self > Tensor(0)))

                if (self.creation_op == "index_select"):
                    new_grad = np.zeros_like(self.creators[0].data)
                    indices_ = self.index_select_indices.data.flatten()
                    grad_ = grad.data.reshape(len(indices_), -1)
                    for i in range(len(indices_)):
                        new_grad[indices_[i]] += grad_[i]
                    self.creators[0].backward(Tensor(new_grad))

                if (self.creation_op == "cross_entropy"):
                    dx = self.softmax_output - self.target_dist
                    self.creators[0].backward(Tensor(dx))

                # Handling convolution (conv2d) operation
                if (self.creation_op == "conv2d"):
                    x, weight, bias = self.creators
                    batch_size, out_channels, out_height, out_width = self.grad.data.shape
                    _, in_channels, kernel_size, _ = weight.data.shape

                    # Initialize gradients
                    weight_grad = np.zeros_like(weight.data)
                    input_grad = np.zeros_like(x.data)
                    bias_grad = np.zeros_like(bias.data)

                    # Calculate padding
                    pad_h = max((out_height - 1) - (x.data.shape[2] - kernel_size), 0)
                    pad_w = max((out_width - 1) - (x.data.shape[3] - kernel_size), 0)
                    pad_top = pad_h // 2
                    pad_bottom = pad_h - pad_top
                    pad_left = pad_w // 2
                    pad_right = pad_w - pad_left

                    # Pad input if necessary
                    if pad_h > 0 or pad_w > 0:
                        x_padded = np.pad(x.data, ((0, 0), (0, 0), (pad_top, pad_bottom), (pad_left, pad_right)), 'constant')
                    else:
                        x_padded = x.data

                    # Compute gradients
                    for b in range(batch_size):
                        for c_out in range(out_channels):
                            for c_in in range(in_channels):
                                for h_out in range(out_height):
                                    h_in = h_out
                                    for w_out in range(out_width):
                                        w_in = w_out
                                        # Weight gradient
                                        weight_grad[c_out, c_in] += \
                                            self.grad.data[b, c_out, h_out, w_out] * \
                                            x_padded[b, c_in, h_in:h_in + kernel_size, w_in:w_in + kernel_size]

                                        # Input gradient
                                        input_grad[b, c_in, h_in:h_in + kernel_size, w_in:w_in + kernel_size] += \
                                            self.grad.data[b, c_out, h_out, w_out] * \
                                            weight.data[c_out, c_in]

                    # Bias gradient
                    bias_grad = self.grad.data.sum(axis=(0, 2, 3))

                    # Backpropagate gradients
                    x.backward(Tensor(input_grad))
                    weight.backward(Tensor(weight_grad))
                    bias.backward(Tensor(bias_grad))

                # Handling max pooling (maxpool2d)
                if self.creation_op == "maxpool2d":
                    x = self.creators[0]
                    batch_size, channels, height, width = x.data.shape
                    kernel_size = self.grad.data.shape[2]
                    stride = self.grad.data.shape[3]
                    input_grad = np.zeros_like(x.data)

                    # Backpropagate through max pooling
                    for b in range(batch_size):
                        for c in range(channels):
                            for h in range(0, height - kernel_size + 1, stride):
                                for w in range(0, width - kernel_size + 1, stride):
                                    patch = x.data[b, c, h:h + kernel_size, w:w + kernel_size]
                                    max_val = np.max(patch)
                                    grad_patch = (patch == max_val) * self.grad.data[b, c, h // stride, w // stride]
                                    input_grad[b, c, h:h + kernel_size, w:w + kernel_size] += grad_patch

                    x.backward(Tensor(input_grad))

                # Handling the flatten operation
                if self.creation_op == "flatten":
                    original_shape = self.creators[0].data.shape
                    self.creators[0].backward(Tensor(self.grad.data.reshape(original_shape)))

    @property
    def shape(self):
        return np.shape(self.data)

    def __gt__(self, other):
        if (self.autograd):
            return Tensor(self.data > other.data,
                          autograd=True,
                          creators=[self,other],
                          creation_op=">")
        return Tensor(self.data > other.data)

    def __add__(self, other):
        if (self.autograd and other.autograd):
            return Tensor(self.data + other.data,
                          autograd=True,
                          creators=[self,other],
                          creation_op="add")
        return Tensor(self.data + other.data)

    def __neg__(self):
        if (self.autograd):
            return Tensor(self.data * -1,
                          autograd=True,
                          creators=[self],
                          creation_op="neg")
        return Tensor(self.data * -1)

    def __sub__(self, other):
        if (self.autograd and other.autograd):
            return Tensor(self.data - other.data,
                          autograd=True,
                          creators=[self,other],
                          creation_op="sub")
        return Tensor(self.data - other.data)

    def __mul__(self, other):
        if (self.autograd and other.autograd):
            return Tensor(self.data * other.data,
                          autograd=True,
                          creators=[self,other],
                          creation_op="mul")
        return Tensor(self.data * other.data)

    def sum(self, dim):
        if (self.autograd):
            return Tensor(self.data.sum(dim),
                          autograd=True,
                          creators=[self],
                          creation_op="sum_"+str(dim))
        return Tensor(self.data.sum(dim))

    def expand(self, dim,copies):

        trans_cmd = list(range(0,len(self.data.shape)))
        trans_cmd.insert(dim,len(self.data.shape))
        new_data = self.data.repeat(copies).reshape(list(self.data.shape) + [copies]).transpose(trans_cmd)

        if (self.autograd):
            return Tensor(new_data,
                          autograd=True,
                          creators=[self],
                          creation_op="expand_"+str(dim))
        return Tensor(new_data)

    def transpose(self):
        if (self.autograd):
            return Tensor(self.data.transpose(),
                          autograd=True,
                          creators=[self],
                          creation_op="transpose")

        return Tensor(self.data.transpose())

    def mm(self, x):
        if (self.autograd):
            return Tensor(self.data.dot(x.data),
                          autograd=True,
                          creators=[self,x],
                          creation_op="mm")
        return Tensor(self.data.dot(x.data))

    def __repr__(self):
        return str(self.data.__repr__())

    def __str__(self):
        return str(self.data.__str__())

    def sigmoid(self):
        if (self.autograd):
            return Tensor(1 / (1 + np.exp(-self.data)), autograd=True, creators=[self], creation_op = "sigmoid")
        return Tensor(1 / (1 + np.exp(-self.data)))

    def tanh(self):
        if (self.autograd):
            return Tensor(np.tanh(self.data), autograd=True, creators=[self], creation_op = "tanh")
        return Tensor(np.tanh(self.data))

    def relu(self):
        if (self.autograd):
            return Tensor(np.maximum(self.data, 0), autograd=True, creators=[self], creation_op = "relu")
        return Tensor(np.maximum(self.data, 0))

    def index_select(self, indices):
        if (self.autograd):
            new = Tensor(self.data[indices.data], autograd=True, creators=[self], creation_op = "index_select")
            new.index_select_indices = indices
            return new
        return Tensor(self.data[indices.data])

    def cross_entropy(self, target_indices):
        """ Numerically stable cross entropy """
        temp = np.exp(self.data - np.max(self.data))
        softmax_output = temp / (np.sum(temp, axis=len(self.data.shape)-1, keepdims=True) + 1e-10)
        t = target_indices.data.flatten()
        p = softmax_output.reshape(len(t), -1)
        target_dist = np.eye(p.shape[1])[t]

        # Add a small epsilon to avoid log(0)
        epsilon = 1e-10
        loss = -(np.log(p + epsilon) * target_dist).sum(1).mean()

        if self.autograd:
            out = Tensor(loss, autograd=True, creators=[self], creation_op="cross_entropy")
            out.softmax_output = softmax_output
            out.target_dist = target_dist
            return out

        return Tensor(loss)

    def _extract_patches(self, input_data, kernel_size, stride=1, padding=0):
        """
        Extracts patches from the input tensor for convolution operations.

        Parameters:
        - input_data (numpy.ndarray): Input data of shape (batch_size, in_channels, height, width).
        - kernel_size (int): Size of the convolution kernel.
        - stride (int): Stride of the convolution. Default is 1.
        - padding (int): Zero-padding added to both sides of the input. Default is 0.

        Returns:
        - patches (numpy.ndarray): Extracted patches of shape
                                  (batch_size, out_height, out_width, in_channels, kernel_size, kernel_size).
        """
        batch_size, in_channels, height, width = input_data.shape

        # Apply padding if needed
        if padding > 0:
            input_data = np.pad(input_data,
                                ((0, 0), (0, 0), (padding, padding), (padding, padding)),
                                mode='constant')

        # Calculate output dimensions
        out_height = (height + 2 * padding - kernel_size) // stride + 1
        out_width = (width + 2 * padding - kernel_size) // stride + 1

        # Initialize an array to hold the patches
        patches = np.zeros((batch_size, out_height, out_width, in_channels, kernel_size, kernel_size))

        # Extract patches by sliding over the input tensor
        for b in range(batch_size):
            for c in range(in_channels):
                for i in range(0, out_height):
                    for j in range(0, out_width):
                        patch = input_data[b, c, i * stride:i * stride + kernel_size, j * stride:j * stride + kernel_size]
                        patches[b, i, j, c, :, :] = patch

        return patches

class Layer:
    def __init__(self):
        self.params: List[Tensor] = []

    def forward(self, inp: Tensor) -> Tensor:
        raise NotImplementedError

    def get_params(self) -> List[Tensor]:
        return self.params

class Conv2d(Layer):
    def __init__(self, in_channels: int, out_channels: int, kernel_size: int, stride: int = 1):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride

        # Initialize weights and bias
        k = 1.0 / (in_channels * kernel_size * kernel_size)
        kernel_shape = (out_channels, in_channels, kernel_size, kernel_size)
        self.weight = Tensor(np.random.uniform(-np.sqrt(k), np.sqrt(k), kernel_shape), autograd=True)
        self.bias = Tensor(np.zeros(out_channels), autograd=True)
        self.params = [self.weight, self.bias]

    def _extract_patches(self, x: np.ndarray) -> np.ndarray:
        """Extract patches from input for convolution"""
        batch_size, channels, height, width = x.shape
        out_height = (height - self.kernel_size) // self.stride + 1
        out_width = (width - self.kernel_size) // self.stride + 1

        patches = np.zeros((batch_size, out_height, out_width, channels,
                          self.kernel_size, self.kernel_size))

        for h in range(0, height - self.kernel_size + 1, self.stride):
            for w in range(0, width - self.kernel_size + 1, self.stride):
                patch = x[:, :, h:h+self.kernel_size, w:w+self.kernel_size]
                patches[:, h//self.stride, w//self.stride] = patch

        return patches

    def forward(self, x: Tensor) -> Tensor:
        batch_size = x.data.shape[0]
        patches = self._extract_patches(x.data)

        # Reshape patches for matrix multiplication
        patches_reshaped = patches.reshape(batch_size, -1,
                                         self.in_channels * self.kernel_size * self.kernel_size)
        weight_reshaped = self.weight.data.reshape(self.out_channels, -1)

        # Perform convolution as matrix multiplication
        output = np.zeros((batch_size, self.out_channels,
                          patches.shape[1], patches.shape[2]))

        for b in range(batch_size):
            out = patches_reshaped[b].dot(weight_reshaped.T)
            output[b] = out.reshape(patches.shape[1], patches.shape[2], -1).transpose(2, 0, 1)

        output += self.bias.data.reshape(1, -1, 1, 1)

        return Tensor(output, autograd=True, creators=[x, self.weight, self.bias],
                     creation_op="conv2d")

    def get_parameters(self) -> List[Tensor]:
        return [self.weight, self.bias]

class MaxPool2d(Layer):
    def __init__(self, kernel_size: int, stride: int = None):
        super().__init__()
        self.kernel_size = kernel_size
        self.stride = stride if stride is not None else kernel_size

    def forward(self, x: Tensor) -> Tensor:
        batch_size, channels, height, width = x.data.shape
        out_height = (height - self.kernel_size) // self.stride + 1
        out_width = (width - self.kernel_size) // self.stride + 1

        output = np.zeros((batch_size, channels, out_height, out_width))

        for b in range(batch_size):
            for c in range(channels):
                for h in range(0, height - self.kernel_size + 1, self.stride):
                    for w in range(0, width - self.kernel_size + 1, self.stride):
                        output[b, c, h//self.stride, w//self.stride] = np.max(
                            x.data[b, c, h:h+self.kernel_size, w:w+self.kernel_size])

        return Tensor(output, autograd=True, creators=[x], creation_op="maxpool2d")

    def get_parameters(self) -> List[Tensor]:
        return []

class Flatten(Layer):
    def forward(self, x: Tensor) -> Tensor:
        batch_size = x.data.shape[0]
        return Tensor(x.data.reshape(batch_size, -1), autograd=True,
                     creators=[x], creation_op="flatten")

    def get_parameters(self) -> List[Tensor]:
        return []

class SGD(object):
    def __init__(self, parameters, alpha=0.1):
        self.parameters = parameters
        self.alpha = alpha

    def zero(self):
        for p in self.parameters:
            p.grad.data *= 0

    def step(self, zero=True):
        for p in self.parameters:
            p.data -= p.grad.data * self.alpha

            if (zero):
                p.grad.data *= 0

class Layer(object):
    def __init__(self):
        self.parameters = list()

    def get_parameters(self):
        return self.parameters

class Linear(Layer):
    def __init__(self, n_inputs, n_outputs):
        super().__init__()
        W = np.random.randn(n_inputs, n_outputs) * np.sqrt(2.0 / (n_inputs))
        self.weight = Tensor(W, autograd=True)
        self.bias = Tensor(np.zeros(n_outputs), autograd=True)

        self.parameters.append(self.weight)
        self.parameters.append(self.bias)

    def forward(self, input):
        return input.mm(self.weight) + self.bias.expand(0, len(input.data))

class Sequential(Layer):
    def __init__(self, layers=list(), training=True):
        super().__init__()
        self.layers = layers
        self.training = training

    def train(self):
        self.training = True

    def eval(self):
        self.training = False

    def get_parameters(self):
        params = list()
        for l in self.layers:
            params += l.get_parameters()
        return params

    def forward(self, input):
        for l in self.layers:
            input = l.forward(input)
        return input

class Dropout(Layer):
    def __init__(self, p=0.5, training=True):
        super().__init__()
        self.p = p
        self.mask = None
        self.training = training

    def forward(self, input):
        # Only apply dropout when training.
        if self.training:
            # Multiply by 1 / (1 - p) to balance out the extra sensitivity.
            self.mask = np.random.binomial(1, 1-self.p, input.shape) / (1-self.p)
            return input * Tensor(self.mask, autograd=input.autograd)
        return input

    def backward(self, grad):
        return grad * self.mask

class CrossEntropyLoss(object):
    def __init__(self):
        super().__init__()

    def forward(self, input, target):
        return input.cross_entropy(target)

class MSELoss(Layer):

    def __init__(self):
        super().__init__()

    def forward(self, pred, target):
        return ((pred - target) * (pred - target)).sum(0)

class Tanh(Layer):
    def __init__(self):
        super().__init__()

    def forward(self, input):
        return input.tanh()

class Sigmoid(Layer):
    def __init__(self):
        super().__init__()

    def forward(self, input):
        return input.sigmoid()

class Relu(Layer):
    def __init__(self):
        super().__init__()

    def forward(self, input):
        return input.relu()

# Create the dataset.
(X_train, y_train), (X_test, y_test) = mnist.load_data()

# One hot encoding.
# y_train = np.eye(10)[y_train]
# y_test = np.eye(10)[y_test]

# Take the first 1000 samples.
X_train = X_train[:100]
y_train = y_train[:100]
X_test = X_test[:100]
y_test = y_test[:100]

# Normalize the data.
X_train = X_train / 255
X_test = X_test / 255

# Hyperparameters.
epochs = 5
batch_size = 32
learning_rate = 0.01

# Initialize the model.
model = Sequential([
    Conv2d(in_channels=1, out_channels=16, kernel_size=3),  # 28x28 -> 26x26
    MaxPool2d(kernel_size=2),  # 26x26 -> 13x13
    Relu(),
    Conv2d(in_channels=16, out_channels=32, kernel_size=3),  # 13x13 -> 11x11
    MaxPool2d(kernel_size=2),  # 11x11 -> 5x5
    Relu(),
    Dropout(p=0.1),
    Flatten(),
    Linear(32 * 5 * 5, 128),
    Linear(128, 10),
    Dropout(p=0.1),
    Relu(),
])

criterion = CrossEntropyLoss()
optimizer = SGD(model.get_parameters(), alpha=learning_rate)

n_samples = len(X_train)

# Training loop.
for epoch in range(epochs):
    print(f"epoch: {epoch}")
    total_loss = 0
    correct = 0

    model.train()
    # Iterate over the batches.
    for i in range(0, n_samples, batch_size):
        batch_x = X_train[i:i+batch_size]
        batch_y = y_train[i:i+batch_size]

        x = Tensor(batch_x.reshape(-1, 1, 28, 28), autograd=True)
        target = Tensor(batch_y, autograd=True)

        pred = model.forward(x)

        for j in range(len(pred.data)):
            pred_label = np.argmax(pred.data[j])
            if pred_label == batch_y[j]:
                correct += 1

        loss = criterion.forward(pred, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.data

    if i % 1 == 0:
        model.eval()
        # Evaluate on the test set.
        test_correct = 0
        for i in range(len(X_test)):
            x = Tensor(X_test[i].reshape(-1, 1, 28, 28), autograd=True)
            pred = model.forward(x)
            pred_label = np.argmax(pred.data)
            if pred_label == y_test[i]:
                test_correct += 1
        print(f"Train: {correct / len(X_train)} Test: {test_correct / len(X_test)}")

    print(f"Epoch {epoch+1}, Average Loss: {total_loss / (n_samples/batch_size):.4f}")

epoch: 0
Train: 0.07 Test: 0.15
Epoch 1, Average Loss: 2.9366
epoch: 1
Train: 0.11 Test: 0.08
Epoch 2, Average Loss: 2.9705
epoch: 2
Train: 0.13 Test: 0.09
Epoch 3, Average Loss: 2.9351
epoch: 3
Train: 0.16 Test: 0.12
Epoch 4, Average Loss: 2.8856
epoch: 4
Train: 0.18 Test: 0.12
Epoch 5, Average Loss: 2.7451


In [67]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# Define the CNN architecture
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Hyperparameters
batch_size = 64
learning_rate = 0.001
num_epochs = 10

# Data loading and transformations
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# Model, loss function, and optimizer
model = CNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        if batch_idx % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}')

# Testing the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for data, target in test_loader:
        outputs = model(data)
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()

accuracy = 100 * correct / total
print(f'Test Accuracy: {accuracy:.2f}%')

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
<urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate has expired (_ssl.c:1007)>

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:01<00:00, 7798467.28it/s] 


Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
<urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate has expired (_ssl.c:1007)>

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 1888525.54it/s]


Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
<urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate has expired (_ssl.c:1007)>

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 13273550.36it/s]


Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
<urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate has expired (_ssl.c:1007)>

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 5081496.07it/s]

Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw






Epoch [1/10], Step [1/938], Loss: 2.3006
Epoch [1/10], Step [101/938], Loss: 0.3920
Epoch [1/10], Step [201/938], Loss: 0.2025
Epoch [1/10], Step [301/938], Loss: 0.0853
Epoch [1/10], Step [401/938], Loss: 0.1538
Epoch [1/10], Step [501/938], Loss: 0.1636
Epoch [1/10], Step [601/938], Loss: 0.0883
Epoch [1/10], Step [701/938], Loss: 0.1175
Epoch [1/10], Step [801/938], Loss: 0.2090
Epoch [1/10], Step [901/938], Loss: 0.0457
Epoch [2/10], Step [1/938], Loss: 0.0742
Epoch [2/10], Step [101/938], Loss: 0.0204
Epoch [2/10], Step [201/938], Loss: 0.2195
Epoch [2/10], Step [301/938], Loss: 0.0675
Epoch [2/10], Step [401/938], Loss: 0.0487
Epoch [2/10], Step [501/938], Loss: 0.0655
Epoch [2/10], Step [601/938], Loss: 0.0456
Epoch [2/10], Step [701/938], Loss: 0.0888
Epoch [2/10], Step [801/938], Loss: 0.0446
Epoch [2/10], Step [901/938], Loss: 0.0525
Epoch [3/10], Step [1/938], Loss: 0.0120
Epoch [3/10], Step [101/938], Loss: 0.0134
Epoch [3/10], Step [201/938], Loss: 0.0073
Epoch [3/10], Ste