# **NN** Implementation (Forward + Backward Propagation)
1. Add input in /test/input and model in /test/model with .txt format, makesure they both have the same filename.
2. Input the filename for both the input and the model, ensuring that they have the same filename.
3. Scroll down and click on `./tmp/network.html`. This will redirect you to network.html, which you can then open the visualization using a live server.

#### Made by:
- Samuel Christoper Swandi - 13520075
- Grace Claudia - 13520078
- Ubaidillah Ariq Prathama - 13520085
- Patrick Amadeus Irawan - 13520109

------------

List of Content

1. [Library & Dependencies](#library)
2. [Helper Function](#helper)
3. [Neural Network Visualization](#visualization)

## Library & Dependencies <a name="library"></a>

In [None]:
!python3 -m pip install pyvis==0.3.2
!python3 -m pip install networkx==2.6.3
!python3 -m pip install numpy==1.21.6

In [1]:
import numpy as np
from pandas import DataFrame
from sklearn.neural_network import MLPClassifier
import os

from pyvis.network import Network

## **Class**

#### Connected Layer

In [3]:
class ConnectedLayer:
    def __init__(self, input_size, output_size, weights):
        self.input_size = input_size
        self.output_size = output_size
        self.weights = weights[1:]
        self.bias = weights[0]

    def forward(self, input):
        self.input = input
        return np.dot(input, self.weights) + self.bias

#### Activation Layer

In [4]:
class ActivationLayer:
    def __init__(self, activation):
        self.activation = activation
    
    def forward(self, input):
        self.input = input
        return self.activation(input)

#### Neural Network

In [348]:
class NeuralNetwork:
    def __init__(self):
        self.layers = []

    def add(self, layer):
        self.layers.append(layer)
    
    def predict(self, x):
        complete_result = []
        output = x[:, 1:]

        for layer in self.layers:
            output = layer.forward(output)
            if isinstance(layer, ActivationLayer):
                complete_result.append(output)

        return complete_result[-1] , complete_result

    # train the network
    def fit(self, x_train, y_train, learning_rate=0.1, epochs = 5, batch_size = 2, err_threshold = 0.01):
        # training loop
        for i in range(epochs):
            print(i)
            err = 0
            for j in range(len(x_train)):
                
                # forward propagation
                output = x_train[:, 1:]
                for layer in self.layers:
                    output = layer.forward(output)

                print(output)
                # compute loss (for display purpose only)
                # err += self.loss(y_train[j], output)

            #     # backward propagation
            #     error = self.loss_prime(y_train[j], output)
            #     for layer in reversed(self.layers):
            #         error = layer.backward_propagation(error, learning_rate)

            # # calculate average error on all samples
            # err /= samples
            # print('epoch %d/%d   error=%f' % (i+1, epochs, err))

## **Helper** Function <a class="anchor" id="helper"></a>

#### Activation Function

In [6]:
def linear(net):
    return net

def ReLU(net):
    return np.maximum(0,net)

def sigmoid(net):
    return 1/(1+np.exp(-net))

def softmax(net):
    res = []
    for sample in net:
        res.append(np.exp(sample)/np.sum(np.exp(sample)))
    return np.array(res)

#### Loss Function

In [7]:
def mse(y_true, y_pred):
    return np.mean(np.square(y_true - y_pred))

def sse(y_true, y_pred):
    return np.sum(np.square(y_true - y_pred))

#### Model Loading Function

In [8]:
def model_load(data):
    '''
    Function to load model from file
    INPUT :     data -> data from model file

    OUTPUT :    weights -> weights of each model neuron
                list_prev_size -> list of previous layer size
                list_layer_size -> list of current layer size
                list_activation -> list of activation function
    '''

    idx = 1
    weights,list_prev_size,list_layer_size,list_activation = [],[],[],[]

    for i in range(int(data[0]) - 1):
        # Loading size & act function
        prev_size, layer_size, activation = [int(i) for i in data[idx].split()]
        list_prev_size.append(prev_size)
        list_layer_size.append(layer_size)
        list_activation.append(activation)
        
        # Loading weights
        idx += 1
        weight = []
        for j in range(prev_size + 1):
            weight.append([float(i) for i in data[idx].split()])
            idx += 1
        weights.append(weight)
    
    return weights, list_prev_size, list_layer_size, list_activation

#### Predict Output Function

In [329]:
def predict_output(x, net):
    '''
    Function to predict output from input data
    INPUT :     x-> input data
                net -> NeuralNetwork object

    OUTPUT :    out -> output of the model
                complete_out -> complete output of the model, visualization purpose
                x -> input data, visualization purpose
    '''
    # Predict output
    out, complete_out = net.predict(x)

    # Gather complete output
    n_complete_out = []
    for i in range(len(complete_out[0])):
        n_complete_out.append([complete_out[0][i], complete_out[-1][i]])

    complete_out = n_complete_out
    return out, complete_out

#### SSE Errors Function + microhelper

In [330]:
def compute_out_and_errors(output_data, out):
    '''
    Function to calculate output error
    INPUT :     output_data -> output data from output file
                out -> output of the model, obtained from predict_output function

    OUTPUT :    out_pred -> output of the model (flattened)
                out_true -> output from output_data (flattened)
                sse_error -> sum squared error
                sse_error <= max_sse -> boolean value, True if sse_error <= max_sse, False otherwise
    '''
    # Assign y_train from output_data
    parsed_output = [[float(j) for j in i.split()] for i in output_data]
    out_pred = out.flatten()
    out_true = np.array(parsed_output[:-1]).flatten()
    max_sse = parsed_output[-1][0]

    return out_pred, out_true, sse(out_true, out_pred), sse(out_true, out_pred) <= max_sse

## **Load** Section

Entrypoint

In [115]:
MODEL_FOLDER = '../test/model/'
INPUT_FOLDER = '../test/input/'
OUTPUT_FOLDER = '../test/output/'
# read file from test folder
def read_file(folder_path, file_name):
    with open(folder_path + file_name, 'r') as file:
        data = [i.rstrip("\n") for i in file.readlines()]
    return data

filename = input('Enter test case name (with extension): ')
data = read_file(MODEL_FOLDER, filename)
input_data = read_file(INPUT_FOLDER, filename)

try: 
    output_data = read_file(OUTPUT_FOLDER, filename)
except:
    output_data = None

model_load

In [349]:
weights, list_prev_size, list_layer_size, list_activation = model_load(data)

print('---Model Information---')
print('Number of layers :', len(list_prev_size) + 1)
print('Input size :', list_prev_size[0])
print('Output size :', list_layer_size[-1])

print()
print('Weights :', weights)
print('Previous layer size :', list_prev_size)
print('Current layer size :', list_layer_size)
print('Activation function :', list_activation)

---Model Information---
Number of layers : 3
Input size : 2
Output size : 2

Weights : [[[0.35, 0.35], [0.15, 0.25], [0.2, 0.3]], [[0.6, 0.6], [0.4, 0.5], [0.45, 0.55]]]
Previous layer size : [2, 2]
Current layer size : [2, 2]
Activation function : [2, 2]


Build NN

In [350]:
# Build NN model
act = {0: linear, 1: ReLU, 2: sigmoid, 3: softmax}
net = NeuralNetwork()
for i in range (int(data[0]) - 1):
    net.add(ConnectedLayer(list_prev_size[i], list_layer_size[i], weights[i]))
    net.add(ActivationLayer(act[list_activation[i]]))

## **Predict** Section

Prepare x_train

In [342]:
# Assign x_train from input_data
x_train = []
for i in range(len(input_data)):
    x_train.append([float(i) for i in input_data[i].split()])
x_train = np.array(x_train)

In [343]:
out, complete_out = predict_output(x_train, net)

# Print the information, complete with brief verbose
print('---Prediction Information---')
print('Input data :', x_train)
print('Output data :', out.flatten())

---Prediction Information---
Input data : [[1.   0.05 0.1 ]]
Output data : [0.75136507 0.77292847]


In [351]:
net.fit(x_train,out_true)

0
[[0.75136507 0.77292847]]
1
[[0.75136507 0.77292847]]
2
[[0.75136507 0.77292847]]
3
[[0.75136507 0.77292847]]
4
[[0.75136507 0.77292847]]


## Compute Error with **Sum Squared Error (SSE)**

Sum Squared Error (SSE) is a mathematical function used in statistics and machine learning to measure the difference between predicted and actual values. It is commonly used as a cost function in various optimization algorithms, such as gradient descent.

The SSE is calculated by taking the difference between each predicted value and its corresponding actual value, squaring the difference, and then summing all of the squared differences:

$$SSE = \sum_{i=1}^{n}(y_i - \hat{y}_i)^2$$

Where:
- $n$ is the number of data points
- $y_i$ is the actual value of the i-th data point
- $\hat{y}_i$ is the predicted value of the i-th data point

The SSE gives an indication of how well the model fits the data. A lower SSE indicates that the model is a better fit for the data.

In machine learning, the SSE is often used as a cost function to be minimized during training of a model. The goal is to find the set of model parameters that minimizes the SSE, thus improving the accuracy of the model's predictions.

In [118]:
out_pred, out_true, sse_err, isLessThanMaxSSE = compute_out_and_errors(output_data, out)

print('Output prediction :', out_pred)
print('Output true :', out_true)
print('SSE  : ', sse_err)
print("sse <= max_sse  :", isLessThanMaxSSE)

Output prediction : [0.75136507 0.77292847]
Output true : [0.01 0.99]
SSE  :  0.5967422175200054
sse <= max_sse  : False


## Neural Network Visualization with **Pyvis**

Pyvis is a Python library that provides an easy-to-use interface for visualizing complex networks, including neural networks. Implementation of the visualization is enlisted below:

In [119]:
def visualize_network(data, complete_out) -> Network:
    n = int(data[0])

    # Constant
    XSTEP, YSTEP, SIZE = 300, 300, 10

    # Nodes
    nodes = []
    node_i = 1

    # Nodes Value
    value = []
    x_val = 0
    y_val = 0

    # Position
    x = []
    y = []

    # Styling + Text
    label = []
    color = []
    edge = []
    title = []

    # Indexing
    src_idx = 0
    idx = 1

    for i_layer in range(n - 1):
        n_curr, n_next, _ = [int(i) for i in data[idx].split()]

        if i_layer == 0:  # means that this is the first layer, hence construct input
            for i_node in range(n_curr + 1):
                if i_node == 0:  # bias
                    color.append("#dd4b39")
                    label.append("Input[bias]")
                    temp = ""
                    for i in range (len(input_data)):
                        temp += str(x_train[i][0])
                        if i < len(input_data) - 1:
                            temp += ", "
                    title.append(temp)
                else:
                    color.append("#162347")
                    label.append("Input[{}]".format(i_node))
                    temp = ""
                    for i in range (len(input_data)):
                        temp += str(x_train[i][i_node])
                        if i < len(input_data) - 1:
                            temp += ", "
                    title.append(temp)
                value.append(SIZE)
                x.append(x_val)
                y.append(y_val)
                y_val += YSTEP
                nodes.append(node_i)
                node_i += 1
            x_val += XSTEP

        y_val = 0
        # always construct the next layer
        for i_node in range(n_next + 1):
            if i_node == 0:
                if i_layer == n - 2:
                    continue
                color.append("#dd4b39")
                label.append("HL{}[bias]".format(i_layer + 1))
                temp = ""
                for i in range (len(input_data)):
                    temp +=  "1"
                    if i < len(input_data) - 1:
                        temp += ", "
                title.append(temp)
            else:
                color.append("#162347")
                if i_layer == n - 2:
                    label.append("Output[{}]".format(i_node))
                else:
                    label.append("HL{}[{}]".format(i_layer + 1, i_node))
                temp = ""
                for i in range (len(input_data)):
                    temp += str(complete_out[i][i_layer][i_node - 1])
                    if i < len(input_data) - 1:
                        temp += ", "
                title.append(temp)
            value.append(SIZE)
            x.append(x_val)
            y.append(y_val)
            y_val += YSTEP
            nodes.append(node_i)
            node_i += 1
        x_val += XSTEP

        idx += 1
        for origin in range(n_curr + 1):
            dst_idx = -1
            for w in reversed(data[idx].split()):
                edge.append((nodes[src_idx], nodes[dst_idx], w))
                dst_idx -= 1
            src_idx += 1
            idx += 1

    g = Network(notebook=True, cdn_resources="remote")
    g.add_nodes(nodes,title = title,value = value,x=x,y=y,label = label,color = color)

    for e in edge:
        g.add_edge(e[0], e[1], title = e[2], color="#162347")

    for n in g.nodes:
        n.update({'physics': False})
    
    return g

Visualize using Helper Function

In [None]:
g = visualize_network(data, complete_out)
g.show("./tmp/network.html")

In [341]:
net.fit(x_train, out_true)

0


ValueError: shapes (3,) and (2,2) not aligned: 3 (dim 0) != 2 (dim 0)

## Backpropagation Section

#### **Output** Delta

In [312]:
def delta_output(out_pred, out_true, predecessor_out, act_type):
    dE_dO = -(out_true - out_pred)
    dO_dNet = None
    dNet_dW = predecessor_out

    if act_type == 0: #linear
        dO_dNet = 1
    elif act_type == 1: #relu
        dO_dNet = np.where(out_pred < 0, 0, 1)
    elif act_type == 2: #sigmoid
        dO_dNet = out_pred * (1 - out_pred)

    dE_dNet = dE_dO * dO_dNet
    if act_type == 3: # softmax
        dE_dNet = np.where(out_pred == out_true, out_pred, -(1 - out_pred))
    
    delta = []
    for i in dNet_dW: # TODO: Change for batch
        cur_w = []
        for j in dE_dNet:
            cur_w.append(i * j)
        delta.append(cur_w)
    
    return delta, dE_dNet

#### **Hidden** Delta

In [313]:
def error_succ(succ_dE_dNet,weight):
    err_total = []
    for w in weight[1:]:  # skip the bias
        err_total.append(np.sum(succ_dE_dNet * np.array(w)))
    
    return err_total

def delta_hidden(predecessor_out, succ_out, succ_dE_dNet, succ_weight, act_type):
    dEtotal_dH = error_succ(succ_dE_dNet, succ_weight)
    dH_dNet = None
    dNet_dW = predecessor_out

    if act_type == 0: #linear
        dH_dNet = 1
    elif act_type == 1: #relu
        dH_dNet = np.where(succ_out < 0, 0, 1)
    elif act_type == 2: #sigmoid
        dH_dNet = succ_out * (1 - succ_out)

    dEtotal_dNet = dEtotal_dH * dH_dNet
    # if act_type == 3: #softmax
    #     dEtotal_dNet = np.where(prev_out == out_true, prev_out, -(1 - prev_out))

    delta = []
    for i in dNet_dW.flatten(): # TODO: change for batch
        cur_w = []
        for j in dEtotal_dNet:
            cur_w.append(i * j)
        delta.append(cur_w)
    return delta, dEtotal_dNet

#### **Update** Weight

In [314]:
def update(weights, deltas, learning_rate):
    np_weights = np.array(weights) - learning_rate * np.array(deltas)
    return np_weights.tolist()

### Single Iteration Implementation

total deltas (gradient) for each layer

In [315]:
deltas = []

Output deltas computation

In [316]:
out_pred, out_true, _, _ = compute_out_and_errors(output_data, out)

# Without bias TODO: change for batch
predecessor_out = np.array([i[-2] for i in complete_out]).flatten()
# Concatenate with bias TODO : change for batch
predecessor_out = np.concatenate((np.array([1]), predecessor_out))

delta, succ_dE_dNet = delta_output(out_pred, out_true, predecessor_out, 2)
deltas.append(delta)

Hidden deltas computation

In [317]:
succ_out = np.array([i[-2] for i in complete_out]).flatten()
delta, succ_dE_dNet = delta_hidden(x_train, succ_out, succ_dE_dNet, weights[-1], 2)
deltas.append(delta)

Update Weights

In [318]:
weights = update(weights, deltas[::-1], 0.5)

In [319]:
weights

[[[0.34196709387931556, 0.3407865766027151],
  [0.14959835469396576, 0.24953932883013577],
  [0.19919670938793158, 0.29907865766027153]],
 [[0.461501438371443, 0.6380982365165563],
  [0.3178329594357693, 0.522602540477475],
  [0.3673323721524668, 0.5727402422159782]]]