# Deep learning - project 1
## Creating and training neural network from scratch

In [10]:
import numpy as np
from typing import List, Callable, Iterable
from abc import ABC, abstractmethod 
from IPython.core.debugger import set_trace

In [11]:
"""
Abstract classes for loss function and activation function.
"""

class Loss(ABC):
    """
    Base class for a loss function of a network
    # implements: operator(), derivative()
    """
    @abstractmethod
    def __call__(self, y_predicted: np.ndarray, y_true: np.ndarray):
        pass
    
    @abstractmethod
    def derivative(self, y_predicted: np.ndarray, y_true: np.ndarray):
        pass

class Activation(ABC):
    """Base class for an activation function"""
    @abstractmethod
    def __call__(self, x: np.ndarray) -> np.ndarray:
        """Activation (pointwise) function"""
        pass
    
    @abstractmethod
    def derivative(self, x: np.ndarray) -> np.ndarray:
        """
        Derivative method of the activation function.
        """
        pass

class Layer:
    """
    One layer of a neural network. Used only when creating neural networks.
    # attributes: number of neurons
    # implements: operator() which depends on activation function, derivative()
    """
    def __init__(self, n_neurons: int, activation: Activation, *, verbose=False):
        self.n_neurons = n_neurons
        self.output_size = n_neurons
        self.activation = activation
        self.verbose = verbose
        self.state = None
        self.input_state = None
    
    def initalize(self, input_size, *, sd=0.1):
        self.input_size = input_size
        self.W = np.random.normal(0, sd, size=[input_size, self.output_size])  # weights
        self.b = np.random.normal(0, sd, size=[1, self.output_size])  # biases
        self.derivative = np.zeros([self.input_size, self.output_size])
        
    def verbose_print(self, s: str):
        if self.verbose:
            print(s)
    
    def __call__(self, input_vector: np.array):
        self.verbose_print("calculating next vector from sizes {}x{} with matrix {}x{}".format(*input_vector.shape, self.input_size, self.output_size))
        # Remembering previous and current state
        self.input_state = input_vector.copy()
        self.state = self.activation(input_vector.dot(self.W) + self.b)
        # each pass through a layer changes the value of self.input_state and self.state
        return self.state
    
    def __str__(self):
        return "Layer(input_size={}, output_size={})".format(self.input_size, self.output_size)
    
    def __repr__(self):
        return str(self)
    
    def _set_derivative_weights(self, state: np.ndarray, cumulated_jacobian: np.ndarray):
        # Let H be the dimansionality of the input state, W be H x K matrix, n number of samples
        n = state.shape[0]
        next_state = state.dot(self.W) # n x K
        outer_der = self.activation.derivative(next_state) # n x K x K
        inner_der = np.zeros([n, self.output_size, self.input_size, self.output_size])  # n x K x (H x K)
        for i, (row, tensor) in enumerate(zip(state, inner_der)):  # tensor: K x H x K
            for j, matrix in enumerate(tensor):  # matrix: H x K
                matrix[:, j] = row.flatten()  # flatten because row is a 1 x H array
        derivative = (inner_der.reshape([n, self.output_size, -1]) @ outer_der).reshape([n, self.output_size, self.input_size, self.output_size]) # TODO: ??
        set_trace()
        self.derivative = cumulated_jacobian @ derivative 
        
        
    def set_derivative_weights(self, state: np.ndarray, cumulated_jacobian: np.ndarray):
        n = state.shape[0]
        
    
    def derivative_state(self):
        """Partial derivative of a layer with respect to prevoius hidden state. Needed for calculating cumulative jacobian."""
        state = self.input_state
        next_state = state.dot(self.W)  # TODO: optimize: no need to caluclate it in backward pass
        return self.W @ self.activation.derivative(next_state)

In [34]:
"""
Actual implementation of specific loss functions and activation functions.
"""

class QuadraticLoss(Loss):
    """Loss for simple regression: mean squared error."""
    def __call__(self, y_predicted: np.ndarray, y_true: np.ndarray):
        if len(y_predicted) != len(y_true):
            raise IndexError("length of y_predicted ({}) has to be the same as lenght of y_true ({})".format(len(y_predicted), len(y_true)))
        return np.linalg.norm(y_predicted - y_true) / len(y_true)
    
    def derivative(self, y_predicted: np.ndarray, y_true: np.ndarray):
        n = len(y_predicted)
        assert len(y_true) == n
        return (2 / n * (y_predicted - y_true)).reshape([-1, 1, 1])  # be careful with this reshaping
        
    
class BernLoss(Loss):
    """
    Loss for binary classification (negative binomial likelihood),
    also known as cross-entropy between the empirical and model distribution (binomial).
    """
    def __call__(self, y_predicted: np.ndarray, y_true: np.ndarray):
        return -np.mean(
            np.array(
                [np.log(p) if y == 1 else np.log(1-p) 
                 for p, y in zip(y_predicted, y_true)]
            )
        )

# TODO: should this be a class? There is no state in it
class Sigmoid(Activation):
    def __call__(self, x: np.array) -> np.ndarray:
        return 1 / (1 +  np.exp(-x))
    
    def derivative(self, x: np.array):
        der = self(x) * (1 - self(x))
        return np.array([np.diag(row) for row in der])  # TODO: this is sparse matrix: possible memory and time optimization
    
class Identity(Activation):
    def __call__(self, x: np.array) -> np.ndarray:
        return x
    
    def derivative(self, x: np.array):  # x is n x D
        dim = x.shape[1]
        print(np.array([np.identity(dim) for row in x]).shape)
        return np.array([np.identity(dim) for row in x])


In [35]:
class NNet:
    """Feedforwad (classical) neural network"""
    def __init__(self, input_size: int, layers: List[Layer], loss: Loss):
        layers[0].initalize(input_size)
        for i, layer in enumerate(layers):
            if i == 0:
                continue
            layer.initalize(input_size=layers[i-1].output_size)
        self.layers = layers
        self.n_layers = len(self.layers)
        self.loss = loss
        
    def __call__(self, x: np.ndarray):
        """Forwad pass"""
        y = x
        for layer in self.layers:
            y = layer(y)
        self.output = y.copy()
        return y
    
    def all_layers_passed(self):
        return all([layer.state is not None for layer in self.layers])
    
    def backward(self, y_true: np.ndarray):
        """One centralized function responsible to computing derivatives of a whole network."""
        jacobian = self.loss.derivative(self.output, y_true)
        print(jacobian.shape)
        
        for i in range(self.n_layers - 1, -1, -1):
            print("layer {}".format(i))
            layer = self.layers[i]
            layer.set_derivative_weights(layer.input_state, jacobian)
            print("dupa")
            if i != 0:
                jacobian @= layer.derivative_state()      

In [36]:
nnet = NNet(
    10,
    [Layer(3, Sigmoid()), Layer(1, Identity())],
    QuadraticLoss()
)

In [37]:
x = np.random.normal(0, 100, [100, 10])
y_true = np.zeros([100, 1])  # important: output has to have 2 dimension (for each sample we output a vector)
y_pred = nnet(x)

In [38]:
nnet(x)[:5]

array([[ 0.05899351],
       [ 0.14360027],
       [-0.03498361],
       [ 0.20830139],
       [-0.08557915]])

In [39]:
nnet.backward(y_true)

(100, 1, 1)
layer 1
(100, 1, 1)


ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 1 is different from 3)

In [66]:
a = np.ones([10, 2])

In [68]:
a.transpose()

array([[1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]])