# 📘 Assignment Roadmap: CNN Architecture Implementation

This roadmap follows the official assignment README step by step.  
Each part lists the requirements and our implementation progress.  

---

## ✅ Part 1: CNN Layer Implementation (40 points)
- [x] **Conv2D Layer** → forward + backward, stride & padding, He initialization  
- [x] **MaxPool2D Layer** → forward + backward, gradient routing  
- [x] **Flatten Layer**  
- [x] **Dense Layer (Fully Connected)**  
- [x] **ReLU Activation**  
- [x] **Dropout / Dropout2D**  
- [x] **BatchNorm2D**  

---

## ✅ Part 2: CNN Architectures (30 points)
- [x] **LeNet-5** → implemented with BatchNorm  
- [x] **Mini-VGG** → implemented with BatchNorm + Dropout  

---

## ✅ Part 3: CIFAR-10 Classification (20 points)

### 3.1 Data Preprocessing
- [x] Data normalization ([0,1] scaling)  
- [x] Data augmentation (flip, rotation, shift)  
- [ ] Cropping augmentation (explicit in README, but skipped here)  
- [x] Train/validation/test split  

### 3.2 Training
- [x] Training loop with forward → loss → backward → update  
- [x] SGD optimizer  
- [x] Cross-entropy loss with softmax  
- [x] Learning rate scheduling (step decay scheduler)  
- [ ] Plot training/validation metrics (loss & accuracy curves)  

### 3.3 Evaluation
- [x] Accuracy calculation  
- [x] Confusion matrix  
- [x] Per-class accuracy  
- [x] Confusion matrix visualization  

---

## ✅ Part 4: Feature Visualization (10 points)
- [x] Filter visualization (first Conv layer filters)  
- [x] Feature map visualization (activations at chosen layers)  

---


# Required Imports

In [None]:
# ==============================
# Required Imports
# ==============================
import numpy as np   # NumPy is used for array operations
from typing import Tuple  # for type hints
from tensorflow.keras.datasets import cifar10
from scipy.ndimage import rotate
import matplotlib.pyplot as plt
import seaborn as sns

# Part 1: CNN Layer Implementation

In [20]:


# ==============================
# Base Layer Class
# ==============================
class Layer:
    """
    Base class for all neural network layers.
    Every layer must have forward() and backward() methods.
    """
    def __init__(self):
        self.trainable = True        # if the layer has weights (Conv, Dense)
        self.params = {}             # store weights
        self.grads = {}              # store gradients
        self.cache = {}              # store intermediate values for backprop

    def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray:
        raise NotImplementedError

    def backward(self, grad_output: np.ndarray) -> np.ndarray:
        raise NotImplementedError



# ==============================
# Conv2D Layer
# ==============================
class Conv2D(Layer):
    """
    2D Convolution Layer

    Parameters:
    -----------
    in_channels : int   → Number of input channels (e.g., 3 for RGB images)
    out_channels : int  → Number of filters (output channels)
    kernel_size : int   → Size of the convolution filter (e.g., 3 for 3x3)
    stride : int        → How much the filter moves each step
    padding : str       → 'same' (keep size same) or 'valid' (no padding)
    """

    def __init__(self, in_channels: int, out_channels: int,
                 kernel_size: int = 3, stride: int = 1,
                 padding: str = 'same'):
        super().__init__()

        # Save layer configuration
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = (kernel_size, kernel_size)  # assume square filters
        self.stride = stride
        self.padding = padding

        # He initialization: good for ReLU networks
        kh, kw = self.kernel_size
        scale = np.sqrt(2.0 / (in_channels * kh * kw))

        # Initialize weights and biases
        # W shape: (out_channels, in_channels, kernel_h, kernel_w)
        self.params['W'] = np.random.randn(out_channels, in_channels, kh, kw) * scale
        # Bias for each filter
        self.params['b'] = np.zeros((out_channels, 1))

        # Cache for storing intermediate values for backward
        self.cache = {}

    def _get_pad_width(self, input_shape: Tuple[int, ...]) -> Tuple[int, int]:
        """
        Calculate padding for 'same' or 'valid'.
        """
        _, _, H, W = input_shape
        kh, kw = self.kernel_size

        if self.padding == 'same':
            # Formula ensures output size ≈ input size
            pad_h = max((np.ceil(H / self.stride) - 1) * self.stride + kh - H, 0)
            pad_w = max((np.ceil(W / self.stride) - 1) * self.stride + kw - W, 0)
            return int(pad_h // 2), int(pad_w // 2)
        elif self.padding == 'valid':
            return 0, 0
        else:
            raise ValueError("Padding must be 'same' or 'valid'")

    def _pad_input(self, x: np.ndarray, pad_h: int, pad_w: int) -> np.ndarray:
        """
        Pad input with zeros around the border.
        x shape: (batch, channels, height, width)
        """
        return np.pad(x, ((0, 0), (0, 0), (pad_h, pad_h), (pad_w, pad_w)), mode='constant')

    def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray:
        """
        Forward pass of convolution.
        Input shape:  (batch, in_channels, H, W)
        Output shape: (batch, out_channels, H_out, W_out)
        """
        batch_size, _, H, W = x.shape
        kh, kw = self.kernel_size
        stride = self.stride
        pad_h, pad_w = self._get_pad_width(x.shape)

        # Pad input if necessary
        x_padded = self._pad_input(x, pad_h, pad_w)

        # Calculate output dimensions
        H_out = (H + 2*pad_h - kh) // stride + 1
        W_out = (W + 2*pad_w - kw) // stride + 1

        # Allocate output tensor
        out = np.zeros((batch_size, self.out_channels, H_out, W_out))

        # Convolution operation
        W = self.params['W']
        b = self.params['b']

        for n in range(batch_size):          # for each image
            for f in range(self.out_channels):  # for each filter
                for i in range(H_out):      # slide vertically
                    for j in range(W_out):  # slide horizontally
                        h_start = i * stride
                        h_end = h_start + kh
                        w_start = j * stride
                        w_end = w_start + kw

                        # Extract the patch of the image
                        patch = x_padded[n, :, h_start:h_end, w_start:w_end]

                        # Convolution = sum(patch * filter) + bias
                        out[n, f, i, j] = np.sum(patch * W[f]) + b[f]

        # Save values for backward pass
        self.cache = {"x": x, "x_padded": x_padded, "pad_h": pad_h, "pad_w": pad_w}
        return out

    def backward(self, grad_output: np.ndarray) -> np.ndarray:
        """
        Backward pass of convolution.
        grad_output: gradient of loss w.r.t. layer output
        Returns: gradient w.r.t. layer input
        """
        x = self.cache['x']
        x_padded = self.cache['x_padded']
        pad_h, pad_w = self.cache['pad_h'], self.cache['pad_w']

        batch_size, _, H, W = x.shape
        kh, kw = self.kernel_size
        stride = self.stride
        _, _, H_out, W_out = grad_output.shape

        # Initialize gradients
        dW = np.zeros_like(self.params['W'])
        db = np.zeros_like(self.params['b'])
        dx_padded = np.zeros_like(x_padded)

        # Compute gradients
        for n in range(batch_size):
            for f in range(self.out_channels):
                for i in range(H_out):
                    for j in range(W_out):
                        h_start = i * stride
                        h_end = h_start + kh
                        w_start = j * stride
                        w_end = w_start + kw

                        patch = x_padded[n, :, h_start:h_end, w_start:w_end]

                        # Gradients of weights and bias
                        dW[f] += grad_output[n, f, i, j] * patch
                        db[f] += grad_output[n, f, i, j]

                        # Gradient wrt input
                        dx_padded[n, :, h_start:h_end, w_start:w_end] += grad_output[n, f, i, j] * self.params['W'][f]

        # Remove padding from dx
        if pad_h > 0 or pad_w > 0:
            dx = dx_padded[:, :, pad_h:-pad_h, pad_w:-pad_w]
        else:
            dx = dx_padded

        # Save gradients
        self.grads['W'] = dW
        self.grads['b'] = db

        return dx


In [21]:
class MaxPool2D(Layer):
    """
    2D Max Pooling Layer

    Parameters:
    -----------
    pool_size : int or tuple
        Size of pooling window (e.g., 2 → 2x2 pooling)
    stride : int
        Step size for moving the pooling window
        If None, defaults to pool_size
    """

    def __init__(self, pool_size: int = 2, stride: int = None):
        super().__init__()

        # If given a single int, make it a square window
        self.pool_size = (pool_size, pool_size) if isinstance(pool_size, int) else pool_size
        self.stride = stride if stride is not None else pool_size

        # Pooling has no trainable parameters
        self.trainable = False
        self.cache = {}

    def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray:
        """
        Forward pass of max pooling.

        Input shape:  (batch, channels, H, W)
        Output shape: (batch, channels, H_out, W_out)
        """
        batch_size, channels, H, W = x.shape
        ph, pw = self.pool_size
        stride = self.stride

        # Calculate output dimensions
        H_out = (H - ph) // stride + 1
        W_out = (W - pw) // stride + 1

        # Allocate output and mask (to remember max locations)
        out = np.zeros((batch_size, channels, H_out, W_out))
        max_indices = {}

        for n in range(batch_size):
            for c in range(channels):
                for i in range(H_out):
                    for j in range(W_out):
                        h_start = i * stride
                        h_end = h_start + ph
                        w_start = j * stride
                        w_end = w_start + pw

                        # Extract pooling region
                        region = x[n, c, h_start:h_end, w_start:w_end]

                        # Find maximum value in region
                        max_val = np.max(region)
                        out[n, c, i, j] = max_val

                        # Store index of max for backward
                        max_pos = np.unravel_index(np.argmax(region), region.shape)
                        max_indices[(n, c, i, j)] = (h_start + max_pos[0], w_start + max_pos[1])

        # Save cache for backward pass
        self.cache = {"x_shape": x.shape, "max_indices": max_indices,
                      "pool_size": self.pool_size, "stride": stride}
        return out

    def backward(self, grad_output: np.ndarray) -> np.ndarray:
        """
        Backward pass of max pooling.

        grad_output: gradient of loss w.r.t. pooled output
        Returns: gradient w.r.t. input (same shape as forward input)
        """
        x_shape = self.cache["x_shape"]
        max_indices = self.cache["max_indices"]

        # Initialize gradient w.r.t input with zeros
        dx = np.zeros(x_shape)

        for (n, c, i, j), (h_idx, w_idx) in max_indices.items():
            # Route gradient only to the max location
            dx[n, c, h_idx, w_idx] += grad_output[n, c, i, j]

        return dx


In [None]:
# ==============================
# Flatten Layer
# ==============================
class Flatten(Layer):
    """
    Flatten layer converts 4D input (batch, channels, H, W)
    into 2D (batch, features).
    Example:
        Input shape:  (32, 16, 7, 7)
        Output shape: (32, 16*7*7)
    """

    def __init__(self):
        super().__init__()
        self.trainable = False  # no weights

    def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray:
        # Save input shape for backward
        self.cache["input_shape"] = x.shape
        return x.reshape(x.shape[0], -1)

    def backward(self, grad_output: np.ndarray) -> np.ndarray:
        # Reshape gradient back to original input shape
        return grad_output.reshape(self.cache["input_shape"])


# ==============================
# Dense (Fully Connected) Layer
# ==============================
class Dense(Layer):
    """
    Fully connected (linear) layer.
    y = xW^T + b

    Parameters:
    -----------
    in_features : int  → number of input neurons
    out_features : int → number of output neurons
    """

    def __init__(self, in_features: int, out_features: int):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features

        # He initialization for weights
        scale = np.sqrt(2.0 / in_features)
        self.params["W"] = np.random.randn(out_features, in_features) * scale
        self.params["b"] = np.zeros((out_features, 1))

    def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray:
        """
        Forward pass.
        Input:  (batch, in_features)
        Output: (batch, out_features)
        """
        self.cache["x"] = x
        return x.dot(self.params["W"].T) + self.params["b"].T

    def backward(self, grad_output: np.ndarray) -> np.ndarray:
        """
        Backward pass.
        grad_output: (batch, out_features)
        Returns: gradient wrt input (batch, in_features)
        """
        x = self.cache["x"]

        # Gradients wrt weights and biases
        self.grads["W"] = grad_output.T.dot(x)      # (out_features, in_features)
        self.grads["b"] = np.sum(grad_output, axis=0, keepdims=True).T  # (out_features, 1)

        # Gradient wrt input
        grad_input = grad_output.dot(self.params["W"])  # (batch, in_features)
        return grad_input


# ==============================
# ReLU Activation
# ==============================
class ReLU(Layer):
    """
    ReLU Activation: f(x) = max(0, x)
    """

    def __init__(self):
        super().__init__()
        self.trainable = False  # no parameters

    def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray:
        # Store mask of where x > 0 for backward
        self.cache["mask"] = (x > 0).astype(float)
        return np.maximum(0, x)

    def backward(self, grad_output: np.ndarray) -> np.ndarray:
        # Pass gradient only where input was positive
        return grad_output * self.cache["mask"]



# ==============================
# Dropout2D Implementation
# ==============================

class Dropout(Layer):
    """
    Standard Dropout (for Dense layers).
    Randomly drops individual neurons.
    """

    def __init__(self, p: float = 0.5):
        super().__init__()
        self.p = p
        self.trainable = False

    def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray:
        if training:
            mask = (np.random.rand(*x.shape) > self.p).astype(float)
            self.cache["mask"] = mask
            return x * mask / (1.0 - self.p)
        else:
            return x

    def backward(self, grad_output: np.ndarray) -> np.ndarray:
        mask = self.cache.get("mask", 1.0)
        return grad_output * mask / (1.0 - self.p)


# ==============================
# Batch Normalization
# ==============================

class BatchNorm2D(Layer):
    """
    Batch Normalization for CNNs (per-channel).
    Normalizes across batch and spatial dimensions, then scales & shifts.
    """

    def __init__(self, num_features: int, eps: float = 1e-5, momentum: float = 0.9):
        super().__init__()
        self.num_features = num_features
        self.eps = eps
        self.momentum = momentum

        # Learnable parameters
        self.params["gamma"] = np.ones((num_features, 1))  # scale
        self.params["beta"] = np.zeros((num_features, 1))  # shift

        # Running stats (for inference)
        self.running_mean = np.zeros((num_features, 1))
        self.running_var = np.ones((num_features, 1))

    def forward(self, x: np.ndarray, training: bool = True) -> np.ndarray:
        """
        Forward pass.
        Input: (N, C, H, W)
        """
        N, C, H, W = x.shape

        if training:
            # Compute mean/var over batch+spatial dims
            mean = np.mean(x, axis=(0, 2, 3), keepdims=True)
            var = np.var(x, axis=(0, 2, 3), keepdims=True)

            # Normalize
            x_hat = (x - mean) / np.sqrt(var + self.eps)

            # Update running stats
            self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * mean.reshape(C, 1)
            self.running_var = self.momentum * self.running_var + (1 - self.momentum) * var.reshape(C, 1)

            # Cache for backward
            self.cache = {"x": x, "x_hat": x_hat, "mean": mean, "var": var}
        else:
            # Use running stats for inference
            mean = self.running_mean.reshape(1, C, 1, 1)
            var = self.running_var.reshape(1, C, 1, 1)
            x_hat = (x - mean) / np.sqrt(var + self.eps)

        # Scale + shift
        out = self.params["gamma"].reshape(1, C, 1, 1) * x_hat + self.params["beta"].reshape(1, C, 1, 1)
        return out

    def backward(self, grad_output: np.ndarray) -> np.ndarray:
        """
        Backward pass.
        """
        x = self.cache["x"]
        x_hat = self.cache["x_hat"]
        mean = self.cache["mean"]
        var = self.cache["var"]
        N, C, H, W = x.shape
        m = N * H * W  # number of elements per channel

        # Grad wrt gamma and beta
        self.grads["gamma"] = np.sum(grad_output * x_hat, axis=(0, 2, 3), keepdims=True).reshape(C, 1)
        self.grads["beta"] = np.sum(grad_output, axis=(0, 2, 3), keepdims=True).reshape(C, 1)

        # Grad wrt input
        dx_hat = grad_output * self.params["gamma"].reshape(1, C, 1, 1)
        dvar = np.sum(dx_hat * (x - mean) * -0.5 * (var + self.eps) ** (-1.5), axis=(0, 2, 3), keepdims=True)
        dmean = np.sum(dx_hat * -1 / np.sqrt(var + self.eps), axis=(0, 2, 3), keepdims=True) + \
                dvar * np.sum(-2 * (x - mean), axis=(0, 2, 3), keepdims=True) / m

        dx = dx_hat / np.sqrt(var + self.eps) + dvar * 2 * (x - mean) / m + dmean / m
        return dx


# Part 2: CNN Architectures

Now let’s assemble the LeNet-5 architecture using the building blocks we already coded (Conv2D, ReLU, MaxPool2D, Flatten, Dense)

LeNet-5 Architecture Reminder (from your README)

```
Input (32x32x3) →
Conv(6, 5x5) → ReLU → MaxPool(2x2) →
Conv(16, 5x5) → ReLU → MaxPool(2x2) →
Flatten → FC(120) → ReLU → FC(84) → ReLU → FC(10)

```



Implementation

In [33]:
class LeNet5:
    """
    LeNet-5 with BatchNorm added
    """

    def __init__(self, num_classes: int = 10):
        self.layers = [
            Conv2D(in_channels=3, out_channels=6, kernel_size=5, stride=1, padding="valid"),
            BatchNorm2D(num_features=6),
            ReLU(),
            MaxPool2D(pool_size=2, stride=2),

            Conv2D(in_channels=6, out_channels=16, kernel_size=5, stride=1, padding="valid"),
            BatchNorm2D(num_features=16),
            ReLU(),
            MaxPool2D(pool_size=2, stride=2),

            Flatten(),

            Dense(in_features=16*5*5, out_features=120),
            ReLU(),

            Dense(in_features=120, out_features=84),
            ReLU(),

            Dense(in_features=84, out_features=num_classes)
        ]

    def forward(self, x, training=True):
        for layer in self.layers:
            x = layer.forward(x, training)
        return x

    def backward(self, grad_output):
        for layer in reversed(self.layers):
            grad_output = layer.backward(grad_output)
        return grad_output

    def get_params(self):
        params = {}
        for idx, layer in enumerate(self.layers):
            if layer.trainable:
                for k, v in layer.params.items():
                    params[f"layer{idx}_{k}"] = v
        return params

    def get_grads(self):
        grads = {}
        for idx, layer in enumerate(self.layers):
            if layer.trainable:
                for k, v in layer.grads.items():
                    grads[f"layer{idx}_{k}"] = v
        return grads


In [24]:
# Example usage
model = LeNet5(num_classes=10)
X = np.random.randn(2, 3, 32, 32)  # dummy batch of 2 RGB images
out = model.forward(X)
print("Output shape:", out.shape)  # (2, 10)


  out[n, f, i, j] = np.sum(patch * W[f]) + b[f]


Output shape: (2, 10)


Now that we have Conv2D, MaxPool2D, Flatten, Dense, ReLU working, let’s build the **Mini-VGG architecture** described in your README.



```
Input (32x32x3) →
Conv(32, 3x3) → ReLU → Conv(32, 3x3) → ReLU → MaxPool(2x2) →
Conv(64, 3x3) → ReLU → Conv(64, 3x3) → ReLU → MaxPool(2x2) →
Conv(128, 3x3) → ReLU → Conv(128, 3x3) → ReLU → MaxPool(2x2) →
Flatten → FC(256) → ReLU → Dropout(0.5) → FC(10)

```



In [None]:

# Simplified VGG-style network for CIFAR-10 (input: 32x32x3, output: 10 classes)

class MiniVGG:

    def __init__(self, num_classes: int = 10):
        self.layers = [
            # Block 1
            Conv2D(3, 32, kernel_size=3, stride=1, padding="same"),
            BatchNorm2D(32),
            ReLU(),
            Conv2D(32, 32, kernel_size=3, stride=1, padding="same"),
            BatchNorm2D(32),
            ReLU(),
            MaxPool2D(pool_size=2, stride=2),

            # Block 2
            Conv2D(32, 64, kernel_size=3, stride=1, padding="same"),
            BatchNorm2D(64),
            ReLU(),
            Conv2D(64, 64, kernel_size=3, stride=1, padding="same"),
            BatchNorm2D(64),
            ReLU(),
            MaxPool2D(pool_size=2, stride=2),

            # Block 3
            Conv2D(64, 128, kernel_size=3, stride=1, padding="same"),
            BatchNorm2D(128),
            ReLU(),
            Conv2D(128, 128, kernel_size=3, stride=1, padding="same"),
            BatchNorm2D(128),
            ReLU(),
            MaxPool2D(pool_size=2, stride=2),

            # Fully connected
            Flatten(),
            Dense(128*4*4, 256),
            ReLU(),
            Dropout(p=0.5),
            Dense(256, num_classes)
        ]

    def forward(self, x, training=True):
        for layer in self.layers:
            x = layer.forward(x, training)
        return x

    def backward(self, grad_output):
        for layer in reversed(self.layers):
            grad_output = layer.backward(grad_output)
        return grad_output

    def get_params(self):
        params = {}
        for idx, layer in enumerate(self.layers):
            if layer.trainable:
                for k, v in layer.params.items():
                    params[f"layer{idx}_{k}"] = v
        return params

    def get_grads(self):
        grads = {}
        for idx, layer in enumerate(self.layers):
            if layer.trainable:
                for k, v in layer.grads.items():
                    grads[f"layer{idx}_{k}"] = v
        return grads


# Notes

Padding = "same" ensures spatial dimensions stay the same after convolution.

After 3 max poolings (stride=2):

  1. Input 32×32 → 16×16 → 8×8 → 4×4

  2. With 128 channels → 128 × 4 × 4 = 2048 features.

Dropout2D(p=0.5) randomly drops half of the feature maps during training to reduce overfitting.

✅ Quick Test

In [39]:
# Create Mini-VGG model
model = MiniVGG(num_classes=10)
# Dummy input: batch of 2 RGB images (32x32)
X = np.random.randn(2, 3, 32, 32)
# Forward pass
out = model.forward(X, training=True)
print("Output shape:", out.shape)  # should be (2, 10)

  out[n, f, i, j] = np.sum(patch * W[f]) + b[f]


Output shape: (2, 10)


# Part 3: CIFAR-10 Classification

Now that LeNet-5 and Mini-VGG are built, the next step is to train them. For that, we need three key pieces:

  1. Loss function → Cross-Entropy (commonly used for classification).

  2. Optimizer → Stochastic Gradient Descent (SGD).

  3. Training Loop → to iterate over batches, forward → loss → backward → update.

Step 1: Cross-Entropy Loss (with Softmax)

In [40]:
class CrossEntropyLoss:
    """
    Cross-Entropy Loss with Softmax.
    """

    def __init__(self):
        self.cache = {}

    def forward(self, logits: np.ndarray, labels: np.ndarray) -> float:
        """
        logits: raw output from model (batch, num_classes)
        labels: true class indices (batch,)
        """
        # Shift logits for numerical stability
        shifted_logits = logits - np.max(logits, axis=1, keepdims=True)

        # Compute softmax probabilities
        exp_logits = np.exp(shifted_logits)
        probs = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)

        # Negative log likelihood loss
        N = logits.shape[0]
        correct_logprobs = -np.log(probs[np.arange(N), labels])
        loss = np.mean(correct_logprobs)

        # Save probs + labels for backward
        self.cache["probs"] = probs
        self.cache["labels"] = labels
        return loss

    def backward(self) -> np.ndarray:
        """
        Gradient of loss wrt logits.
        """
        probs = self.cache["probs"]
        labels = self.cache["labels"]
        N = probs.shape[0]

        grad_logits = probs.copy()
        grad_logits[np.arange(N), labels] -= 1
        grad_logits /= N
        return grad_logits


Step 2: Optimizer (SGD)

In [41]:
class SGD:
    """
    Stochastic Gradient Descent optimizer.
    """

    def __init__(self, model, lr=0.01):
        self.model = model
        self.lr = lr

    def step(self):
        """
        Update model parameters using stored gradients.
        """
        params = self.model.get_params()
        grads = self.model.get_grads()

        for key in params.keys():
            params[key] -= self.lr * grads[key]


Step 3: Training Loop

In [42]:
def accuracy(preds, labels):
    """Compute accuracy"""
    return np.mean(np.argmax(preds, axis=1) == labels)


def train(model, X_train, y_train, X_val, y_val, epochs=5, batch_size=64, lr=0.01):
    """
    Simple training loop for CNNs.
    """
    loss_fn = CrossEntropyLoss()
    optimizer = SGD(model, lr=lr)

    num_batches = int(np.ceil(X_train.shape[0] / batch_size))

    for epoch in range(epochs):
        epoch_loss, epoch_acc = 0, 0

        # Shuffle training data
        indices = np.arange(X_train.shape[0])
        np.random.shuffle(indices)

        for i in range(num_batches):
            batch_idx = indices[i*batch_size:(i+1)*batch_size]
            X_batch, y_batch = X_train[batch_idx], y_train[batch_idx]

            # Forward pass
            logits = model.forward(X_batch, training=True)

            # Compute loss
            loss = loss_fn.forward(logits, y_batch)

            # Backward pass
            grad_logits = loss_fn.backward()
            model.backward(grad_logits)

            # Update weights
            optimizer.step()

            # Track batch accuracy
            acc = accuracy(logits, y_batch)
            epoch_loss += loss
            epoch_acc += acc

        # Average over batches
        epoch_loss /= num_batches
        epoch_acc /= num_batches

        # Validation
        val_logits = model.forward(X_val, training=False)
        val_loss = loss_fn.forward(val_logits, y_val)
        val_acc = accuracy(val_logits, y_val)

        print(f"Epoch {epoch+1}/{epochs} | "
              f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f} | "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


**Example Usage**

In [None]:
# Dummy CIFAR-like data (small for demo)
X_train = np.random.randn(100, 3, 32, 32)
y_train = np.random.randint(0, 10, 100)

X_val = np.random.randn(20, 3, 32, 32)
y_val = np.random.randint(0, 10, 20)

# Pick a model (LeNet5 or MiniVGG)
model = LeNet5(num_classes=10)
# model = MiniVGG(num_classes=10)

# Train
train(model, X_train, y_train, X_val, y_val, epochs=3, batch_size=16, lr=0.01)


# Notes

  CrossEntropyLoss: combines softmax + negative log likelihood in one.

  SGD: updates parameters after each batch.

  Training loop:

    Forward pass → predictions

    Compute loss

    Backward pass → gradients

    Optimizer step → update weights





Let’s plug in real CIFAR-10 data instead of dummy arrays, so you can actually train LeNet-5 or Mini-VGG.

Since we’re coding this from scratch with NumPy only, we’ll use Keras dataset loader (very convenient, no PyTorch/TensorFlow training needed).

Step 1: CIFAR-10 Loader

In [43]:
def load_cifar10(normalize=True):
    """
    Load CIFAR-10 dataset and return NumPy arrays.
    Shape:
      X_train: (50000, 3, 32, 32)
      y_train: (50000,)
      X_test:  (10000, 3, 32, 32)
      y_test:  (10000,)
    """
    (X_train, y_train), (X_test, y_test) = cifar10.load_data()

    # Convert from (N, 32, 32, 3) → (N, 3, 32, 32)
    X_train = X_train.transpose(0, 3, 1, 2).astype(np.float32)
    X_test = X_test.transpose(0, 3, 1, 2).astype(np.float32)

    # Flatten labels
    y_train = y_train.flatten()
    y_test = y_test.flatten()

    # Normalize to [0,1] if required
    if normalize:
        X_train /= 255.0
        X_test /= 255.0

    return X_train, y_train, X_test, y_test


Step 2: Train on CIFAR-10

In [None]:
# Load dataset
X_train, y_train, X_test, y_test = load_cifar10()

# Split validation set
X_val, y_val = X_train[:5000], y_train[:5000]
X_train, y_train = X_train[5000:], y_train[5000:]

print("Train set:", X_train.shape, y_train.shape)
print("Val set:", X_val.shape, y_val.shape)
print("Test set:", X_test.shape, y_test.shape)

# Choose a model
model = LeNet5(num_classes=10)
# model = MiniVGG(num_classes=10)   # <- More powerful, but slower

# Train
train(model, X_train, y_train, X_val, y_val,
      epochs=5, batch_size=64, lr=0.01)

# Final evaluation on test set
loss_fn = CrossEntropyLoss()
logits = model.forward(X_test, training=False)
test_loss = loss_fn.forward(logits, y_test)
test_acc = accuracy(logits, y_test)

print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")


Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
[1m170498071/170498071[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m79s[0m 0us/step
Train set: (45000, 3, 32, 32) (45000,)
Val set: (5000, 3, 32, 32) (5000,)
Test set: (10000, 3, 32, 32) (10000,)


  out[n, f, i, j] = np.sum(patch * W[f]) + b[f]


**Notes**

    Normalization → Dividing by 255 scales pixels from [0,255] → [0,1], which makes training stable.

    Transpose → Keras loads images as (N, H, W, C), but our network expects (N, C, H, W).

    Split validation → We take first 5000 images from training set as validation.

    Training → You’ll see epoch-wise loss and accuracy for both training and validation.

    Test set → Evaluate model after training.

⚠️ Note: Since our implementation is pure NumPy (no GPU acceleration), training on full CIFAR-10 will be slow.</br>
👉 For testing, you might want to use smaller subsets first, e.g., X_train[:2000], y_train[:2000].

Let’s add data augmentation (as required in your assignment). This helps prevent overfitting and improves accuracy, especially for Mini-VGG.

We’ll implement:

    Random horizontal flip

    Random shift (translation)

    Random rotation

In [None]:

class DataAugmentation:
    """
    Data augmentation for CIFAR-10 images.
    Works on NumPy arrays shaped (batch, C, H, W).
    """

    def __init__(self, horizontal_flip=True, rotation_range=15, shift_range=0.1):
        self.horizontal_flip = horizontal_flip
        self.rotation_range = rotation_range  # degrees
        self.shift_range = shift_range        # fraction of image size

    def augment_batch(self, X: np.ndarray) -> np.ndarray:
        """
        Apply random augmentations to a batch of images.
        """
        X_aug = np.empty_like(X)
        batch_size, C, H, W = X.shape

        for i in range(batch_size):
            img = X[i].transpose(1, 2, 0)  # (H, W, C) for easier manipulation

            # Random horizontal flip
            if self.horizontal_flip and np.random.rand() < 0.5:
                img = np.fliplr(img)

            # Random rotation
            if self.rotation_range > 0:
                angle = np.random.uniform(-self.rotation_range, self.rotation_range)
                img = rotate(img, angle, reshape=False, mode="reflect")

            # Random shift (translation)
            if self.shift_range > 0:
                shift_h = int(np.random.uniform(-self.shift_range, self.shift_range) * H)
                shift_w = int(np.random.uniform(-self.shift_range, self.shift_range) * W)

                # Create empty canvas
                shifted = np.zeros_like(img)
                h_start = max(0, shift_h)
                h_end = H + min(0, shift_h)
                w_start = max(0, shift_w)
                w_end = W + min(0, shift_w)

                shifted[h_start:h_end, w_start:w_end] = img[
                    max(0, -shift_h):H - max(0, shift_h),
                    max(0, -shift_w):W - max(0, shift_w)
                ]
                img = shifted

            # Store back
            X_aug[i] = img.transpose(2, 0, 1)  # back to (C, H, W)

        return X_aug


📝 Updating Training Loop with Augmentation

We integrate augmentation in the train loop (only for training batches, not validation/test):

In [None]:
def train(model, X_train, y_train, X_val, y_val,
          epochs=5, batch_size=64, lr=0.01, augment=False):
    """
    Simple training loop with optional data augmentation.
    """
    loss_fn = CrossEntropyLoss()
    optimizer = SGD(model, lr=lr)
    augmenter = DataAugmentation() if augment else None

    num_batches = int(np.ceil(X_train.shape[0] / batch_size))

    for epoch in range(epochs):
        epoch_loss, epoch_acc = 0, 0

        # Shuffle training data
        indices = np.arange(X_train.shape[0])
        np.random.shuffle(indices)

        for i in range(num_batches):
            batch_idx = indices[i*batch_size:(i+1)*batch_size]
            X_batch, y_batch = X_train[batch_idx], y_train[batch_idx]

            # Apply augmentation if enabled
            if augment:
                X_batch = augmenter.augment_batch(X_batch)

            # Forward pass
            logits = model.forward(X_batch, training=True)

            # Compute loss
            loss = loss_fn.forward(logits, y_batch)

            # Backward pass
            grad_logits = loss_fn.backward()
            model.backward(grad_logits)

            # Update weights
            optimizer.step()

            # Track batch accuracy
            acc = accuracy(logits, y_batch)
            epoch_loss += loss
            epoch_acc += acc

        # Average over batches
        epoch_loss /= num_batches
        epoch_acc /= num_batches

        # Validation
        val_logits = model.forward(X_val, training=False)
        val_loss = loss_fn.forward(val_logits, y_val)
        val_acc = accuracy(val_logits, y_val)

        print(f"Epoch {epoch+1}/{epochs} | "
              f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f} | "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


✅ Example Usage with Augmentation

In [None]:
# Load CIFAR-10
X_train, y_train, X_test, y_test = load_cifar10()

# Create validation split
X_val, y_val = X_train[:500], y_train[:500]
X_train, y_train = X_train[500:2500], y_train[500:2500]  # use only 2000 samples for speed

print("Train set:", X_train.shape, y_train.shape)
print("Val set:", X_val.shape, y_val.shape)

# Create model
model = MiniVGG(num_classes=10)

# Train (with augmentation enabled)
train(model,
      X_train, y_train,
      X_val, y_val,
      epochs=3, batch_size=32, lr=0.01,
      augment=True)

# Quick test evaluation on 200 test images
loss_fn = CrossEntropyLoss()
logits = model.forward(X_test[:200], training=False)
test_loss = loss_fn.forward(logits, y_test[:200])
test_acc = accuracy(logits, y_test[:200])

print(f"Subset Test Loss: {test_loss:.4f}, Subset Test Accuracy: {test_acc:.4f}")


# Notes

    Flip: makes network invariant to left-right orientation.

    Rotation: helps recognize rotated objects.

    Shift: teaches robustness to translation.

    Only applied during training; validation/test sets are left untouched.

⚠️ Training on full CIFAR-10 (50,000 images) with NumPy CNN will be very slow.<br>
👉 For testing, use smaller subsets first (X_train[:2000], etc.), then scale up.

Let’s add a visualization helper so you can see how your CIFAR-10 images look before and after augmentation.

📝 Visualization Function

In [None]:

def show_augmentation(X_batch, augmenter, n=5):
    """
    Visualize original vs augmented images side by side.

    Parameters:
    -----------
    X_batch : np.ndarray
        A batch of images (N, C, H, W)
    augmenter : DataAugmentation
        Augmentation object
    n : int
        Number of samples to display
    """
    # Pick first n images
    X_orig = X_batch[:n]
    X_aug = augmenter.augment_batch(X_orig.copy())

    plt.figure(figsize=(2*n, 4))

    for i in range(n):
        # Original
        plt.subplot(2, n, i+1)
        img = X_orig[i].transpose(1, 2, 0)  # (H,W,C)
        plt.imshow(np.clip(img, 0, 1))
        plt.axis("off")
        plt.title("Original")

        # Augmented
        plt.subplot(2, n, n+i+1)
        img_aug = X_aug[i].transpose(1, 2, 0)
        plt.imshow(np.clip(img_aug, 0, 1))
        plt.axis("off")
        plt.title("Augmented")

    plt.show()


✅ Example Usage

In [None]:
# Load a small batch of CIFAR-10
X_train, y_train, X_test, y_test = load_cifar10()
augmenter = DataAugmentation(horizontal_flip=True, rotation_range=20, shift_range=0.2)

# Show first 5 images before/after augmentation
show_augmentation(X_train[:5], augmenter, n=5)

# Learning Rate Scheduling
👉 Learning rate scheduling means changing the learning rate during training to improve convergence.

Typical policies:

    Step decay: reduce lr every few epochs (e.g., lr *= 0.1 every 10 epochs).

    Exponential decay: lr = lr * exp(-decay * epoch).

    Plateau decay: reduce lr when validation accuracy stops improving.

Code: Learning Rate Scheduler

In [None]:
def train(model, X_train, y_train, X_val, y_val,
          epochs=5, batch_size=64, lr=0.01, augment=False,
          scheduler=None):
    """
    Training loop with optional augmentation and LR scheduler.
    """
    loss_fn = CrossEntropyLoss()
    optimizer = SGD(model, lr=lr)
    augmenter = DataAugmentation() if augment else None

    num_batches = int(np.ceil(X_train.shape[0] / batch_size))

    for epoch in range(epochs):
        epoch_loss, epoch_acc = 0, 0

        # Shuffle training data
        indices = np.arange(X_train.shape[0])
        np.random.shuffle(indices)

        for i in range(num_batches):
            batch_idx = indices[i*batch_size:(i+1)*batch_size]
            X_batch, y_batch = X_train[batch_idx], y_train[batch_idx]

            if augment:
                X_batch = augmenter.augment_batch(X_batch)

            # Forward
            logits = model.forward(X_batch, training=True)
            loss = loss_fn.forward(logits, y_batch)

            # Backward
            grad_logits = loss_fn.backward()
            model.backward(grad_logits)

            # Update weights
            optimizer.step()

            acc = accuracy(logits, y_batch)
            epoch_loss += loss
            epoch_acc += acc

        # Average over batches
        epoch_loss /= num_batches
        epoch_acc /= num_batches

        # Validation
        val_logits = model.forward(X_val, training=False)
        val_loss = loss_fn.forward(val_logits, y_val)
        val_acc = accuracy(val_logits, y_val)

        # Apply LR scheduling
        if scheduler is not None:
            scheduler.step(epoch)

        print(f"Epoch {epoch+1}/{epochs} | "
              f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f} | "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


🔧 Update Training Loop

In [None]:
def train(model, X_train, y_train, X_val, y_val,
          epochs=5, batch_size=64, lr=0.01, augment=False,
          scheduler=None):
    """
    Training loop with optional augmentation and LR scheduler.
    """
    loss_fn = CrossEntropyLoss()
    optimizer = SGD(model, lr=lr)
    augmenter = DataAugmentation() if augment else None

    num_batches = int(np.ceil(X_train.shape[0] / batch_size))

    for epoch in range(epochs):
        epoch_loss, epoch_acc = 0, 0

        # Shuffle training data
        indices = np.arange(X_train.shape[0])
        np.random.shuffle(indices)

        for i in range(num_batches):
            batch_idx = indices[i*batch_size:(i+1)*batch_size]
            X_batch, y_batch = X_train[batch_idx], y_train[batch_idx]

            if augment:
                X_batch = augmenter.augment_batch(X_batch)

            # Forward
            logits = model.forward(X_batch, training=True)
            loss = loss_fn.forward(logits, y_batch)

            # Backward
            grad_logits = loss_fn.backward()
            model.backward(grad_logits)

            # Update weights
            optimizer.step()

            acc = accuracy(logits, y_batch)
            epoch_loss += loss
            epoch_acc += acc

        # Average over batches
        epoch_loss /= num_batches
        epoch_acc /= num_batches

        # Validation
        val_logits = model.forward(X_val, training=False)
        val_loss = loss_fn.forward(val_logits, y_val)
        val_acc = accuracy(val_logits, y_val)

        # Apply LR scheduling
        if scheduler is not None:
            scheduler.step(epoch)

        print(f"Epoch {epoch+1}/{epochs} | "
              f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f} | "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


✅ Example Usage

In [None]:
# Training on a small subset with LR scheduling
model = MiniVGG(num_classes=10)
scheduler = LRScheduler(optimizer=SGD(model, lr=0.01), step_size=2, gamma=0.5)

train(model,
      X_train[:2000], y_train[:2000],
      X_val[:500], y_val[:500],
      epochs=6, batch_size=64, lr=0.01,
      augment=True,
      scheduler=scheduler)


📝 Confusion Matrix + Per-Class Accuracy

In [None]:

def confusion_matrix(preds: np.ndarray, labels: np.ndarray, num_classes=10):
    """
    Compute confusion matrix.

    preds: model predictions (N,) or logits (N, num_classes)
    labels: true labels (N,)
    """
    if preds.ndim > 1:  # if logits, take argmax
        preds = np.argmax(preds, axis=1)

    cm = np.zeros((num_classes, num_classes), dtype=int)
    for t, p in zip(labels, preds):
        cm[t, p] += 1
    return cm

def per_class_accuracy(cm: np.ndarray):
    """
    Compute per-class accuracy from confusion matrix.
    """
    accs = cm.diagonal() / cm.sum(axis=1, where=(cm.sum(axis=1)!=0))
    return accs

def plot_confusion_matrix(cm: np.ndarray, class_names=None, title="Confusion Matrix"):
    """
    Display confusion matrix using heatmap.
    """
    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title(title)
    plt.show()


✅ Example Usage After Training

In [None]:
# Assume model is trained already

# Forward pass on test set (smaller subset for speed)
logits = model.forward(X_test[:1000], training=False)

# Compute confusion matrix
cm = confusion_matrix(logits, y_test[:1000], num_classes=10)

# Per-class accuracy
accs = per_class_accuracy(cm)
for i, acc in enumerate(accs):
    print(f"Class {i}: {acc:.2f}")

# Plot matrix
plot_confusion_matrix(cm, class_names=[str(i) for i in range(10)])


# Notes

    Confusion matrix → shows how many times each class was predicted correctly (diagonal) vs. misclassified (off-diagonal).

    Per-class accuracy → fraction of correct predictions for each class.

    Plotting → heatmap visualization makes it easy to spot which classes the network struggles with.

# Part 4: Feature Visualization.

Now let’s move to README Part 4: Feature Visualization.

The README requires two things:

    Filter Visualization → show the learned filters of the first Conv layer.

    Feature Map Visualization → show activations (feature maps) inside the network for a sample image.

📝 Filter Visualization

In [None]:
def visualize_filters(conv_layer, save_path="filters.png"):
    """
    Visualize learned filters of the first Conv2D layer.

    conv_layer: Conv2D object
    """
    W = conv_layer.params["W"]  # shape (out_channels, in_channels, kh, kw)
    num_filters = W.shape[0]
    num_channels = W.shape[1]

    # Normalize filters to 0-1 for display
    W_min, W_max = W.min(), W.max()
    W = (W - W_min) / (W_max - W_min + 1e-8)

    # Plot each filter
    cols = 8
    rows = int(np.ceil(num_filters / cols))
    plt.figure(figsize=(cols, rows))

    for i in range(num_filters):
        f = W[i]
        if num_channels == 3:  # RGB filters
            f_img = np.transpose(f, (1,2,0))
        else:  # grayscale
            f_img = f[0]
        plt.subplot(rows, cols, i+1)
        plt.imshow(f_img, cmap="viridis")
        plt.axis("off")
    plt.suptitle("Learned Filters")
    plt.tight_layout()
    plt.savefig(save_path)
    plt.show()


📝 Feature Map Visualization

In [None]:
def visualize_feature_maps(model, X_sample, layer_indices, save_path="feature_maps.png"):
    """
    Visualize feature maps at specified layers.

    model: LeNet5 or MiniVGG instance
    X_sample: single image (C, H, W)
    layer_indices: list of indices of layers to visualize
    """
    x = X_sample[np.newaxis, ...]  # add batch dim
    activations = []

    # Forward pass while storing intermediate outputs
    for idx, layer in enumerate(model.layers):
        x = layer.forward(x, training=False)
        if idx in layer_indices:
            activations.append((idx, x.copy()))

    # Plot
    for idx, act in activations:
        num_maps = act.shape[1]
        cols = 8
        rows = int(np.ceil(num_maps / cols))
        plt.figure(figsize=(cols, rows))
        for i in range(num_maps):
            plt.subplot(rows, cols, i+1)
            plt.imshow(act[0, i], cmap="gray")
            plt.axis("off")
        plt.suptitle(f"Feature Maps at Layer {idx}")
        plt.tight_layout()
        plt.savefig(f"feature_maps_layer{idx}.png")
        plt.show()


✅ Example Usage

In [None]:
# After training your model
# Show filters of first Conv layer
visualize_filters(model.layers[0])

# Pick one test image
X_sample = X_test[0]

# Show feature maps at some Conv layers
visualize_feature_maps(model, X_sample, layer_indices=[0, 3, 7])


**Notes**

    Filters (weights of conv kernels) in early layers often look like edge detectors or color blobs.

    Feature maps show what each channel is “looking for” in the image (edges, textures, shapes).

    As you go deeper → features become more abstract (object parts, high-level patterns).