In [1]:
import numpy as np

class Conv2d:
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        # Handle kernel_size as tuple or single value
        if isinstance(kernel_size, int):
            self.kernel_size = (kernel_size, kernel_size)
        else:
            self.kernel_size = kernel_size
            
        # Handle stride as tuple or single value
        if isinstance(stride, int):
            self.stride = (stride, stride)
        else:
            self.stride = stride
            
        # Handle padding as tuple or single value
        if isinstance(padding, int):
            self.padding = (padding, padding)
        else:
            self.padding = padding
            
        self.in_channels = in_channels
        self.out_channels = out_channels
        
        # Initialize weights and bias
        self.weights = np.random.randn(
            self.kernel_size[0], self.kernel_size[1], in_channels, out_channels) * 0.01
        self.bias = np.zeros((1, out_channels))
        
        # For storing inputs and outputs needed in backpropagation
        self.x = None
        self.padded_x = None
        
    def forward(self, x):
        """
        Forward pass of 2D convolution.
        
        Args:
            x: Input tensor of shape (batch_size, in_channels, height, width) for NCHW format
               or (batch_size, height, width, in_channels) for NHWC format
               
        Returns:
            Output tensor after convolution
        """
        # Store input for backpropagation
        self.x = x
        
        batch_size, in_h, in_w, in_channels = x.shape  # NHWC format
        
        # Calculate output dimensions
        out_h = (in_h + 2 * self.padding[0] - self.kernel_size[0]) // self.stride[0] + 1
        out_w = (in_w + 2 * self.padding[1] - self.kernel_size[1]) // self.stride[1] + 1
        
        # Initialize output tensor
        output = np.zeros((batch_size, out_h, out_w, self.out_channels))
        
        # Apply padding if needed
        if self.padding[0] > 0 or self.padding[1] > 0:
            padded_x = np.pad(
                x,
                ((0, 0), (self.padding[0], self.padding[0]), (self.padding[1], self.padding[1]), (0, 0)),
                mode='constant'
            )
        else:
            padded_x = x
            
        self.padded_x = padded_x
        
        # Perform convolution
        for b in range(batch_size):
            for i in range(out_h):
                for j in range(out_w):
                    # Define the current window position
                    h_start = i * self.stride[0]
                    h_end = h_start + self.kernel_size[0]
                    w_start = j * self.stride[1]
                    w_end = w_start + self.kernel_size[1]
                    
                    # Extract the current window
                    window = padded_x[b, h_start:h_end, w_start:w_end, :]
                    
                    # Compute convolution for all output channels
                    for m in range(self.out_channels):
                        output[b, i, j, m] = np.sum(window * self.weights[:, :, :, m]) + self.bias[0, m]
                        
        return output
    
    def backward(self, delta):
        """
        Backward pass of 2D convolution.
        
        Args:
            delta: Gradient from the next layer, shape matching output of forward
            
        Returns:
            Gradient with respect to input
        """
        batch_size, out_h, out_w, out_channels = delta.shape
        _, in_h, in_w, _ = self.x.shape
        
        # Initialize gradients
        dx = np.zeros_like(self.x)
        dw = np.zeros_like(self.weights)
        db = np.zeros_like(self.bias)
        
        # Compute bias gradient
        for m in range(self.out_channels):
            db[0, m] = np.sum(delta[:, :, :, m])
        
        # Compute weight gradient and input gradient
        for b in range(batch_size):
            for i in range(out_h):
                for j in range(out_w):
                    # Define the current window position
                    h_start = i * self.stride[0]
                    h_end = h_start + self.kernel_size[0]
                    w_start = j * self.stride[1]
                    w_end = w_start + self.kernel_size[1]
                    
                    # Update weight gradients
                    for m in range(self.out_channels):
                        window = self.padded_x[b, h_start:h_end, w_start:w_end, :]
                        dw[:, :, :, m] += window * delta[b, i, j, m]
                        
                    # Update input gradients (if padding is 0, otherwise need to handle padding)
                    if self.padding[0] == 0 and self.padding[1] == 0:
                        for m in range(self.out_channels):
                            dx[b, h_start:h_end, w_start:w_end, :] += self.weights[:, :, :, m] * delta[b, i, j, m]
        
        # If padding was used, we need to compute the gradient for the original input
        if self.padding[0] > 0 or self.padding[1] > 0:
            # Compute padded gradient
            dx_padded = np.zeros_like(self.padded_x)
            
            for b in range(batch_size):
                for i in range(out_h):
                    for j in range(out_w):
                        h_start = i * self.stride[0]
                        h_end = h_start + self.kernel_size[0]
                        w_start = j * self.stride[1]
                        w_end = w_start + self.kernel_size[1]
                        
                        for m in range(self.out_channels):
                            dx_padded[b, h_start:h_end, w_start:w_end, :] += self.weights[:, :, :, m] * delta[b, i, j, m]
            
            # Extract the non-padded portion
            dx = dx_padded[:, self.padding[0]:self.padding[0]+in_h, 
                         self.padding[1]:self.padding[1]+in_w, :]
        
        return dx, dw, db
    
    def update(self, dw, db, learning_rate):
        """
        Update weights and biases.
        
        Args:
            dw: Weight gradients
            db: Bias gradients
            learning_rate: Learning rate for parameter update
        """
        self.weights -= learning_rate * dw
        self.bias -= learning_rate * db

In [2]:
import numpy as np

def verify_cnn_forward():
    # Input data (1,1,4,4) - Converting to NHWC format (1,4,4,1)
    x = np.array([[[[1], [2], [3], [4]],
                   [[5], [6], [7], [8]],
                   [[9], [10], [11], [12]],
                   [[13], [14], [15], [16]]]], dtype=np.float32)
    
    # Filter weights (2,1,3,3) - NHWC format
    w = np.array([[[[0.]], [[0.]], [[0.]]], 
                  [[[0.]], [[1.]], [[0.]]], 
                  [[[0.]], [[-1.]], [[0.]]]])
    
    # Second filter
    w2 = np.array([[[[0.]], [[0.]], [[0.]]], 
                   [[[0.]], [[-1.]], [[1.]]], 
                   [[[0.]], [[0.]], [[0.]]]])
    
    # Combine filters to (2,3,3,1)
    weights = np.stack([w, w2], axis=3)
    weights = weights.reshape(3, 3, 1, 2)
    
    # Create Conv2d layer
    conv = Conv2d(in_channels=1, out_channels=2, kernel_size=3, stride=1, padding=0)
    conv.weights = weights
    conv.bias = np.zeros((1, 2))
    
    # Forward pass
    output = conv.forward(x)
    print("Forward pass result:")
    print(output)
    
    # Expected output: [[[[-4, 1], [-4, 1]], [[-4, 1], [-4, 1]]]]
    
    # Backward pass
    delta = np.array([[[[-4, 1], [-4, -7]], [[10, 1], [11, -11]]]])
    dx, dw, db = conv.backward(delta)
    
    # Expected output: array([[-5, 4], [13, 27]])
    # This is a simplified representation of the full gradient
    print("\nBackward pass result:")
    print(dx)

verify_cnn_forward()

Forward pass result:
[[[[-4.  1.]
   [-4.  1.]]

  [[-4.  1.]
   [-4.  1.]]]]

Backward pass result:
[[[[  0.]
   [  0.]
   [  0.]
   [  0.]]

  [[  0.]
   [ -5.]
   [  4.]
   [ -7.]]

  [[  0.]
   [ 13.]
   [ 27.]
   [-11.]]

  [[  0.]
   [-10.]
   [-11.]
   [  0.]]]]


In [3]:
def calculate_output_size(input_size, padding, filter_size, stride):
    """
    Calculate the output size after convolution.
    
    Args:
        input_size: Tuple of (height, width) of input
        padding: Tuple of (padding_h, padding_w)
        filter_size: Tuple of (filter_h, filter_w)
        stride: Tuple of (stride_h, stride_w)
        
    Returns:
        Tuple of (output_height, output_width)
    """
    output_height = (input_size[0] + 2 * padding[0] - filter_size[0]) // stride[0] + 1
    output_width = (input_size[1] + 2 * padding[1] - filter_size[1]) // stride[1] + 1
    
    return (output_height, output_width)

# Test the function
input_size = (28, 28)
padding = (0, 0)
filter_size = (3, 3)
stride = (1, 1)

output_size = calculate_output_size(input_size, padding, filter_size, stride)
print(f"Input size: {input_size}")
print(f"Output size: {output_size}")

Input size: (28, 28)
Output size: (26, 26)


In [4]:
import numpy as np

class MaxPool2D:
    def __init__(self, pool_size, stride=None):
        # Handle pool_size as tuple or single value
        if isinstance(pool_size, int):
            self.pool_size = (pool_size, pool_size)
        else:
            self.pool_size = pool_size
            
        # If stride is not specified, set it equal to pool_size
        if stride is None:
            self.stride = self.pool_size
        elif isinstance(stride, int):
            self.stride = (stride, stride)
        else:
            self.stride = stride
            
        # For storing indices of max values for backpropagation
        self.max_indices = None
        self.x_shape = None
        
    def forward(self, x):
        """
        Forward pass of max pooling.
        
        Args:
            x: Input tensor of shape (batch_size, height, width, channels) for NHWC format
               
        Returns:
            Output tensor after max pooling
        """
        self.x_shape = x.shape
        batch_size, in_h, in_w, channels = x.shape
        
        # Calculate output dimensions
        out_h = (in_h - self.pool_size[0]) // self.stride[0] + 1
        out_w = (in_w - self.pool_size[1]) // self.stride[1] + 1
        
        # Initialize output tensor and indices of max values
        output = np.zeros((batch_size, out_h, out_w, channels))
        self.max_indices = np.zeros((batch_size, out_h, out_w, channels, 2), dtype=int)
        
        # Perform max pooling
        for b in range(batch_size):
            for i in range(out_h):
                for j in range(out_w):
                    # Define the current window position
                    h_start = i * self.stride[0]
                    h_end = h_start + self.pool_size[0]
                    w_start = j * self.stride[1]
                    w_end = w_start + self.pool_size[1]
                    
                    # Extract the current window
                    window = x[b, h_start:h_end, w_start:w_end, :]
                    
                    # Find max values for each channel
                    for c in range(channels):
                        window_channel = window[:, :, c]
                        max_idx = np.unravel_index(np.argmax(window_channel), window_channel.shape)
                        output[b, i, j, c] = window_channel[max_idx]
                        
                        # Store indices for backpropagation
                        self.max_indices[b, i, j, c] = [h_start + max_idx[0], w_start + max_idx[1]]
                        
        return output
    
    def backward(self, delta):
        """
        Backward pass of max pooling.
        
        Args:
            delta: Gradient from the next layer, shape matching output of forward
            
        Returns:
            Gradient with respect to input
        """
        batch_size, out_h, out_w, channels = delta.shape
        dx = np.zeros(self.x_shape)
        
        # Distribute gradients only to the positions that had max values during forward pass
        for b in range(batch_size):
            for i in range(out_h):
                for j in range(out_w):
                    for c in range(channels):
                        h_idx, w_idx = self.max_indices[b, i, j, c]
                        dx[b, h_idx, w_idx, c] += delta[b, i, j, c]
                        
        return dx

In [5]:
import numpy as np

class AveragePool2D:
    def __init__(self, pool_size, stride=None):
        # Handle pool_size as tuple or single value
        if isinstance(pool_size, int):
            self.pool_size = (pool_size, pool_size)
        else:
            self.pool_size = pool_size
            
        # If stride is not specified, set it equal to pool_size
        if stride is None:
            self.stride = self.pool_size
        elif isinstance(stride, int):
            self.stride = (stride, stride)
        else:
            self.stride = stride
            
        # For storing input shape for backpropagation
        self.x_shape = None
        self.pool_positions = None
        
    def forward(self, x):
        """
        Forward pass of average pooling.
        
        Args:
            x: Input tensor of shape (batch_size, height, width, channels) for NHWC format
               
        Returns:
            Output tensor after average pooling
        """
        self.x_shape = x.shape
        batch_size, in_h, in_w, channels = x.shape
        
        # Calculate output dimensions
        out_h = (in_h - self.pool_size[0]) // self.stride[0] + 1
        out_w = (in_w - self.pool_size[1]) // self.stride[1] + 1
        
        # Initialize output tensor
        output = np.zeros((batch_size, out_h, out_w, channels))
        
        # Store pool positions for backprop
        self.pool_positions = []
        
        # Perform average pooling
        for b in range(batch_size):
            for i in range(out_h):
                for j in range(out_w):
                    # Define the current window position
                    h_start = i * self.stride[0]
                    h_end = h_start + self.pool_size[0]
                    w_start = j * self.stride[1]
                    w_end = w_start + self.pool_size[1]
                    
                    # Store positions for backprop
                    self.pool_positions.append((h_start, h_end, w_start, w_end))
                    
                    # Extract the current window and compute average
                    window = x[b, h_start:h_end, w_start:w_end, :]
                    output[b, i, j, :] = np.mean(window, axis=(0, 1))
                        
        return output
    
    def backward(self, delta):
        """
        Backward pass of average pooling.
        
        Args:
            delta: Gradient from the next layer, shape matching output of forward
            
        Returns:
            Gradient with respect to input
        """
        batch_size, out_h, out_w, channels = delta.shape
        dx = np.zeros(self.x_shape)
        
        # Distribute gradients evenly to all positions in each pooling window
        for b in range(batch_size):
            for i in range(out_h):
                for j in range(out_w):
                    # Get the positions of the current pooling window
                    idx = i * out_w + j
                    h_start, h_end, w_start, w_end = self.pool_positions[idx]
                    
                    # Number of elements in the pooling window
                    pool_size = (h_end - h_start) * (w_end - w_start)
                    
                    # Distribute gradient evenly
                    for c in range(channels):
                        dx[b, h_start:h_end, w_start:w_end, c] += delta[b, i, j, c] / pool_size
                        
        return dx

In [6]:
import numpy as np

class Flatten:
    def __init__(self):
        self.x_shape = None
        
    def forward(self, x):
        """
        Forward pass of flatten operation.
        
        Args:
            x: Input tensor of shape (batch_size, height, width, channels) for NHWC format
            
        Returns:
            Flattened tensor of shape (batch_size, height*width*channels)
        """
        self.x_shape = x.shape
        batch_size = x.shape[0]
        flattened_dim = np.prod(x.shape[1:])
        
        return x.reshape(batch_size, flattened_dim)
    
    def backward(self, delta):
        """
        Backward pass of flatten operation.
        
        Args:
            delta: Gradient from the next layer, shape matching output of forward
            
        Returns:
            Gradient with reshaped to match original input shape
        """
        return delta.reshape(self.x_shape)

In [7]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

class ReLU:
    def __init__(self):
        self.mask = None
        
    def forward(self, x):
        """
        Forward pass of ReLU activation.
        
        Args:
            x: Input tensor
            
        Returns:
            Output after ReLU activation
        """
        self.mask = (x > 0)
        return x * self.mask
    
    def backward(self, delta):
        """
        Backward pass of ReLU activation.
        
        Args:
            delta: Gradient from the next layer
            
        Returns:
            Gradient with respect to input
        """
        return delta * self.mask

class Softmax:
    def __init__(self):
        self.output = None
        
    def forward(self, x):
        """
        Forward pass of softmax activation.
        
        Args:
            x: Input tensor
            
        Returns:
            Output after softmax activation
        """
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        self.output = exp_x / np.sum(exp_x, axis=1, keepdims=True)
        return self.output
    
    def backward(self, delta):
        """
        Backward pass of softmax activation.
        
        Args:
            delta: Gradient from the next layer
            
        Returns:
            Gradient with respect to input
        """
        # For cross-entropy loss, this is typically simplified
        return delta

class Dense:
    def __init__(self, input_dim, output_dim):
        self.weights = np.random.randn(input_dim, output_dim) * 0.01
        self.bias = np.zeros((1, output_dim))
        self.x = None
        
    def forward(self, x):
        """
        Forward pass of fully connected layer.
        
        Args:
            x: Input tensor
            
        Returns:
            Output tensor
        """
        self.x = x
        return np.dot(x, self.weights) + self.bias
    
    def backward(self, delta):
        """
        Backward pass of fully connected layer.
        
        Args:
            delta: Gradient from the next layer
            
        Returns:
            Gradient with respect to input, weights, and bias
        """
        dx = np.dot(delta, self.weights.T)
        dw = np.dot(self.x.T, delta)
        db = np.sum(delta, axis=0, keepdims=True)
        return dx, dw, db
    
    def update(self, dw, db, learning_rate):
        """
        Update weights and biases.
        
        Args:
            dw: Weight gradients
            db: Bias gradients
            learning_rate: Learning rate for parameter update
        """
        self.weights -= learning_rate * dw
        self.bias -= learning_rate * db

class CrossEntropyLoss:
    def __init__(self):
        self.y_pred = None
        self.y_true = None
        
    def forward(self, y_pred, y_true):
        """
        Forward pass of cross-entropy loss.
        
        Args:
            y_pred: Predicted probabilities
            y_true: True labels (one-hot encoded)
            
        Returns:
            Cross-entropy loss
        """
        self.y_pred = y_pred
        self.y_true = y_true
        
        # Add small epsilon to avoid log(0)
        eps = 1e-10
        y_pred_safe = np.clip(y_pred, eps, 1 - eps)
        
        loss = -np.sum(y_true * np.log(y_pred_safe)) / y_true.shape[0]
        return loss
    
    def backward(self):
        """
        Backward pass of cross-entropy loss.
        
        Returns:
            Gradient with respect to predicted probabilities
        """
        # With softmax, this simplifies to (y_pred - y_true)
        return (self.y_pred - self.y_true) / self.y_true.shape[0]

class Scratch2dCNNClassifier:
    def __init__(self):
        # Define the model architecture
        self.conv1 = Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.relu1 = ReLU()
        self.pool1 = MaxPool2D(pool_size=2)
        
        self.conv2 = Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.relu2 = ReLU()
        self.pool2 = MaxPool2D(pool_size=2)
        
        self.flatten = Flatten()
        
        # 7x7 is the size after two 2x2 max pooling operations on 28x28 input
        self.fc1 = Dense(7*7*64, 128)
        self.relu3 = ReLU()
        
        self.fc2 = Dense(128, 10)
        self.softmax = Softmax()
        
        self.loss_fn = CrossEntropyLoss()
        
    def forward(self, x):
        """
        Forward pass through the entire network.
        
        Args:
            x: Input images
            
        Returns:
            Predicted probabilities
        """
        x = self.conv1.forward(x)
        x = self.relu1.forward(x)
        x = self.pool1.forward(x)
        
        x = self.conv2.forward(x)
        x = self.relu2.forward(x)
        x = self.pool2.forward(x)
        
        x = self.flatten.forward(x)
        
        x = self.fc1.forward(x)
        x = self.relu3.forward(x)
        
        x = self.fc2.forward(x)
        x = self.softmax.forward(x)
        
        return x
    
    def backward(self, y_true):
        """
        Backward pass through the entire network.
        
        Args:
            y_true: True labels (one-hot encoded)
            
        Returns:
            Loss value
        """
        # Calculate loss
        loss = self.loss_fn.forward(self.softmax.output, y_true)
        
        # Backward pass through the network
        delta = self.loss_fn.backward()
        
        delta = self.softmax.backward(delta)
        delta, dw_fc2, db_fc2 = self.fc2.backward(delta)
        
        delta = self.relu3.backward(delta)
        delta, dw_fc1, db_fc1 = self.fc1.backward(delta)
        
        delta = self.flatten.backward(delta)
        
        delta = self.pool2.backward(delta)
        delta = self.relu2.backward(delta)
        delta, dw_conv2, db_conv2 = self.conv2.backward(delta)
        
        delta = self.pool1.backward(delta)
        delta = self.relu1.backward(delta)
        delta, dw_conv1, db_conv1 = self.conv1.backward(delta)
        
        # Update parameters
        learning_rate = 0.01
        self.fc2.update(dw_fc2, db_fc2, learning_rate)
        self.fc1.update(dw_fc1, db_fc1, learning_rate)
        self.conv2.update(dw_conv2, db_conv2, learning_rate)
        self.conv1.update(dw_conv1, db_conv1, learning_rate)
        
        return loss
    
    def calculate_accuracy(self, y_pred, y_true):
        """
        Calculate accuracy.
        
        Args:
            y_pred: Predicted probabilities
            y_true: True labels (one-hot encoded)
            
        Returns:
            Accuracy
        """
        pred_classes = np.argmax(y_pred, axis=1)
        true_classes = np.argmax(y_true, axis=1)
        
        return np.mean(pred_classes == true_classes)
    
    def train(self, X_train, y_train, X_val, y_val, batch_size=32, epochs=5):
        """
        Train the model.
        
        Args:
            X_train: Training images
            y_train: Training labels (one-hot encoded)
            X_val: Validation images
            y_val: Validation labels (one-hot encoded)
            batch_size: Batch size for training
            epochs: Number of training epochs
            
        Returns:
            Training history
        """
        n_samples = X_train.shape[0]
        n_batches = n_samples // batch_size
        
        history = {
            'train_loss': [],
            'val_loss': [],
            'train_acc': [],
            'val_acc': []
        }
        
        for epoch in range(epochs):
            # Shuffle the training data
            indices = np.random.permutation(n_samples)
            X_train_shuffled = X_train[indices]
            y_train_shuffled = y_train[indices]
            
            epoch_loss = 0
            
            for batch in range(n_batches):
                start_idx = batch * batch_size
                end_idx = (batch + 1) * batch_size
                
                X_batch = X_train_shuffled[start_idx:end_idx]
                y_batch = y_train_shuffled[start_idx:end_idx]
                
                # Forward pass
                y_pred = self.forward(X_batch)
                
                # Backward pass
                batch_loss = self.backward(y_batch)
                epoch_loss += batch_loss
                
                # Print progress
                if batch % 10 == 0:
                    print(f"Epoch {epoch+1}/{epochs}, Batch {batch+1}/{n_batches}, Loss: {batch_loss:.4f}")
                
            # Calculate training accuracy
            y_pred_train = self.forward(X_train)
            train_acc = self.calculate_accuracy(y_pred_train, y_train)
            
            # Calculate validation loss and accuracy
            y_pred_val = self.forward(X_val)
            val_loss = self.loss_fn.forward(y_pred_val, y_val)
            val_acc = self.calculate_accuracy(y_pred_val, y_val)
            
            # Update history
            history['train_loss'].append(epoch_loss / n_batches)
            history['val_loss'].append(val_loss)
            history['train_acc'].append(train_acc)
            history['val_acc'].append(val_acc)
            
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {epoch_loss/n_batches:.4f}, Val Loss: {val_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")
        
        return history
    
    def predict(self, X):
        """
        Make predictions on new data.
        
        Args:
            X: Input images
            
        Returns:
            Predicted class indices
        """
        y_pred = self.forward(X)
        return np.argmax(y_pred, axis=1)

# Run training on MNIST dataset
def train_mnist_cnn():
    # Load MNIST dataset
    print("Loading MNIST dataset...")
    mnist = fetch_openml('mnist_784', version=1, cache=True,parser='auto' )
    
    # Prepare data
    num_samples = 2000
    X = mnist.data.astype('float32').values[:num_samples].reshape(-1, 28, 28, 1) / 255.0
    y = mnist.target.astype('int64').values[:num_samples]
    # X = mnist.data.astype('float32').values.reshape(-1, 28, 28, 1) / 255.0
    # y = mnist.target.astype('int64').values
    
    # One-hot encode labels
    encoder = OneHotEncoder(sparse_output=False)
    y_one_hot = encoder.fit_transform(y.reshape(-1, 1))
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y_one_hot, test_size=0.2, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
    
    # Create and train model
    print("Creating CNN model...")
    model = Scratch2dCNNClassifier()
    
    print("Starting training...")
    history = model.train(X_train, y_train, X_val, y_val, batch_size=16, epochs=2)
    
    # Evaluate on test set
    y_pred = model.forward(X_test)
    test_acc = model.calculate_accuracy(y_pred, y_test)
    print(f"Test accuracy: {test_acc:.4f}")
    
    return model, history, test_acc


if __name__ == "__main__":
    model, history, test_acc = train_mnist_cnn()

Loading MNIST dataset...
Creating CNN model...
Starting training...
Epoch 1/2, Batch 1/80, Loss: 2.3026
Epoch 1/2, Batch 11/80, Loss: 2.3025
Epoch 1/2, Batch 21/80, Loss: 2.3023
Epoch 1/2, Batch 31/80, Loss: 2.3022
Epoch 1/2, Batch 41/80, Loss: 2.3033
Epoch 1/2, Batch 51/80, Loss: 2.3032
Epoch 1/2, Batch 61/80, Loss: 2.3030
Epoch 1/2, Batch 71/80, Loss: 2.3031
Epoch 1/2, Train Loss: 2.3025, Val Loss: 2.3025, Train Acc: 0.1125, Val Acc: 0.1250
Epoch 2/2, Batch 1/80, Loss: 2.3001
Epoch 2/2, Batch 11/80, Loss: 2.3003
Epoch 2/2, Batch 21/80, Loss: 2.3037
Epoch 2/2, Batch 31/80, Loss: 2.2987
Epoch 2/2, Batch 41/80, Loss: 2.2997
Epoch 2/2, Batch 51/80, Loss: 2.2984
Epoch 2/2, Batch 61/80, Loss: 2.3070
Epoch 2/2, Batch 71/80, Loss: 2.3039
Epoch 2/2, Train Loss: 2.3020, Val Loss: 2.3024, Train Acc: 0.1125, Val Acc: 0.1250
Test accuracy: 0.1000


In [9]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

class LeNet:
    def __init__(self):
        # Define the model architecture based on LeNet but adapted for MNIST (28x28)
        
        # Layer 1: Convolution layer (C1)
        # 6 output channels, 5x5 filter size, stride 1
        self.conv1 = Conv2d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=0)
        self.relu1 = ReLU()
        
        # Layer 2: Pooling (S2)
        self.pool1 = MaxPool2D(pool_size=2, stride=2)
        
        # Layer 3: Convolution layer (C3)
        # 16 output channels, 5x5 filter size, stride 1
        self.conv2 = Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1, padding=0)
        self.relu2 = ReLU()
        
        # Layer 4: Pooling (S4)
        self.pool2 = MaxPool2D(pool_size=2, stride=2)
        
        # Layer 5: Flatten
        self.flatten = Flatten()
        
        # Calculate the size after convolutions and pooling
        # 28x28 -> Conv1(5x5) -> 24x24 -> Pool1(2x2) -> 12x12
        # 12x12 -> Conv2(5x5) -> 8x8 -> Pool2(2x2) -> 4x4
        # So, 4x4x16 = 256 features after flatten
        
        # Layer 6: Fully connected layer (F5)
        self.fc1 = Dense(input_dim=4*4*16, output_dim=120)
        self.relu3 = ReLU()
        
        # Layer 7: Fully connected layer (F6)
        self.fc2 = Dense(input_dim=120, output_dim=84)
        self.relu4 = ReLU()
        
        # Layer 8: Output layer (F7)
        self.fc3 = Dense(input_dim=84, output_dim=10)
        self.softmax = Softmax()
        
        self.loss_fn = CrossEntropyLoss()
        
    def forward(self, x):
        """
        Forward pass through the LeNet architecture.
        
        Args:
            x: Input images (batch_size, height, width, channels)
            
        Returns:
            Predicted probabilities for each class
        """
        # Layer 1: Convolution
        x = self.conv1.forward(x)
        x = self.relu1.forward(x)
        
        # Layer 2: Pooling
        x = self.pool1.forward(x)
        
        # Layer 3: Convolution
        x = self.conv2.forward(x)
        x = self.relu2.forward(x)
        
        # Layer 4: Pooling
        x = self.pool2.forward(x)
        
        # Layer 5: Flatten
        x = self.flatten.forward(x)
        
        # Layer 6: Fully connected
        x = self.fc1.forward(x)
        x = self.relu3.forward(x)
        
        # Layer 7: Fully connected
        x = self.fc2.forward(x)
        x = self.relu4.forward(x)
        
        # Layer 8: Output layer
        x = self.fc3.forward(x)
        x = self.softmax.forward(x)
        
        return x
    
    def backward(self, y_true):
        """
        Backward pass through the LeNet architecture.
        
        Args:
            y_true: True labels (one-hot encoded)
            
        Returns:
            Loss value
        """
        # Calculate loss
        loss = self.loss_fn.forward(self.softmax.output, y_true)
        
        # Backward pass through the network
        delta = self.loss_fn.backward()
        
        # Layer 8: Output layer
        delta = self.softmax.backward(delta)
        delta, dw_fc3, db_fc3 = self.fc3.backward(delta)
        
        # Layer 7: Fully connected
        delta = self.relu4.backward(delta)
        delta, dw_fc2, db_fc2 = self.fc2.backward(delta)
        
        # Layer 6: Fully connected
        delta = self.relu3.backward(delta)
        delta, dw_fc1, db_fc1 = self.fc1.backward(delta)
        
        # Layer 5: Flatten
        delta = self.flatten.backward(delta)
        
        # Layer 4: Pooling
        delta = self.pool2.backward(delta)
        
        # Layer 3: Convolution
        delta = self.relu2.backward(delta)
        delta, dw_conv2, db_conv2 = self.conv2.backward(delta)
        
        # Layer 2: Pooling
        delta = self.pool1.backward(delta)
        
        # Layer 1: Convolution
        delta = self.relu1.backward(delta)
        delta, dw_conv1, db_conv1 = self.conv1.backward(delta)
        
        # Update parameters
        learning_rate = 0.01
        
        self.fc3.update(dw_fc3, db_fc3, learning_rate)
        self.fc2.update(dw_fc2, db_fc2, learning_rate)
        self.fc1.update(dw_fc1, db_fc1, learning_rate)
        self.conv2.update(dw_conv2, db_conv2, learning_rate)
        self.conv1.update(dw_conv1, db_conv1, learning_rate)
        
        return loss
        
    def calculate_accuracy(self, y_pred, y_true):
        """
        Calculate accuracy.
        
        Args:
            y_pred: Predicted probabilities
            y_true: True labels (one-hot encoded)
            
        Returns:
            Accuracy
        """
        pred_classes = np.argmax(y_pred, axis=1)
        true_classes = np.argmax(y_true, axis=1)
        
        return np.mean(pred_classes == true_classes)
    
    def train(self, X_train, y_train, X_val, y_val, batch_size=32, epochs=5):
        """
        Train the model.
        
        Args:
            X_train: Training images
            y_train: Training labels (one-hot encoded)
            X_val: Validation images
            y_val: Validation labels (one-hot encoded)
            batch_size: Batch size for training
            epochs: Number of training epochs
            
        Returns:
            Training history
        """
        n_samples = X_train.shape[0]
        n_batches = n_samples // batch_size
        
        history = {
            'train_loss': [],
            'val_loss': [],
            'train_acc': [],
            'val_acc': []
        }
        
        for epoch in range(epochs):
            # Shuffle the training data
            indices = np.random.permutation(n_samples)
            X_train_shuffled = X_train[indices]
            y_train_shuffled = y_train[indices]
            
            epoch_loss = 0
            
            for batch in range(n_batches):
                start_idx = batch * batch_size
                end_idx = (batch + 1) * batch_size
                
                X_batch = X_train_shuffled[start_idx:end_idx]
                y_batch = y_train_shuffled[start_idx:end_idx]
                
                # Forward pass
                y_pred = self.forward(X_batch)
                
                # Backward pass
                batch_loss = self.backward(y_batch)
                epoch_loss += batch_loss
                
                # Print progress
                if batch % 10 == 0:
                    print(f"Epoch {epoch+1}/{epochs}, Batch {batch+1}/{n_batches}, Loss: {batch_loss:.4f}")
                
            # Calculate training accuracy
            y_pred_train = self.forward(X_train)
            train_acc = self.calculate_accuracy(y_pred_train, y_train)
            
            # Calculate validation loss and accuracy
            y_pred_val = self.forward(X_val)
            val_loss = self.loss_fn.forward(y_pred_val, y_val)
            val_acc = self.calculate_accuracy(y_pred_val, y_val)
            
            # Update history
            history['train_loss'].append(epoch_loss / n_batches)
            history['val_loss'].append(val_loss)
            history['train_acc'].append(train_acc)
            history['val_acc'].append(val_acc)
            
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {epoch_loss/n_batches:.4f}, Val Loss: {val_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")
        
        return history
    
    def predict(self, X):
        """
        Make predictions on new data.
        
        Args:
            X: Input images
            
        Returns:
            Predicted class indices
        """
        y_pred = self.forward(X)
        return np.argmax(y_pred, axis=1)

# Run training on MNIST dataset with LeNet
def train_mnist_lenet():
    # Load MNIST dataset
    print("Loading MNIST dataset...")
    mnist = fetch_openml('mnist_784', version=1, cache=True, parser='auto')
    
    # Prepare data
    num_samples = 2000
    X = mnist.data.astype('float32').values[:num_samples].reshape(-1, 28, 28, 1) / 255.0
    y = mnist.target.astype('int64').values[:num_samples]
    # X = mnist.data.astype('float32').values.reshape(-1, 28, 28, 1) / 255.0
    # y = mnist.target.astype('int64').values
    
    # One-hot encode labels
    encoder = OneHotEncoder(sparse_output=False)
    y_one_hot = encoder.fit_transform(y.reshape(-1, 1))
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y_one_hot, test_size=0.2, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
    
    # Create and train model
    print("Creating LeNet model...")
    model = LeNet()
    
    print("Starting training...")
    history = model.train(X_train, y_train, X_val, y_val, batch_size=16, epochs=2)
    
    # Evaluate on test set
    y_pred = model.forward(X_test)
    test_acc = model.calculate_accuracy(y_pred, y_test)
    print(f"Test accuracy: {test_acc:.4f}")
    
    return model, history, test_acc

if __name__ == "__main__":
    model, history, test_acc = train_mnist_lenet()

Loading MNIST dataset...
Creating LeNet model...
Starting training...
Epoch 1/2, Batch 1/80, Loss: 2.3026
Epoch 1/2, Batch 11/80, Loss: 2.3024
Epoch 1/2, Batch 21/80, Loss: 2.3036
Epoch 1/2, Batch 31/80, Loss: 2.3029
Epoch 1/2, Batch 41/80, Loss: 2.3006
Epoch 1/2, Batch 51/80, Loss: 2.3022
Epoch 1/2, Batch 61/80, Loss: 2.3050
Epoch 1/2, Batch 71/80, Loss: 2.3024
Epoch 1/2, Train Loss: 2.3026, Val Loss: 2.3025, Train Acc: 0.1125, Val Acc: 0.1250
Epoch 2/2, Batch 1/80, Loss: 2.3048
Epoch 2/2, Batch 11/80, Loss: 2.3015
Epoch 2/2, Batch 21/80, Loss: 2.2963
Epoch 2/2, Batch 31/80, Loss: 2.3052
Epoch 2/2, Batch 41/80, Loss: 2.3044
Epoch 2/2, Batch 51/80, Loss: 2.3026
Epoch 2/2, Batch 61/80, Loss: 2.3004
Epoch 2/2, Batch 71/80, Loss: 2.2964
Epoch 2/2, Train Loss: 2.3020, Val Loss: 2.3025, Train Acc: 0.1125, Val Acc: 0.1250
Test accuracy: 0.1000


### Problem 9: Research into Famous Image Recognition Models

Here's a brief overview of famous CNN architectures for image recognition:

Popular CNN architectures that are commonly available in deep learning frameworks include:

- **AlexNet (2012)**  
  The breakthrough model that won the ImageNet competition in 2012, significantly outperforming other approaches. It used ReLU activation, dropout for regularization, and data augmentation.

- **VGG16/VGG19 (2014)**  
  Known for its simplicity and depth, using small 3×3 convolution filters throughout the network with increasing depth.

- **GoogLeNet/Inception (2014)**  
  Introduced the inception module that uses parallel convolutions of different sizes to capture features at different scales.

- **ResNet (2015)**  
  Introduced residual connections (skip connections) to enable training of very deep networks (up to 152 layers), addressing the vanishing gradient problem.

- **DenseNet (2017)**  
  Each layer connects to every other layer in a feed-forward fashion, improving feature reuse and parameter efficiency.

- **MobileNet (2017)**  
  Designed for mobile/embedded applications with depthwise separable convolutions to reduce parameters and computation.

- **EfficientNet (2019)**  
  Uses compound scaling to uniformly scale network width, depth, and resolution for better efficiency.

- **Vision Transformer (ViT) (2020)**  
  Adapts transformer architecture from NLP to image classification by splitting images into patches.

---

Most deep learning frameworks provide pre-trained versions of these models:

- **PyTorch**: `torchvision.models`  
- **Keras/TensorFlow**: `keras.applications`  
- **TensorFlow**: `tensorflow.keras.applications`

These pre-trained models can be used for **transfer learning** on custom datasets, where the convolutional base is frozen and a new classifier head is trained on the target task.


### Problem 10: Calculating the Output Size and Number of Parameters

Let's calculate the output size and parameters for the given convolutional layers:

#### Case 1:
- **Input size**: 144×144, 3 channels  
- **Filter size**: 3×3, 6 channels  
- **Stride**: 1  
- **Padding**: None  

**Output size calculation**:
- Output height = (144 + 20 - 3) / 1 + 1 = 142  
- Output width = (144 + 20 - 3) / 1 + 1 = 142  
- **Output shape**: (142, 142, 6)

**Number of parameters**:
- Each filter: 3 × 3 × 3 = 27 weights (for each of the 6 output channels)  
- Total weights: 27 × 6 = 162  
- Bias terms: 1 × 6 = 6  
- **Total parameters**: 162 + 6 = 168  

---

#### Case 2:
- **Input size**: 60×60, 24 channels  
- **Filter size**: 3×3, 48 channels  
- **Stride**: 1  
- **Padding**: None  

**Output size calculation**:
- Output height = (60 + 20 - 3) / 1 + 1 = 58  
- Output width = (60 + 20 - 3) / 1 + 1 = 58  
- **Output shape**: (58, 58, 48)

**Number of parameters**:
- Each filter: 3 × 3 × 24 = 216 weights (for each of the 48 output channels)  
- Total weights: 216 × 48 = 10,368  
- Bias terms: 1 × 48 = 48  
- **Total parameters**: 10,368 + 48 = 10,416  

---

#### Case 3:
- **Input size**: 20×20, 10 channels  
- **Filter size**: 3×3, 20 channels  
- **Stride**: 2  
- **Padding**: None  

**Output size calculation**:
- Output height = (20 + 20 - 3) / 2 + 1 = 9.5 → 9 (truncated)  
- Output width = (20 + 20 - 3) / 2 + 1 = 9.5 → 9 (truncated)  
- **Output shape**: (9, 9, 20)

**Number of parameters**:
- Each filter: 3 × 3 × 10 = 90 weights (for each of the 20 output channels)  
- Total weights: 90 × 20 = 1,800  
- Bias terms: 1 × 20 = 20  
- **Total parameters**: 1,800 + 20 = 1,820  

In the last case, with a stride of 2, we can't perfectly cover the input. The framework would typically ignore the extra pixels at the edges, resulting in a 9×9 output feature map.


### Problem 11: Investigation into Filter Size

#### Why are 3×3 filters more commonly used than larger ones like 7×7?

There are several reasons why 3×3 filters are preferred over larger filters like 7×7:

1. **Parameter Efficiency**:  
   Stacking multiple 3×3 convolutions can achieve the same receptive field as a single large filter while using fewer parameters. For example, three stacked 3×3 filters have a 7×7 effective receptive field but only 27 parameters per channel (3×3×3), compared to 49 parameters (7×7) for a single 7×7 filter.

2. **More Non-linearity**:  
   With stacked smaller filters, we can include more activation functions (ReLU) between layers, introducing more non-linearity to the network. This helps the network learn more complex features.

3. **Computational Efficiency**:  
   Multiple 3×3 convolutions are often more computationally efficient than a single large convolution due to optimization in modern deep learning frameworks.

4. **Regularization Effect**:  
   The additional non-linearities added by stacking smaller filters act as implicit regularization, improving the network's generalization.

5. **Flexibility**:  
   Using stacked small filters allows the network to learn hierarchical representations, where early layers capture simple features and later layers combine them into complex ones.

6. **Historical Success**:  
   The VGG networks demonstrated the effectiveness of using exclusively 3×3 filters, influencing subsequent architectures.

---

#### The Effect of a 1×1 Filter with No Height or Width

Despite their small size, 1×1 filters play important roles in CNN architectures:

1. **Dimensionality Reduction**:  
   The primary use is to reduce the number of channels/feature maps. This is often called a "bottleneck layer" and significantly reduces computational cost.

2. **Cross-Channel Information Flow**:  
   1×1 convolutions mix information across channels while maintaining spatial dimensions, essentially performing a pointwise transformation.

3. **Adding Non-linearity**:  
   When followed by an activation function, 1×1 convolutions add non-linearity to the network without changing spatial dimensions.

4. **Network-in-Network Architecture**:  
   They enable the creation of "micro-networks" within each convolution layer, increasing the representational power.

5. **Parameter Reduction**:  
   In architectures like Inception modules, they're used before larger filters to reduce input channels, decreasing the total parameter count.

6. **Fixed Spatial Resolution**:  
   When you want to maintain spatial resolution but need to manipulate the feature space, 1×1 convolutions are ideal.

7. **Computational Efficiency**:  
   They require significantly fewer computations than larger filters, making networks more efficient.
