Imports 

In [21]:
import numpy as np
import pandas as pd
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import os
import pickle

Functions

In [15]:
def relu(x):
    return np.maximum(0, x)

def relu_derivative(x):
    return np.where(x > 0, 1, 0)

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_derivative(x):
    s = sigmoid(x)
    return s * (1 - s)

def softmax(x):
    exps = np.exp(x - np.max(x, axis=1, keepdims=True))  # Subtract max for stability(Cs 7015 - Nptel notes)
    return exps / np.sum(exps, axis=1, keepdims=True)

Loss Function using cross entropy as it's a classification problem

In [16]:
def cross_entropy_loss(y_true, y_pred):
    # Clip predictions to avoid log(0)
    y_pred = np.clip(y_pred, 1e-15, 1 - 1e-15)
    loss = -np.sum(y_true * np.log(y_pred)) / y_true.shape[0]
    return loss

def cross_entropy_gradient(y_true, y_pred): # Needed for backpropagation
    return (y_pred - y_true) / y_true.shape[0]

Initialize the Params and forward pass and backprop

In [17]:
def initialize_parameters(input_size, hidden_sizes, output_size):
    parameters = {}
    layer_sizes = [input_size] + hidden_sizes + [output_size]
    num_layers = len(layer_sizes)

    for i in range(1, num_layers):
        # Xavier/He initialization for avoiding the vanishing/exploding gradient problem
        parameters[f'W{i}'] = np.random.randn(layer_sizes[i - 1], layer_sizes[i]) * np.sqrt(2 / layer_sizes[i - 1])
        parameters[f'b{i}'] = np.zeros((1, layer_sizes[i]))
    return parameters

def forward_propagation(X, parameters, activation_func='relu'):
    activations = {'A0': X}  # Store the input as the activation for the first layer
    num_layers = len(parameters) // 2  

    for i in range(1, num_layers):
        W = parameters[f'W{i}']
        b = parameters[f'b{i}']
        Z = np.dot(activations[f'A{i-1}'], W) + b  # Pre-activation
        activations[f'Z{i}'] = Z
        if activation_func == 'relu':
            A = relu(Z)
        elif activation_func == 'sigmoid':
            A = sigmoid(Z)
        else:
            raise ValueError(f"Didnt select the activation function correctly: {activation_func}")
        activations[f'A{i}'] = A  # Store the activations for backprop

    # Last layer uses softmax (for multi-class classification)
    W = parameters[f'W{num_layers}']
    b = parameters[f'b{num_layers}']
    Z = np.dot(activations[f'A{num_layers-1}'], W) + b
    activations[f'Z{num_layers}'] = Z
    A = softmax(Z)
    activations[f'A{num_layers}'] = A

    return activations
def backward_propagation(activations, parameters, y_true, activation_func='relu'):
    gradients = {}
    num_layers = len(parameters) // 2  # Number of weight matrices
    m = y_true.shape[0]  # Number of samples

    # Compute the gradient of the loss with respect to the output (A_L)
    dA = cross_entropy_gradient(y_true, activations[f'A{num_layers}'])
    dZ = dA # For softmax, dZ = dA

    for i in range(num_layers, 0, -1):
        dW = np.dot(activations[f'A{i-1}'].T, dZ)
        db = np.sum(dZ, axis=0, keepdims=True)
        gradients[f'dW{i}'] = dW
        gradients[f'db{i}'] = db

        if i > 1:
            W = parameters[f'W{i}']
            d_activation = None
            if activation_func == 'relu':
                d_activation = relu_derivative(activations[f'Z{i-1}'])
            elif activation_func == 'sigmoid':
                d_activation = sigmoid_derivative(activations[f'Z{i-1}'])
            else:
                raise ValueError("Invalid activation function. Must be 'relu' or 'sigmoid'.")
            dZ = np.dot(dZ, W.T) * d_activation # Chain rule
    return gradients


In [18]:
def gd(parameters, gradients, learning_rate):
    updated_parameters = {}
    num_layers = len(parameters) // 2
    for i in range(1, num_layers + 1):
        W = parameters[f'W{i}']
        b = parameters[f'b{i}']
        dW = gradients[f'dW{i}']
        db = gradients[f'db{i}']
        updated_parameters[f'W{i}'] = W - learning_rate * dW
        updated_parameters[f'b{i}'] = b - learning_rate * db
    return updated_parameters, None

def Momentum(parameters, gradients, learning_rate, momentum, v):
    updated_parameters = {}
    num_layers = len(parameters) // 2
    for i in range(1, num_layers + 1):
        W = parameters[f'W{i}']
        b = parameters[f'b{i}']
        dW = gradients[f'dW{i}']
        db = gradients[f'db{i}']
        if f'VdW{i}' not in v:
            v[f'VdW{i}'] = np.zeros_like(dW)
            v[f'Vdb{i}'] = np.zeros_like(db)
        v[f'VdW{i}'] = momentum * v[f'VdW{i}'] + learning_rate * dW
        v[f'Vdb{i}'] = momentum * v[f'Vdb{i}'] + learning_rate * db
        updated_parameters[f'W{i}'] = W - v[f'VdW{i}']
        updated_parameters[f'b{i}'] = b - v[f'Vdb{i}']
    return updated_parameters, v

def nag(parameters, gradients, learning_rate, momentum, v):
    updated_parameters = {}
    num_layers = len(parameters) // 2
    for i in range(1, num_layers + 1):
        W = parameters[f'W{i}']
        b = parameters[f'b{i}']
        dW = gradients[f'dW{i}']
        db = gradients[f'db{i}']
        if f'VdW{i}' not in v:
            v[f'VdW{i}'] = np.zeros_like(dW)
            v[f'Vdb{i}'] = np.zeros_like(db)

        # Calculate the intermediate parameters (W_tilde, b_tilde)
        W_tilde = W - momentum * v[f'VdW{i}']
        b_tilde = b - momentum * v[f'Vdb{i}']

        # Calculate gradients at the intermediate parameters
        # Need to perform forward and backward propagation with W_tilde, b_tilde
        intermediate_parameters = {f'W{k}': parameters[f'W{k}'] if k != i else W_tilde for k in range(1, num_layers + 1)}
        intermediate_parameters = {f'b{k}': parameters[f'b{k}'] if k != i else b_tilde for k in range(1, num_layers + 1)}

        #  Need a mini-forward and mini-backward prop here.
        #   This is the key difference between Momentum and NAG.
        # intermediate_gradients = gradients  # In practice, you'd calculate this with a forward/backward pass #remove this
        activations = forward_propagation(X=X_train, parameters=intermediate_parameters, activation_func='relu') #add this
        intermediate_gradients = backward_propagation(activations=activations, parameters=intermediate_parameters, y_true=y_train, activation_func='relu') #add this

        v[f'VdW{i}'] = momentum * v[f'VdW{i}'] + learning_rate * intermediate_gradients[f'dW{i}']
        v[f'Vdb{i}'] = momentum * v[f'Vdb{i}'] + learning_rate * intermediate_gradients[f'db{i}']
        updated_parameters[f'W{i}'] = W - v[f'VdW{i}']
        updated_parameters[f'b{i}'] = b - v[f'Vdb{i}']
    return updated_parameters, v

def adam(parameters, gradients, learning_rate, t, v):
    updated_parameters = {}
    num_layers = len(parameters) // 2
    if v is None:
        v = {}
    beta1 = 0.9
    beta2 = 0.999
    epsilon = 1e-8

    for i in range(1, num_layers + 1):
        W = parameters[f'W{i}']
        b = parameters[f'b{i}']
        dW = gradients[f'dW{i}']
        db = gradients[f'db{i}']

        if f'MdW{i}' not in v:
            v[f'MdW{i}'] = np.zeros_like(dW)
            v[f'Mdb{i}'] = np.zeros_like(db)
            v[f'VdW{i}'] = np.zeros_like(dW)
            v[f'Vdb{i}'] = np.zeros_like(db)

        v[f'MdW{i}'] = beta1 * v[f'MdW{i}'] + (1 - beta1) * dW
        v[f'Mdb{i}'] = beta1 * v[f'Mdb{i}'] + (1 - beta1) * db
        v[f'VdW{i}'] = beta2 * v[f'VdW{i}'] + (1 - beta2) * (dW ** 2)
        v[f'Vdb{i}'] = beta2 * v[f'Vdb{i}'] + (1 - beta2) * (db ** 2)

        MdW = v[f'MdW{i}'] / (1 - beta1 ** t)
        Mdb = v[f'Mdb{i}'] / (1 - beta1 ** t)
        VdW = v[f'VdW{i}'] / (1 - beta2 ** t)
        Vdb = v[f'Vdb{i}'] / (1 - beta2 ** t)

        updated_W = W - learning_rate * MdW / (np.sqrt(VdW) + epsilon)
        updated_b = b - learning_rate * Mdb / (np.sqrt(Vdb) + epsilon)
        updated_parameters[f'W{i}'] = updated_W
        updated_parameters[f'b{i}'] = updated_b
    return updated_parameters, v

In [19]:
def update_parameters(parameters, gradients, learning_rate, optimizer='gd', momentum=0.5, v=None, t=None):
    if v is None:
        v = {}

    if optimizer == 'gd':
        return gd(parameters, gradients, learning_rate)
    elif optimizer == 'momentum':
        return Momentum(parameters, gradients, learning_rate, momentum, v)
    elif optimizer == 'nag':
        return nag(parameters, gradients, learning_rate, momentum, v)
    elif optimizer == 'adam':
        return adam(parameters, gradients, learning_rate, t, v)
    else:
        raise ValueError(f"Invalid optimizer: {optimizer}")


In [20]:
def one_hot_encode(y, num_classes):
    m = y.shape[0]
    one_hot = np.zeros((m, num_classes))
    one_hot[np.arange(m), y] = 1
    return one_hot

def load_data(path_to_train=None, path_to_val=None, path_to_test=None):
    if path_to_train and path_to_val:
        # Load from CSV files
        train_df = pd.read_csv(path_to_train)
        val_df = pd.read_csv(path_to_val)
        X_train = train_df.iloc[:, :-1].values  # All columns except the last one
        y_train = train_df.iloc[:, -1].values   # Last column
        X_val = val_df.iloc[:, :-1].values
        y_val = val_df.iloc[:, -1].values
        X_test = None
        y_test = None
        if path_to_test:
            test_df = pd.read_csv(path_to_test)
            X_test = test_df.iloc[:, :-1].values
            y_test = test_df.iloc[:, -1].values
    else:
        # Load the digits dataset from scikit-learn and split it
        digits = load_digits()
        X, y = digits.data, digits.target
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42) # 0.25 of the original training set is now validation
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)
    if X_test is not None:
        X_test = scaler.transform(X_test)

    return X_train, y_train, X_val, y_val, X_test, y_test

def train(X_train, y_train, X_val, y_val,
          learning_rate=0.001, momentum=0.5, num_hidden=1, sizes="64",
          activation='relu', loss='ce', opt='adam', batch_size=20,
          num_epochs=10, anneal=False, path_save_dir='models/',
          expt_dir='logs/', to_pretrain=False, epoch_to_restore=0,
          to_test=False, X_test=None, y_test=None):
    """
    Trains the neural network model.

    Args:
        X_train (numpy.ndarray): Training data.
        y_train (numpy.ndarray): Training labels.
        X_val (numpy.ndarray): Validation data.
        y_val (numpy.ndarray): Validation labels.
        learning_rate (float, optional): Learning rate. Defaults to 0.001.
        momentum (float, optional): Momentum. Defaults to 0.5.
        num_hidden (int, optional): Number of hidden layers. Defaults to 1.
        sizes (str, optional): Sizes of hidden layers (comma-separated). Defaults to "64".
        activation (str, optional): Activation function. Defaults to 'relu'.
        loss (str, optional): Loss function ('ce'). Defaults to 'ce'.
        opt (str, optional): Optimizer to use ('gd', 'momentum', 'adam', 'nag').
        batch_size (int, optional): Batch size. Defaults to 20.
        num_epochs (int, optional): Number of epochs. Defaults to 10.
        anneal (bool, optional): Whether to use annealing. Defaults to False.
        path_save_dir (str, optional): Path to save models. Defaults to 'models/'.
        expt_dir (str, optional): Path to save logs. Defaults to 'logs/'.
        to_pretrain (bool, optional): Whether to pretrain. Defaults to False.
        epoch_to_restore (int, optional): Epoch to restore from. Defaults to 0.
        to_test (bool, optional): Whether to test. Defaults to False.
        X_test (numpy.ndarray, optional): Test data. Defaults to None.
        y_test (numpy.ndarray, optional): Test labels. Defaults to None.
    """

    # Create directories if they don't exist
    os.makedirs(path_save_dir, exist_ok=True)
    os.makedirs(expt_dir, exist_ok=True)

    # Save hyperparameters to a file
    with open(os.path.join(expt_dir, 'readme.txt'), 'w') as f:
        f.write(f"learning_rate: {learning_rate}\n")
        f.write(f"momentum: {momentum}\n")
        f.write(f"num_hidden: {num_hidden}\n")
        f.write(f"sizes: {sizes}\n")
        f.write(f"activation: {activation}\n")
        f.write(f"loss: {loss}\n")
        f.write(f"opt: {opt}\n")
        f.write(f"batch_size: {batch_size}\n")
        f.write(f"num_epochs: {num_epochs}\n")
        f.write(f"anneal: {anneal}\n")
        f.write(f"path_save_dir: {path_save_dir}\n")
        f.write(f"expt_dir: {expt_dir}\n")
        f.write(f"to_pretrain: {to_pretrain}\n")
        f.write(f"epoch_to_restore: {epoch_to_restore}\n")
        f.write(f"to_test: {to_test}\n")

    input_size = X_train.shape[1]
    hidden_sizes = [int(s) for s in sizes.split(',')] * num_hidden  # Repeat if necessary
    output_size = len(np.unique(y_train))
    num_classes = output_size # For one-hot encoding

    print(f"Input size: {input_size}, Hidden sizes: {hidden_sizes}, Output size: {output_size}") # Debug

    if to_pretrain:
        # Load pre-trained weights
        try:
            with open(os.path.join(path_save_dir, f'weights_epoch_{epoch_to_restore}.pkl'), 'rb') as f:
                parameters = pickle.load(f)
            print(f"Loaded pre-trained weights from epoch {epoch_to_restore}")
        except FileNotFoundError:
            print("Pre-trained weights not found.  Training from scratch.")
            parameters = initialize_parameters(input_size, hidden_sizes, output_size)
    else:
        # Initialize weights and biases
        parameters = initialize_parameters(input_size, hidden_sizes, output_size)

    # One-hot encode the labels
    y_train_encoded = one_hot_encode(y_train, num_classes)
    y_val_encoded = one_hot_encode(y_val, num_classes)

    # Initialize velocity for momentum, NAG, and Adam
    v = None
    t = 0 # For Adam

    # Best validation loss and epoch for early stopping
    best_val_loss = float('inf')
    best_epoch = 0

    # Training loop
    for epoch in range(num_epochs):
        # Mini-batch gradient descent
        num_batches = X_train.shape[0] // batch_size
        for batch in range(num_batches):
            t += 1 # For Adam
            start = batch * batch_size
            end = (batch + 1) * batch_size
            X_batch = X_train[start:end]
            y_batch = y_train_encoded[start:end]

            # Forward propagation
            activations = forward_propagation(X_batch, parameters, activation)
            y_pred_batch = activations[f'A{len(hidden_sizes) + 1}'] #get the last activation

            # Compute loss
            loss_batch = cross_entropy_loss(y_batch, y_pred_batch)

            # Backward propagation
            gradients = backward_propagation(activations, parameters, y_batch, activation)

            # Update parameters
            parameters, v = update_parameters(parameters, gradients, learning_rate, opt, momentum, v, t)

        # Compute training and validation loss for the epoch
        activations_train = forward_propagation(X_train, parameters, activation)
        y_pred_train = activations_train[f'A{len(hidden_sizes) + 1}']
        loss_train = cross_entropy_loss(y_train_encoded, y_pred_train)

        activations_val = forward_propagation(X_val, parameters, activation)
        y_pred_val = activations_val[f'A{len(hidden_sizes) + 1}']
        loss_val = cross_entropy_loss(y_val_encoded, y_pred_val)

        # Print epoch summary
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss_train:.4f}, Val Loss: {loss_val:.4f}")

        # Save model weights
        with open(os.path.join(path_save_dir, f'weights_epoch_{epoch + 1}.pkl'), 'wb') as f:
            pickle.dump(parameters, f)

        # Early stopping
        if loss_val < best_val_loss:
            best_val_loss = loss_val
            best_epoch = epoch + 1
        elif epoch - best_epoch > 5:  # Stop if validation loss hasn't improved in 5 epochs
            print(f"Early stopping at epoch {epoch + 1}")
            break

        # Annealing 
        if anneal:
            learning_rate *= 0.95  # Reduce learning rate each epoch

    print(f"Best validation loss: {best_val_loss:.4f} at epoch {best_epoch}")

    # Evaluate on the test set if provided
    if to_test and X_test is not None and y_test is not None:
        y_test_encoded = one_hot_encode(y_test, num_classes)
        activations_test = forward_propagation(X_test, parameters, activation)
        y_pred_test = activations_test[f'A{len(hidden_sizes) + 1}']
        loss_test = cross_entropy_loss(y_test_encoded, y_pred_test)
        y_pred_test_labels = np.argmax(y_pred_test, axis=1)  # Convert one-hot to labels
        accuracy = accuracy_score(y_test, y_pred_test_labels)
        print(f"Test Loss: {loss_test:.4f}, Test Accuracy: {accuracy:.4f}")

        # Print classification report and confusion matrix
        print(classification_report(y_test, y_pred_test_labels))
        print(confusion_matrix(y_test, y_pred_test_labels))
    return parameters #return the trained parameters

if __name__ == '__main__':
    # Set your parameters
    learning_rate = 0.001
    momentum = 0.9
    num_hidden = 2
    sizes = "128,64"
    activation = 'relu'  # or 'sigmoid'
    loss = 'ce'  #  'ce' for cross-entropy
    opt = 'adam'  # 'gd', 'momentum', 'adam', 'nag'
    batch_size = 64
    num_epochs = 50
    anneal = False
    path_save_dir = 'models/'
    expt_dir = 'logs/'
    to_pretrain = False
    epoch_to_restore = 0
    to_test = True # Set to true if you have a test set
    path_to_train = None  # Or path to your training CSV
    path_to_val = None    # Or path to your validation CSV
    path_to_test = None   # Or path to your test CSV

    # Load data
    X_train, y_train, X_val, y_val, X_test, y_test = load_data(path_to_train, path_to_val, path_to_test)

    # Train the model
    trained_parameters = train(X_train, y_train, X_val, y_val,
                      learning_rate, momentum, num_hidden, sizes,
                      activation, loss, opt, batch_size, num_epochs,
                      anneal, path_save_dir, expt_dir, to_pretrain,
                      epoch_to_restore, to_test, X_test, y_test)
    # Save trained model
    with open(os.path.join(path_save_dir, 'final_model.pkl'), 'wb') as f:
        pickle.dump(trained_parameters, f)
    print("Trained model saved to final_model.pkl")


Input size: 64, Hidden sizes: [128, 64, 128, 64], Output size: 10
Epoch 1/50, Loss: 1.0945, Val Loss: 1.1682
Epoch 2/50, Loss: 0.4690, Val Loss: 0.5657
Epoch 3/50, Loss: 0.2254, Val Loss: 0.3160
Epoch 4/50, Loss: 0.1312, Val Loss: 0.2389
Epoch 5/50, Loss: 0.0849, Val Loss: 0.2038
Epoch 6/50, Loss: 0.0588, Val Loss: 0.1884
Epoch 7/50, Loss: 0.0415, Val Loss: 0.1774
Epoch 8/50, Loss: 0.0301, Val Loss: 0.1706
Epoch 9/50, Loss: 0.0222, Val Loss: 0.1661
Epoch 10/50, Loss: 0.0173, Val Loss: 0.1660
Epoch 11/50, Loss: 0.0140, Val Loss: 0.1668
Epoch 12/50, Loss: 0.0118, Val Loss: 0.1671
Epoch 13/50, Loss: 0.0102, Val Loss: 0.1678
Epoch 14/50, Loss: 0.0090, Val Loss: 0.1684
Epoch 15/50, Loss: 0.0081, Val Loss: 0.1693
Epoch 16/50, Loss: 0.0074, Val Loss: 0.1700
Epoch 17/50, Loss: 0.0068, Val Loss: 0.1707
Early stopping at epoch 17
Best validation loss: 0.1660 at epoch 10
Test Loss: 0.0904, Test Accuracy: 0.9722
              precision    recall  f1-score   support

           0       0.97      1.