#### Student Name: Mai Ngo
#### Course Name and Number: DSC 578 Neural Network and Deep Learning - SEC 701
#### Assignment 1 - Modified Network code
#### Date: 10/8/2023

In [1]:
import random
import json
import numpy as np
import pandas as pd

In [2]:
class Network(object):
    '''class Network for stochastic gradient descent.'''
    
    def __init__(self, networkSize):
        '''Initialize network's number of layers, and neuron count within each layer. 
        Biases and weights are initialized randomly.'''
    
        self.numLayers = len(networkSize)
        self.networkSize = networkSize
        self.init_acts_shape = []  #Initial shape of activation outputs.
    
        #Biases start from the 2nd layer. neuronNum_l1 = # of neurons in the layer.
        self.biases = [np.random.randn(neuronNum_l1, 1) for neuronNum_l1 in networkSize[1:]]
    
        #Weights return as an array size (# neurons next layer, # neurons current layer).
        self.weights = [np.random.randn(neuronNum_l1, neuronNum_l) 
                        for neuronNum_l, neuronNum_l1 in zip(networkSize[:-1], networkSize[1:])]


    def SGD(self, trainData, epochs, mini_batchSize, eta, testData=None):
        '''Train the neural network using mini-batch stochastic gradient descent.
        trainData: a list of tuples (X,Y). testData: optional.
        epochs: # of times the network will be trained on the entire trainData.'''
        
        trainNum = len(trainData) 
        if testData:
            testNum = len(testData)
        
        summary_trainRes = []
        summary_testRes = []
        
        for epoch in range(epochs):
            #Get a list of mini batches with individual mini_batchSize.
            miniBatches = [trainData[m : m + mini_batchSize]
                for m in range(0, trainNum, mini_batchSize)]

            for miniBatch in miniBatches:
                self.update_miniBatch(miniBatch, eta)

            #Get evaluation result of each epoch.
            trainRes = self.evaluateEpoch(trainData)
            if not testData:
                summary_trainRes.append(trainRes)
                print(f"[Epoch {epoch}] Train: Count={trainRes['Count']:>4}, "
                      f"Accuracy={trainRes['Accuracy']:.4f}, "
                      f"MSE={trainRes['MSE']:.4f}, CE={trainRes['CE']:.4f}, LL={trainRes['LL']:.4f}")
            
            else: #Get and print both.
                summary_trainRes.append(trainRes)
                print(f"[Epoch {epoch}] Train: Count={trainRes['Count']:>4}, "
                      f"Accuracy={trainRes['Accuracy']:.4f}, "
                      f"MSE={trainRes['MSE']:.4f}, CE={trainRes['CE']:.4f}, LL={trainRes['LL']:.4f}")
                
                testRes = self.evaluateEpoch(testData)
                summary_testRes.append(testRes)
                print(f"[Epoch {epoch}] Valid: Count={testRes['Count']:>4}, "
                      f"Accuracy={testRes['Accuracy']:.4f}, "
                      f"MSE={testRes['MSE']:.4f}, CE={testRes['CE']:.4f}, LL={testRes['LL']:.4f}")
            
            #Early exit if applies.
            if trainRes['Accuracy'] == 1.0: 
                break
            elif (epoch >=3 and trainRes['MSE'] >= max (summary_trainRes[epoch-1]['MSE'], 
                                                        summary_trainRes[epoch-2]['MSE'], 
                                                        summary_trainRes[epoch-3]['MSE'])):
                break

        return [summary_trainRes, summary_testRes if testData else []]

    def update_miniBatch(self, miniBatch, eta):
        '''Update the network's weights and biases to a single mini batch.
        miniBatch: a list of tuples (X, Y), eta: learning rate.'''
        
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        nabla_w = [np.zeros(w.shape) for w in self.weights]

        for X, Y in miniBatch:
            backprop_nabla_b, backprop_nabla_w = self.backprop(X, Y)
            for i in range(len(nabla_b)): #Iterate over eaxch layer.
                nabla_b[i] += backprop_nabla_b[i]
                nabla_w[i] += backprop_nabla_w[i]
        self.weights = [w - (eta / len(miniBatch)) * nw for w, nw in zip(self.weights, nabla_w)]
        self.biases = [b - (eta / len(miniBatch)) * nb for b, nb in zip(self.biases, nabla_b)]
        
        return nabla_b, nabla_w
        
    def backprop(self, X, Y):
        '''Calculate the gradients with respect to the network's parameters (weights and biases).
        Return a tuple (nabla_b, nabla_w).'''

        #Create two lists of arrays with the same shapes as the network's biases and weights.
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        nabla_w = [np.zeros(w.shape) for w in self.weights]

        #Forward pass: start with initial instanceInput X
        #Activations list contains the activations of each layer progressed through the network.
        activations = [np.zeros((layerSize, 1)) for layerSize in self.networkSize]  #Replica of network size. 
        activations[0] = X
        
        #Store initial activation shape.
        self.init_acts_shape = [X.shape]
        
        zList = []  #List to store all the z vectors, layer by layer.
        for i, (b, w) in enumerate(zip(self.biases, self.weights)):
            z = np.dot(w, activations[i]) + b
            zList.append(z)
            activation = sigmoid(z)
            activations[i + 1] = activation
            self.init_acts_shape.append(activation.shape)  #Append for each activation.
            

        #Backward pass.
        #Error gradient (finalDelta), nabla weight and bias @ final output.
        delta = self.costDerivative(activations[-1], Y) * sigmoidPrime(zList[-1])
        nabla_b[-1] = delta
        nabla_w[-1] = np.dot(delta, activations[-2].transpose())

        #Since backward, l = 1 means the last layer of neurons, l = 2 is the second-last layer, and so on.
        for l in range(2, self.numLayers):
            z = zList[-l]
            z_sigmoidPrime = sigmoidPrime(z)
            delta = np.dot(self.weights[-l + 1].transpose(), delta) * z_sigmoidPrime
            nabla_b[-l] = delta
            nabla_w[-l] = np.dot(delta, activations[-l - 1].transpose())
        
        return nabla_b, nabla_w

    def evaluateEpoch(self, testData):
        '''Evaluate each epoch's performance returns as dictionary form.
        Correct count, accuracy, MSE, CE, and Log-Likelihood (LL).'''
        
        testNum = len(testData) #To get average.
        epochMSE = 0.0
        epochCE = 0.0
        epochLL = 0.0
        epochPred = 0
        
        for instanceInput, targetOutput in testData:
            actualOutput = self.feedForward(instanceInput)
            outputDiff = actualOutput-targetOutput
            epochMSE += 0.5 * np.linalg.norm(outputDiff) ** 2
            epochCE += np.sum(np.nan_to_num(-targetOutput*np.log(actualOutput)-(1-targetOutput)*np.log(1-actualOutput)))
            epochLL += np.sum(np.nan_to_num(-targetOutput*np.log(actualOutput)))
            
            if np.argmax(actualOutput) == np.argmax(targetOutput): 
                epochPred +=1
                
        return {'Count': epochPred, 'Accuracy': epochPred/testNum, 'MSE': epochMSE / testNum, 
                'CE': epochCE / testNum, 'LL': epochLL / testNum}

    def feedForward(self, activationOutput):
        '''Return network activation output given 'activationOutput' as input.
        Use for evaluation; not during training/backpropagation.'''
        for b, w in zip(self.biases, self.weights):
            activationOutput = sigmoid(np.dot(w, activationOutput) + b)
        return activationOutput
    
    def costDerivative(self, actualOutput, targetOutput):
        '''Calculate the difference between target and actual output activations of the neural network, final layer.'''
        return actualOutput - targetOutput

    @classmethod
    def loadNetwork(cls, fileName):
        '''Load a neural network from a JSON file and return an instance of Network.'''
        try:
            with open(fileName, "r") as inFile:
                data = json.load(inFile)
            network = cls(data["sizes"])
            network.weights = [np.array(w) for w in data["weights"]]
            network.biases = [np.array(b) for b in data["biases"]]
            return network
        
        except (FileNotFoundError, json.JSONDecodeError) as e:
            print(f"Error loading network from {fileName}: {e}")
            return None  

    def saveNetwork(self, fileName):
        '''Save the neural network to a JSON file.'''
        data = {"sizes": self.sizes, "weights": [w.tolist() for w in self.weights],
                "biases": [b.tolist() for b in self.biases]}  #"cost": str(self.cost.__name__)        
        outFile = open(fileName, "w")
        json.dump(data, outFile)
        outFile.close()

In [3]:
def sigmoid(z):
    '''Compute the sigmoid activation for a given input.'''
    return 1.0 / (1.0 + np.exp(-z))

def sigmoidPrime(z):
    '''Compute the derivative of the sigmoid function for use in backpropagation.'''
    return sigmoid(z) * (1 - sigmoid(z))

def vectorizeTarget(n, target):
    '''Return an array of shape (n, 1) with '1.0' assigned to target position, and zeroes for the rest. 
    n = total # of Ys | target is an array of one element aka. target value itself.'''
    
    targetArray = np.zeros((n, 1))
    print(targetArray)
    targetArray[int(target[0])] = 1.0
    return targetArray

In [4]:
def loadCSV(fileName, inputSize, targetSize, seedNum=213):
    ''' Load the data from a csv file.  Target (y) is already in the one-hot-vector notation (binary representation).
        inputSize: # of Xs | targetSize: # of Ys. 
        Output as a list with each element contains a pair of Xs and Ys (formatted as one column vector respectively). 
        Total # of element = Total # of instances.'''
    
    data = pd.read_csv(fileName, header=None)
    #Set the random seed if specified to shuffle, for reproducibility. Otherwise no shuffling.
    if seedNum:
        data = data.sample(frac=1, random_state=seedNum)

    #Separate the X and Y parts.
    X = data[data.columns[:inputSize]].values.tolist()
    Y = data[data.columns[-targetSize:]].values.tolist()
    
    #Combine the parts for each instance and put all in a list. 
    #For each instance, zip(X,Y) pairs input feature vector and its original corresponding target value vector.
    dataset = [(np.reshape(x, (inputSize, 1)), np.reshape(y, (targetSize, 1)))
               for x, y in zip(X, Y)]
    return dataset