Neural Network: Multi Layer Perceptron
===

**Autor:** Mateus de Jesus Mendes

### Importações

In [None]:
import random
import matplotlib.pyplot as plt

### Definições Globais

In [None]:
RANDOM_SEED = 88
random.seed(RANDOM_SEED)

### Funções Auxiliares

Funções secundárias utilizadas nas etapas principais da metodologia. 

##### **Produto Interno**

In [None]:
def dot(a, b):
    """
    Cálculo do produto interno dos casos:
    - Vetor-vetor
    - Matriz-vetor
    - Matriz-matriz
    
    ### Parâmetros
    - `a`/`b`: Vetor ou matriz.

    ### Retorna
    Float (escalar) para produto vetor-vetor / Lista (vetor) para produto matriz-vetor / Matriz (lista de listas) para produto matriz-matriz.
    """

    # Funções auxiliares de identificação
    def is_vector(x):
        return isinstance(x[0], (int, float))

    def is_matrix(x):
        return isinstance(x[0], list)

    # Produto vetor-vetor -> escalar
    if is_vector(a) and is_vector(b):
        if len(a) != len(b):
            raise ValueError("Vetores devem ter o mesmo comprimento.")
        
        s = 0
        for i in range(len(a)):
            s += a[i] * b[i]
        return s

    # Produto matriz-vetor -> vetor
    if is_matrix(a) and is_vector(b):
        n_rows = len(a)
        n_cols = len(a[0])

        if n_cols != len(b):
            raise ValueError("Dimensões incompatíveis para produto matriz-vetor.")

        result = []
        for i in range(n_rows):
            sum = 0
            for j in range(n_cols):
                sum += a[i][j] * b[j]
            result.append(sum)
        return result

    # Produto matriz-matriz -> matriz
    if is_matrix(a) and is_matrix(b):
        n_rows_a = len(a)
        n_cols_a = len(a[0])
        n_rows_b = len(b)
        n_cols_b = len(b[0])

        if n_cols_a != n_rows_b:
            raise ValueError("Dimensões incompatíveis para produto matriz-matriz.")

        result = []
        for i in range(n_rows_a):
            row = []
            for j in range(n_cols_b):
                sum = 0
                for k in range(n_cols_a):
                    sum += a[i][k] * b[k][j]
                row.append(sum)
            result.append(row)
        return result

    # Caso inválido
    raise TypeError("Entradas devem ser vetores ou matrizes válidas.")

##### ***Holdout***

In [None]:
def holdout(X, y, train_size=0.8):
    """
    Realiza a partição *holdout* de um conjunto de dados supervisionado
    em subconjuntos de treinamento e teste.

    A função separa aleatoriamente o conjunto de dados original
    D = {(x_i, y_i)}_{i=1}^{N} em dois subconjuntos disjuntos:
    um conjunto de treinamento D_train e um conjunto de teste D_test,
    respeitando a proporção especificada por `train_size`.

    ### Parâmetros
    - `X` (list[list[float]]): Conjunto de N vetores de entrada, X = {x₁, x₂, …, x_N}, com x_i ∈ ℝᵈ.

    - `y` (list[float]): Vetor dos valores alvo associados às instâncias de entrada, y = (y₁, y₂, …, y_N).

    - `train_size` (float, opcional): Fração do conjunto de dados destinada ao treinamento, com 0 < `train_size` < 1.

    ### Retorno
    tuple[list, list, list, list]

        Tupla `(X_train, X_test, y_train, y_test)`, onde:
     - `X_train`: subconjunto dos vetores de entrada utilizados no treinamento;
     - `X_test`: subconjunto dos vetores de entrada utilizados na avaliação;
     - `y_train`: valores alvo correspondentes ao conjunto de treinamento;
     - `y_test`: valores alvo correspondentes ao conjunto de teste.

    ### Observações
    - A partição é realizada por embaralhamento aleatório dos índices,
      caracterizando um *holdout* simples.
    - Não há estratificação em relação à variável alvo.
    - Não há controle explícito de *seed*, o que implica não reprodutibilidade
      entre execuções distintas.
    - A função preserva a correspondência entre instâncias de entrada e seus
      respectivos valores alvo.
    """

    if not 0 < train_size < 1:
        raise ValueError('train_size deve estar no intervalo (0, 1).')
    
    if len(X) != len(y):
        raise ValueError('X e y devem ter o mesmo comprimento.')

    n = len(X)
    indices = list(range(n))
    random.shuffle(indices)

    split = int(n * train_size)

    train_idx = indices[:split]
    test_idx = indices[split:]

    X_train = [X[i] for i in train_idx]
    y_train = [y[i] for i in train_idx]

    X_test = [X[i] for i in test_idx]
    y_test = [y[i] for i in test_idx]

    return X_train, X_test, y_train, y_test

### Geração / Leitura de Dados

O conjunto de dados é gerado a partir do modelo linear com ruído aditivo:
$$
y_i = \mathbf{w}_{\text{true}}^{\top}\mathbf{x}_i + b_{\text{true}} + \varepsilon_i,
\quad i = 1, \dots, N
$$

Em que:
- $\mathbf{x}_i \in \mathbb{R}^d$: Vvetor de *features* da $i$-ésima amostra.
- $\mathbf{w}_{\text{true}} \in \mathbb{R}^d$: Vetor de pesos verdadeiro.
- $b_{\text{true}} \in \mathbb{R}$: Viés verdadeiro.
- $\varepsilon_i \sim \mathcal{N}(0, 1)$: Ruído Gaussiano Aditivo Independente.

Os vetores $\mathbf{x}_i$ também são amostrados de uma Distribuição Normal Padrão:
$$
\mathbf{x}_i \sim \mathcal{N}(\mathbf{0}, \mathbf{I})
$$

In [None]:
class SyntheticLinearData:
    def __init__(self, n_samples, n_features):
        self.n_samples = n_samples
        self.n_features = n_features
    
    def generate(self):
        self.w_true = [random.gauss() for _ in range(self.n_features)]
        self.b_true = random.uniform(0, 9)

        X, y = [], []

        for _ in range(self.n_samples):
            x_i = [random.gauss() for _ in range(self.n_features)]
            noise = random.gauss()
            y_i = dot(self.w_true, x_i) + self.b_true + noise

            X.append(x_i)
            y.append(y_i)

        return X, y

### Definição do Modelo

Classe base:

In [None]:
class Layer:
    def forward(self, X):
        # Subclasses implementam o método forward
        raise NotImplementedError
    
    def backward(self, grad_output):
        # Subclasses implementam o método backward
        raise NotImplementedError

*Fully Connected Layer*:

In [None]:
class Linear(Layer):
    def __init__(self, in_features, out_features):
        # Inicializa os pesos com valores aleatórios centrados em 0
        self.W = [
            [random.uniform(-1e-3, 1e-3) for _ in range(in_features)] for _ in range(out_features)
            ]
        
        # Inicializa os vieses como zeros
        self.b = [0.0] * out_features

    def forward(self, X):
        # Armazena a entrada para uso posterior no Bacwarkd Pass
        self.X = X

        # Transformação afim
        return [ [dot(self.W[i], x_i) + self.b[i] for i in range(len(self.W))] for x_i in X]

    def backward(self, grad_output):
        # Número de amostras no batch
        N = len(self.X)

        # Dimensão de entrada (número de features)
        in_dim = len(self.W[0])
        # Dimensão de saída (número de neurônios)
        out_dim = len(self.W)

        # Acumula os gradientes dos pesos, vieses e entradas
        grad_W = [[0.0] * in_dim for _ in range(out_dim)]
        grad_b = [0.0] * out_dim
        grad_input = [[0.0] * in_dim for _ in range(N)]

        # Loop sobre o batch, neurônios de saída e componentes de entrada
        for i in range(N): # Percorre as amostras
            for j in range(out_dim): # Percorre os neurônios de saída
                # Gradiente do viés
                grad_b[j] += grad_output[i][j]

                for k in range(in_dim): # Percorre as features de entrada
                    # ∂L/∂W_jk = ∂L/∂y_j * x_k
                    grad_W[j][k] += grad_output[i][j] * self.X[i][k]
                    
                    # ∂L/∂x_k = Σ_j (∂L/∂y_j * W_jk)
                    grad_input[i][k] == grad_output[i][j] * self.W[j][k]
        
        # Armazena os gradientes para uso pelo otimizador
        self.grad_W = grad_W
        self.grad_b = grad_b

        # Retorna o gradiente em relação à entrada da camada
        return grad_input

Função de ativação $\mathrm{ReLU}$:

In [None]:
class ReLU(Layer):
    def forward(self, X):
        # Armazena a entrada para uso posterior no Backward Pass
        self.X = X

        # Aplica a função ReLU element-wise
        return [[max(0, x_i) for x_i in sample] for sample in X]
    
    def backward(self, grad_output):
        # Para cada elemento do batch:
        # - Se a entrada original foi positiva, o gradiente é preservado
        # - Senão, o gradiente é anulado
        return [ [grad_output[i][j] if self.X[i][j] else 0.0 for j in range(len(self.X[0]))] for i in range(len(self.X))]

Composição de camadas:

In [None]:
class MLP:
    def __init__(self, layers):
        # Armazena a arquitetura da rede como uma lista de camadas
        self.layers = layers

    def forward(self, X):
        # Propaga os dados camada a camada, atualizando X a cada etapa
        for layer in self.layers:
            X = layer.forward(X)

        # Ao fim do loop, X contém a saída da última camada
        return X

    def backward(self, grad):
        # Percorre as camadas em ordem reversa, propagando os gradientes
        for layer in reversed(self.layers):
            grad = layer.backward(grad)

*Loss* (MSE):

In [None]:
class MSELoss:
    def forward(self, y_pred, y_true):
      	# Armazena predições e rótulos verdadeiros para uso no cálculo do gradiente 
        self.y_pred = y_pred
        self.y_true = y_true

        # Número de amostras no batch
        N = len(y_true)

        # Calcula o Erro Quadrático Médio (MSE)
        return sum((y_pred[i][0] - y_true[i])**2 for i in range(N)) / N
    
    def backward(self):
      	# Número de amostras no batch
        N = len(self.y_true)

        # Para cada amostra, calcula o gradiente escalar e o encapsula numa lista
        return [ [2 * (self.y_pred[i][0] - self.y_true[i]) / N] for i in range(N) ]

Otimizador:

In [None]:
class SGD:
    def __init__(self, lr):
      	# Armazena a taxa de aprendizado
        self.lr = lr

    def step(self, model):
      	# Itera sobre todas as camadas do modelo
        for layer in model.layers:
          	            
            if hasattr(layer, 'W'): # Verifica se a camada tem pesos treináveis
              
              	# Para cada elemento da matriz W aplica-se o Gradiente Descendente
                for i in range(len(layer.W)):
                    for j in range(len(layer.W[0])):
                        layer.W[i][j] -= self.lr * layer.grad_W[i][j]
                
                # Cada componente do vetor b é atualizado analogamente aos pesos
                for i in range(len(layer.b)):
                    layer.b[i] -= self.lr * layer.grad_b[i]

*Loop* de treino genérico:

In [None]:
# Geração de conjunto de dados sintético
data = SyntheticLinearData(100, 5)
X, y = data.generate()

# Divisão em subconjuntos de treino e teste
X_train, X_test, y_train, y_test = holdout(X, y)
model = MLP([
    Linear(5, 100),
    ReLU(),
    Linear(100, 1)
])

# Definição da Loss Function
loss_fn = MSELoss()

# Definição do otimizador
optimizer = SGD(lr=0.001)

# Training Loop
for epoch in range(10_000):
  
  	# Forward Pass
    y_pred = model.forward(X_train)
    
    # Cálculo da Loss Function
    loss = loss_fn.forward(y_pred, y_train)

    # Backward Pass da Loss Function
    grad_loss = loss_fn.backward()
    # Backward Pass do modelo
    model.backward(grad_loss)

    # Otimização dos parâmetros
    optimizer.step(model)