# MLP class

In [1]:
import numpy as np

In [2]:
import copy

class MLP:
    def __init__(self, layer_sizes, act_fun, out_act_fun_is_linear = True):
        self.layer_sizes = layer_sizes
        self.n_layers = len(layer_sizes)
        self.set_act_fun(act_fun)
        self.set_out_act_fun(out_act_fun_is_linear)
        
        self.weights = [None] * (self.n_layers - 1)
        self.biases = [None] * (self.n_layers - 1)
        self.initialize_weights()

    def initialize_weights(self):
        for i in range(self.n_layers - 1):
            self.weights[i] = np.random.rand(self.layer_sizes[i], self.layer_sizes[i + 1])
            self.biases[i] = np.random.rand(self.layer_sizes[i + 1], 1)        

    def _forward(self, X, return_activations = False):
        X = np.atleast_2d(X)
        Y = np.atleast_2d([])
        A = []
        for i in range(X.shape[0]):
            Y_temp = np.atleast_2d(X[i]) #if len(X.shape) > 1 else X
            for j in range(len(self.weights) - 1):
                A.append(self.weights[j].T.dot(Y_temp) + self.biases[j])
                Y_temp = self.act_fun(A[-1])
            A.append(self.weights[-1].T.dot(Y_temp) + self.biases[-1])
            Y_temp = self.out_act_fun(A[-1])
            Y = np.append(Y, Y_temp)
        return (Y, A) if return_activations else Y
    
    def predict(self, X):
        return self._forward(X)
    
    def _backward(self, x, y):
        y = np.atleast_2d(y)
        x = np.atleast_2d(x)
        y_pred, A = self._forward(x, return_activations = True)
        error = y_pred - y
        D_weights = []
        D_biases = []
        D = []
        for i in range(len(self.weights) - 1, -1, -1):
            if i == len(self.weights) - 1:
                D.insert(0, error * self.out_act_fun_prime(A[-1]))
            else:
                D.insert(0, np.dot(self.weights[i + 1], D[0]) * self.act_fun_prime(A[i]))
            if i == 0:
                D_weights.insert(0, np.outer(x, D[0]))
            else:
                D_weights.insert(0, np.outer(self.act_fun(A[i - 1]), D[0]))
            D_biases.insert(0, D[0])
        return D_weights, D_biases
    
    def fit_SGD(self, X, Y, first_lr = 0.01, lr_decay_rate=0.01, epochs = 100, n_epochs_displayed = 100):
        X = np.array(X)
        Y = np.array(Y)
        losses = []
        weights_over_epochs = [copy.deepcopy(self.weights)]
        biases_over_epochs = [copy.deepcopy(self.biases)]
        for epoch in range(epochs):
            permutaion = np.random.permutation(X.shape[0])
            for i in permutaion:
                D_weights, D_biases = self._backward(X[i], Y[i])
                learning_rate = first_lr / (1 + epoch * lr_decay_rate)
                for j in range(len(self.weights)):
                    self.weights[j] -= learning_rate * D_weights[j]
                    self.biases[j] -= learning_rate * D_biases[j]
            losses.append(self.mse(self.predict(X), Y))
            weights_over_epochs.append(copy.deepcopy(self.weights))
            biases_over_epochs.append(copy.deepcopy(self.biases))
            if epoch == 0 or (epoch + 1) % n_epochs_displayed == 0:
                print(f'Epoch {epoch + 1}: loss_fun={losses[-1]}')
        return losses, weights_over_epochs, biases_over_epochs
    
    def fit_batch(self, X, Y, first_lr = 0.01, lr_decay_rate=0.01, epochs = 100, n_epochs_displayed = 100):
        X = np.array(X)
        Y = np.array(Y)
        losses = []
        weights_over_epochs = [self.weights]
        biases_over_epochs = [self.biases]
        for epoch in range(epochs):
            permutaion = np.random.permutation(X.shape[0])
            D_weights = [np.zeros(w.shape) for w in self.weights]
            D_biases = [np.zeros(b.shape) for b in self.biases]
            for i in permutaion:
                D_weights_temp, D_biases_temp = self._backward(X[i], Y[i])
                for j in range(len(self.weights)):
                    D_weights[j] += D_weights_temp[j]
                    D_biases[j] += D_biases_temp[j]
            learning_rate = first_lr / (1 + epoch * lr_decay_rate)
            for j in range(len(self.weights)):
                self.weights[j] -= learning_rate * D_weights[j] / X.shape[0]
                self.biases[j] -= learning_rate * D_biases[j] / X.shape[0]
            losses.append(self.mse(self.predict(X), Y))
            weights_over_epochs.append(self.weights)
            biases_over_epochs.append(self.biases)
            if epoch == 0 or (epoch + 1) % n_epochs_displayed == 0:
                print(f'Epoch {epoch + 1}: loss_fun={losses[-1]}')
        return losses, weights_over_epochs, biases_over_epochs
    
    def fit_minibatch(self, X, Y, first_lr = 0.01, lr_decay_rate=0.01, epochs = 100, n_epochs_displayed = 100, batch_size = 32):
        X = np.array(X)
        Y = np.array(Y)
        losses = []
        weights_over_epochs = [self.weights]
        biases_over_epochs = [self.biases]
        for epoch in range(epochs):
            permutation = np.random.permutation(X.shape[0])
            for i in range(0, X.shape[0], batch_size):
                D_weights = [np.zeros(w.shape) for w in self.weights]
                D_biases = [np.zeros(b.shape) for b in self.biases]
                for j in range(i, min(i + batch_size, X.shape[0])):
                    D_weights_temp, D_biases_temp = self._backward(X[j], Y[j])
                    for k in range(len(self.weights)):
                        D_weights[k] += D_weights_temp[k]
                        D_biases[k] += D_biases_temp[k]
                learning_rate = first_lr / (1 + epoch * lr_decay_rate)
                for j in range(len(self.weights)):
                    self.weights[j] -= learning_rate * D_weights[j] / batch_size
                    self.biases[j] -= learning_rate * D_biases[j] / batch_size
            losses.append(self.mse(self.predict(X), Y))
            weights_over_epochs.append(self.weights)
            biases_over_epochs.append(self.biases)
            if epoch == 0 or (epoch + 1) % n_epochs_displayed == 0:
                print(f'Epoch {epoch + 1}: loss_fun={losses[-1]}')
        return losses, weights_over_epochs, biases_over_epochs
                
    def set_all_weights(self, weights):
        self.weights = weights

    def set_weights_for_layer(self, layer, weights):
        self.weights[layer] = weights

    def set_weigth(self, layer, from_neuron, to_neuron, value):
        self.weights[layer][from_neuron][to_neuron] = value

    def set_all_biases(self, biases):
        self.biases = biases

    def set_biases_for_layer(self, layer, biases):
        self.biases[layer] = biases

    def set_bias(self, layer, neuron, value):
        self.biases[layer][neuron] = value

    def set_act_fun(self, act_fun):
        act_fun_prime = None
        if act_fun == 'sigmoid':
            act_fun = lambda x: 1 / (1 + np.exp(-x))
            act_fun_prime = lambda x: np.exp(-x) / (1 + np.exp(-x)) ** 2

        if act_fun == 'relu':
            act_fun = lambda x: np.maximum(0, x)
            act_fun_prime = lambda x: np.where(x > 0, 1, 0)

        if act_fun == 'tanh':
            act_fun = lambda x: np.tanh(x)
            act_fun_prime = lambda x: 1 - np.tanh(x) ** 2
            
        self.act_fun = act_fun
        self.act_fun_prime = act_fun_prime

    def set_out_act_fun(self, out_act_fun_is_linear):
        if not out_act_fun_is_linear:
            self.out_act_fun = self.act_fun
            self.out_act_fun_prime = self.act_fun_prime
            return
        self.out_act_fun = lambda x: x
        self.out_act_fun_prime = lambda x: 1
    

    def mse(self, y, y_pred):
        return np.mean((y - y_pred) ** 2) / 2

    

In [3]:
def normalize(data, min = None, max = None):
    data = np.array(data)
    if min is None:
        min = np.min(data)

    if max is None:
        max = np.max(data)

    return (data - min) / (max - min), min, max    

In [4]:
def denormalize(data, min, max):
    data = np.array(data)
    return data * (max - min) + min

# Functions for visualisation

In [5]:
import matplotlib.pyplot as plt
import networkx as nx

In [6]:
def draw_network_for_epoch(weights, biases, epoch):
    G = nx.Graph()
    n_layers = len(weights) + 1
    n_neurons = [len(layer) for layer in weights] + [len(biases[-1])]

    max_abs_weight = max([np.max(np.abs(layer)) for layer in weights])

    for i in range(n_layers):
        for j in range(n_neurons[i]):
            G.add_node(f'{i}-{j}')
    for i in range(n_layers - 1):
        for j in range(n_neurons[i]):
            for k in range(n_neurons[i + 1]):
                G.add_edge(f'{i}-{j}', f'{i + 1}-{k}', weight=weights[i][j][k])
    
    pos = {}
    max_n_neurons = max(n_neurons)
    for i in range(n_layers):
        for j in range(n_neurons[i]):
            pos[f'{i}-{j}'] = (i, max_n_neurons - n_neurons[i] + 2 * j)
    
    cmap = plt.get_cmap('RdYlGn')
    norm = plt.Normalize(vmin=-max_abs_weight, vmax=max_abs_weight)
    edge_colors = [cmap(norm(G[u][v]['weight'])) for u, v in G.edges()]

    plt.figure(figsize=(10, 10))
    plt.title(f'Epoch {epoch}')
    nx.draw(
        G, 
        pos,
        edge_color=edge_colors,
        node_size=3000, 
        font_size=10, 
        width=[abs(G[u][v]['weight']) + 1 for u, v in G.edges()]
    )
    node_labels = {f'{i+1}-{j}': f'{biases[i][j][0]:.2f}' for i in range(n_layers - 1) for j in range(n_neurons[i + 1])}
    nx.draw_networkx_labels(G, pos, labels=node_labels)
    nx.draw_networkx_edge_labels(G, pos, edge_labels={(u, v): f'{G[u][v]["weight"]:.2f}' for u, v in G.edges()})
    plt.show()

In [7]:
def draw_networks_for_epochs(weights_over_epochs, biases_over_epochs, epochs):
    for epoch in epochs:
        draw_network_for_epoch(weights_over_epochs[epoch], biases_over_epochs[epoch], epoch)

In [8]:
def train_and_draw_plots(model, X_train, Y_train, X_test, Y_test, method, first_lr, lr_decay_rate, epochs, n_epochs_displayed = 100, batch_size = 32):
    X_train_norm, x_min, x_max = normalize(X_train)
    Y_train_norm, y_min, y_max = normalize(Y_train)
    X_test_norm, _, _ = normalize(X_test, x_min, x_max)

    if method == 'sgd':
        losses, weights_over_epochs, biases_over_epochs = model.fit_SGD(X_train_norm, Y_train_norm, first_lr, lr_decay_rate, epochs, n_epochs_displayed)
    elif method == 'batch':
        losses, weights_over_epochs, biases_over_epochs = model.fit_batch(X_train_norm, Y_train_norm, first_lr, lr_decay_rate, epochs, n_epochs_displayed)
    elif method == 'minibatch':
        losses, weights_over_epochs, biases_over_epochs = model.fit_minibatch(X_train_norm, Y_train_norm, first_lr, lr_decay_rate, epochs, n_epochs_displayed, batch_size)
    
    fig, ax = plt.subplots(1, 2, figsize=(15, 5))

    ax[0].scatter(X_train, Y_train, color='blue', label='true')
    ax[0].scatter(X_train, denormalize(model.predict(X_train_norm), y_min, y_max), color='red', label='prediction')
    ax[0].set_xlabel('x')
    ax[0].set_ylabel('y')
    ax[0].set_title('Denormalised train data')
    ax[0].legend()

    ax[1].plot(range(epochs), losses)
    ax[1].set_xlabel('epoch')
    ax[1].set_title(f'Denormalised train set MSE={model.mse(Y_train, denormalize(model.predict(X_train_norm), y_min, y_max)):.2f}')
    plt.show()

    fig, ax = plt.subplots(1, 2, figsize=(15, 5))

    ax[0].scatter(X_test, Y_test, color='blue', label='true')
    ax[0].scatter(X_test, denormalize(model.predict(X_test_norm), y_min, y_max), color='red', label='prediction')
    ax[0].set_xlabel('x')
    ax[0].set_ylabel('y')
    ax[0].set_title('Denormalised test data')
    ax[0].legend()

    ax[1].text(0.5, 0.5, f'Denormalised test set MSE={model.mse(Y_test, denormalize(model.predict(X_test_norm), y_min, y_max)):.2f}', fontsize=15, ha='center')
    ax[1].axis('off')

    plt.show()

    return losses, weights_over_epochs, biases_over_epochs
    


# Square-simple

In [9]:
import pandas as pd

In [11]:
X_train = pd.read_csv('data/regression/square-simple-training.csv', index_col=0)['x']
Y_train = pd.read_csv('data/regression/square-simple-training.csv', index_col=0)['y']
X_test = pd.read_csv('data/regression/square-simple-test.csv', index_col=0)['x']
Y_test = pd.read_csv('data/regression/square-simple-test.csv', index_col=0)['y']

## SGD

In [12]:
model = MLP([1, 5, 1], 'sigmoid', out_act_fun_is_linear = True)
losses, weights_over_epochs, biases_over_epochs = train_and_draw_plots(model, X_train, Y_train, X_test, Y_test, method = 'sgd', first_lr = 0.5, lr_decay_rate = 0, epochs = 1000, n_epochs_displayed = 100)









In [13]:
draw_networks_for_epochs(weights_over_epochs, biases_over_epochs, [0, 1, 10, 100, 500, 1000])













## Mini-batch

In [14]:
X_train.shape



In [15]:
model = MLP([1, 5, 1], 'sigmoid', out_act_fun_is_linear = True)
train_and_draw_plots(model, X_train, Y_train, X_test, Y_test, method = 'minibatch', first_lr = 0.5, lr_decay_rate = 0, epochs = 1000, n_epochs_displayed = 100, batch_size = 10)









## Batch

In [16]:
model = MLP([1, 5, 1], 'sigmoid', out_act_fun_is_linear = True)
train_and_draw_plots(model, X_train, Y_train, X_test, Y_test, method = 'batch', first_lr = 0.5, lr_decay_rate = 0, epochs = 1000, n_epochs_displayed = 100)









Podsumowując, z tymi samymi hiperparametrami, szybkość zbieżności to $SGD < MiniBatch(batch\_size=10\%) < Batch$.

# Steps-small

In [10]:
X_train = pd.read_csv('data/regression/steps-small-training.csv')['x']
Y_train = pd.read_csv('data/regression/steps-small-training.csv')['y']
X_test = pd.read_csv('data/regression/steps-small-test.csv')['x']
Y_test = pd.read_csv('data/regression/steps-small-test.csv')['y']

## SGD

In [14]:
model = MLP([1, 5, 5, 1], 'sigmoid', out_act_fun_is_linear = True)
losses, weights_over_epochs, biases_over_epochs = train_and_draw_plots(model, X_train, Y_train, X_test, Y_test, method = 'sgd', first_lr = 0.9, lr_decay_rate = 1e-5, epochs = 200000, n_epochs_displayed = 1000)









In [15]:
draw_networks_for_epochs(weights_over_epochs, biases_over_epochs, [0, 1, 1000, 10000, 100000, 150000])













# Multimodal-large

In [22]:
X_train = pd.read_csv('data/regression/multimodal-large-training.csv')['x']
Y_train = pd.read_csv('data/regression/multimodal-large-training.csv')['y']
X_test = pd.read_csv('data/regression/multimodal-large-test.csv')['x']
Y_test = pd.read_csv('data/regression/multimodal-large-test.csv')['y']

## SGD

In [23]:
model = MLP([1, 10, 10, 1], 'sigmoid', out_act_fun_is_linear = True)
train_and_draw_plots(model, X_train, Y_train, X_test, Y_test, method = 'sgd', first_lr = 0.1, lr_decay_rate = 1e-3, epochs = 1000, n_epochs_displayed = 10)









## Mini-batch

In [24]:
model = MLP([1, 10, 10, 1], 'sigmoid', out_act_fun_is_linear = True)
train_and_draw_plots(model, X_train, Y_train, X_test, Y_test, method = 'minibatch', first_lr = 0.3, lr_decay_rate = 0, epochs = 200, n_epochs_displayed=1, batch_size=32)









## Batch

In [25]:
model = MLP([1, 10, 10, 1], 'sigmoid', out_act_fun_is_linear = True)
train_and_draw_plots(model, X_train, Y_train, X_test, Y_test, method = 'batch', first_lr = 0.1, lr_decay_rate = 1e-2, epochs = 200, n_epochs_displayed=1)







