In [None]:
from keras.datasets import fashion_mnist
import numpy as np
import matplotlib.pyplot as plt
import wandb
from sklearn.model_selection import train_test_split
import math

In [None]:
wandb.login(key="b4dc866a06ba17317c20de0d13c1a64cc23096dd")
wandb.init(project="CS23S025-Assignment-1-DA6401-DL", entity="cs23s025-indian-institute-of-technology-madras")

In [None]:
(X_train, Y_train), (X_test, Y_test) = fashion_mnist.load_data()

 1. Fashion-MNIST images are 28x28, so input size is 784. The output is 10 classes.

In [None]:
categories = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

# list to hold one sample per class
sample_images = [None] * len(categories)

for img, lbl in zip(X_train, Y_train):
    if sample_images[lbl] is None:
        sample_images[lbl] = img  
    
    if not any(x is None for x in sample_images):
        break

wandb_images = [wandb.Image(img.astype(np.uint8), caption=categories[idx]) for idx, img in enumerate(sample_images)]
wandb.log({"Q1_sampleImageForEachClass": wandb_images})
wandb.finish()

In [None]:
# fig, axes = plt.subplots(2, 5, figsize=(10, 5))
# for ax, img, lbl in zip(axes.flat, sample_images, categories):
#     ax.imshow(img, cmap='gray')
#     ax.set_title(lbl)
#     ax.axis('off')

# plt.tight_layout()
# plt.show()


Next, we need to normalise the data.
Why to normalise ::
1. Faster convergence.
2. Avoids vanishing gradients.
3. Compatible with activation functions.
4. improves Model performance.

we did not normalise above because we were just visualising the images but whenever we use the data set for training, normalisation becomes essential for better performance.

Why are we dividing with 255.0 for normalisation?
-> Fashion-MNIST consists of grayscale images with pixel value ranging from 0 to 255. Dividing by 255 scales all values between 0 and 1.

In [None]:
(X_train, Y_train), (X_test, Y_test) = fashion_mnist.load_data()
X_train = X_train / 255.0
X_test = X_test / 255.0


In [None]:
#Reshaping
xTrainTemp = X_train.reshape(X_train.shape[0], -1)  # (60000, 784)
yTrainTemp = Y_train
xTestTemp = X_test.reshape(X_test.shape[0], -1)       # (10000, 784)
yTestTemp = Y_test

print("xTrainTemp shape:", xTrainTemp.shape)
print("yTrainTemp shape:", yTrainTemp.shape)


In [None]:
# Split training data into training and validation sets (90/10 split)
x_train, x_val, y_train, y_val = train_test_split(xTrainTemp, yTrainTemp, test_size=0.1, random_state=33)

print("x_train shape:", x_train.shape)
print("y_train shape:", y_train.shape)

Activation Functions and their corresponding Derivatives

In [None]:
def relu(x):
    return np.maximum(0, x)

def dRelu(x):
    return np.where(x > 0, 1, 0)

def tanh(x):
    return np.tanh(x)

def dTanh(x):
    return 1 - np.square(np.tanh(x))

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def dSigmoid(x):
    s = sigmoid(x)
    return s * (1 - s)

def softmax(x):
    expX = np.exp(x)
    return expX / np.sum(expX)

In [None]:
class Layer:
    # dict<function, derivative>
    activationFunc = {
        'tanh': (tanh, dTanh),
        'sigmoid': (sigmoid, dSigmoid),
        'relu': (relu, dRelu),
        # Derivative for softmax is handled with cross-entropy loss
        'softmax': (softmax, None)  
    }
    
    def __init__(self, inputSize, neuronCount, activation):
        # Xavier initialization for weights to help with gradient flow
        np.random.seed(33)
        sd = np.sqrt(2 / float(inputSize + neuronCount))
        self.w = np.random.normal(0, sd, size=(neuronCount, inputSize))
        self.b = np.zeros((neuronCount, 1))
        self.act, self.dAct = self.activationFunc.get(activation)
        self.dW = 0 #gradients of the loss function with respect to the weights and biases of the layer, respectively
        self.db = 0

In [None]:
def forwardPropagation(inputData, layers):
    """
      inputData: numpy array of shape (inputDim, 1)
      layers: list of Layer objects ordered from the first hidden to the output layer
      Returns:Output probabilities from the final layer
    """
    numLayers = len(layers)
    # First layer
    layers[0].a = np.dot(layers[0].w, inputData)
    layers[0].h = layers[0].act(layers[0].a)
    
    #hidden layers
    for j in range(1, numLayers - 1):
        layers[j].a = np.dot(layers[j].w, layers[j-1].h)
        layers[j].h = layers[j].act(layers[j].a)
    j+=1
    # Output layer
    layers[j].a = np.dot(layers[j].w, layers[j-1].h) #last layers pre-activation
    layers[j].h = softmax(layers[j].a) #output layer activation using softmax fucntion ---> returns probability
    return layers[numLayers-1].h 

In [None]:
def backwardPropagation(trueLabel, yHat, layers, inputData):
    """
Used to compute gradients for each layer.

trueLabel: the true class index (integer)
yHat: predicted output from forwardPropagation (probabilities)
layers: list of Layer objects
inputData: original input data (needed for first layer gradient)
  
  returns : The layers list with updated gradients (dW and db for each layer).
    """
    # one-hot encoded vector for the true label
    oneHot = np.zeros(yHat.shape)
    oneHot[trueLabel] = 1
    
    # For output layer using softmax and cross-entropy loss:
    layers[-1].da = yHat - oneHot  # (yHat - oneHot) is the gradient
    
    # Backpropagate from the output layer to the first hidden layer
    for j in range(len(layers) - 1, 0, -1):

        # gradients for weights and biases for the current layer
        prevActivation = layers[j-1].h  # Activation from previous layer
        layers[j].dW = np.dot(layers[j].da, prevActivation.T)
        layers[j].db = layers[j].da 
        
        # gradient for previous layer's activation
        layers[j-1].dh = np.dot(layers[j].w.T, layers[j].da)
        if layers[j-1].dAct is not None:
            layers[j-1].da = layers[j-1].dh * layers[j-1].dAct(layers[j-1].a)
    
    # gradients for the first layer using the original ip data
    layers[0].dW = np.dot(layers[0].da, inputData.T)
    layers[0].db = layers[0].da
    
    return layers

#### Losss Functions - cross Entropy and squared Loss

In [None]:
def squaredErrorLoss(labels, predictions, index):
    
    numClasses = predictions.shape[0]
    # one-hot encoded vector for the true label.
    oneHot = np.zeros((numClasses, 1))
    oneHot[labels[index]] = 1

    # squared error loss b/w predictions and one hot vector.
    loss = np.sum((predictions - oneHot) ** 2)
    return loss

def crossEntropyLoss(labels, predictions, index):
 
    '''
        * labels[index] is the true class index, predictions[labels[index]] is 
        the probability for that class.
    '''
    return -np.log(predictions[labels[index]])


#### Stochastic Gradient Descent (SGD) training with mini-batch updates.

In [None]:
def sgd(numEpochs, layers, learningRate, trainX, trainY, valX, valY, batchSize):
    """
    returns the following : 
      *costHistory -> List of average training cost per epoch.
      * layers    ->  Updated network layers with trained weights.
    """
    numSamples = trainX.shape[0]
    costHistory = []
    
    for epoch in range(numEpochs):
        epochCost = 0
        
        for i in range(numSamples):
           
            inputSample = trainX[i].reshape(784, 1)
            print(inputSample[0].shape) 
            # Forward propagation: calc the output prob.
            outputProb = forwardPropagation(inputSample, layers)
            
            # get training loss for the sample using cross-entropy loss.
            epochCost += crossEntropyLoss(trainY, outputProb, i)
            
            # Backward propagation: calc grad based on the true label.
            backwardPropagation(trainY[i], outputProb, layers, inputSample)
            
            # Update parameters when the mini-batch is complete.
            if (i + 1) % batchSize == 0:
                for layer in layers:
                    # Update weights and biases using averaged gradients.
                    layer.w = layer.w - learningRate * (layer.dW / batchSize)
                    layer.b = layer.b - learningRate * (layer.db / batchSize)
                    
                    # gradients is resetted to zero for the next mini-batch.
                    layer.dW = 0
                    layer.db = 0
        
        # avg cost for the current epoch.
        costHistory.append(epochCost / numSamples)
        
        # Evaluating the Model
        # Entire validation set is passed as a batch
        valPrediction = forwardPropagation(valX.T, layers)
        valCost = 0
        for i in range(len(valY)):
            valCost += crossEntropyLoss(valY, valPrediction[:, i].reshape(10, 1), i)
        valCost /= len(valY)
        
        # validation accuracy
        predictedLabels = valPrediction.argmax(axis=0)
        valAccuracy = np.sum(predictedLabels == valY) / valY.shape[0]
        
        # wandb logs
        #wandb.log({"epoch": epoch, "train_loss": costHistory[-1], "val_accuracy": valAccuracy, "val_loss": valCost})
        
        print(f"-----------------Epoch {epoch}-----------------")
        print("Training Loss: ", epochCost / numSamples)
        print("Validation Accuracy: ", valAccuracy)
        print("Validation Loss: ", valCost)
    
    return costHistory, layers


#### momentum based gradient descent

In [None]:
def momentumGradientDescent(numEpochs, layers, learningRate, trainX, trainY, valX, valY, batchSize):
    # Momentum hyperparameter
    gamma = 0.9  
    numSamples = trainX.shape[0]
    costHistory = []
    
    # momentum for each layer once at the start
    for layer in layers:
        layer.updateW = 0  # Momentum term for weights
        layer.updateB = 0  # Momentum term for biases

    for epoch in range(numEpochs):
        epochCost = 0
        
        for i in range(numSamples):
            # Reshape current training sample into column vector-> 784 x 1 for MNIST
            inputSample = trainX[i].reshape(784, 1)
            
            # Forward propagation: cacl output probab
            outputProb = forwardPropagation(inputSample, layers)
            
            # Calc training loss using cross-entropy
            epochCost += crossEntropyLoss(trainY, outputProb, i)
            
            # Backpropagation: calc grad for current sample
            backwardPropagation(trainY[i], outputProb, layers, inputSample)
            
            # update parameters with momentum when a mini batch is complete
            if (i + 1) % batchSize == 0:
                for layer in layers:
                    # Update velocity
                    layer.updateW = gamma * layer.updateW + learningRate * (layer.dW / batchSize)
                    layer.updateB = gamma * layer.updateB + learningRate * (layer.db / batchSize)
                    
                    # Update weights and biases using the momentum terms
                    layer.w = layer.w - layer.updateW
                    layer.b = layer.b - layer.updateB
                    
                    # Reset gradients for the next mini-batch
                    layer.dW = 0
                    layer.db = 0

        # Avg training cost for the epoch
        costHistory.append(epochCost / numSamples)
        
        # Evaluating the Model
        # Here, we assume the validation data is passed in a batch
        valPrediction = forwardPropagation(valX.T, layers)
        valCost = 0
        for i in range(len(valY)):
            valCost += crossEntropyLoss(valY, valPrediction[:, i].reshape(10, 1), i)
            
        valCost /= len(valY)
        predictedLabels = valPrediction.argmax(axis=0)
        valAccuracy = np.sum(predictedLabels == valY) / valY.shape[0]
        
        # wandb logs
        #wandb.log({"epoch": epoch, "train_loss": costHistory[-1], "val_accuracy": valAccuracy, "val_loss": valCost})
        
        print(f"-----------------Epoch {epoch}-----------------")
        print("Training Loss: ", epochCost / numSamples)
        print("Validation Accuracy: ", valAccuracy)
        print("Validation Loss: ", valCost)
    
    return costHistory, layers


#### ADAM


In [None]:
def adam(numEpochs, layers, learningRate, trainX, trainY, valX, valY, batchSize):
    
    epsilon, beta1, beta2 = 1e-8, 0.9, 0.99
    # Time step for bias correction
    t = 0  
    numSamples = trainX.shape[0]
    costHistory = []
    
    # Initialize m,v for each layer once at the start.
    for layer in layers:
        layer.mW = 0  # 1st moment for weights
        layer.mB = 0  # 1stmoment for biases
        layer.vW = 0  # 2nd  moment for weights
        layer.vB = 0  # 2nd  moment for biases

    for epoch in range(numEpochs):
        epochCost = 0

        for i in range(numSamples):
            # Reshape current training sample into column vector-> 784 x 1 for MNIST
            inputSample = trainX[i].reshape(784, 1)
            
            # Forward propagation: calc the output probab
            outputProb = forwardPropagation(inputSample, layers)
            
            # Calculate training loss
            epochCost += crossEntropyLoss(trainY, outputProb, i)
            
            # Backpropagation: calc grad for the current sample
            backwardPropagation(trainY[i], outputProb, layers, inputSample)
            
            # Update parameters after processing a mini-batch
            if (i + 1) % batchSize == 0:
                t += 1  
                for layer in layers:
                    # Compute mini-batch avg grad
                    gradW = layer.dW / batchSize
                    gradB = layer.db / batchSize
                    
                    # Update biased 1st moment estimate
                    layer.mW = beta1 * layer.mW + (1 - beta1) * gradW
                    layer.mB = beta1 * layer.mB + (1 - beta1) * gradB
                    
                    # Update biased 2nd moment estimate
                    layer.vW = beta2 * layer.vW + (1 - beta2) * (gradW ** 2)
                    layer.vB = beta2 * layer.vB + (1 - beta2) * (gradB ** 2)
                    
                    # Compute bias-corrected 1st moment estimate
                    mWHat = layer.mW / (1 - np.power(beta1, t))
                    mBHat = layer.mB / (1 - np.power(beta1, t))
                    
                    # Compute bias-corrected 2nd moment estimate
                    vWHat = layer.vW / (1 - np.power(beta2, t))
                    vBHat = layer.vB / (1 - np.power(beta2, t))
                    
                    # Update parameters using the Adam update rule
                    layer.w = layer.w - learningRate * mWHat / (np.sqrt(vWHat) + epsilon)
                    layer.b = layer.b - learningRate * mBHat / (np.sqrt(vBHat) + epsilon)
                    
                    # Reset gradients for the next mini-batch
                    layer.dW = 0
                    layer.db = 0
        
        # average training cost for the current epoch
        costHistory.append(epochCost / numSamples)
        
        # Evaluation on validation data 
        valPrediction = forwardPropagation(valX.T, layers)
        valCost = 0
        for i in range(len(valY)):
            valCost += crossEntropyLoss(valY, valPrediction[:, i].reshape(10, 1), i)
        valCost /= len(valY)
        
        # find predicted labels and compute validation accuracy
        predictedLabels = valPrediction.argmax(axis=0)
        valAccuracy = np.sum(predictedLabels == valY) / valY.shape[0]
        
        # wandb logs
        # wandb.log({"epoch": epoch, "train_loss": costHistory[-1], "val_accuracy": valAccuracy, "val_loss": valCost})
        
        print(f"-----------------Epoch {epoch}-----------------")
        print("Training Loss: ", epochCost / numSamples)
        print("Validation Accuracy: ", valAccuracy)
        print("Validation Loss: ", valCost)
    
    return costHistory, layers


In [None]:
def myOptimiser(layers, optimizerName, numEpochs, learningRate, trainX, trainY, valX, valY, batchSize):

    if optimizerName == "sgd":
        return sgd(numEpochs, layers, learningRate, trainX, trainY, valX, valY, batchSize)
    elif optimizerName == "mgd":
        return momentumGradientDescent(numEpochs, layers, learningRate, trainX, trainY, valX, valY, batchSize)
    # elif optimizerName == "nesterov":
    #     return nesterov(numEpochs, layers, learningRate, trainX, trainY, valX, valY, batchSize)
    # elif optimizerName == "rmsprop":
    #     return rmsprop(numEpochs, layers, learningRate, trainX, trainY, valX, valY, batchSize)
    elif optimizerName == "adam":
         return adam(numEpochs, layers, learningRate, trainX, trainY, valX, valY, batchSize)
    # elif optimizerName == "nadam":
    #     return nadam(numEpochs, layers, learningRate, trainX, trainY, valX, valY, batchSize)
    else:
        print("No optimization algorithm named " + optimizerName + " found")
        return "Error", "Error"


In [None]:
def predict(inputData, trueLabels, layers):

    # Forward pass: compute predictions for the entire test data.
    predictionMatrix = forwardPropagation(inputData, layers)
    
    totalLoss = 0
    numSamples = len(trueLabels)
    for i in range(numSamples):
        # Compute cross-entropy loss for each sample.
        totalLoss += crossEntropyLoss(trueLabels, predictionMatrix[:, i].reshape(10, 1), i)
    
    # Determine predicted labels by taking the class with maximum probability.
    predictedLabels = predictionMatrix.argmax(axis=0)
    accuracy = np.sum(predictedLabels == trueLabels) / trueLabels.shape[0]
    
    averageLoss = totalLoss / numSamples
    return predictedLabels, accuracy, averageLoss

In [None]:
def modelTrain(epochs, learningRate, neurons, hLayers, activation, batchSize, optimizer, x_train, y_train, x_val, y_val):
   
    layers = [Layer(x_train.shape[1], neurons, activation)]
    for _ in range(hLayers - 1):
        layers.append(Layer(neurons, neurons, activation))
    layers.append(Layer(neurons, 10, 'softmax'))
    
    costs, layers = myOptimiser(layers, optimizer, epochs, learningRate, x_train, y_train, x_val, y_val, batchSize)
    
    # Evaluate the model on the test set.
    outputTest, accuracyTest, testLoss = predict(xTestTemp.T, yTestTemp, layers)
    
    # wandb logs
    # wandb.log({"accuracy": accuracyTest})
    # wandb.log({"Testing loss": testLoss})
    
    print("----------------------------------")
    print("Test accuracy: ", accuracyTest)
    print("Test loss: ", testLoss)
    
    return outputTest

# Set hyperparameters and training configuration
activation = 'tanh'
batchSize = 32
epochs = 1
hLayers = 3
learningRate = 0.1
neurons = 128
optimizer = 'sgd'

# Train the model and get test predictions
outputTest = modelTrain(epochs, learningRate, neurons, hLayers, activation, batchSize, optimizer, x_train, y_train, x_val, y_val)


In [4]:
import wandb

api = wandb.Api()

# List all runs for the project
runs = api.runs("cs23s025-indian-institute-of-technology-madras/CS23S025-Assignment-1-DA6401-DL")

# Collect unique sweep IDs and names
sweep_info = {}
for run in runs:
    if run.sweep:  # Check if the run is part of a sweep
        sweep_id = run.sweep.id
        if sweep_id not in sweep_info:
            sweep_info[sweep_id] = run.sweep.name  # Try getting the sweep name

# Print the mapping
for sweep_id, sweep_name in sweep_info.items():
    print(f"Sweep ID: {sweep_id}, Sweep Name: {sweep_name}")


Sweep ID: zm5ph7l4, Sweep Name: Deep-Learning-Assignment-1
Sweep ID: 38qy9h0x, Sweep Name: DL-Assignment-1
Sweep ID: xm7jkopc, Sweep Name: DL-Assignment-1_running_Remote
Sweep ID: amu7xlj0, Sweep Name: DL-Assignment-1_remote_SqLoss
Sweep ID: sluhp8ra, Sweep Name: DL-Assignment-1_sweep2
Sweep ID: oc3w4kuz, Sweep Name: DL-Assignment-1_remote_SqLoss_sweep2
Sweep ID: 8z40b4y0, Sweep Name: DL-Assignment-1_sqLoss_sweep1
Sweep ID: sejxf76k, Sweep Name: DL-Assignment-1_sweep3
Sweep ID: nf3mjrmg, Sweep Name: DL-Assignment-1_sweep4
Sweep ID: 6xn5laor, Sweep Name: DL-Assignment-1_finalSQloss
