<a href="https://colab.research.google.com/github/subikkshas/DA6401/blob/main/Network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import math
import wandb
import numpy as np
from tqdm import tqdm
import tensorflow as tf
from copy import deepcopy
from activations import Sigmoid, Tanh, ReLU, Softmax
from layers import Input, Dense
from optimizers import SGD, Momentum, Nesterov, RMSProp, Adam, Nadam
from loss import CrossEntropy, SquaredError
from helper import OneHotEncoder

# Mapping available optimizers and loss functions
optimizer_mapping = {
    "SGD": SGD(),
    "Momentum": Momentum(),
    "Nesterov": Nesterov(),
    "RMSProp": RMSProp(),
    "Adam": Adam(),
    "Nadam": Nadam()
}

loss_mapping = {
    "SquaredError": SquaredError(),
    "CrossEntropy": CrossEntropy()
}



# Neural Network

class NeuralNetwork:
    """Implements a feedforward neural network with backpropagation."""

    def __init__(self, layers, batch_size, optimizer, initialization, epochs, target, loss,
                 X_val=None, target_val=None, use_wandb=False, optim_params=None):
        """
        Initializes the neural network.

        Args:
            layers (list): List of layers in the network.
            batch_size (int): Number of samples per batch.
            optimizer (str): Optimization algorithm.
            initialization (str): Weight initialization method.
            epochs (int): Number of training epochs.
            target (np.ndarray): Training target labels.
            loss (str): Loss function name.
            X_val (np.ndarray, optional): Validation dataset.
            target_val (np.ndarray, optional): Validation target labels.
            use_wandb (bool, optional): Whether to log training on Weights & Biases.
            optim_params (dict, optional): Parameters for optimizers.
        """
        self.layers = layers
        self.batch_size = batch_size
        self.initialization = initialization
        self.epochs = epochs
        self.optimizer = optimizer
        self.target = target
        self.num_batches = math.ceil(self.target.shape[1] / batch_size)
        self.loss_type = loss
        self.loss_fn = loss_mapping[loss]
        self.use_wandb = use_wandb

        if target_val is not None:
            self.X_val = X_val
            self.layers[0].a_val = X_val
            self.target_val = target_val

        self.initialize_parameters(optimizer, optim_params)

    def initialize_parameters(self, optimizer, optim_params):
        """Initializes weights and biases for each layer."""
        previous_size = self.layers[0].size
        for layer in self.layers[1:]:
            layer.W_size = (layer.size, previous_size)
            previous_size = layer.size
            layer.W_optimizer = deepcopy(optimizer_mapping[optimizer])
            layer.b_optimizer = deepcopy(optimizer_mapping[optimizer])

            # Assign optimizer parameters if provided
            if optim_params:
                layer.W_optimizer.configure_params(optim_params)
                layer.b_optimizer.configure_params(optim_params)

        if self.initialization == "RandomNormal":
            for layer in self.layers[1:]:
                layer.W = np.random.normal(loc=0, scale=1.0, size=layer.W_size)
                layer.b = np.zeros((layer.W_size[0], 1))

        elif self.initialization == "XavierUniform":
            for layer in self.layers[1:]:
                initializer = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.05)
                layer.W = np.array(initializer(shape=layer.W_size))
                layer.b = np.zeros((layer.W_size[0], 1))

        elif self.initialization == "Test":
            for layer in self.layers[1:]:
                layer.W = np.ones(layer.W_size) * 0.5
                layer.b = np.zeros((layer.W_size[0], 1))

    def forward_pass(self):
        """Performs forward propagation through the network."""
        for i in range(1, len(self.layers)):
            self.layers[i].h = self.layers[i].W @ self.layers[i - 1].a - self.layers[i].b
            self.layers[i].a = self.layers[i].activation.compute(self.layers[i].h)

            if hasattr(self, "X_val"):
                self.layers[i].h_val = self.layers[i].W @ self.layers[i - 1].a_val - self.layers[i].b
                self.layers[i].a_val = self.layers[i].activation.compute(self.layers[i].h_val)

        if self.loss_type == "CrossEntropy":
            self.layers[-1].y = Softmax().compute(self.layers[-1].a)
            self.layers[-1].y_val = Softmax().compute(self.layers[-1].a_val)
        else:
            self.layers[-1].y = self.layers[-1].a
            self.layers[-1].y_val = self.layers[-1].a_val

    def compute_accuracy(self, validation=False, verbose=False):
        """Computes the accuracy of the model."""
        encoder = OneHotEncoder()
        y_train = encoder.inverse_transform(self.layers[-1].y)
        t_train = encoder.inverse_transform(self.target)
        acc_train = np.sum(y_train == t_train)

        if verbose:
            print("Training Accuracy:", acc_train)

        if validation:
            y_val = encoder.inverse_transform(self.layers[-1].y_val)
            t_val = encoder.inverse_transform(self.target_val)
            acc_val = np.sum(y_val == t_val)

            if verbose:
                print("Validation Accuracy:", acc_val)
            return acc_train, acc_val
        return acc_train

    def train(self):
        """Trains the neural network using backpropagation."""
        self.learning_rate_history = []
        self.training_loss_history = []
        self.training_accuracy_history = []
        self.validation_loss_history = []
        self.validation_accuracy_history = []

        loss_fn = SquaredError()

        for epoch in tqdm(range(self.epochs), desc="Training Progress"):
            self.learning_rate_history.append(self.layers[-1].W_optimizer.learning_rate)
            self.training_loss_history.append(loss_fn.compute_loss(self.target, self.layers[-1].y))
            train_acc, val_acc = self.compute_accuracy(validation=True)
            self.training_accuracy_history.append(train_acc)
            self.validation_loss_history.append(loss_fn.compute_loss(self.target_val, self.layers[-1].y_val))
            self.validation_accuracy_history.append(val_acc)

            if self.use_wandb:
                wandb.log({
                    "epoch": epoch,
                    "training_loss": self.training_loss_history[-1] / self.target.shape[1],
                    "training_accuracy": self.training_accuracy_history[-1] / self.target.shape[1],
                    "validation_loss": self.validation_loss_history[-1] / self.target_val.shape[1],
                    "validation_accuracy": self.validation_accuracy_history[-1] / self.target_val.shape[1]
                })

            for batch in range(self.num_batches):
                t_batch = self.target[:, batch * self.batch_size:(batch + 1) * self.batch_size]
                y_batch = self.layers[-1].y[:, batch * self.batch_size:(batch + 1) * self.batch_size]

                if loss_fn.compute_loss(t_batch, y_batch) > self.training_loss_history[-1]:
                    for layer in self.layers[1:]:
                        layer.W_optimizer.configure_params({"learning_rate": self.optimizer.learning_rate / 2})
                        layer.b_optimizer.configure_params({"learning_rate": self.optimizer.learning_rate / 2})
                    break

                self.layers[-1].a_grad = loss_fn.compute_gradient(t_batch, y_batch)
                self.layers[-1].h_grad = self.layers[-1].a_grad * self.layers[-1].activation.derivative(self.layers[-1].h)

                for i in range(len(self.layers) - 2, 0, -1):
                    self.layers[i].a_grad = self.layers[i + 1].W.T @ self.layers[i + 1].h_grad
                    self.layers[i].h_grad = self.layers[i].a_grad * self.layers[i].activation.derivative(self.layers[i].h)

                for layer in self.layers[1:]:
                    layer.W -= layer.W_optimizer.compute_update(layer.W_grad)
                    layer.b -= layer.b_optimizer.compute_update(layer.b_grad)

            self.forward_pass()

    def summary(self):
        """Displays the model summary."""
        print("Model Architecture:")
        for layer in self.layers:
            print(layer)
        print("Loss Function:", self.loss_fn)
        print("Epochs:", self.epochs)
        print("Batch Size:", self.batch_size)
        print("Optimizer:", self.optimizer)
        print("Initialization Method:", self.initialization)
