# Backpropagation

Tasks:

1. Try out the given examples. Change parameters such as learning rate, number of epochs and observe the NNs behavior in training
2. Change the parameters and the topology in the overfitting example and try to find the most simple NN suitable - avoid overfitting
3. Implement the ReLU as a new activation function and create and train a NN to regress the data in the file "regression_example_2.csv"

In [None]:
import matplotlib.pyplot as plt #library for visualizing data
%matplotlib widget 
#setting for jupyter lab
plt.rcParams['figure.figsize'] = [10, 5] #setting figure size (plots)

import pandas as pd  # (software library for data analysis and manipulation, https://pandas.pydata.org/docs/)
import numpy as np  # (software library for matrix multiplications, https://numpy.org/doc/)
import statistics as stats  # (python module for statistic calculations, https://docs.python.org/3/library/statistics.html)
import time #python time module
import random #module for generation of random values

def lin(x):
    return x

def lin_deriv(x):
    return 1


def sigmoid(x):
        return 1 / (1 + np.exp(-x))


def sigmoid_deriv(x):
    return sigmoid(x) * (1.0 - sigmoid(x))


def tanh(x):
    return np.tanh(x)


def tanh_deriv(x):
    return 1 - x ** 2



class NeuralNet:

    history = []
    layer_topology = []

    
    
    def __init__(self, layers, activation_function='tanh', print_model_info=True):
        if activation_function == 'sigmoid':
            self.activation = sigmoid
            self.activation_prime = sigmoid_deriv
        elif activation_function == 'tanh':
            self.activation = tanh
            self.activation_prime = tanh_deriv
        elif activation_function == 'lin':
            self.activation = lin
            self.activation_prime = lin_deriv

        self.layers = layers

        # Set weights for each layer
        self.weights = []
        for i in range(0, len(layers) - 1):
            additional_column = 1 if i < len(layers) - 2 else 0
            r = np.random.randint(-500, 500, (layers[i] + 1, layers[i + 1] + additional_column)) * 0.001 #set to values between -0.5 and 0.5
            self.weights.append(r)

        if print_model_info:
            self.print_model_info()

            
            
    def print_model_info(self):
        print('Model: \n')

        layers = self.layers
        max_nodes = max(layers)
        for i, layer in enumerate(layers):
            diff = abs(max_nodes - layer)
            output = ''
            for k in range(round(diff + 0.5 / 2)):
                output += ' '
            for j in range(layer):
                output += ' O'
            for k in range(round(diff + 5)):
                output += ' '
            if i == 0:
                output += f'{layer} Input(s)'
            elif i == len(layers) - 1:
                output += f'{layer} Output(s)'
            else:
                output += f'{layer} Node(s)'
            print(output)

        print('\nActivation:', str(self.activation))

        weight_count = 0
        for weight in self.weights:
            weight_count += weight.shape[0] * weight.shape[1]

        print(f'{len(layers)} Layers')
        print(f'{weight_count} trainable parameters.\n\n')

        
        
    def fit(self, X, y, l_rate=0.2, epochs=100000, print_freq=100, validation_x=None, validation_y=None):
        t_0 = time.perf_counter()

        ones = np.atleast_2d(np.ones(X.shape[0]))
        X = np.hstack((ones.T, X))  # stack ones for bias

        history = []

        for k in range(epochs):
            t_0_epoch = time.perf_counter()
            for i, row in enumerate(X):  # loop through every training data sample

                # 1 FEED FORWARD - loop through the layers and compute layer activations
                layer_activations = [row] #create list for layer activations and use present NN inputs as first entry
                for w_i in range(len(self.weights)):
                    dot_value = np.matmul(layer_activations[w_i], self.weights[w_i]) #weighted sum
                    activation = self.activation(dot_value)
                    layer_activations.append(activation)

                # 2: Output Layer
                nn_result = layer_activations[-1]  # last item in our layer_activations is the output of our network
                error = y[i] - nn_result

                deltas = [error * self.activation_prime(layer_activations[-1])] #create list for error signals and store error signal of output layer

                # 3: Backpropagation through hidden layers
                for la_i in range(len(layer_activations) - 2, 0, -1):  # iterate backwards through layer activations and weights
                    ##compute delta based on previous examined layers deltas and store in list
                    deltas.append(np.matmul(deltas[-1], self.weights[la_i].T) * self.activation_prime(layer_activations[la_i]))
                deltas.reverse()  # invert order of delta list

                # 4: Gradient calculation and weight update
                for w_i in range(len(self.weights)):
                    layer_activation = np.atleast_2d(layer_activations[w_i])
                    delta = np.atleast_2d(deltas[w_i])
                    gradient = np.matmul(layer_activation.T, delta)
                    self.weights[w_i] += l_rate * gradient

            # performance control
            Y_pred = nn.predict(X[:, 1:])
            error = y.reshape(len(y), 1) - Y_pred
            mse = (error ** 2).mean()
            mae = np.abs(error).mean()
            
            mse_v = mae_v = None
            
            if validation_x is not None:
                y_val_pred = nn.predict(validation_x)
                val_error = validation_y.reshape(len(validation_y), 1) - y_val_pred
                mse_v = (val_error ** 2).mean()
                mae_v = np.abs(val_error).mean()

            history.append([k, mse, mae, mse_v, mae_v])


            # print output
            if k % print_freq == 0:
                t_1_epoch = time.perf_counter()
                time_delta_epochs = t_1_epoch - t_0_epoch
                mean_time_delta_epochs_ms = round((time_delta_epochs / print_freq) * 1000, 4)
                print(f'Epochs: {k}  -  MSE: {round(mse,4)}  -  MAE: {round(mae,4)}  -  Time dur. per epoch: {mean_time_delta_epochs_ms} ms')

        self.history = history
        print(f'\nTraining finished. Time consumed: {round(time.perf_counter() - t_0, 2)} s')
        return history

    
    
    def predict(self, X):
        X = np.atleast_2d(X)
        ones = np.atleast_2d(np.ones(X.shape[0]))
        prediction = np.hstack((ones.T, X))

        for weight_matrix in self.weights:
            prediction = self.activation(np.matmul(prediction, weight_matrix))
        return prediction


In [None]:
#logic gate examples
xor_gate = pd.DataFrame([[0,0,0],[1,0,1],[0,1,1],[1,1,0]], columns = ['x1', 'x2', 'y'])
and_gate = pd.DataFrame([[1,1,1],[1,0,0],[0,1,0],[0,0,0]], columns = ['x1', 'x2', 'y'])
or_gate = pd.DataFrame([[1,1,1],[1,0,1],[0,1,1],[0,0,0]], columns = ['x1', 'x2', 'y'])

gate = or_gate
X = gate[['x1', 'x2']].to_numpy()
y = gate[['y']].to_numpy()
nn = NeuralNet([2,1], activation_function='tanh')
history = nn.fit(X, y, l_rate=0.95, epochs = 200, print_freq=5)

In [None]:
print(nn.predict(np.array([0,0])))
print(nn.predict(np.array([1,0])))
print(nn.predict(np.array([1,1])))

In [None]:
#regression example
data = pd.read_csv('./regression_example_1.csv')

training_data = data.sample(frac=0.8)
validation_data = data.drop(training_data.index)

X = training_data[['input']].to_numpy()
y = training_data[['output']].to_numpy()
X_val = validation_data[['input']].to_numpy()
y_val = validation_data[['output']].to_numpy()

nn = NeuralNet([1,2,3,1])

history = nn.fit(X, y, l_rate=0.075, epochs = 1000, print_freq=250, validation_x = X_val, validation_y = y_val)

In [None]:
#overfitting example

#set seed for repeatable results
seed=24253
np.random.seed(seed)
random.seed(seed)

data = pd.read_csv('./regression_example_2.csv')

training_data = data.sample(frac=0.8)

while [0.12] not in training_data['input'].values or [0.13] not in validation_data['input'].values:
    training_data = data.sample(frac=0.8)
    validation_data = data.drop(training_data.index)

    


X = training_data[['input']].to_numpy()
y = training_data[['output']].to_numpy()
X_val = validation_data[['input']].to_numpy()
y_val = validation_data[['output']].to_numpy()
nn = NeuralNet([1,3,6,6,19,14,6,6,3,1], activation_function='tanh')

history = nn.fit(X, y, l_rate=0.09, epochs = 8000, print_freq=250, validation_x = X_val, validation_y = y_val)

In [None]:
#plot history
plt.close('all')
history_df = pd.DataFrame(history, columns=['epoch', 'MSE', 'MAE', 'MSE_val', 'MAE_val'])
history_df.plot(x='epoch', y=['MSE', 'MSE_val'])
history_df.tail(int(len(history_df.index)/1.5)).plot(x='epoch',  y=['MSE', 'MSE_val'])

In [None]:
#create test data
X_test = np.atleast_2d(np.linspace(0,1,1001,1)).T
Y=nn.predict(X_test)

predictions = pd.DataFrame([X_test.flatten(),Y.flatten()]).T
predictions.columns = ['input', 'output']

training_data = pd.DataFrame([X.flatten(),y.flatten()]).T
training_data.columns = ['input', 'output']

validation_data = pd.DataFrame([X_val.flatten(),y_val.flatten()]).T
validation_data.columns = ['input', 'output']

#display predictions
ax = training_data.plot(kind='scatter', x= 'input', y='output', label='training data')
validation_data.plot(kind='scatter', x= 'input', y='output', ax=ax, color='red', label='validation data')
predictions.plot(kind='line', x= 'input', y='output', ax = ax, color='orange')

