## Importing libs

In [1]:
import os
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
%matplotlib inline
plt.rcParams['figure.figsize'] = [2, 2]
sns.set() # apply the seaborn defaults to plotted figures (e.g. theme, scaling, color palette), instead of matplotlib's

## Functions and derivatives [DEPRECATED]

In [3]:
def linear(z):
    return z

def relu(z):
    return np.maximum(0, z)

def sigmoid(z, limit=500):
    if limit != None:
        z = np.clip(z, -limit, limit) # avoid overflow
    return 1 / (1 + np.exp(-z))

def softmax(y_pred, axis=-1):
    exp = np.exp(y_pred)
    return exp / np.sum(exp, axis=axis, keepdims=True)

''' y.shape == y_pred.shape == (m, C), where:
    - m is the number of examples
    - C is the number of classes 
    Thus, each row of y and y_pred is a one-hot encoded vector of shape (1, C)
'''    

def cross_entropy(y, y_pred, axis=-1, eps=1e-12):
    if eps != None:
        y_pred = np.clip(y_pred, eps, 1 - eps) # avoid overflow
    m = y_pred.shape[0]
    return -np.sum(y * log(y_pred), axis=axis) / m

def xent(y, y_pred):
    return -np.sum(y * log(y_pred))

In [4]:
def grad_linear(z):
    return np.ones(shape=z.shape)

def grad_relu(z):
    return np.where(z > 0, 1, 0)

def grad_sigmoid(z):
    sigmoid_z = sigmoid(z)
    return sigmoid_z * (1 - sigmoid_z)

def grad_softmax(y_pred):
    # y_pred[i]*(1-y_pred[j]) if i != j --> y_pred[i] - y_pred[i] * y_pred[j]
    # -y_pred[i]*y_pred[j]    if i == j -->     0     - y_pred[i] * y_pred[j]
    y_pred = y_pred.reshape(-1, 1)
    return np.diagflat(y_pred) - np.dot(y_pred, y_pred.T)

def grad_cross_entropy(y, y_pred, axis=-1):
    return y_pred - y # FIXME

## Neural Network

Being $m$ the number of samples in a batch, from a layer $k-1$ to a layer $k$ we have:
- Weights $\mathbb{W}^{(k)} \in \mathbb{R}^{n_{k-1} \times n_k}$
- Biases $\mathbf{b}^{(k)} \in \mathbb{R}^{n_k}$
- Activations $\mathbb{A}^{(k)} = g_k(\mathbb{Z}^{(k)}) \in \mathbb{R}^{m \times n_k}$, where $g_k(\mathbb{Z}^{(k)})$ is the activation function of the $k^{\text{th}}$ layer and $\mathbb{Z}^{(k)} = \mathbb{A}^{(k-1)} \mathbb{W}^{(k)} + \mathbf{b}^{(k)}$

(Xavier initialization: [[1]](https://prateekvjoshi.com/2016/03/29/understanding-xavier-initialization-in-deep-neural-networks/))

For the first layer, the activation is the input itself: $\mathbb{A}^{(1)} = \mathbb{X} \in \mathbb{R}^{m \times n_1}$, where $n_1$ is the input size (3072)  
For the middle layers ($2 \leq k < L$), the activation function is the sigmoid: $\mathbb{A}^{(k)} = g_k(\mathbb{Z}^{(k)}) = sigmoid(\mathbb{Z}^{(k)})$  
For the last layer, we have the predicted value with softmax activation: $\mathbb{A}^{(L)} = g_k(\mathbb{Z}^{(L)}) = softmax(\mathbb{Z}^{(L)}) \in \mathbb{R}^{m \times n_L}$, where $n_L$ is the output size (10)  
(i.e. the hypothesis function $a^{(L)} = h_{W, b}(x) = y_{\text{pred}} \approx y$)

obs.: the number of layers $L$ comes from: $1$ input layer + $1$ output layer + $L-2$ hidden layers

In [3]:
import warnings

# RANDOM_SEED = 886

In [4]:
class ActivationFunction:
    ''' An ActivationFunction is applied to Z to get the output A, 
        but its derivative expects the value A, not Z (!):

        A == __call__(Z) and derivative(A) == derivative(__call__(Z)), 
        calling derivative(Z) will often yield WRONG results
    '''
    def __call__(self, Z):
        ''' Z.shape=(n_examples, layer_output_size) '''
        raise NotImplementedError    
    def derivative(self, A):
        ''' A.shape=(n_examples, layer_output_size) '''
        raise NotImplementedError

class Linear(ActivationFunction):
    def __call__(self, Z):
        return Z
    def derivative(self, A):
        return np.ones_like(A)

class Sigmoid(ActivationFunction):
    def __call__(self, Z):
        return 1 / (1 + np.exp(-Z))
    def derivative(self, A):
        return A * (1 - A) # Sigmoid(Z) * (1 - Sigmoid(Z))

class ReLU(ActivationFunction):
    def __call__(self, Z):
        return np.maximum(0, Z)
    def derivative(self, A):
        return np.where(A > 0, 1, 0)

class SoftMax(ActivationFunction):
    def __call__(self, Z):
        # e^{x+c} / sum(e^{x+c}) == (e^x * e^c) / (e^c * sum(e^x)) == e^x / sum(e^x)
        exp = np.exp(Z - Z.max(axis=1, keepdims=True))
        return exp / np.sum(exp, axis=1, keepdims=True)
    def derivative(self, A):
        # FIXME
        # A == SoftMax(Z), A.shape == (n_examples, output_shape)
        #return A * (1 - A) # SoftMax(Z) * (1 - SoftMax(Z))
        raise NotImplementedError
        # ref.: https://medium.com/@aerinykim/how-to-implement-the-softmax-derivative-independently-from-any-loss-function-ae6d44363a9d


In [None]:
class CostFunction:
    ''' A CostFunction is applied to Y (the target values) and Ypred to get a scalar output
        Its derivative w.r.t. Ypred also expects Y and Ypred, but returns tensor (n_examples, last_layer_output_size)
        
        obs.: Ypred is the last layer's activation values: last_layer.A == last_layer.g(last.layer.Z)
    '''
    def __call__(self, Y, Ypred):
        ''' Y.shape == Ypred.shape == (n_examples, last_layer_output_size) '''
        raise NotImplementedError # [J(Y, Ypred)]
    def derivative(self, Y, Ypred):
        ''' Y.shape == Ypred.shape == (n_examples, last_layer_output_size) '''
        raise NotImplementedError # [dJ/dYpred]

class CrossEntropy(CostFunction):
    def __call__(self, Y, Ypred):
        return np.mean( -(Y * np.log(Ypred)).sum(axis=1) )
    def derivative(self, Y, Ypred):
        # FIXME
        raise NotImplementedError
        #return Y / Ypred
        #return (Ypred - Y) / Ypred.shape[0]
    

In [5]:
class Layer:
    ''' A.shape == (n_examples, output_size)
        Z.shape == (n_examples, output_size)
        W.shape == (input_size, output_size)
        b.shape == (output_size, )
        obs.:
            input_size == prev_layer.output_size
            output_size == next_layer.input_size
    '''
    def __init__(self, input_size, output_size, activation_function, 
                 weight_initialization='xavier'):
        assert(isinstance(activation_function, ActivationFunction)), "Invalid type for activation_function"
        
        self.input_size  = input_size
        self.output_size = output_size
        
        # activation function
        self.g = activation_function # g_prime == activation_function.derivative
        
        # activation values
        self.A = None # self.A == self.g(self.Z)
        self.Z = None # prev_layer.A @ self.W + self.b
        
        if weight_initialization == 'xavier':
            stddev = np.sqrt(1 / input_size)
            self.W = stddev * np.random.randn(input_size, output_size)
            self.b = np.random.randn(output_size, )
        elif weight_initialization == 'xavier avg':
            stddev = np.sqrt(2 / (input_size + output_size))
            self.W = stddev * np.random.randn(input_size, output_size)
            self.b = np.random.randn(output_size, )
        elif weight_initialization == '-1 to 1':
            self.W = 2 * np.random.randn(input_size, output_size) - 1
            self.b = 2 * np.random.randn(output_size, ) - 1
        else:
            raise ValueError(f"Invalid weight_initialization value: '{weight_initialization}'")
    
    @property
    def params_count(self):
        return self.W.size + self.b.size
    
    # receives the activation values of the previous layer (i.e. this layer's input)
    # returns the activation values of the current layer (i.e. next layer's input)
    def feedforward(self, X):
        ''' X.shape == (n_examples, self.input_size) '''
        assert(X.shape[1] == self.input_size)
        self.X = X
        # (n_examples, output_size) = (n_examples, input_size) @ (input_size, output_size) + (output_size, )
        self.Z = self.X @ self.W + self.b
        self.A = self.g(self.Z)
        return self.A
    
    # receives the derivative of the cost function w.r.t. the activation values of the current layer (i.e. next layer's input)
    # returns the derivative of the cost function w.r.t. the activation values of the previous layer (i.e. this layer's input)
    def backprop(self, dA, learning_rate):
        ''' dA.shape == (n_examples, self.output_size) '''
        assert(dA.shape[1] == self.output_size)        
        # (n_examples, output_size) = (n_examples, output_size) * (n_examples, output_size)
        # (input_size, output_size) = (input_size, n_examples)  @ (n_examples, output_size)
        # (output_size, )           = (n_examples, output_size).sum(axis=0)
        delta = dA * self.g.derivative(self.A) # [dJ/dZ = dJ/dA . dA/dZ]
        self.dW = (self.X).T @ delta           # [dJ/dW = dJ/dZ . dZ/dX]
        self.db = delta.sum(axis=0)            # [dJ/db = dJ/dZ . dZ/db]
        
        # TODO update self.W and self.b (?)
        self.__update_params()
        
        # (n_examples, input_size)  = (n_examples, output_size) @ (output_size, input_size), input_size==prev_layer.output_size
        return delta @ (self.W).T              # [dJ/dX = dJ/dZ . dZ/dX]
    
    def __update_params(self):
        # gradient descent
        self.W += -learning_rate * self.dW
        self.b += -learning_rate * self.db

In [6]:
class NN:
    def __init__(self, layers, cost_function, cost_function_derivative):
        
        self.J = cost_function # cost_function(Y, Ypred)
        self.dJ_dYpred = cost_function_derivative # derivative of J w.r.t. the last layer's activation values [dJ/dYpred]
        # obs.: Ypred == self.layers[-1].A, thus self.dJ_dYpred is the input (dA) for the last layer's backprop
        
        self.layers = []
        self.layers.append(layers[0]) # input layer
        for l in range(1, len(layers)):
            if layers[l-1].output_shape == layers[l].input_shape:
                self.layers.append(layers[l])
            else:
                raise ValueError(
                    f"Invalid input shape at the {l}-th layer"
                    f"\n{l}-th layer's shape: {layer[l].shape}"
                    f"\n{l-1}-th layer's shape: {layer[l-1].shape}"
                )
        
        self.history = { "loss": [], "val_loss": [] }
    
    # note that we use zero-based indexing here, so
    # the 1st layer is self.layers[0] and the last is self.layers[len(self.layers) - 1]
    
    def predict(self, X):
        ''' X.shape == (n_examples, self.layers[0].input_size) '''
        assert(X.shape[1] == self.layers[0].input_size)
        activation = X # network's input
        for l in range(1, len(self.layers)):
            Z = activation @ self.layers[l].W + self.layers[l].b
            activation = self.layers[l].g(Z)
        return activation # network's output (Ypred)
    
    def feedforward(self, X):
        ''' X.shape     == (n_examples, self.layers[0].input_size)
            Ypred.shape == (n_examples, self.layers[-1].output_size)
        '''
        assert(X.shape[1] == self.layers[0].input_size)
        self.layers[0].A = X # input
        for l in range(1, len(self.layers)):
            self.layers[l].feedforward(self.layers[l-1].A)
        Ypred = self.layers[-1].A # output
        return Ypred
    
    def backprop(self, X, Y, learning_rate):
        ''' X.shape == (n_examples, self.layers[0].input_size)
            Y.shape == (n_examples, self.layers[-1].output_size)
        '''
        assert(X.shape[0] == Y.shape[0])
        assert(X.shape[1] == self.layers[0].input_size)
        assert(Y.shape[1] == self.layers[-1].output_size)
        
        Ypred = self.feedforward(X) # == self.layers[-1].A
        cost = self.J(Y, Ypred)
        self.history["loss"].append(cost)
        
        # FIXME
        cost_wrt_Ypred = self.dJ_dYpred(Y, Ypred)
        dA = self.layers[-1].backprop(cost_wrt_Ypred, learning_rate)
        for l in reversed(range(len(self.layers) - 1)):
            dA = self.layers[l].backprop(dA, learning_rate)
    
    def __get_batches(self, X, Y, batch_size):
        m = X.shape[0] # == Y.shape[0]
        n_batches = m // batch_size
        try:
            return np.split(X, n_batches), np.split(Y, n_batches)
            #return zip(np.split(X, n_batches), np.split(Y, n_batches))
        except:
            warnings.warn(f"\nbatch_size={batch_size} does not result in an equal division for shapes: " +
                          f"{X.shape} of X, and {y.shape} of Y. The last batch will have size {m % batch_size}")
            return np.array_split(X, n_batches), np.array_split(Y, n_batches)
            #return zip(np.array_split(X, n_batches), np.array_split(Y, n_batches))
    
    # trainning data
    def fit(self, X_train, Y_train, learning_rate, n_epochs, batch_size):
        ''' X_train.shape == (n_total_examples, self.layers[0].input_size)
            Y_train.shape == (n_total_examples, self.layers[-1].output_size)
            
            For each iteration we'll have:
              n_examples = batch_size
              X.shape == (n_examples, self.layers[0].input_size)
              Y.shape == (n_examples, self.layers[-1].output_size)
            Thus, each epoch has n_total_examples // batch_size iterations (batches)
            (obs.: X and Y are rows of X_train and Y_train)
            (obs.: if n_total_examples is not divisible by batch_size the last examples are ignored)
        '''
        assert(X_train.shape[0] == Y_train.shape[0])
        assert(X_train.shape[1] == self.layers[0].input_size)
        assert(Y_train.shape[1] == self.layers[-1].output_size)
        
        # TODO use the last examples even if the last batch is smaller than batch_size
        batches_per_epoch = n_total_examples // batch_size
        
        for epoch in n_epochs:
            #TODO
            pass
    
    # test data
    def evaluate(self, X_test, Y_test, learning_rate, n_epochs, batch_size):
        ''' X_val.shape == (n_examples, self.layers[0].input_size)
            Y_val.shape == (n_examples, self.layers[-1].output_size)
        '''
        assert(X_val.shape[0] == Y_val.shape[0])
        assert(X_val.shape[1] == self.layers[0].input_size)
        assert(Y_val.shape[1] == self.layers[-1].output_size)
        
        #TODO
        pass        