In [1]:
import numpy as np
import matplotlib.pyplot as plt
import wandb
from keras.datasets import fashion_mnist

In [2]:
# Activation Functions
def relu(x):
    return np.maximum(0, x)

def relu_derivative(x):
    return (x > 0).astype(float)

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def sigmoid_derivative(x):
    return sigmoid(x) * (1 - sigmoid(x))

# Define the activation_methods dictionary
activation_methods = {
    'relu': (relu, relu_derivative),
    'sigmoid': (sigmoid, sigmoid_derivative),\
}

In [None]:
def encode_labels(y, num_labels):
    encoded = np.zeros((len(y), num_labels))
    encoded[np.arange(len(y)), y] = 1
    return encoded

def get_accuracy(Y_est, Y_actual):
    predicted = np.argmax(Y_est, axis=1)
    trueVal = np.argmax(Y_actual, axis=1)
    return np.mean(predicted == trueVal)

In [3]:
class NeuralNet:
    def __init__(self, input_size, hidden_layers, output_size, act_func="relu", init_method="Xavier"):
        self.layer_count = len(hidden_layers) + 1
        self.act_func = act_func
        self.weights = []
        self.biases = []

        layer_dims = [input_size] + hidden_layers + [output_size]
        for i in range(self.layer_count):
            if init_method == "Xavier":
                if act_func in ["tanh", "sigmoid"]:
                    factor = np.sqrt(2. / (layer_dims[i] + layer_dims[i+1]))
                else:  # Defaulting to He for ReLU
                    factor = np.sqrt(2. / layer_dims[i])
                W = np.random.randn(layer_dims[i], layer_dims[i+1]) * factor
            else:
                W = np.random.randn(layer_dims[i], layer_dims[i+1]) * 0.01
            B = np.zeros((1, layer_dims[i+1]))
            self.weights.append(W)
            self.biases.append(B)

    def forward_pass(self, data):
        act_fn, _ = activation_methods[self.act_func]
        self.intermediate_Z = []
        self.intermediate_A = [data]
        output = data
        for layer in range(self.layer_count):
            Z = output.dot(self.weights[layer]) + self.biases[layer]
            self.intermediate_Z.append(Z)
            if layer == self.layer_count - 1:
                shifted_Z = Z - np.max(Z, axis=1, keepdims=True)
                exp_values = np.exp(shifted_Z)
                output = exp_values / np.sum(exp_values, axis=1, keepdims=True)
            else:
                output = act_fn(Z)
            self.intermediate_A.append(output)
        return output

    def loss_function(self, predicted, actual, loss_type="cross_entropy"):
        samples = actual.shape[0]
        if loss_type == "cross_entropy":
            return -np.sum(actual * np.log(predicted + 1e-8)) / samples
        elif loss_type == "mean_squared_error":
            return np.sum((actual - predicted) ** 2) / (2 * samples)

    def backpropagation(self, data, target, loss_type="cross_entropy"):
        batch_size = data.shape[0]
        grad_W = [None] * self.layer_count
        grad_B = [None] * self.layer_count

        output = self.intermediate_A[-1]
        delta = output - target if loss_type == "cross_entropy" else (output - target)

        for layer in reversed(range(self.layer_count)):
            if layer == self.layer_count - 1:
                dZ = delta
            else:
                _, deriv_func = activation_methods[self.act_func]
                dZ = delta * deriv_func(self.intermediate_Z[layer])
            prev_A = self.intermediate_A[layer]
            grad_W[layer] = prev_A.T.dot(dZ) / batch_size
            grad_B[layer] = np.sum(dZ, axis=0, keepdims=True) / batch_size
            if layer > 0:
                delta = dZ.dot(self.weights[layer].T)
        return grad_W, grad_B


    def update_parameters(self, grad_w, grad_b, optimizer, config, cache):
        """
        Updates model parameters using different optimization algorithms.

        :param grad_w: List of weight gradients
        :param grad_b: List of bias gradients
        :param optimizer: Optimization algorithm (e.g., "sgd", "momentum", "rmsprop", "adam", "nesterov", "nadam")
        :param config: Configuration object with hyperparameters (learning_rate, beta values, etc.)
        :param cache: Dictionary to store past values for optimizers like momentum, RMSProp, Adam, NAG, and NAdam.
        :return: Updated cache
        """
        lr = config.learning_rate  # Learning rate

        if optimizer == "sgd":
            # Vanilla Stochastic Gradient Descent (SGD)
            for i in range(self.layer_count):
                self.weights[i] -= lr * grad_w[i]
                self.biases[i] -= lr * grad_b[i]

        elif optimizer == "momentum":
            # Standard Momentum Optimization
            beta = getattr(config, "momentum", 0.9)

            if "momentum_cache" not in cache:
                cache["momentum_cache"] = {"v_w": [np.zeros_like(w) for w in self.weights],
                                          "v_b": [np.zeros_like(b) for b in self.biases]}

            for i in range(self.layer_count):
                cache["momentum_cache"]["v_w"][i] = beta * cache["momentum_cache"]["v_w"][i] + (1 - beta) * grad_w[i]
                cache["momentum_cache"]["v_b"][i] = beta * cache["momentum_cache"]["v_b"][i] + (1 - beta) * grad_b[i]

                self.weights[i] -= lr * cache["momentum_cache"]["v_w"][i]
                self.biases[i] -= lr * cache["momentum_cache"]["v_b"][i]

        elif optimizer == "nesterov":
            # Nesterov Accelerated Gradient (NAG)
            beta = getattr(config, "momentum", 0.9)

            if "nesterov_cache" not in cache:
                cache["nesterov_cache"] = {"v_w": [np.zeros_like(w) for w in self.weights],
                                          "v_b": [np.zeros_like(b) for b in self.biases]}

            for i in range(self.layer_count):
                # Lookahead Step
                v_prev_w = cache["nesterov_cache"]["v_w"][i]
                v_prev_b = cache["nesterov_cache"]["v_b"][i]

                cache["nesterov_cache"]["v_w"][i] = beta * v_prev_w + (1 - beta) * grad_w[i]
                cache["nesterov_cache"]["v_b"][i] = beta * v_prev_b + (1 - beta) * grad_b[i]

                self.weights[i] -= lr * (beta * v_prev_w + (1 - beta) * grad_w[i])
                self.biases[i] -= lr * (beta * v_prev_b + (1 - beta) * grad_b[i])

        elif optimizer == "rmsprop":
            # RMSProp Optimization
            decay_rate = getattr(config, "decay_rate", 0.99)
            epsilon = getattr(config, "epsilon", 1e-8)

            if "rms_cache" not in cache:
                cache["rms_cache"] = {"s_w": [np.zeros_like(w) for w in self.weights],
                                      "s_b": [np.zeros_like(b) for b in self.biases]}

            for i in range(self.layer_count):
                cache["rms_cache"]["s_w"][i] = decay_rate * cache["rms_cache"]["s_w"][i] + (1 - decay_rate) * (grad_w[i] ** 2)
                cache["rms_cache"]["s_b"][i] = decay_rate * cache["rms_cache"]["s_b"][i] + (1 - decay_rate) * (grad_b[i] ** 2)

                self.weights[i] -= lr * grad_w[i] / (np.sqrt(cache["rms_cache"]["s_w"][i]) + epsilon)
                self.biases[i] -= lr * grad_b[i] / (np.sqrt(cache["rms_cache"]["s_b"][i]) + epsilon)

        elif optimizer == "adam":
            # Adam Optimization (Adaptive Momentum Estimation)
            beta1 = getattr(config, "beta1", 0.9)
            beta2 = getattr(config, "beta2", 0.999)
            epsilon = getattr(config, "epsilon", 1e-8)

            if "adam_cache" not in cache:
                cache["adam_cache"] = {"m_w": [np.zeros_like(w) for w in self.weights],
                                      "v_w": [np.zeros_like(w) for w in self.weights],
                                      "m_b": [np.zeros_like(b) for b in self.biases],
                                      "v_b": [np.zeros_like(b) for b in self.biases],
                                      "step": 0}

            cache["adam_cache"]["step"] += 1
            step = cache["adam_cache"]["step"]

            for i in range(self.layer_count):
                cache["adam_cache"]["m_w"][i] = beta1 * cache["adam_cache"]["m_w"][i] + (1 - beta1) * grad_w[i]
                cache["adam_cache"]["m_b"][i] = beta1 * cache["adam_cache"]["m_b"][i] + (1 - beta1) * grad_b[i]

                cache["adam_cache"]["v_w"][i] = beta2 * cache["adam_cache"]["v_w"][i] + (1 - beta2) * (grad_w[i] ** 2)
                cache["adam_cache"]["v_b"][i] = beta2 * cache["adam_cache"]["v_b"][i] + (1 - beta2) * (grad_b[i] ** 2)

                m_w_hat = cache["adam_cache"]["m_w"][i] / (1 - beta1 ** step)
                v_w_hat = cache["adam_cache"]["v_w"][i] / (1 - beta2 ** step)
                m_b_hat = cache["adam_cache"]["m_b"][i] / (1 - beta1 ** step)
                v_b_hat = cache["adam_cache"]["v_b"][i] / (1 - beta2 ** step)

                self.weights[i] -= lr * m_w_hat / (np.sqrt(v_w_hat) + epsilon)
                self.biases[i] -= lr * m_b_hat / (np.sqrt(v_b_hat) + epsilon)

        elif optimizer == "nadam":
            # NAdam (Nesterov-accelerated Adam)
            beta1 = getattr(config, "beta1", 0.9)
            beta2 = getattr(config, "beta2", 0.999)
            epsilon = getattr(config, "epsilon", 1e-8)

            if "nadam_cache" not in cache:
                cache["nadam_cache"] = {"m_w": [np.zeros_like(w) for w in self.weights],
                                        "v_w": [np.zeros_like(w) for w in self.weights],
                                        "m_b": [np.zeros_like(b) for b in self.biases],
                                        "v_b": [np.zeros_like(b) for b in self.biases],
                                        "step": 0}

            cache["nadam_cache"]["step"] += 1
            step = cache["nadam_cache"]["step"]

            mu_t = beta1 * (1 - 0.5 * (0.96 ** (step / 250)))
            for i in range(self.layer_count):
                cache["nadam_cache"]["m_w"][i] = mu_t * cache["nadam_cache"]["m_w"][i] + (1 - mu_t) * grad_w[i]
                cache["nadam_cache"]["v_w"][i] = beta2 * cache["nadam_cache"]["v_w"][i] + (1 - beta2) * (grad_w[i] ** 2)

                self.weights[i] -= lr * cache["nadam_cache"]["m_w"][i] / (np.sqrt(cache["nadam_cache"]["v_w"][i]) + epsilon)

        return cache  # Return updated cache

In [5]:
def execute_training():
    wandb.init()
    cfg = wandb.config

    # Set dynamic run name for easy tracking in WandB
    wandb.run.name = f"e_{cfg.num_epochs}_hls_{cfg.hiddennodes}_numhl_{cfg.hiddenlayers}_opt_{cfg.opt}" \
                     f"_bs_{cfg.batch_size}_init_{cfg.initializer}_ac_{cfg.activation_func}_loss_{cfg.loss}" \
                     f"_lr_{cfg.learning_rate}_wdecay_{cfg.weight_decay}"

    # Load and preprocess data
    (train_X, train_y), (test_X, test_y) = fashion_mnist.load_data()
    train_X = train_X.reshape(train_X.shape[0], -1) / 255.0
    test_X = test_X.reshape(test_X.shape[0], -1) / 255.0

    num_classes = 10
    train_y_oh = encode_labels(train_y, num_classes)
    test_y_oh = encode_labels(test_y, num_classes)

    # Split train into train and validation sets
    val_split = int(0.9 * train_X.shape[0])
    val_X, val_y_oh = train_X[val_split:], train_y_oh[val_split:]
    train_X, train_y_oh = train_X[:val_split], train_y_oh[:val_split]

    input_dim = train_X.shape[1]
    hidden_arch = [cfg.hiddennodes] * cfg.hiddenlayers

    # Initialize the model
    model = NeuralNet(
        input_size=input_dim,
        hidden_layers=hidden_arch,
        output_size=num_classes,
        act_func=cfg.activation_func,
        init_method=cfg.initializer
    )

    optimizer_states = {}
    grad_clip_value = 1.0

    for epoch in range(cfg.num_epochs):
        # Shuffle data at the beginning of each epoch
        shuffle_idx = np.random.permutation(train_X.shape[0])
        train_X = train_X[shuffle_idx]
        train_y_oh = train_y_oh[shuffle_idx]

        epoch_loss = 0.0
        correct_preds = 0
        total_preds = 0

        num_batches = train_X.shape[0] // cfg.batch_size
        for batch in range(num_batches):
            start = batch * cfg.batch_size
            end = start + cfg.batch_size
            X_batch = train_X[start:end]
            y_batch = train_y_oh[start:end]

            # Forward pass
            outputs = model.forward_pass(X_batch)
            loss = model.loss_function(outputs, y_batch, loss_type=cfg.loss)
            epoch_loss += loss

            # Accuracy calculation
            correct_preds += np.sum(np.argmax(outputs, axis=1) == np.argmax(y_batch, axis=1))
            total_preds += X_batch.shape[0]

            # Backward pass (compute gradients)
            grad_w, grad_b = model.backpropagation(X_batch, y_batch, loss_type=cfg.loss)

            # Gradient clipping
            grad_w = [np.clip(g, -grad_clip_value, grad_clip_value) for g in grad_w]
            grad_b = [np.clip(g, -grad_clip_value, grad_clip_value) for g in grad_b]

            # Update model parameters using optimizer
            optimizer_states = model.update_parameters(grad_w, grad_b, cfg.opt, cfg, optimizer_states)

        # Average loss and training accuracy
        avg_loss = epoch_loss / num_batches
        train_acc = correct_preds / total_preds

        # Validation phase
        val_outputs = model.forward_pass(val_X)
        val_loss = model.loss_function(val_outputs, val_y_oh, loss_type=cfg.loss)
        val_acc = get_accuracy(val_outputs, val_y_oh)

        # Log training and validation metrics to WandB
        wandb.log({
            "epoch": epoch + 1,
            "train_loss": avg_loss,
            "train_accuracy": train_acc,
            "val_loss": val_loss,
            "val_accuracy": val_acc
        })

        # Display progress
        print(f"Epoch {epoch + 1}: Train Loss: {avg_loss:.4f}, Train Acc: {train_acc:.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    # Test phase
    test_outputs = model.forward_pass(test_X)
    test_acc = get_accuracy(test_outputs, test_y_oh)

    # Log final test accuracy
    wandb.log({"test_accuracy": test_acc})
    print(f"Final Test Accuracy: {test_acc:.4f}")

In [None]:
# Custom sweep configuration 1st Session

sweep_config = {
    'name': "DA6401 ASSIGNMENT 01",
    'method': 'bayes',
    'metric': {
        'name': 'val_accuracy',
        'goal': 'maximize'
    },
    'parameters': {
        'hiddenlayers': {'values': [4, 5, 6]},
        'num_epochs': {'values': [5, 10]},  
        'hiddennodes': {'values': [128, 256]},  # Larger networks
        'learning_rate': {'values': [1e-3, 5e-4]},  # Higher learning rates
        'initializer': {'values': ["Xavier"]},  # Force proper initialization
        'batch_size': {'values': [128, 256]},
        'opt': {'values': ["sgd", "adam", "nesterov", "rmsprop", "momentum", "nadam"]},
        'activation_func': {'values': ["relu"]},  # Better for deep networks
        'loss': {'values': ["cross_entropy"]},
        'weight_decay': {'values': [0.0001, 0.001]}  # L2 regularization
    }
}

if __name__ == "__main__":
    sweep_id = wandb.sweep(sweep_config, project="Q4")
    wandb.agent(sweep_id, function=execute_training, count=50)

In [None]:
sweep_config1 = {
    'name': "DA6401 ASSIGNMENT 01",
    'method': 'grid',
    'metric': {'name': 'val_accuracy', 'goal': 'maximize'},
    'parameters': {
        'hiddenlayers': {'values': [6]},
        'num_epochs': {'values': [15]},
        'hiddennodes': {'values': [256]},
        'learning_rate': {'values': [1e-3]},
        'initializer': {'values': ["Xavier"]},
        'batch_size': {'values': [256]},
        'opt': {'values': ["adam"]},
        'activation_func': {'values': ["relu"]},
        'loss': {'values': ["cross_entropy"]},
        'weight_decay': {'values': [0.001]}
    }
}

sweep_config2 = {
    'name': "DA6401 ASSIGNMENT 01",
    'method': 'grid',
    'metric': {'name': 'val_accuracy', 'goal': 'maximize'},
    'parameters': {
        'hiddenlayers': {'values': [6]},
        'num_epochs': {'values': [10]},
        'hiddennodes': {'values': [256]},
        'learning_rate': {'values': [5e-4]},
        'initializer': {'values': ["Xavier"]},
        'batch_size': {'values': [256]},
        'opt': {'values': ["nadam"]},
        'activation_func': {'values': ["relu"]},
        'loss': {'values': ["cross_entropy"]},
        'weight_decay': {'values': [0.0008]}
    }
}

sweep_config3 = {
    'name': "DA6401 ASSIGNMENT 01",
    'method': 'grid',
    'metric': {'name': 'val_accuracy', 'goal': 'maximize'},
    'parameters': {
        'hiddenlayers': {'values': [6]},
        'num_epochs': {'values': [15]},
        'hiddennodes': {'values': [256]},
        'learning_rate': {'values': [1e-3]},
        'initializer': {'values': ["Xavier"]},
        'batch_size': {'values': [256]},
        'opt': {'values': ["rmsprop"]},
        'activation_func': {'values': ["relu"]},
        'loss': {'values': ["cross_entropy"]},
        'weight_decay': {'values': [0.001]}
    }
}
# -----------------------
# Execution
if __name__ == "__main__":
    sweep_id = wandb.sweep(sweep_config1, project="Q4")
    wandb.agent(sweep_id, function=execute_training, count=1)

    sweep_id = wandb.sweep(sweep_config2, project="Q4")
    wandb.agent(sweep_id, function=execute_training, count=1)

    sweep_id = wandb.sweep(sweep_config3, project="Q4")
    wandb.agent(sweep_id, function=execute_training, count=1)