## João Pedro Rodrigues Freitas - 11316552

In [86]:
import numpy as np
import matplotlib.pyplot as plt

## Data
Esta classe realiza o pre-processamento dos dados, permitindo:
- Permite definir uma porcentagem de dados para treino
- Embaralhar as linhas
- Normalizar os dados (para cada coluna, de acordo com o max e min de cada coluna)
- Obter o conjunto de treino
- Obter o conjunto de teste
- Obter o número de atributos (numero de colunas)

In [87]:
class Data():
    def __init__(self, X: np.ndarray) -> None:
        self.data = X


    def getData(self) -> np.ndarray:
        return self.data
    

    def setTrainRatio(self, ratio: float) -> None:
        self.train_ratio = ratio


    def getTrainRatio(self) -> float:
        return self.train_ratio
    
    
    def getAttrSize(self) -> int:
        return self.data.shape[1]
    
    
    def shuffleData(self) -> None:
        self.data = np.random.permutation(self.data)


    def getTrainData(self) -> np.ndarray:
        return self.data[:int(self.data.shape[0] * self.train_ratio)]
    
    
    def getTestData(self) -> np.ndarray:
        return self.data[int(self.data.shape[0] * self.train_ratio):]
    

    def normalize(self) -> None:
        '''Normaliza cada atributo para o intervalo [0, 1]'''
        for i in range(self.getAttrSize()):
            self.data[:,i] = (self.data[:,i] - np.min(self.data[:,i])) / (np.max(self.data[:,i]) - np.min(self.data[:,i]))

            

## Função de ativação

Foi utilizada a função de ativação sigmoide.

A função dada por 1 / (1 + np.exp(-Z)) é sensível a overflow. Desse modo, ela
foi adaptada (mesma função utilizada pelo pytorch).

In [88]:
def sigmoid(x, derivative=False):
    if derivative:
        return sigmoid(x) * (1 - sigmoid(x))

    positives = x >= 0
    negatives = ~positives
    
    exp_x_neg = np.exp(x[negatives])
    
    y = x.copy()
    y[positives] = 1 / (1 + np.exp(-x[positives]))
    y[negatives] = exp_x_neg / (1 + exp_x_neg)
    
    return y

## Layer
Os layers contém as informações das camadas.

Os pesos e bias são inicializados de acordo com uma distribuição uniforme
no intervalo de -0.5 a 0.5

O layer permite realizar o forward de si próprio. 

A função de ativação utilizada como padrão é a sigmoide.

In [89]:
class Layer():
    # Construtor da classe
    def __init__(self, n_neurons: int, inputs = None, lastLayer: bool = False) -> None:
        self.n_neurons = n_neurons

        self.W = None
        self.b = None
        self.out = None # Saída da camada
        self.actv = None # Saída com função de ativação

        self.inputs = inputs # Entradas da camada

        self.lastLayer = lastLayer


    # Inicializa os pesos e bias da camada
    def initLayer(self) -> None:
        if self.W is None and self.inputs is not None:
            # Inicializa os pesos e bias com distribuição uniforme
            # entre -0.5 e 0.5
            self.W = np.random.rand(self.inputs.shape[-1], self.n_neurons) - 0.5
            # self.W = np.random.rand(self.shape[0], self.shape[1]) - 0.5
            self.b = np.random.rand(self.n_neurons) - 0.5


    # Realiza o forward da camada
    # Processando a entrada e aplicando a função de ativação
    def selfForward(self, inputs) -> None:
        self.inputs = inputs

        self.initLayer() # Inicializa os pesos e bias, caso n tenham sido inicializados

        self.out = np.dot(self.inputs, self.W) + self.b

        # Se for a última camada, não aplica a função de ativação
        if not self.lastLayer:
            self.actv = sigmoid(self.out)
        else:
            self.actv = self.out


    # Retorna a saída da camada (Z)
    def getOutput(self):
        return self.out
    
    
    # Retorna a saída da camada com a função de ativação (A)
    def getActivation(self):
        return self.actv
        


## Autoencoder

Esta classe permite a gestão do modelo, viabilizando adicionar camadas ao modelo,
o forward, o backward, o treinamento com base em um conjunto de treinamento, a predição
com base em um conjunto de testes e, ainda, permite obter a ativação da dimensão
latente.

In [90]:
class Autoencoder():
    # Construtor da classe
    def __init__(self) -> None:
        self.layers = np.array([])

        self.epochs = None
        self.lr = None

        self.inputs = None # X
        self.targets = None # X

        self.outputs = []
        self.activations = [] # X_Hat
        

    # Realiza o forward do autoencoder
    # Recebe o input e passa por todas as camadas
    # de modo que a saída de uma camada seja a entrada da próxima
    def forward(self, inputs):
        self.activations = [] # Limpa as ativações
        self.outputs = [] # Limpa as saídas

        self.activations.append(inputs)
        self.outputs.append(inputs)

        for layer in self.layers:
            layer.selfForward(inputs)
            inputs = layer.getActivation() # Saída da camada é a entrada da próxima
            output = layer.getOutput()

            self.outputs.append(output) # Salva a saída da camada
            self.activations.append(inputs) # Salva a saída com a função de ativação

        return inputs
    
    
    # Realiza o backward do autoencoder
    def backward(self) -> None:
        i = self.layers.size # Número de camadas
        n = self.inputs.shape[0] # Número de amostras

        err = 2 * (self.targets - self.activations[-1]) # 2 * (x - x_hat)

        # Para cada layer e output(z), calcula o delta e atualiza os pesos e bias
        for layer, z in zip(self.layers[::-1], self.outputs[::-1]):
            delta = err

            # Não calcula a derivada ativação da última camada
            if (i != self.layers.size):
                # Calcula o delta para a próxima iteração
                delta *= sigmoid(z, derivative=True)

            err = np.dot(delta, layer.W.T) # Atualiza o erro para a prox iteracao

            if i > 0: # Evita index out of range
                prev_act = self.activations[i-1] # Ativação da camada anterior
                dW = np.dot(prev_act.T, delta) / n
                db = delta.mean()

                layer.W += self.lr * dW
                layer.b += self.lr * db


            i -= 1

        
    # Treina o modelo, realizando o forward e backward epochs vezes
    # e printando o erro quadrático médio a cada 10 épocas
    def fit(self, inputs, targets, lr: float = 0.01, epochs: int = 100) -> None:
        self.inputs = inputs
        self.targets = targets
        self.lr = lr

        for epoch in range(epochs):
            self.forward(inputs)
            self.backward()

            if epoch % 10 == 0:
                print(f'Epoch: {epoch}')
                print(f'Loss: {np.mean(np.square(self.targets - self.activations[-1]))}')


    # Adiciona uma camada ao autoencoder
    def addLayer(self, layer: Layer) -> None:
        self.layers = np.append(self.layers, layer)


    # Retorna a predição do input (X_hat) com base no modelo treinado
    def predict(self, inputs) -> np.ndarray:
        self.forward(inputs)
        return self.activations[-1]
    
    
    # Retorna a ativação da dimensão latente
    def getLatentSpace(self) -> np.ndarray:
        return self.layers[self.layers.size // 2 - 1].getActivation()
    

## Criação do modelo
Esta função instancia a classe autoencoder.

Ela recebe a dimensão dos atributos de entrada, as camadas ocultas (na parte do
encoder) e a função de ativação.

Esta função irá instanciar os layers de forma que seja espelhado, ou seja:

Se inputDim = 10, e as camadas ocultas têm os tamanhos layers=[5, 3], então
a camada com 3 neurônios será a dimensão latente, sendo tudo antes dela espelhado.

Assim, ficará no formato: [10, 5, 3, 5, 10].

In [91]:
def createModel(inputDim, layers):
    model = Autoencoder()

    # Adiciona as camadas do encoder
    for i in range(len(layers)):
        model.addLayer(Layer(layers[i]))

    # Adiciona as camadas do decoder
    for i in range(len(layers)-2, -1, -1):
        model.addLayer(Layer(layers[i]))

    # Adiciona a camada de saída
    model.addLayer(Layer(inputDim, lastLayer = True))

    return model

Nesta seção, os dados serão gerados e pré-processados, criando-se modelo
e definindo a arquitetura

In [92]:
X = np.vstack([np.random.normal(1, 0.5, [100,32]),
                np.random.normal(-2, 1, [100,32]),
                np.random.normal(3, 0.75, [100,32])])

data = Data(X) # Cria um objeto Data com os dados X

data.normalize() # Normaliza os dados
data.shuffleData() # Embaralha as linhas
data.setTrainRatio(0.8) # Define a proporção de dados de treino

inputDim = data.getAttrSize() # Dimensão de entrada (número de atributos)

# Número de neurônios em cada camada escondida
# A última camada dessa lista é a dimensão latente
# O espelhamento é feito automaticamente
# Exemplo:
# Para um X com 32 atributos, se hiddenLayers = [16, 8], a rede terá a seguinte estrutura:
# [32, 16, 8, 16, 32]
# Em que o primeiro valor é a dimensão de entrada e o último é a dimensão de saída
hiddenLayers = [16, 8]

# Cria o modelo do autoencoder
autoencoder = createModel(inputDim, hiddenLayers)

# Treina o autoencoder
autoencoder.fit(data.getTrainData(), data.getTrainData(), lr=0.01, epochs=10000)

Epoch: 0
Loss: 1.0869104801430287
Epoch: 10
Loss: 0.20429825788712913
Epoch: 20
Loss: 0.08087524324377922
Epoch: 30
Loss: 0.05945896150788768
Epoch: 40
Loss: 0.05571821272817437
Epoch: 50
Loss: 0.05503974844952127
Epoch: 60
Loss: 0.05489724153298014
Epoch: 70
Loss: 0.054850797933948404
Epoch: 80
Loss: 0.054822125442479074
Epoch: 90
Loss: 0.054796961427108735
Epoch: 100
Loss: 0.05477265021264395
Epoch: 110
Loss: 0.05474868102894885
Epoch: 120
Loss: 0.05472494532952147
Epoch: 130
Loss: 0.05470141097565309
Epoch: 140
Loss: 0.054678060653408966
Epoch: 150
Loss: 0.05465488013466582
Epoch: 160
Loss: 0.05463185601814948
Epoch: 170
Loss: 0.0546089752844451
Epoch: 180
Loss: 0.05458622519982417
Epoch: 190
Loss: 0.05456359328756695
Epoch: 200
Loss: 0.054541067312580495
Epoch: 210
Loss: 0.05451863526884084
Epoch: 220
Loss: 0.05449628536762815
Epoch: 230
Loss: 0.05447400602615512
Epoch: 240
Loss: 0.054451785856503204
Epoch: 250
Loss: 0.05442961365484246
Epoch: 260
Loss: 0.054407478390921686
Epoch: 

# Testar o modelo

In [97]:
X = data.getTestData() # Dados de teste
X_hat = autoencoder.predict(X) # Dados preditos pelo modelo com base nos dados de teste

# Calcula o erro quadrático médio
err = np.mean(np.square(X - X_hat))

print(f'Erro quadrático médio: {err * 100:.3f}%')

# autoencoder.getLatentSpace()


Erro quadrático médio: 0.704%
