### Pedro Jorge Oliveira Câmara - Álgebra Linear Aplicada - Autoencoders

In [None]:
import numpy as np
from numpy.linalg import *

import matplotlib.pyplot as plt

import torch
import torch.nn as nn 
import torch.optim as optim

from typing import *

# para passar as instâncias do pytorch para a GPU caso tenha uma 
# (no meu caso tem)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Funções auxiliares 

In [None]:
def plot_signal(sample_points: np.ndarray, signals: np.ndarray, label: str = "Sinais com ruído", color=None) -> None:
    """
    Plota um conjunto de sinais.

    Args:
        sample_points (np.ndarray): lista de pontos do domínio
        signals (np.ndarray): matriz ou vetor de sinais com amostragens para os valores do sinal em cada ponto
        label (str, optional): Título do plot. Defaults to "Sinais com ruído".
    """
    plt.plot(sample_points, signals)
    plt.title(label)
    plt.xlabel("Tempo")
    plt.ylabel("Amplitude")
    plt.show()
    

def plot_signal_comparison(sample_points: np.ndarray, signals: dict, label: str, colors = List) -> None:
    for signal_name, color in zip(signals.keys(), colors):
        plt.plot(sample_points, signals[signal_name], label=signal_name, color = color)
    
    plt.title(label)
    plt.xlabel("Tempo")
    plt.ylabel("Amplitude")
    plt.legend()
    plt.show()

In [None]:
def get_f(m: float = 0, s: float = 1.5):
    """
    Retorna aleatoriamente uma função `f` trigonométrica ou combinadas de funções trigonométricas.
    Para gerar mais variações nos sinais, as funções causam uma perturbação na entrada. Por exemplo: sin(x*0.4), cos(x*(-0.2)). 
    A perturbação é dada por uma amostra de uma distribuição normal de média `m` e desvio padrão `s`, gerada a cada vez que uma função é retornada.

    Returns:
        Callable: uma função f 
    """
    r = np.random.normal
    
    functions = (
        lambda x: np.sin(x*r(m, s)),
        lambda x: np.cos(x*r(m, s)),
        lambda x: np.sin(x*r(m, s)) + np.cos(x*r(m, s)),
        lambda x: np.sin(x*r(m, s)) - np.cos(x*r(m, s)),
        lambda x: np.sin(x*r(m, s)) * np.cos(x*r(m, s)),
    )
    
    f = np.random.choice(functions)
    
    return f

In [None]:
def add_noise(signals: np.ndarray, level: float = 0.1, mean: float = 0, std: float = 1) -> float:
    """
    Adiciona ruído a um sinal. O ruído é uma amostra de uma distribuição normal de média `mean` e desvio padrão `std`.

    Args:
        signal_vector (np.ndarray): vetor ou matrix em que será adicionado o ruído
        level (float, optional): nível do ruído. Defaults to 1.
        mean (float, optional): média da distribuição normal. Defaults to 0.
        std (float, optional): desvio padrão da distribuição normal. Defaults to 1.

    Returns:
        float: _description_
    """
    return signals + (np.random.normal(mean, std, size=signals.shape) * level)


def generate_signal_vector(f: Callable, sample_points: np.ndarray) -> np.ndarray:
    """
    Gera um vetor de sinais. Cada idx é um "instante de tempo" e cada v[idx] é o valor correspondente do sinal.

    Args:
        f (Callable): função para aplicar aos pontos
        sample_points (np.ndarray): domínio da amostra

    Returns:
        np.ndarray: vetor de sinais 
    """
    return f(sample_points)
    
    
def generate_signal_matrix(n_signals: int,  sample_points: np.ndarray) -> np.ndarray:
    """
    Gera uma matriz de sinais. Cada linha é um vetor de sinais.
    Dada uma linha L, cada idx é um "instante de tempo" e cada L[idx] é o valor correspondente do sinal.

    Args:
        n_signals (int): número de sinais a serem amostrados. (número de linhas da matriz)
        sample_points (np.ndarray): domínio da amostra

    Returns:
        np.ndarray: matriz de sinais
    """
    n_points = sample_points.size
    
    m = np.zeros((n_signals, n_points))
    for i in range(n_signals):
        f = get_f()
        m[i, :] = generate_signal_vector(f, sample_points)
    
    return m


def generate_sample_points(start: float, stop: float, step: float) -> np.ndarray:
    """
    Gera o domínio da amostra

    Args:
        start (float): menor valor do domínio
        stop (float): maior valor do domínio
        step (float): tamanho do passo entre o menor e o maior valor

    Returns:
        np.ndarray: amostra de pontos no intervalo [`start`, `stop`) com passo `step`
    """
    return np.arange(start, stop, step)

### Definição do Autoencoder

In [None]:
class Autoencoder(nn.Module):
    def __init__(self, activation: Callable):
        """
        Args:
            activation (Callable): função de ativação
        """
        super(Autoencoder, self).__init__()
        self.activation_name = activation.__name__
        
        self.encoder = nn.Sequential(
            nn.Linear(100, 50),
            activation(),
            nn.Linear(50, 25),
            activation(),
            nn.Linear(25, 10),
            #activation(),
            #nn.Linear(10, 5),
            #activation(),
            #nn.Linear(5, 1)
        )
        
        self.decoder = nn.Sequential(
            #nn.Linear(1, 5),
            #activation(),
            #nn.Linear(5, 10),
            #activation(),
            nn.Linear(10, 25),
            activation(),
            nn.Linear(25, 50),
            activation(),
            nn.Linear(50, 100)
        )
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x
   
    def reconstruct(self, signals):
        signals = torch.tensor(signals, dtype=torch.float32).to(device)
        self.eval()
        with torch.no_grad():
            return self.forward(signals).detach().cpu().numpy()

### Instanciação e treinamento do Autoencoder

In [None]:
def train(signals_np: np.ndarray, n_epochs: int, activation, lr):
    activation = activation 
    lr = lr 
    
    autoencoder = Autoencoder(activation = activation).to(device)
    criterion = nn.MSELoss().to(device)
    optimizer = optim.Adam(autoencoder.parameters(), lr=lr)
    signals = torch.tensor(signals_np, dtype=torch.float32).to(device)
    
    for epoch in range(n_epochs):
        outputs = autoencoder(signals)
        loss = criterion(outputs, signals)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (epoch+1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{n_epochs}], Loss: {loss.item():.4f}')

    return autoencoder

In [None]:
start = 0
stop = 10
step = 0.1

n_samples = 100000
n_epochs = 10000

sample_points = generate_sample_points(start, stop, step)
signals_matrix = generate_signal_matrix(n_samples, sample_points)

In [None]:
params = dict(activation = None, lr = 0.005) 

activations = (nn.ReLU, nn.Sigmoid, nn.Tanh, nn.LeakyReLU, nn.PReLU, nn.GELU,)
autoencoders = []

for activation in activations:
    params["activation"] = activation
    autoencoders.append(train(signals_matrix, n_epochs, **params))

### Singular Value Decomposition

In [None]:
def svd_reconstruction(signal_matrix: np.ndarray, n_components: int) -> np.ndarray:
    """
    Reconstrói uma matriz até o posto `n_components`.

    Args:
        U (np.ndarray): matriz U do SVD
        S (np.ndarray): matriz S do SVD
        V (np.ndarray): matriz V do SVD
        n_components (int): posto da matriz reconstruída

    Returns:
        np.ndarray: matriz original reconstruída com posto igual a n_components
    """
    U, S, V = svd(signal_matrix)
    
    U = U[:, :n_components]
    S = np.diag(S[:n_components])
    V = V[:n_components, :]
    
    return U @ S @ V

### Experimentos

In [None]:
def generate_noisy_signal_matrix(f: Callable, n_signals: int,  sample_points: np.ndarray, noisy = True) -> np.ndarray:
    """
    Gera uma matriz de sinais. Cada linha é um vetor de sinais.
    Dada uma linha L, cada idx é um "instante de tempo" e cada L[idx] é o valor correspondente do sinal.

    Args:
        n_signals (int): número de sinais a serem amostrados. (número de linhas da matriz)
        sample_points (np.ndarray): domínio da amostra

    Returns:
        np.ndarray: matriz de sinais
    """
    n_points = sample_points.size
    
    m = np.zeros((n_signals, n_points))
    for i in range(n_signals):
        m[i, :] = generate_signal_vector(f, sample_points)
        if noisy:
            m[i, :] = add_noise(m[i, :], level = 1)
    
    return m

In [None]:
def plot_svd_experiments(sample_points, noisy_signals, clean_signals, rank, signal_label):
    plot_signal(sample_points, noisy_signals.T, f"{signal_label} com ruído")
    
    reconstruction = svd_reconstruction(noisy_signals.T, rank)
    
    plot_signal(sample_points, reconstruction, f"{signal_label} reconstruído - SVD posto 1")
    plot_signal(sample_points, reconstruction.T.mean(axis=0), f"{signal_label} reconstruído - SVD posto 1")
    
    signals = dict(Original = clean_signals.T, Reconstrução = reconstruction.T.mean(axis=0))
    plot_signal_comparison(sample_points, signals, label = f"{signal_label} reconstruído - SVD posto {rank}", colors = ["red", "blue"])
    
    
def plot_autoencoder_experiments(autoencoder, sample_points, noisy_signals, clean_signals, dimension, signal_label):
    #plot_signal(sample_points, noisy_signals.T, f"{signal_label} com ruído")

    reconstruction = autoencoder.reconstruct(sin)

    plot_signal(sample_points, reconstruction.T, f"{signal_label} reconstruído - Autoencoder - Espaço latente {dimension}d - Ativação: {autoencoder.activation_name}")
    plot_signal(sample_points, reconstruction.mean(axis=0), f"{signal_label} reconstruído - Autoencoder - Espaço latente {dimension}d - Ativação: {autoencoder.activation_name}")

    signals = dict(Original = clean_signals.T, Reconstrução = reconstruction.mean(axis=0))
    plot_signal_comparison(sample_points, signals, label = f"{signal_label} reconstruído - Autoencoder - Espaço latente {dimension}d - Ativação: {autoencoder.activation_name}", colors = ["red", "blue"])


In [None]:
n_signals = 10

f = lambda x: np.sin(0.5*x) * np.cos(2*x)

sin_clean = generate_noisy_signal_matrix(f, 1, sample_points, noisy = False)
sin = generate_noisy_signal_matrix(f, n_signals, sample_points)

label = "F"

In [None]:
dimension = 10

for autoencoder in autoencoders:
    plot_autoencoder_experiments(autoencoder, sample_points, sin, sin_clean, dimension, label)

In [None]:
plot_signal(sample_points, sin_clean.T, "F sem ruído")

In [None]:
rank = 1
plot_svd_experiments(sample_points, sin, sin_clean, rank, label)

In [None]:
norm(sin_clean - (autoencoder.reconstruct(sin).mean(axis=0)))

In [None]:
norm(sin_clean - svd_reconstruction(sin, 1).mean(axis=0))