In [97]:
import numpy as np

## Functions

In [100]:
def sigmoid(z):
    """ Sigmoid activation function."""
    return 1 / (1 + np.exp(-z))

def sigmoid_prime(z):
        """Derivative of the sigmoid function."""
        return sigmoid(z) * (1 - sigmoid(z))

def mse(y_pred, y):
    """Mean Squared Error (MSE) cost function."""
    return np.mean((y_pred - y) ** 2)

def mse_prime(y_pred, y):
    """Derivative of the Mean Squared Error (MSE) cost function."""
    return 2 * (y_pred - y) / y.shape[1]
    
def softmax(z):
    """Softmax function: Converts input values into probabilities""" 
    exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
    return exp_z / np.sum(exp_z, axis=0, keepdims=True)

def softmax_prime(z):
    """Derivative of softmax"""
    s = softmax(z)
    return s * (1 - s)

def relu(z):
    """ReLU activation function."""
    return np.maximum(0, z)

def relu_prime(z):
    """Derivative of the ReLU function."""
    return (z > 0).astype(float)

def leaky_relu(z, a=0.01):
    """Leaky ReLU activation function."""
    return np.maximum(a * z, z)

def leaky_relu_prime(z, a=0.01):
    """Derivative of the Leaky ReLU function."""
    dz = np.ones_like(z)
    dz[z <= 0] = a
    return dz

def cross_entropy(y_pred, y):
    """Cross-entropy loss function."""
    m = y.shape[1]
    return -np.sum(y * np.log(y_pred + 1e-9)) / m

def cross_entropy_prime(y_pred, y):
    """Derivative of the cross-entropy loss with respect to the softmax input."""
    return y_pred - y

In [102]:
class Dense:
    def __init__(self, input_size, layer_size, activation, init=None):
        self.input_size = input_size
        self.layer_size = layer_size
        self.a_func = activation # Activation function. Tuple of (function, derivative)

        if init == "he":
            self.he_init()
        else:
            self.weights = np.random.randn(layer_size, input_size)
            self.biases = np.random.randn(layer_size, 1)

    
    def he_init(self):
        """He initialization of weights and biases"""
        std = np.sqrt(2. / self.input_size)
        self.weights = np.random.randn(self.layer_size, self.input_size) * std
        self.biases = np.zeros((self.layer_size, 1))


    def forward(self, X):
        """Forward pass through the layer."""
        self.X = X # Cache input X
        
        z = self.weights @ X + self.biases  # Weighted input
        self.z = z
        
        a = self.a_func[0](z)
        self.a = a
        
        return a
        

    def backward(self, dA):
        """
            Backward pass through the fully connected/dense layer.
            dA: gradient from next layer w.r.t  the post activation of this layer, shape (layer_size)
        """
        dz = dA * self.a_func[1](self.z)
        # Make sure the shape is correct
        #dz = dz.reshape(-1, 1)
        #dz = dz.reshape(self.layer_size, 1)
        self.dW = dz @ self.X.T
        self.db = dz
        
        dX = self.weights.T @ dz
        return dX
        


class Conv:
    def __init__(self, input_size, activation, kernel_size, num_filters=1, stride=1, padding=0, init=None):
        # Ensure input_size is a tuple of (height, width, channels)
        h, w, c = (list(input_size) + [1, 1, 1])[:3]
        self.input_size = (h, w, c)

        self.a_func = activation # Activation function. Tuple of (function, derivative)
        self.kernel_size = (*kernel_size, self.input_size[2]) # Input kernel size should be a tuple (height, width)
        self.num_filters = num_filters
        self.stride = stride
        self.padding = padding

        # Initialize weights and biases
        if init == "he":
            self.he_init()
        else:
            self.weights = np.random.randn(num_filters, *self.kernel_size)
            self.biases = np.random.randn(num_filters, 1)


    def he_init(self):
        """He initialization of weights and biases"""
        k_h, k_w, c = self.kernel_size
        fan_in = k_h * k_w * c
        std = np.sqrt(2. / fan_in)
        self.weights = np.random.randn(self.num_filters, k_h, k_w, c) * std
        self.biases = np.zeros((self.num_filters, 1))


    def forward(self, X):
        """Forward pass through the convolutional layer."""
        self.X = X # Cache input X
        K_h, K_w, _ = self.kernel_size # Kernel width and height
        stride = self.stride
        # Output width of convolution is (W−F+2P)/S+1 where W is width of input F is width of filter(kernel), P is the padding, and S is the stride. Same for height.
        # Here we assume no padding P=0.
        H_out = (X.shape[0] - K_h) // stride + 1
        W_out = (X.shape[1] - K_w) // stride + 1
        # Output size of the convolutional layer is (output_height, output_width, num_filters)
        self.a = np.zeros((H_out, W_out, self.num_filters))  # List to store activations.
        self.z = np.zeros((W_out, W_out, self.num_filters))  # List to store pre-activations

        W, b = self.weights, self.biases

        # Perform convolution
        for f in range(self.num_filters):
            for i in range(H_out):
                for j in range(W_out):
                    # Calculate the starting index for the region of interest
                    i0 = i * stride
                    j0 = j * stride
                    # Calculate the ending index for the region of interest
                    i1 = i0 + K_h
                    j1 = j0 + K_w
                    # Extract the region of interest
                    region = X[i0:i1, j0:j1, :] # Note: region is of shape (K_h, K_w, input_channels)
                    # Perform convolution operation and store the pre-activations and activations
                    z = np.sum(region * W[f]) + b[f]
                    self.z[i, j, f] = z.item()
                    # Apply activation function
                    a = self.a_func[0](z)
                    self.a[i, j, f] = a.item()

        return self.a
    

    def backward(self, dA):
        """
        dA: ∂L/∂a, shape (H_out, W_out, F)
        returns dX: ∂L/∂X, shape (H, W, C)
        """
        X = self.X  # (H, W, C)
        W, b = self.weights, self.biases
        K_h, K_w, _ = self.kernel_size
        stride = self.stride
        H_in, W_in, C = X.shape
        H_out, W_out, F = dA.shape

        dZ = dA * self.a_func[1](self.z)

        # backprop through activation: dZ = dA * σ'(z)
        dZ = dA * self.a_func[1](self.z)  # (H_out, W_out, F)

        dW = np.zeros_like(self.weights)  # (F, K_h, K_w, C)
        db = np.zeros_like(self.biases)   # (F, 1)
        dX = np.zeros_like(X)             # (H, W_in, C)

        for f in range(F):
            # bias gradient: sum over all positions
            db[f, 0] = np.sum(dZ[:, :, f])

            for i in range(H_out):
                for j in range(W_out):
                    i0 = i * stride
                    j0 = j * stride

                    i1 = i0 + K_h
                    j1 = j0 + K_w
                    
                    region = X[i0:i1, j0:j1, :]  # (K_h, K_w, C)

                    grad = dZ[i, j, f] # ∂L/∂z_{i,j,f}
                    # weight gradient
                    dW[f] += region * grad
                    # input gradient
                    dX[i0:i1, j0:j1, :] += W[f] * grad

        self.dW, self.db = dW, db
        return dX



class MaxPool:
    def __init__(self, input_size, kernel_size, stride=1):
        # Ensure input_size is a tuple of (height, width, number of input matricies)
        h, w, c = (list(input_size) + [1, 1, 1])[:3]
        self.input_size = (h, w, c)

        self.kernel_size = (*kernel_size, self.input_size[2]) # Kernel size should be a tuple (height, width)
        self.stride = stride


    def forward(self, X):
        """Forward pass through the convolutional layer."""
        # Ensure X is 3D: (H, W, C)
        if X.ndim == 2:
            # Assume (H, W) dimesions, add channel dimension
            X = X[:, :, np.newaxis]
        elif X.ndim != 3:
            raise ValueError(f"MaxPool layer expected 3D input (H,W,C), got shape {X.shape}")
        
        self.X = X # Cache input X
        K_h, K_w, _ = self.kernel_size # Kernel width and height
        stride = self.stride
        # Output width of convolution is (W−F+2P)/S+1 
        # where W is width of input F is width of filter(kernel), P is the padding, and S is the stride. Same for height.
        # Here we assume no padding P=0.
        H_out = (X.shape[0] - K_h) // stride + 1
        W_out = (X.shape[1] - K_w) // stride + 1
        # Output size of the convolutional layer is (output_height, output_width, num_filters)
        self.a = np.zeros((H_out, W_out, X.shape[2]))  # List to store activations.

        self.max_idx = {}  # store argmax positions

        for c in range(X.shape[2]):
            for i in range(H_out):
                for j in range(W_out):
                    # Calculate the starting index for the region of interest
                    i0 = i * stride
                    j0 = j * stride
                    # Calculate the ending index for the region of interest
                    i1 = i0 + K_h
                    j1 = j0 + K_w
                    # Extract the region of interest
                    region = X[i0:i1, j0:j1, c] # Note: region is of shape (K_h, K_w, input matricies(X.shape[2]))
                    # Extract indicies of maximum value in region of interest and store it
                    idx = np.unravel_index(np.argmax(region), region.shape)
                    self.max_idx[(i,j,c)] = (i0+idx[0], j0+idx[1], c)
                    # Extract maximum value from region and store it
                    self.a[i, j, c] = region[idx]

        return self.a
    

    def backward(self, dA):
        """
            Backward pass through the MaxPool layer. 
            dA: gradient from next layer, shape (H_out, W_out, C)
        """
        dX = np.zeros_like(self.X)
        H_out, W_out = dA.shape[0], dA.shape[1]
        
        for c in range(self.X.shape[2]):
            for i in range(H_out):
                for j in range(W_out):
                    # route gradient through max position
                    # other position's gradient is 0
                    pi, pj, pc = self.max_idx[(i,j,c)]
                    dX[pi,pj,pc] = dA[i,j,c]

        return dX



class Flatten:
    def forward(self, X):
        self.input_shape = X.shape
        return X.reshape(-1, 1)
    def backward(self, dA):
        return dA.reshape(self.input_shape)

## Neural network class

In [105]:
class NeuralNetwork:
    def __init__(self, layers, cost=None):
        """
        Args:
            layers (list): List of layer objects (Dense, Conv, MaxPool, etc.).
            cost (tuple): (cost_func, cost_prime) for the network.
        """
        self.layers = layers
        self.cost = cost  # (cost_func, cost_prime)

    
    def forward(self, X):
        a = X
        for layer in self.layers:
            a = layer.forward(a)
        self.output = a
        return a

    
    def backprop(self, y):
        """
        Backward pass through the network (assumes last layer is differentiable).
        Uses the cost function for loss derivative wrt network output.
        """
        # Compute dA for output layer
        cost_prime = self.cost[1]
        y = y.reshape(-1,1)
        dA = cost_prime(self.output, y)
        
        for layer in reversed(self.layers):
            dA = layer.backward(dA)
        # Gradients are stored as attributes in each layer for update

    
    def update_parameters(self, eta, batch_size):
        """
        For layers with weights/biases, update using stored gradients.
        """
        for layer in self.layers:
            if hasattr(layer, "weights") and hasattr(layer, "dW"):
                layer.weights -= eta * (layer.dW / batch_size)
            if hasattr(layer, "biases") and hasattr(layer, "db"):
                layer.biases -= eta * (layer.db / batch_size)
                

    def train(self, x_train, y_train, epochs, batch_size, eta, results=False, print_every=10):
        num_samples = len(x_train)

        for epoch in range(epochs):
            indices = np.random.permutation(num_samples)
            X_shuffled = [x_train[i] for i in indices]
            y_shuffled = [y_train[i] for i in indices]

            if results:
                epoch_outputs, epoch_labels = [], []

            batch_count = 0
            for i in range(0, num_samples, batch_size):
                X_batch = X_shuffled[i:i+batch_size]
                y_batch = y_shuffled[i:i+batch_size]

                # Zero gradients for all layers before batch
                for layer in self.layers:
                    if hasattr(layer, "dW"):
                        layer.dW = np.zeros_like(layer.weights)
                    if hasattr(layer, "db"):
                        layer.db = np.zeros_like(layer.biases)

                if results:
                    batch_outputs, batch_labels = [], []

                
                for x, y in zip(X_batch, y_batch):
                    out = self.forward(x)
                    self.backprop(y)

                    if results:
                        epoch_outputs.append(out.flatten())
                        epoch_labels.append(y.flatten())
                    
                        batch_outputs.append(out.flatten())
                        batch_labels.append(y.flatten())

                # Update parameters after batch
                self.update_parameters(eta, len(X_batch))
                batch_count += 1

                # Print batch results at the specified interval
                if results and (batch_count % print_every == 0):
                    batch_outputs = np.array(batch_outputs)
                    batch_labels = np.array(batch_labels)
                    batch_loss = self.cost[0](batch_outputs, batch_labels)
                    batch_preds = np.argmax(batch_outputs, axis=1)
                    batch_true = np.argmax(batch_labels, axis=1)
                    batch_acc = np.mean(batch_preds == batch_true)
                    print(f"Epoch {epoch+1}, Batch {batch_count}, Loss: {batch_loss:.4f}, Accuracy: {batch_acc * 100:.2f}%")

            if results and epoch_outputs:
                epoch_outputs = np.array(epoch_outputs)
                epoch_labels = np.array(epoch_labels)
                loss = self.cost[0](epoch_outputs, epoch_labels)
                predictions = np.argmax(epoch_outputs, axis=1)
                true_labels = np.argmax(epoch_labels, axis=1)
                accuracy = np.mean(predictions == true_labels)
                print(f"Epoch {epoch+1}/{epochs}, Loss: {loss:.4f}, Accuracy: {accuracy * 100:.2f}%")


## Training

### Load MNIST dataset and seperate test and train data

In [109]:
import tensorflow as tf
from tensorflow.keras.datasets import mnist

# Load MNIST dataset
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Normalize the data to the range [0, 1]
x_train = x_train / 255.0
x_test = x_test / 255.0

# Add channel dimension: (28, 28) -> (28, 28, 1)
x_train = x_train[..., np.newaxis]
x_test = x_test[..., np.newaxis]

# Convert labels to one-hot encoding
y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)

### Initialize neural network and train it

In [114]:
layers = [
    Conv((28,28,1), (relu, relu_prime), (3,3), 6, init="he"),
    MaxPool((26,26,6), (2,2), stride=2),
    Conv((13,13,6), (relu, relu_prime), (3,3), 6, init="he"),
    MaxPool((11,11,6), (2,2), stride=2),
    Flatten(),
    Dense(150, 100, (relu, relu_prime), init="he"),
    Dense(100, 80, (relu, relu_prime), init="he"),
    Dense(80, 10, (softmax, softmax_prime), init="he")
]

nn = NeuralNetwork(layers, cost=(cross_entropy, cross_entropy_prime))

nn.train(x_train, y_train, epochs=5, batch_size=128, eta=0.5, results=True)

Epoch 1, Batch 10, Loss: 31.7894, Accuracy: 13.28%
Epoch 1, Batch 20, Loss: 32.2607, Accuracy: 13.28%
Epoch 1, Batch 30, Loss: 33.3363, Accuracy: 9.38%
Epoch 1, Batch 40, Loss: 32.0672, Accuracy: 11.72%
Epoch 1, Batch 50, Loss: 31.3374, Accuracy: 15.62%
Epoch 1, Batch 60, Loss: 30.4711, Accuracy: 17.19%
Epoch 1, Batch 70, Loss: 31.0999, Accuracy: 16.41%
Epoch 1, Batch 80, Loss: 31.0843, Accuracy: 14.06%
Epoch 1, Batch 90, Loss: 30.6520, Accuracy: 15.62%
Epoch 1, Batch 100, Loss: 31.7822, Accuracy: 16.41%
Epoch 1, Batch 110, Loss: 30.4703, Accuracy: 16.41%
Epoch 1, Batch 120, Loss: 30.3300, Accuracy: 17.19%
Epoch 1, Batch 130, Loss: 30.1218, Accuracy: 21.09%
Epoch 1, Batch 140, Loss: 31.0901, Accuracy: 16.41%
Epoch 1, Batch 150, Loss: 30.9981, Accuracy: 17.97%
Epoch 1, Batch 160, Loss: 29.3098, Accuracy: 24.22%
Epoch 1, Batch 170, Loss: 30.5865, Accuracy: 14.84%
Epoch 1, Batch 180, Loss: 29.6017, Accuracy: 19.53%
Epoch 1, Batch 190, Loss: 31.1430, Accuracy: 11.72%
Epoch 1, Batch 200, Lo