In [None]:
# Setup and imports
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.pyplot import imread
import time
import random
from scipy.optimize import minimize

# for auto-reloading external modules
%matplotlib inline
plt.rcParams['figure.figsize'] = (10.0, 8.0) # set default size of plots
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

# for auto-reloading external modules
%load_ext autoreload
%autoreload 2

# Set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

In [None]:
# Data loading and processing utilities

def load_CIFAR10(ROOT):
    """
    Load CIFAR-10 dataset
    """
    import os, pickle
    xs = []
    ys = []
    for b in range(1, 6):
        f = os.path.join(ROOT, 'data_batch_%d' % (b, ))
        X, Y = load_CIFAR_batch(f)
        xs.append(X)
        ys.append(Y)
    Xtr = np.concatenate(xs)
    Ytr = np.concatenate(ys)
    del X, Y
    Xte, Yte = load_CIFAR_batch(os.path.join(ROOT, 'test_batch'))
    return Xtr, Ytr, Xte, Yte

def load_CIFAR_batch(filename):
    """ load single batch of cifar """
    import pickle
    with open(filename, 'rb') as f:
        datadict = pickle.load(f, encoding='latin1')
        X = datadict['data']
        Y = datadict['labels']
        X = X.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype("float")
        Y = np.array(Y)
        return X, Y

def get_CIFAR10_data(num_training=49000, num_validation=1000, num_test=1000):
    """
    Load the CIFAR-10 dataset from disk and perform preprocessing to prepare
    it for classifiers. These are the same steps as we used for the SVM, but
    condensed to a single function.
    """
    # Load the raw CIFAR-10 data
    cifar10_dir = 'cs231n/datasets/cifar-10-batches-py'
    X_train, y_train, X_test, y_test = load_CIFAR10(cifar10_dir)

    # Subsample the data
    mask = list(range(num_training, num_training + num_validation))
    X_val = X_train[mask]
    y_val = y_train[mask]
    mask = list(range(num_training))
    X_train = X_train[mask]
    y_train = y_train[mask]
    mask = list(range(num_test))
    X_test = X_test[mask]
    y_test = y_test[mask]

    # Normalize the data: subtract the mean image
    mean_image = np.mean(X_train, axis=0)
    X_train -= mean_image
    X_val -= mean_image
    X_test -= mean_image

    # Transpose so that channels come first
    X_train = X_train.transpose(0, 3, 1, 2).copy()
    X_val = X_val.transpose(0, 3, 1, 2).copy()
    X_test = X_test.transpose(0, 3, 1, 2).copy()

    # Package data into dictionaries
    return {
        'X_train': X_train, 'y_train': y_train,
        'X_val': X_val, 'y_val': y_val,
        'X_test': X_test, 'y_test': y_test,
    }

In [None]:
# CNN layers implementation

def conv_forward_naive(x, w, b, conv_param):
    """
    A naive implementation of the forward pass for a convolutional layer.

    The input consists of N data points, each with C channels, height H and
    width W. We convolve each input with F different filters, where each filter
    spans all C channels and has height HH and width WW.

    Input:
    - x: Input data of shape (N, C, H, W)
    - w: Filter weights of shape (F, C, HH, WW)
    - b: Biases, of shape (F,)
    - conv_param: A dictionary with the following keys:
      - 'stride': The number of pixels between adjacent receptive fields in the
        horizontal and vertical directions.
      - 'pad': The number of pixels that will be used to zero-pad the input.

    During padding, 'pad' zeros should be placed symmetrically (i.e equally on both sides)
    along the height and width axes of the input. Be careful not to modify the original
    input x directly.

    Returns a tuple of:
    - out: Output data, of shape (N, F, H', W') where H' and W' are given by
      H' = 1 + (H + 2 * pad - HH) / stride
      W' = 1 + (W + 2 * pad - WW) / stride
    - cache: (x, w, b, conv_param)
    """
    stride = conv_param.get('stride', 1)
    pad = conv_param.get('pad', 0)

    N, C, H, W = x.shape
    F, _, HH, WW = w.shape

    H_out = 1 + (H + 2 * pad - HH) // stride
    W_out = 1 + (W + 2 * pad - WW) // stride

    out = np.zeros((N, F, H_out, W_out))

    # Pad the input
    x_padded = np.pad(x, ((0, 0), (0, 0), (pad, pad), (pad, pad)), mode='constant')

    for n in range(N):  # For each example
        for f in range(F):  # For each filter
            for h_out in range(H_out):
                for w_out in range(W_out):
                    h_start = h_out * stride
                    w_start = w_out * stride
                    x_slice = x_padded[n, :, h_start:h_start+HH, w_start:w_start+WW]
                    out[n, f, h_out, w_out] = np.sum(x_slice * w[f]) + b[f]

    cache = (x, w, b, conv_param)
    return out, cache

def conv_backward_naive(dout, cache):
    """
    A naive implementation of the backward pass for a convolutional layer.

    Inputs:
    - dout: Upstream derivatives.
    - cache: A tuple of (x, w, b, conv_param) as in conv_forward_naive

    Returns a tuple of:
    - dx: Gradient with respect to x
    - dw: Gradient with respect to w
    - db: Gradient with respect to b
    """
    x, w, b, conv_param = cache

    stride = conv_param.get('stride', 1)
    pad = conv_param.get('pad', 0)

    N, C, H, W = x.shape
    F, _, HH, WW = w.shape
    _, _, H_out, W_out = dout.shape

    # Pad the input
    x_padded = np.pad(x, ((0, 0), (0, 0), (pad, pad), (pad, pad)), mode='constant')

    # Initialize gradients
    dx_padded = np.zeros_like(x_padded)
    dw = np.zeros_like(w)
    db = np.zeros_like(b)

    # Calculate gradient with respect to bias
    for f in range(F):
        db[f] = np.sum(dout[:, f, :, :])

    # Calculate gradient with respect to w and x
    for n in range(N):
        for f in range(F):
            for h_out in range(H_out):
                for w_out in range(W_out):
                    h_start = h_out * stride
                    w_start = w_out * stride
                    dx_padded[n, :, h_start:h_start+HH, w_start:w_start+WW] += w[f] * dout[n, f, h_out, w_out]
                    dw[f] += x_padded[n, :, h_start:h_start+HH, w_start:w_start+WW] * dout[n, f, h_out, w_out]

    # Remove padding from dx
    dx = dx_padded[:, :, pad:pad+H, pad:pad+W]

    return dx, dw, db

def max_pool_forward_naive(x, pool_param):
    """
    A naive implementation of the forward pass for a max-pooling layer.

    Inputs:
    - x: Input data, of shape (N, C, H, W)
    - pool_param: dictionary with the following keys:
      - 'pool_height': The height of each pooling region
      - 'pool_width': The width of each pooling region
      - 'stride': The distance between adjacent pooling regions

    No padding is necessary here. Output size is given by

    Returns a tuple of:
    - out: Output data, of shape (N, C, H', W') where H' and W' are given by
      H' = 1 + (H - pool_height) / stride
      W' = 1 + (W - pool_width) / stride
    - cache: (x, pool_param)
    """
    pool_height = pool_param.get('pool_height', 2)
    pool_width = pool_param.get('pool_width', 2)
    stride = pool_param.get('stride', 2)

    N, C, H, W = x.shape

    H_out = 1 + (H - pool_height) // stride
    W_out = 1 + (W - pool_width) // stride

    out = np.zeros((N, C, H_out, W_out))

    for n in range(N):
        for c in range(C):
            for h_out in range(H_out):
                for w_out in range(W_out):
                    h_start = h_out * stride
                    w_start = w_out * stride
                    x_slice = x[n, c, h_start:h_start+pool_height, w_start:w_start+pool_width]
                    out[n, c, h_out, w_out] = np.max(x_slice)

    cache = (x, pool_param)
    return out, cache

def max_pool_backward_naive(dout, cache):
    """
    A naive implementation of the backward pass for a max-pooling layer.

    Inputs:
    - dout: Upstream derivatives
    - cache: A tuple of (x, pool_param) as in the forward pass.

    Returns:
    - dx: Gradient with respect to x
    """
    x, pool_param = cache

    pool_height = pool_param.get('pool_height', 2)
    pool_width = pool_param.get('pool_width', 2)
    stride = pool_param.get('stride', 2)

    N, C, H, W = x.shape
    _, _, H_out, W_out = dout.shape

    dx = np.zeros_like(x)

    for n in range(N):
        for c in range(C):
            for h_out in range(H_out):
                for w_out in range(W_out):
                    h_start = h_out * stride
                    w_start = w_out * stride

                    x_slice = x[n, c, h_start:h_start+pool_height, w_start:w_start+pool_width]
                    max_idx = np.unravel_index(np.argmax(x_slice), x_slice.shape)

                    dx[n, c, h_start+max_idx[0], w_start+max_idx[1]] += dout[n, c, h_out, w_out]

    return dx

def relu_forward(x):
    """
    Computes the forward pass for a layer of rectified linear units (ReLUs).

    Input:
    - x: Inputs, of any shape

    Returns a tuple of:
    - out: Output, of the same shape as x
    - cache: x
    """
    out = np.maximum(0, x)
    cache = x
    return out, cache

def relu_backward(dout, cache):
    """
    Computes the backward pass for a layer of rectified linear units (ReLUs).

    Input:
    - dout: Upstream derivatives, of any shape
    - cache: Input x, of same shape as dout

    Returns:
    - dx: Gradient with respect to x
    """
    x = cache
    dx = dout * (x > 0)
    return dx

def affine_forward(x, w, b):
    """
    Computes the forward pass for an affine (fully-connected) layer.

    The input x has shape (N, d_1, ..., d_k) and contains a minibatch of N
    examples, where each example x[i] has shape (d_1, ..., d_k). We will
    reshape each input into a vector of dimension D = d_1 * ... * d_k, and
    then transform it to an output vector of dimension M.

    Inputs:
    - x: A numpy array containing input data, of shape (N, d_1, ..., d_k)
    - w: A numpy array of weights, of shape (D, M)
    - b: A numpy array of biases, of shape (M,)

    Returns a tuple of:
    - out: output, of shape (N, M)
    - cache: (x, w, b)
    """
    N = x.shape[0]
    D = np.prod(x.shape[1:])
    x_reshaped = x.reshape(N, D)
    out = x_reshaped.dot(w) + b
    cache = (x, w, b)
    return out, cache

def affine_backward(dout, cache):
    """
    Computes the backward pass for an affine layer.

    Inputs:
    - dout: Upstream derivative, of shape (N, M)
    - cache: Tuple of:
      - x: Input data, of shape (N, d_1, ... d_k)
      - w: Weights, of shape (D, M)
      - b: Biases, of shape (M,)

    Returns a tuple of:
    - dx: Gradient with respect to x, of shape (N, d1, ..., d_k)
    - dw: Gradient with respect to w, of shape (D, M)
    - db: Gradient with respect to b, of shape (M,)
    """
    x, w, b = cache
    N = x.shape[0]
    D = np.prod(x.shape[1:])
    x_reshaped = x.reshape(N, D)

    dx_reshaped = dout.dot(w.T)
    dx = dx_reshaped.reshape(x.shape)
    dw = x_reshaped.T.dot(dout)
    db = np.sum(dout, axis=0)

    return dx, dw, db

In [None]:
# Dropout implementation for regularization

def dropout_forward(x, dropout_param):
    """
    Performs the forward pass for dropout.

    Inputs:
    - x: Input data, of any shape
    - dropout_param: A dictionary with the following keys:
      - p: Dropout parameter. We keep each neuron output with probability p.
      - mode: 'test' or 'train'. If the mode is train, then perform dropout;
        if the mode is test, then just return the input.
      - seed: Seed for the random number generator. Passing seed makes this
        function deterministic, which is needed for gradient checking but not
        in real networks.

    Outputs:
    - out: Array of the same shape as x.
    - cache: tuple (dropout_param, mask). In training mode, mask is the dropout
      mask that was used to multiply the input; in test mode, mask is None.
    """
    p, mode = dropout_param['p'], dropout_param['mode']
    if 'seed' in dropout_param:
        np.random.seed(dropout_param['seed'])

    mask = None
    out = None

    if mode == 'train':
        mask = (np.random.rand(*x.shape) < p) / p  # Scale the mask to preserve expected output
        out = x * mask
    elif mode == 'test':
        out = x

    cache = (dropout_param, mask)
    return out, cache

def dropout_backward(dout, cache):
    """
    Perform the backward pass for dropout.

    Inputs:
    - dout: Upstream derivatives, of any shape
    - cache: (dropout_param, mask) from dropout_forward.
    """
    dropout_param, mask = cache
    mode = dropout_param['mode']

    dx = None
    if mode == 'train':
        dx = dout * mask
    elif mode == 'test':
        dx = dout
    return dx

In [None]:
# Batch normalization implementation

def batchnorm_forward(x, gamma, beta, bn_param):
    """
    Forward pass for batch normalization.

    Input:
    - x: Data of shape (N, D)
    - gamma: Scale parameter of shape (D,)
    - beta: Shift parameter of shape (D,)
    - bn_param: Dictionary with the following keys:
      - mode: 'train' or 'test'
      - eps: Constant for numerical stability
      - momentum: Constant for moving average
      - running_mean: Array of shape (D,) for running mean
      - running_var: Array of shape (D,) for running variance

    Returns:
    - out: Normalized output of shape (N, D)
    - cache: Tuple for backward pass
    """
    mode = bn_param['mode']
    eps = bn_param.get('eps', 1e-5)
    momentum = bn_param.get('momentum', 0.9)

    N, D = x.shape
    running_mean = bn_param.get('running_mean', np.zeros(D, dtype=x.dtype))
    running_var = bn_param.get('running_var', np.zeros(D, dtype=x.dtype))

    out, cache = None, None
    if mode == 'train':
        # Step 1: Calculate mean
        mu = np.mean(x, axis=0)

        # Step 2: Calculate variance
        var = np.var(x, axis=0)

        # Step 3: Normalize
        x_norm = (x - mu) / np.sqrt(var + eps)

        # Step 4: Scale and shift
        out = gamma * x_norm + beta

        # Update running mean and variance
        running_mean = momentum * running_mean + (1 - momentum) * mu
        running_var = momentum * running_var + (1 - momentum) * var

        # Cache for backward pass
        cache = (x, x_norm, mu, var, gamma, beta, eps)

    elif mode == 'test':
        # Use running mean and variance for normalization
        x_norm = (x - running_mean) / np.sqrt(running_var + eps)
        out = gamma * x_norm + beta

    # Store updated running mean and var
    bn_param['running_mean'] = running_mean
    bn_param['running_var'] = running_var

    return out, cache

def batchnorm_backward(dout, cache):
    """
    Backward pass for batch normalization.

    Input:
    - dout: Upstream derivatives, of shape (N, D)
    - cache: Variable of intermediates from batchnorm_forward.

    Returns:
    - dx: Gradient with respect to inputs x, of shape (N, D)
    - dgamma: Gradient with respect to scale parameter gamma, of shape (D,)
    - dbeta: Gradient with respect to shift parameter beta, of shape (D,)
    """
    x, x_norm, mu, var, gamma, beta, eps = cache
    N, D = x.shape

    # Gradient with respect to beta and gamma
    dbeta = np.sum(dout, axis=0)
    dgamma = np.sum(dout * x_norm, axis=0)

    # Gradient with respect to x_norm
    dx_norm = dout * gamma

    # Gradient with respect to x
    dvar = np.sum(dx_norm * (x - mu) * -0.5 * (var + eps)**(-1.5), axis=0)
    dmu = np.sum(-dx_norm / np.sqrt(var + eps), axis=0) + dvar * np.mean(-2 * (x - mu), axis=0)

    dx = dx_norm / np.sqrt(var + eps) + dvar * 2 * (x - mu) / N + dmu / N

    return dx, dgamma, dbeta

def spatial_batchnorm_forward(x, gamma, beta, bn_param):
    """
    Computes the forward pass for spatial batch normalization.

    Inputs:
    - x: Input data of shape (N, C, H, W)
    - gamma: Scale parameter, of shape (C,)
    - beta: Shift parameter, of shape (C,)
    - bn_param: Dictionary with the following keys:
      - mode: 'train' or 'test'; required
      - eps: Constant for numeric stability
      - momentum: Constant for running mean / variance. momentum=0 means that
        old information is discarded completely at every time step, while
        momentum=1 means that new information is never incorporated. The
        default of momentum=0.9 should work well in most situations.
      - running_mean: Array of shape (C,) giving running mean of features
      - running_var Array of shape (C,) giving running variance of features

    Returns a tuple of:
    - out: Output data, of shape (N, C, H, W)
    - cache: Values needed for the backward pass
    """
    N, C, H, W = x.shape

    # Reshape x to N*H*W x C to apply batch normalization
    x_reshaped = x.transpose(0, 2, 3, 1).reshape(-1, C)

    # Apply batch normalization
    out_reshaped, cache = batchnorm_forward(x_reshaped, gamma, beta, bn_param)

    # Reshape back to N x C x H x W
    out = out_reshaped.reshape(N, H, W, C).transpose(0, 3, 1, 2)

    return out, cache

def spatial_batchnorm_backward(dout, cache):
    """
    Computes the backward pass for spatial batch normalization.

    Inputs:
    - dout: Upstream derivatives, of shape (N, C, H, W)
    - cache: Values from the forward pass

    Returns a tuple of:
    - dx: Gradient with respect to inputs, of shape (N, C, H, W)
    - dgamma: Gradient with respect to scale parameter, of shape (C,)
    - dbeta: Gradient with respect to shift parameter, of shape (C,)
    """
    N, C, H, W = dout.shape

    # Reshape dout for batch normalization backward
    dout_reshaped = dout.transpose(0, 2, 3, 1).reshape(-1, C)

    # Apply batch normalization backward
    dx_reshaped, dgamma, dbeta = batchnorm_backward(dout_reshaped, cache)

    # Reshape dx back to N x C x H x W
    dx = dx_reshaped.reshape(N, H, W, C).transpose(0, 3, 1, 2)

    return dx, dgamma, dbeta

In [None]:
# Data augmentation functions

def random_flips(X):
    """
    Randomly flip images horizontally.

    Input:
    - X: Batch of images, of shape (N, C, H, W)

    Returns:
    - Augmented batch of images of the same shape
    """
    out = np.zeros_like(X)
    for i in range(X.shape[0]):
        if np.random.rand() < 0.5:
            out[i] = X[i, :, :, ::-1]  # Flip horizontally
        else:
            out[i] = X[i]
    return out

def random_crops(X, crop_shape, padding=4):
    """
    Randomly crop images with padding.

    Input:
    - X: Batch of images, of shape (N, C, H, W)
    - crop_shape: Shape of the crop (height, width)
    - padding: Amount of padding to add before cropping

    Returns:
    - Augmented batch of images
    """
    N, C, H, W = X.shape
    H_crop, W_crop = crop_shape

    # Add padding
    padded = np.pad(X, ((0, 0), (0, 0), (padding, padding), (padding, padding)), mode='constant')

    out = np.zeros((N, C, H_crop, W_crop))

    for i in range(N):
        # Random crop position
        h_start = np.random.randint(0, padded.shape[2] - H_crop + 1)
        w_start = np.random.randint(0, padded.shape[3] - W_crop + 1)

        # Extract crop
        out[i] = padded[i, :, h_start:h_start+H_crop, w_start:w_start+W_crop]

    return out

def data_augmentation(X, y):
    """
    Apply multiple data augmentation techniques.

    Inputs:
    - X: Batch of images, of shape (N, C, H, W)
    - y: Labels, of shape (N,)

    Returns:
    - Augmented data and labels
    """
    X_aug = random_flips(X)
    X_aug = random_crops(X_aug, (X.shape[2], X.shape[3]), padding=4)

    return X_aug, y

In [None]:
# CNN model implementation

class ConvNet:
    """
    A Convolutional Network architecture for image classification.
    The architecture is based on:
    [conv - spatial batch norm - relu - max pool] x 2 - [affine - batch norm - relu - dropout] - [affine - softmax]
    """

    def __init__(self, input_dim=(3, 32, 32), num_filters=[32, 64], filter_size=3,
                 hidden_dim=100, num_classes=10, weight_scale=1e-3, reg=0.0,
                 dropout=0.5, use_batchnorm=True, dtype=np.float32, seed=None):
        """
        Initialize a new ConvNet.

        Inputs:
        - input_dim: Tuple (C, H, W) giving size of input data
        - num_filters: List of number of filters for each convolutional layer
        - filter_size: Size of filters to use
        - hidden_dim: Number of units in the hidden affine layer
        - num_classes: Number of classes for classification
        - weight_scale: Scalar giving standard deviation for random weight initialization
        - reg: Scalar giving L2 regularization strength
        - dropout: Scalar between 0 and 1 giving dropout strength
        - use_batchnorm: Whether to use batch normalization
        - dtype: numpy datatype to use for computation
        - seed: If not None, then seed for random number generator
        """
        self.use_batchnorm = use_batchnorm
        self.use_dropout = dropout > 0
        self.reg = reg
        self.num_layers = 2 + len(num_filters)
        self.dtype = dtype
        self.params = {}
        self.bn_params = {}

        if seed is not None:
            np.random.seed(seed)

        C, H, W = input_dim

        # Initialize first convolutional layer
        self.params['W1'] = np.random.normal(0, weight_scale, (num_filters[0], C, filter_size, filter_size))
        self.params['b1'] = np.zeros(num_filters[0])

        # Initialize batch norm parameters if needed
        if use_batchnorm:
            self.params['gamma1'] = np.ones(num_filters[0])
            self.params['beta1'] = np.zeros(num_filters[0])
            self.bn_params['bn1'] = {'mode': 'train', 'eps': 1e-5, 'momentum': 0.9}

        # Calculate input dimensions for the next layer
        H_pool = 1 + (H - 2) // 2  # After pooling
        W_pool = 1 + (W - 2) // 2  # After pooling

        # Initialize second convolutional layer
        self.params['W2'] = np.random.normal(0, weight_scale, (num_filters[1], num_filters[0], filter_size, filter_size))
        self.params['b2'] = np.zeros(num_filters[1])

        # Initialize batch norm parameters for second conv layer if needed
        if use_batchnorm:
            self.params['gamma2'] = np.ones(num_filters[1])
            self.params['beta2'] = np.zeros(num_filters[1])
            self.bn_params['bn2'] = {'mode': 'train', 'eps': 1e-5, 'momentum': 0.9}

        # Calculate input dimensions for the fully connected layer
        H_pool2 = 1 + (H_pool - 2) // 2  # After second pooling
        W_pool2 = 1 + (W_pool - 2) // 2  # After second pooling
        fc_input_dim = num_filters[1] * H_pool2 * W_pool2

        # Initialize first fully connected layer
        self.params['W3'] = np.random.normal(0, weight_scale, (fc_input_dim, hidden_dim))
        self.params['b3'] = np.zeros(hidden_dim)

        # Initialize batch norm parameters for first FC layer if needed
        if use_batchnorm:
            self.params['gamma3'] = np.ones(hidden_dim)
            self.params['beta3'] = np.zeros(hidden_dim)
            self.bn_params['bn3'] = {'mode': 'train', 'eps': 1e-5, 'momentum': 0.9}

        # Initialize output layer
        self.params['W4'] = np.random.normal(0, weight_scale, (hidden_dim, num_classes))
        self.params['b4'] = np.zeros(num_classes)

        # Initialize dropout params
        self.dropout_params = {'mode': 'train', 'p': 1 - dropout}
        if seed is not None:
            self.dropout_params['seed'] = seed

        # Cast all parameters to the correct datatype
        for k, v in self.params.items():
            self.params[k] = v.astype(dtype)

    def loss(self, X, y=None):
        """
        Evaluate loss and gradient for the ConvNet.

        Input / Output: Same API as TwoLayerNet.
        """
        X = X.astype(self.dtype)
        mode = 'test' if y is None else 'train'

        # Update BN params and dropout params based on mode
        if self.use_batchnorm:
            for key, bn_param in self.bn_params.items():
                bn_param['mode'] = mode
        if self.use_dropout:
            self.dropout_params['mode'] = mode

        scores = None

        # Forward pass through the network
        conv1_out, conv1_cache = conv_forward_naive(X, self.params['W1'], self.params['b1'], {'stride': 1, 'pad': 1})

        # Batch normalization for first conv layer if used
        if self.use_batchnorm:
            conv1_out, bn1_cache = spatial_batchnorm_forward(conv1_out, self.params['gamma1'],
                                                             self.params['beta1'], self.bn_params['bn1'])

        # ReLU activation for first conv layer
        relu1_out, relu1_cache = relu_forward(conv1_out)

        # Max pooling for first conv layer
        pool1_out, pool1_cache = max_pool_forward_naive(relu1_out, {'pool_height': 2, 'pool_width': 2, 'stride': 2})

        # Second convolutional layer
        conv2_out, conv2_cache = conv_forward_naive(pool1_out, self.params['W2'], self.params['b2'], {'stride': 1, 'pad': 1})

        # Batch normalization for second conv layer if used
        if self.use_batchnorm:
            conv2_out, bn2_cache = spatial_batchnorm_forward(conv2_out, self.params['gamma2'],
                                                             self.params['beta2'], self.bn_params['bn2'])

        # ReLU activation for second conv layer
        relu2_out, relu2_cache = relu_forward(conv2_out)

        # Max pooling for second conv layer
        pool2_out, pool2_cache = max_pool_forward_naive(relu2_out, {'pool_height': 2, 'pool_width': 2, 'stride': 2})

        # First fully connected layer
        fc1_out, fc1_cache = affine_forward(pool2_out, self.params['W3'], self.params['b3'])

        # Batch normalization for first fc layer if used
        if self.use_batchnorm:
            fc1_out, bn3_cache = batchnorm_forward(fc1_out, self.params['gamma3'],
                                                  self.params['beta3'], self.bn_params['bn3'])

        # ReLU activation for first fc layer
        relu3_out, relu3_cache = relu_forward(fc1_out)

        # Dropout after first fc layer if used
        if self.use_dropout:
            relu3_out, dropout_cache = dropout_forward(relu3_out, self.dropout_params)

        # Output layer
        scores, fc2_cache = affine_forward(relu3_out, self.params['W4'], self.params['b4'])

        # If test mode return scores
        if mode == 'test':
            return scores

        # Calculate loss and gradients
        loss, dscores = softmax_loss(scores, y)

        # Add regularization
        loss += 0.5 * self.reg * (np.sum(self.params['W1'] ** 2) + np.sum(self.params['W2'] ** 2) +
                                 np.sum(self.params['W3'] ** 2) + np.sum(self.params['W4'] ** 2))

        # Backward pass
        grads = {}

        # Backprop through output layer
        dx4, grads['W4'], grads['b4'] = affine_backward(dscores, fc2_cache)

        # Add regularization gradient for W4
        grads['W4'] += self.reg * self.params['W4']

        # Backprop through dropout if used
        if self.use_dropout:
            dx4 = dropout_backward(dx4, dropout_cache)

        # Backprop through ReLU3
        dx3 = relu_backward(dx4, relu3_cache)

        # Backprop through batch norm if used
        if self.use_batchnorm:
            dx3, grads['gamma3'], grads['beta3'] = batchnorm_backward(dx3, bn3_cache)

        # Backprop through FC1
        dx2, grads['W3'], grads['b3'] = affine_backward(dx3, fc1_cache)

        # Add regularization gradient for W3
        grads['W3'] += self.reg * self.params['W3']

        # Backprop through second pooling layer
        dx2 = max_pool_backward_naive(dx2, pool2_cache)

        # Backprop through second ReLU
        dx2 = relu_backward(dx2, relu2_cache)

        # Backprop through second batch norm if used
        if self.use_batchnorm:
            dx2, grads['gamma2'], grads['beta2'] = spatial_batchnorm_backward(dx2, bn2_cache)

        # Backprop through second conv layer
        dx1, grads['W2'], grads['b2'] = conv_backward_naive(dx2, conv2_cache)

        # Add regularization gradient for W2
        grads['W2'] += self.reg * self.params['W2']

        # Backprop through first pooling layer
        dx1 = max_pool_backward_naive(dx1, pool1_cache)

        # Backprop through first ReLU
        dx1 = relu_backward(dx1, relu1_cache)

        # Backprop through first batch norm if used
        if self.use_batchnorm:
            dx1, grads['gamma1'], grads['beta1'] = spatial_batchnorm_backward(dx1, bn1_cache)

        # Backprop through first conv layer
        dx, grads['W1'], grads['b1'] = conv_backward_naive(dx1, conv1_cache)

        # Add regularization gradient for W1
        grads['W1'] += self.reg * self.params['W1']

        return loss, grads

def softmax_loss(x, y):
    """
    Computes the loss and gradient for softmax classification.

    Inputs:
    - x: Input data, of shape (N, C) where x[i, j] is the score for the jth
      class for the ith input.
    - y: Vector of labels, of shape (N,) where y[i] is the label for x[i] and
      0 <= y[i] < C

    Returns a tuple of:
    - loss: Scalar giving the loss
    - dx: Gradient of the loss with respect to x
    """
    shifted_logits = x - np.max(x, axis=1, keepdims=True)
    Z = np.sum(np.exp(shifted_logits), axis=1, keepdims=True)
    log_probs = shifted_logits - np.log(Z)
    probs = np.exp(log_probs)
    N = x.shape[0]
    loss = -np.sum(log_probs[np.arange(N), y]) / N
    dx = probs.copy()
    dx[np.arange(N), y] -= 1
    dx /= N
    return loss, dx

In [None]:
# Solver class for training the model

class Solver:
    """
    A Solver encapsulates all the logic necessary for training classification
    models. The Solver performs stochastic gradient descent using different
    update rules.
    """

    def __init__(self, model, data, **kwargs):
        """
        Construct a new Solver instance.

        Required:
        - model: A model object conforming to the API described above
        - data: A dictionary of training and validation data

        Optional kwargs:
        - update_rule: A function that updates model parameters
        - optim_config: A dictionary for the update rule
        - lr_decay: A scalar for learning rate decay
        - batch_size: Size of minibatches used to compute loss and gradient
        - num_epochs: The number of epochs to run for
        - print_every: Integer, print loss every X iterations
        - verbose: Boolean, print progress during optimization
        """
        self.model = model
        self.X_train = data['X_train']
        self.y_train = data['y_train']
        self.X_val = data['X_val']
        self.y_val = data['y_val']

        # Unpack keyword arguments
        self.update_rule = kwargs.pop('update_rule', self._sgd)
        self.optim_config = kwargs.pop('optim_config', {})
        self.lr_decay = kwargs.pop('lr_decay', 1.0)
        self.batch_size = kwargs.pop('batch_size', 100)
        self.num_epochs = kwargs.pop('num_epochs', 10)
        self.print_every = kwargs.pop('print_every', 10)
        self.verbose = kwargs.pop('verbose', True)

        # Set default values for optimizer configuration
        if 'learning_rate' not in self.optim_config:
            self.optim_config['learning_rate'] = 1e-3
        self.optim_config_history = {}

        # Keep track of history
        self.loss_history = []
        self.train_acc_history = []
        self.val_acc_history = []

        # Make a deep copy of the optim_config for each parameter
        self.optim_configs = {}
        for p in self.model.params:
            d = {k: v for k, v in self.optim_config.items()}
            self.optim_configs[p] = d
            # Keep a history of the config parameters
            self.optim_config_history[p] = []

    def _sgd(self, w, dw, config=None):
        """
        Performs vanilla stochastic gradient descent.

        config dict format:
        - learning_rate: Scalar learning rate
        """
        if config is None: config = {}
        config.setdefault('learning_rate', 1e-2)

        w -= config['learning_rate'] * dw
        return w, config

    def _sgd_momentum(self, w, dw, config=None):
        """
        Performs SGD with momentum.

        config dict format:
        - learning_rate: Scalar learning rate
        - momentum: Scalar between 0 and 1 giving the momentum value
        - velocity: A numpy array of the same shape as w and dw used to store
          the velocity
        """
        if config is None: config = {}
        config.setdefault('learning_rate', 1e-2)
        config.setdefault('momentum', 0.9)
        v = config.get('velocity', np.zeros_like(w))

        v = config['momentum'] * v - config['learning_rate'] * dw
        next_w = w + v
        config['velocity'] = v

        return next_w, config

    def _adam(self, w, dw, config=None):
        """
        Uses the Adam update rule, which incorporates moving averages of both the
        gradient and its square and a bias correction term.

        config dict format:
        - learning_rate: Scalar learning rate
        - beta1: Decay rate for moving average of first moment of gradient
        - beta2: Decay rate for moving average of second moment of gradient
        - epsilon: Small scalar used for numerical stability
        - m: Moving average of gradient
        - v: Moving average of squared gradient
        - t: Iteration number
        """
        if config is None: config = {}
        config.setdefault('learning_rate', 1e-3)
        config.setdefault('beta1', 0.9)
        config.setdefault('beta2', 0.999)
        config.setdefault('epsilon', 1e-8)
        config.setdefault('m', np.zeros_like(w))
        config.setdefault('v', np.zeros_like(w))
        config.setdefault('t', 0)

        config['t'] += 1
        config['m'] = config['beta1'] * config['m'] + (1 - config['beta1']) * dw
        config['v'] = config['beta2'] * config['v'] + (1 - config['beta2']) * (dw * dw)

        # Bias correction
        mb = config['m'] / (1 - config['beta1'] ** config['t'])
        vb = config['v'] / (1 - config['beta2'] ** config['t'])

        next_w = w - config['learning_rate'] * mb / (np.sqrt(vb) + config['epsilon'])

        return next_w, config

    def _rmsprop(self, w, dw, config=None):
        """
        Uses the RMSProp update rule, which uses a moving average of squared gradient values
        to set adaptive learning rates.

        config format:
        - learning_rate: Scalar learning rate
        - decay_rate: Scalar between 0 and 1 giving the decay rate for the squared gradient cache
        - epsilon: Small scalar used for numerical stability
        - cache: Moving average of second moments of gradients
        """
        if config is None: config = {}
        config.setdefault('learning_rate', 1e-2)
        config.setdefault('decay_rate', 0.99)
        config.setdefault('epsilon', 1e-8)
        config.setdefault('cache', np.zeros_like(w))

        config['cache'] = config['decay_rate'] * config['cache'] + (1 - config['decay_rate']) * (dw ** 2)
        next_w = w - config['learning_rate'] * dw / (np.sqrt(config['cache']) + config['epsilon'])

        return next_w, config

    def _step(self, X_batch, y_batch):
        """
        Make a single gradient update with data augmentation.
        """
        # Apply data augmentation
        X_batch_aug, y_batch = data_augmentation(X_batch, y_batch)

        # Compute loss and gradients
        loss, grads = self.model.loss(X_batch_aug, y_batch)
        self.loss_history.append(loss)

        # Apply parameter updates
        for p, w in self.model.params.items():
            dw = grads[p]
            config = self.optim_configs[p]
            next_w, next_config = self.update_rule(w, dw, config)
            self.model.params[p] = next_w
            self.optim_configs[p] = next_config

            # Record config history
            self.optim_config_history[p].append({k: v for k, v in next_config.items() if k != 'velocity' and k != 'm' and k != 'v' and k != 'cache'})

    def check_accuracy(self, X, y, num_samples=None, batch_size=100):
        """
        Check accuracy of the model on the provided data.

        - X: Array of data, of shape (N, d_1, ..., d_k)
        - y: Array of labels, of shape (N,)
        - num_samples: If not None, subsample the data and only test on num_samples points
        - batch_size: Split X and y into batches of this size to avoid using too much memory

        Returns:
        - accuracy: Scalar giving the fraction of instances that were correctly classified
        """
        N = X.shape[0]
        if num_samples is not None and N > num_samples:
            indices = np.random.choice(N, num_samples)
            N = num_samples
            X = X[indices]
            y = y[indices]

        # Compute predictions in batches
        num_batches = N // batch_size
        if N % batch_size != 0:
            num_batches += 1

        y_pred = []
        for i in range(num_batches):
            start = i * batch_size
            end = min((i + 1) * batch_size, N)

            batch_scores = self.model.loss(X[start:end])
            batch_preds = np.argmax(batch_scores, axis=1)
            y_pred.append(batch_preds)

        y_pred = np.hstack(y_pred)
        accuracy = np.mean(y_pred == y)

        return accuracy

    def train(self):
        """
        Run optimization to train the model.
        """
        num_train = self.X_train.shape[0]
        iterations_per_epoch = max(num_train // self.batch_size, 1)
        num_iterations = self.num_epochs * iterations_per_epoch

        for epoch in range(self.num_epochs):
            # Shuffle training data
            shuffle_indices = np.random.permutation(num_train)
            X_train_shuffled = self.X_train[shuffle_indices]
            y_train_shuffled = self.y_train[shuffle_indices]

            for i in range(iterations_per_epoch):
                batch_start = i * self.batch_size
                batch_end = min((i + 1) * self.batch_size, num_train)
                X_batch = X_train_shuffled[batch_start:batch_end]
                y_batch = y_train_shuffled[batch_start:batch_end]

                self._step(X_batch, y_batch)

                # Print training loss
                if self.verbose and (i + 1) % self.print_every == 0:
                    print('Epoch %d, iteration %d / %d: loss %f' % (
                        epoch + 1, i + 1, iterations_per_epoch, self.loss_history[-1]))

            # Decay learning rate
            for k in self.optim_configs:
                self.optim_configs[k]['learning_rate'] *= self.lr_decay

            # Evaluate train and val accuracy
            train_acc = self.check_accuracy(self.X_train, self.y_train, num_samples=1000)
            val_acc = self.check_accuracy(self.X_val, self.y_val)
            self.train_acc_history.append(train_acc)
            self.val_acc_history.append(val_acc)

            if self.verbose:
                print('Epoch %d / %d: train acc: %f, val_acc: %f' % (
                    epoch + 1, self.num_epochs, train_acc, val_acc))

In [None]:
# Training and evaluation

# Load CIFAR-10 data
data = get_CIFAR10_data()
for k, v in data.items():
    print('%s: ' % k, v.shape)

# Create the model with the best hyperparameters
model = ConvNet(input_dim=(3, 32, 32),
                num_filters=[32, 64],
                filter_size=3,
                hidden_dim=500,
                num_classes=10,
                weight_scale=5e-2,
                reg=0.001,
                dropout=0.5,
                use_batchnorm=True)

# Create solver with Adam optimizer
solver = Solver(model, data,
                update_rule=Solver._adam,
                optim_config={
                    'learning_rate': 1e-3,
                },
                lr_decay=0.95,
                batch_size=128,
                num_epochs=10,
                print_every=100,
                verbose=True)

# Train the model
solver.train()

# Evaluate final performance
print('Final validation accuracy:', solver.val_acc_history[-1])
print('Final test accuracy:', solver.check_accuracy(data['X_test'], data['y_test']))

# Plot training and validation accuracy
plt.subplot(2, 1, 1)
plt.plot(solver.train_acc_history, '-o')
plt.plot(solver.val_acc_history, '-o')
plt.legend(['train', 'val'], loc='upper left')
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.title('Accuracy history')

# Plot loss history
plt.subplot(2, 1, 2)
plt.plot(solver.loss_history)
plt.xlabel('iteration')
plt.ylabel('loss')
plt.title('Loss history')
plt.tight_layout()
plt.show()

# Visualize predictions on test set
def visualize_predictions(X, y, y_pred, classes, samples=8):
    plt.figure(figsize=(12, 8))
    for i in range(samples):
        plt.subplot(2, 4, i+1)
        img = X[i].transpose(1, 2, 0)
        img = img / 2 + 0.5  # Unnormalize
        plt.imshow(img)
        plt.title(f'True: {classes[y[i]]}\nPred: {classes[y_pred[i]]}')
        plt.axis('off')
    plt.tight_layout()
    plt.show()

# Get predictions on a few test images
test_indices = np.random.choice(data['X_test'].shape[0], 8, replace=False)
test_images = data['X_test'][test_indices]
test_labels = data['y_test'][test_indices]
test_scores = model.loss(test_images)
test_preds = np.argmax(test_scores, axis=1)

# Visualize predictions
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
visualize_predictions(test_images, test_labels, test_preds, classes)