# Load data

In [22]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from typing import Tuple, List, Callable

In [23]:
def load_file(path: str) -> Tuple[np.ndarray, np.ndarray]:
    """Loads the data from the file stored at :param path: and returns the 
    input values and the class labels.
    :param path: path of a CVS file with data
    :return: a tuple containing the input matrix of shape (n, p) and a line 
    vector with the m class labels in {0, ..., 9}
    """
    df = pd.read_csv(path, header = None)                   # citire date din fisierul dat de path
    X = df.values[:, 1:].transpose()                       # scrieti cod
    y = df.values[:, 0].reshape(-1,1).transpose()          # scrieti cod
    assert X.ndim ==  2, 'Matrix required for input values'
    assert y.ndim == 2, 'Column matrix required for labels'
    assert y.shape == (1, X.shape[1]), 'Same number of lines is required'
    return X, y

In [24]:
path_train = './data/mnist_train.csv'
path_test = './data/mnist_test.csv'

In [25]:
X_train, y_train = load_file(path_train)
assert X_train.shape == (784, 60000)
assert y_train.shape == (1, 60000)

X_test, y_test = load_file(path_test)
assert X_test.shape == (784, 10000)
assert y_test.shape == (1, 10000)

In [26]:
def scale_values(X: np.ndarray) -> np.ndarray:
    """Scales the values to range [0, 1].
    :param X: an (m, n) matrix with values between 0 and 255.
    :return: an (m, n) matrix containing values of :param X: scaled in [0, 1]
    """
    result = np.divide(X, np.max(X))                 # scrieti cod
    assert 0 <= np.min(result) <= np.max(result) <= 1, 'Scaled values should be in [0, 1]'
    assert X.shape == result.shape, 'Scaling preserves shape'
    return result

In [27]:
X_train = scale_values(X_train)
assert X_train.shape == (784, 60000)
X_test = scale_values(X_test)
assert X_test.shape == (784, 10000)

# Create model

## Define model's architecture

In [28]:
m = 10                          # number of classes
n, p = X_train.shape
architecture = [n, 100, m]      # list: [input_size, hidden1, hidden2, ..., output_size]

assert len(architecture) >= 3, 'At least one hidden layer'
assert architecture[0] == n
assert architecture[-1] == m

Ponderile sunt initializate conform strategiei lui Xavier Glorot. Pentru o matrice de ponderi $W^{[l]}$ de forma $n_{l} \times n_{l-1}$, ponderile pot fi initializate cu o distributie uniforma in intervalul 
$$
\left[-\frac{\sqrt{6}}{\sqrt{n_{l} + n_{l-1}}}, +\frac{\sqrt{6}}{\sqrt{n_{l} + n_{l-1}}}\right]
$$

Ponderile de bias se obisnuiesc a se initializa cu 0; intializarea aleatoare a ponderilor W este considerata suficienta pentru a obtine spargerea simetriei.

Ref: [Understanding the difficulty of training deep feedforward neural networks](http://proceedings.mlr.press/v9/glorot10a/glorot10a.pdf)

In [29]:
def create_weights(architecture: List[int], init_type:str='glorot_uniform') -> Tuple[List[np.array], List[np.array]]:
    """Creates the list of weights and biases for the given architecture.
    :param architecture: list of number of nodes in each layer 
    (including input and ouotput layers)
    :param init_type: name of initialization parameter. Defaults to 
    'glorot_uniform', add other supported initializtion strategies.
    :return: a tuple containing: list of weight matrices W, a list of bias 
    column vectors. The two lists have the same numer of elements, number of 
    layers - 1.
    """
    L = len(architecture)
    W, b = [], []
    # initializare de ponderi
    for n_lplus1, nl in zip(architecture[1:], architecture[:-1]):
        W.append(np.random.uniform(-np.sqrt(6)/(np.sqrt(nl + n_lplus1)), +np.sqrt(6)/(np.sqrt(nl + n_lplus1)), (n_lplus1, nl)))     # scrieti cod
    for n_l in architecture[1:]:
        b.append(np.zeros((n_l, 1)))                      # scrieti cod
    assert len(W) == len(b) == L-1
    for i, w in enumerate(W):
        assert w.shape == (architecture[i+1], architecture[i]), f'Shape of W[{i}] should be ({L[i+1], L[i]})'
    for i, _b in enumerate(b):
        assert _b.shape == (architecture[i+1], 1), f'Shape of b[{i}] should be ({L[i+1]}, 1)'
    if init_type == 'glorot_uniform':
        for i, w in enumerate(W):
            w_shape_sum = np.sum(w.shape)
            assert -np.sqrt(6)/np.sqrt(w_shape_sum) <= np.min(w) <= np.sqrt(6)/np.sqrt(w_shape_sum), f"Values of W[{i}] should be according to Glorot's initialization"
        for i, _b in enumerate(b):
            assert 0 == np.min(_b) == np.min(_b) == 0, f"Values of b[{i}] should be initialized to 0"
    return W, b

In [30]:
def sigmoid(z: np.array) -> np.array:
    """Computes sigmoid activation function"""
    return 1 / (1 + np.exp(-z))                                 # scrieti cod

def derivate_sigmoid(z: np.array) -> np.array:
    """Computes the derivatives for the sigmoid activation function"""
    return sigmoid(z) * (1 - sigmoid(z))                        # scrieti cod

def tanh(z: np.array) -> np.array:
    """Computes the tanh activation function"""
    return np.tanh(z)                                           # scrieti cod
    # (np.exp(z) - np.exp(-z)) / (np.exp(z) + np.exp(-z))

def derivate_tanh(z: np.array) -> np.array:
    """Computes the derivatives for the tanh activation function"""
    return 1 - np.square(tanh(z))                               # scrieti cod

def ReLU(z: np.array) -> np.array:
    """Computes the rectified linear unit activation function"""
    return np.max(0, z)                                         # scrieti cod

def derivative_ReLU(z: np.array) -> np.array:
    """Computes the derivatives of the rectified linear unit activation function"""
    return 0 if z < 0 else 1                                    # scrieti cod

In [31]:
def softmax(z, axis=0):
    """Applies softmax to a matrix z.
    :param z: np.array of shape (m, k)
    """
    # scrieti cod, posibil mai multe linii
    max_z = np.max(z, axis=axis, keepdims=True)
    exp_z = np.exp(z - max_z)                                                   # calcul de exponentiala; folositi trucul dat in curs, utilizand max_z
    sum_exp_z = (np.sum(exp_z, axis = axis))                                    # scrieti cod
    result = exp_z / sum_exp_z                                                  # scrieti cod; se face normalizarea valorilor; considerati ultimul asert
    assert np.allclose(np.sum(result, axis=axis), 1.0)
    return result

In [32]:
W, b = create_weights(architecture=architecture)

## Feedforward propagation

In [33]:
def can_multiply(a:np.array, b:np.array) -> bool:
    return a.ndim == b.ndim == 2 and a.shape[1] == b.shape[0]

def can_multiply_hadamard(a:np.array, b:np.array) -> bool:
    return a.shape == b.shape

In [34]:
def model(X:np.array, W:List[np.array], b:List[np.array], f:List[Callable]) -> np.array:
    """Computes the output produced by the MLP for the given input X
    :param X: np.array of shape (n, p). Each column of X is a datum from a set.
    :param W: a list of weight matrices
    :param b: a list of bias columns
    :param f: a list of activation functions
    :return: a matrix of output values produced by MLP, of shape: number of 
    predicted outputs (e.g. classes), number of input vectors p
    """
    assert len(W) == len(b) == len(f)
    p = X.shape[1]
    a = X
    for i, (_w, _b, _f) in enumerate(zip(W, b, f)):
        # variabila i poate fi folosita pentru debug
        assert can_multiply(_w, a)
        z = np.dot(_w, a) + _b                                 # scrieti cod
        assert z.shape == (_w.shape[0], p)
        a = _f(z)                                       # scrieti cod
        assert a.shape == z.shape
    assert a.shape == (W[-1].shape[0], p)
    return a

In [35]:
# f[0] = functia de activare pe primul strat ascuns; 
# f[1] = functia de activare pe al doilea strat ascuns etc.
f = [sigmoid, softmax]
# f_prim = [derivate_sigmoid, softmax]

y_hat = model(X_train, W, b, f)

assert y_hat.shape == (m, p)
assert np.allclose(y_hat.sum(axis=0), np.ones(p))

## Error function

In [43]:
def J(X, y, W, b, f, num_classes=10, _lambda=0.01):
    """Computes the error function for MLP
    :param X: np.array of shape (n, k)
    :param y: np.array of shape (1, k)
    :param W: list pf MLPs weights
    :param b: list pf MLPs biases
    :return: loss values, composed of cross entropy + penalty term
    """
    p = X.shape[1]
    EPS = 1e-5
    # computes a one hot encoding for the given classes:
    # if y[i]=c, 0 <= c <= 9 (here), then column i in one_hot_encoding is filled
    # in with 0, excepting line c where one finds value 1
    one_hot_encoding = np.zeros((num_classes, p), dtype=int)
    rows = np.arange(y.shape[1])
    one_hot_encoding[y[0, rows], rows] = 1                              # scrieti cod
    # one_hot_encoding = one_hot_encoding.transpose()
    assert np.all(one_hot_encoding.sum(axis=0) == 1)
    predicted = model(X, W, b, f)
    predicted = np.clip(predicted, EPS, 1-EPS)
    loss1 = -1 * np.sum(np.sum(one_hot_encoding * np.log(predicted)))    # scrieti cod
    loss2 = (_lambda / 2) * np.sum(np.sum(np.square(W[1:])))
    return loss1 + loss2

In [44]:
def accuracy(X:np.array, y:np.array, W: List[np.array], b: List[np.array], f:List[Callable]) -> float:
    """Computes the accuracy on a given input dataset X, with ground truth y
    :param X: np.array of shape (n, k)
    :param y: np.array of shape (1, k); each value is the index of a class
    :param W: list of MLP's weights
    :param b: list of MLP's biases
    :param f: list of activation functions. the last one must be softmax
    :return: ratio between correctly classified vectors and total number of cases
    """
    y_hat = model(X, W, b, f)                                           # scrieti cod
    y_predicted = np.argmax(y_hat, axis=0).reshape(y.shape[1], 1)       # scrieti cod
    return (y_predicted == y).sum() / X.shape[1]

# Train model

In [75]:
def train(X_train: np.array, y_train: np.array, X_test: np.array, y_test: np.array, num_classes, W: List[np.array], b:List[np.array], f:List[Callable], _lambda: float, alpha: float, max_delta_error:float=1e-4) -> Tuple[List[np.array], List[np.array], List[float], List[float], List[float]]:
    """Runs the training on the training dataset (X, y). Stops when  
    difference between  two succesive error values is lower than :param max_delta_error:
    :param X_train: np.array of shape (n, k), with training cases. Each column is a training case
    :param y_train: np.array of shape (1, k), containing labels (0=class 0, ...)
    :param X_test: np.array of shape (n, l), with test cases. Each column is a test vector
    :param y_test: np.array of shape (1, l), containing labels (0=class 0, ...)
    :param num_classes: number of classes
    :param W: list of MLP's weights
    :param b: list of MLP's biases
    :param f: list of activations functions; the last one must be softmax
    :param _lambda: coefficient >= for the L2 penalty term
    :param alpha: > 0, learning rate
    :max_delta_error: >0, a threshold for max absolute difference of succesive loss values
    :return: a tuple consisting of: list of weight matrices, list of biases, list of errors computed at each epoch on training set, 2 lists of accuracies on training and on test set at each epoch
    """
    errors = [J(X_train, y_train, W, b, f, num_classes, _lambda)]
    acc_train = [accuracy(X_train, y_train, W, b, f)]
    acc_test = [accuracy(X_test, y_test, W, b, f)]
    epoch = 0
    while True:
        epoch += 1
        print(epoch)
        # actualizare ponderi si biases W, b pentru fiecare pereche de date din setul de instruire *_test
        predicted = model(X_train, W, b, f)
        derivative2 = (1.0/X_train.shape[1]) * (predicted - y_train)
        print(derivative2.T.shape)
        print(predicted.shape)
        new_theta2 = predicted.T.dot(derivative2) + (_lambda / X_train.shape[1]) * W[0]
        new_bias2 = np.sum(derivative2, axis = 0, keepdims = True)
    
        derivative1 = derivative2.dot(W[1]) * derivate_sigmoid(predicted[0])
        new_theta1 = derivative1.T.dot(X_train) + (_lambda / X_train.shape[1]) * W[0]
        new_bias1 = np.sum(derivative1, axis = 0, keepdims = True)

        W[0] -= alpha * new_theta1
        b[0] -= alpha * new_bias1

        W[1] -= alpha * new_theta2
        b[1] -= alpha * new_bias2

        error = J(X_train, y_train, W, b, f, num_classes, _lambda) # scrieti cod
        errors.append(error)
        train_acc = accuracy(X_train, y_train, W, b, f) # scrieti cod
        acc_train.append(train_acc)
        test_acc = accuracy(X_test, y_test, W, b, f) # scrieti cod
        acc_test.append(test_acc)
        if epoch % 10 == 0:
            print(f'Epoch: {epoch}, error: {error}, train accuracy: {train_acc}, test accuracy: {test_acc}')
        if np.abs(errors[-1] - errors[-2]) < max_delta_error:
            acc_test.append(test_acc)
        if epoch % 10 == 0:
            print(f'Epoch: {epoch}, error: {error}, train accuracy: {train_acc}, test accuracy: {test_acc}')
        print(np.abs(errors[-1] - errors[-2]))
        if np.abs(errors[-1] - errors[-2]) < max_delta_error:
            break
        # plot de valore de eroare pe train si pe test
    return W, b, errors, acc_train, acc_test     # scrieti cod

In [76]:
W, b = create_weights(architecture)

W, b, errors, acc_train, acc_test = train(X_train, y_train, X_test, y_test, 10, W, b, f, 0.01, 0.5)   # scrieti cod

1
(60000, 10)
(10, 60000)


ValueError: operands could not be broadcast together with shapes (60000,60000) (100,784) 

In [None]:
plt.figure(figsize=(10, 8))
plt.plot(errors, label='Loss on train DS')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()

In [None]:
# acc_train, acc_test
plt.plot(acc_train, label='Acc train')
plt.plot(acc_test, label='Acc test')

# Test model

In [None]:
print(f'Accuracy on test set: {accuracy(X_test, y_test, W, b, f)}')