# **EP4 - Variational Autoencoder (VAE)**

*Nome (matrícula):* Pedro Semcovici (12745511)


Dando continuidade à atividade proposta no EP4, este notebook apresenta uma implementação de referência para um **Variational Autoencoder (VAE)**. O objetivo é fornecer um código base que facilite o início do trabalho, permitindo que vocês invistam menos tempo configurando aspectos básicos e tenham mais energia para explorar e experimentar, seguindo sua curiosidade.  

Para evitar problemas com tempos excessivos de treinamento, utilizaremos o dataset **Fashion MNIST**.  

## **Seções**
1. **Carregando os dados**  
2. **Função de perda (Loss)**  
3. **Construção do modelo**  
4. **Treinamento**  
5. **Amostragem de dados gerados**  
6. **Perguntas**  


## Referências
Este notebook foi desenvolvido com base em dois códigos-fonte. Do primeiro, foi extraída uma descrição básica do modelo em PyTorch. Do segundo, aproveitou a customização da função de perda (loss) e o mecanismo de amostragem de novos exemplos. Além disso, a revisão do texto e alguns comentários nos códigos foram realizados com o auxílio do ChatGPT.

[1] "PyTorch-Autoencoders/Autoencoders.ipynb at Master · Lharries/PyTorch-Autoencoders." GitHub. Disponível em: https://github.com/lharries/PyTorch-Autoencoders/blob/master/autoencoders.ipynb.

[2] "PyTorch-VAE/Models/Vanilla_vae.py at Master · AntixK/PyTorch-VAE." GitHub. Disponível em: https://github.com/AntixK/PyTorch-VAE/blob/master/models/vanilla_vae.py.

In [2]:
!pip3 install torch torchvision torchaudio





In [3]:
# Setup
import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import datasets, transforms
import matplotlib.pyplot as plt

# O código espera que você esteja utilizando uma GPU/CUDA.

In [5]:
# Verifica se CUDA está disponível
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Device:", device)

Device: cpu


  return torch._C._cuda_getDeviceCount() > 0


## 1. **Carregando os dados:**

Carrega e organiza o dataset Fashion MNIST, aplicando transformações e criando dataloaders para os conjuntos de treinamento, validação e teste. 

1. Aplica transformações no dataset:
    - Converte imagens para tensores.
    - Normaliza os dados para o intervalo [-1, 1].
2. Baixa e carrega o dataset Fashion MNIST.
3. Divide o conjunto de treinamento em:
    - train_set: Dados para treinamento.
    - val_set: Dados para validação, com tamanho definido por validation_set_percent.
4. Exibe:
    - Informações sobre as dimensões e intervalos dos dados.
    - Exemplos visuais de imagens do dataset.
5. Retorna dataloaders para treinamento, validação e teste.

In [None]:
def carrega_dataset(validation_set_percent=0.1, batch_size=128, verbose=False):
  transform_list = transforms.Compose(
    [
      transforms.ToTensor(),
      transforms.Normalize((0.5), (0.5)),
    ]
  )

  training_data = datasets.FashionMNIST(root="data", train=True, download=True, transform=transform_list)
  test_data = datasets.FashionMNIST(root="data", train=False, download=True, transform=transform_list)

  # Particiona o conjunto de treinamento e validação
  n_val = int(len(training_data) * validation_set_percent)
  n_train = len(training_data) - n_val
  train_set, val_set = random_split(
    training_data, [n_train, n_val], generator=torch.Generator().manual_seed(0)
  )

  if verbose:
    print("Dataset carregado em Tensores com as seguintes dimensões:\n")
    print(f"all_training_data: X = {training_data.data.size()}, intervalo: [{training_data.data.min().item()}, {training_data.data.max().item()}]")
    print(f"                     training:   {len(train_set.indices):6} samples")
    print(f"                     validation: {len(val_set.indices):6} samples")
    print()
    print(f"test_data: X_test = {test_data.data.shape} [{test_data.data.min().item()}, {test_data.data.max().item()}]")

    figure = plt.figure(figsize=(20, 3))
    plt.suptitle("Exemplos de imagens do Fashion-MNIST")
    rows, cols = 3, 15
    for i in range(1, rows * cols + 1):
      sample_idx = torch.randint(len(training_data), size=(1,)).item()
      img, label = training_data[sample_idx]
      figure.add_subplot(rows, cols, i)
      plt.axis("off")
      plt.imshow(img.squeeze(), cmap="gray")
    plt.show()

  train_dataloader = DataLoader(train_set, batch_size=batch_size, shuffle=True, pin_memory=True)
  val_dataloader = DataLoader(val_set, batch_size=batch_size, shuffle=False, pin_memory=True)
  test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=True, pin_memory=True)

  return (train_dataloader, val_dataloader, test_dataloader)

train_dataloader, val_dataloader, test_dataloader = carrega_dataset(validation_set_percent = 0.1, verbose=True)  

----
## 2. Função de perda (Loss)

A função de perda desempenha um papel fundamental no treinamento de um **Variational Autoencoder (VAE)**, pois combina termos que promovem a reconstrução precisa dos dados e a regularização da distribuição latente.  

Neste notebook, utilizamos uma função de perda composta pelos seguintes termos:
- **Mean Squared Error (MSE)**: Mede o erro de reconstrução entre os dados originais e os gerados pelo decoder.  
- **Kullback-Leibler Divergence (KL-Divergence)**: Garante que a distribuição latente gerada pelo encoder esteja próxima de uma distribuição normal padrão (𝒩(0,1)).  

### Ajuste do Termo de Regularização
Para evitar que o termo KL-Divergence domine a perda total, ele é ajustado com base no tamanho do batch em relação ao tamanho do dataset. Neste caso, o redutor utilizado é (256/60000).:  

Essa abordagem ajuda a balancear a contribuição da KL-Divergence na função de perda total, tornando o treinamento mais estável.  

In [None]:
def vae_loss(recons, input, mu, log_var, kld_weight):
    """
    Função para calcular a perda de um Variational Autoencoder (VAE).
    
    Parâmetros:
    - recons: Reconstrução produzida pelo decodificador do VAE.
    - input: Entrada original fornecida ao VAE.
    - mu: Vetor de médias da distribuição latente (gerado pelo codificador).
    - log_var: Vetor do logaritmo da variância da distribuição latente (gerado pelo codificador).
    - kld_weight: Fator de peso para ajustar a importância da perda KLD em relação à perda de reconstrução.
    
    Retorno:
    - Soma da perda de reconstrução e da perda KLD ponderada.
    """
    
    # Calcula a perda de reconstrução usando o erro médio quadrático (MSE)
    recons_loss = F.mse_loss(recons, input)
    
    # Calcula a perda de divergência Kullback-Leibler (KLD) entre a distribuição latente e uma normal padrão
    kld_loss = torch.mean(-0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp(), dim=1), dim=0)
    
    # Retorna a soma da perda de reconstrução e da perda KLD ponderada
    return recons_loss + kld_weight * kld_loss


######################################
# COMENTAR SOBRE ISSO
######################################
def ae_loss(recons, input):
    return F.mse_loss(recons, input)


----
## 3. Variational Autoencoder (VAE)

Arquitetura é útil para geração de novos exemplos baseados em uma distribuição latente aprendida.


In [None]:
class VariationalAutoencoder(nn.Module):
    def __init__(self, input_size):
        super(VariationalAutoencoder, self).__init__()

        # Projeção inicial dos dados para 512 dimensões
        self.fc1 = nn.Sequential(
            nn.Flatten(),
            nn.Linear(input_size,512)
        )

        # Camadas para estimar os parâmetros da distribuição latente (μ e log(σ²))
        self.fc21 = nn.Linear(512, 10)  # Gera a média (μ)
        self.fc22 = nn.Linear(512, 10)  # Gera o logaritmo da variância (log(σ²))
        
        # Atributo para armazenar as embeddings latentes
        self.embeddings = None

        self.relu = nn.ReLU()
        
        # Camadas de decodificação para reconstruir os dados no espaço original
        self.fc3 = nn.Linear(10, 512)
        self.fc4 = nn.Linear(512, input_size)
    

    # Método de codificação: reduz os dados para o espaço latente
    def encode(self, x):
        x = self.relu(self.fc1(x))
        return self.fc21(x), self.fc22(x)
    
    # Método de decodificação: reconstrói os dados a partir do espaço latente
    def decode(self, z):
        z = self.relu(self.fc3(z))
        return torch.tanh(self.fc4(z))
        
    # Truque de reparametrização: amostra do espaço latente de forma diferenciável
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 *logvar)
        eps = torch.randn_like(std)
        return eps.mul(std).add_(mu)
        
    # Forward: executa todo o fluxo do VAE
    def forward(self,x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        prediction = self.decode(z)
        self.embeddings = z
        return prediction, mu, logvar
    
    # Método para gerar amostras a partir do espaço latente
    def sample(self, num_samples:int):
        z = torch.randn(num_samples, 10)  # Gera vetores aleatórios no espaço latente
        z = z.cuda()  

        samples = self.decode(z) # Decodifica os vetores para gerar dados
        return samples
    
    
class SimpleAutoencoder(nn.Module):
    def __init__(self, input_size=784, latent_dim=10):
        super(SimpleAutoencoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Flatten(),
            nn.Linear(input_size, 512),
            nn.ReLU(),
            nn.Linear(512, latent_dim)
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 512),
            nn.ReLU(),
            nn.Linear(512, input_size),
            nn.Tanh()
        )
    def forward(self, x):
        z = self.encoder(x)
        x_reconstructed = self.decoder(z)
        return x_reconstructed

# ### 2. Denoising Autoencoder (DAE)

# %%
class DenoisingAutoencoder(nn.Module):
    def __init__(self, input_size=784, latent_dim=10):
        super(DenoisingAutoencoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Flatten(),
            nn.Linear(input_size, 512),
            nn.ReLU(),
            nn.Linear(512, latent_dim)
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 512),
            nn.ReLU(),
            nn.Linear(512, input_size),
            nn.Tanh()
        )

    def forward(self, x):
        z = self.encoder(x)
        x_reconstructed = self.decoder(z)
        return x_reconstructed

    def add_noise(self, x, noise_factor=0.3):
        x_noisy = x + noise_factor * torch.randn_like(x)
        x_noisy = torch.clip(x_noisy, -1.0, 1.0)
        return x_noisy

In [None]:
vae = VariationalAutoencoder(28*28).to('cuda')


----
## 4. Treinamento

Aqui são definidos os hiperâmetros e código de treinamento.

In [None]:
# Hiperparâmetros
epochs = 30                   
batch_size= 256
learning_rate = 3.5e-3          
loss_fn = vae_loss
optimizer = torch.optim.AdamW(vae.parameters(), lr=learning_rate) 

In [None]:
@torch.inference_mode()
def eval_loop(dataloader, model, loss_fn):
  model.eval()
  num_batches = len(dataloader)
  eval_loss = 0

  ground_truth = None
  prediction = None

  for X, y in dataloader:
    X = X.cuda()
    ground_truth = X
    prediction, mu, logvar = model(X)
    kld_weight = X.shape[0]/60000
    X = X.view(-1, 784)
    eval_loss += loss_fn(prediction, X, mu, logvar, kld_weight).item()

  eval_loss /= num_batches
  
  del X, prediction
  return eval_loss

def train_loop(dataloader, model, loss_fn,  optimizer):
  model.train()
  train_loss = 0
  num_batches = len(dataloader)

  for _, (X, _) in enumerate(dataloader):
    X = X.cuda()
    prediction, mu, logvar = model(X)
    X = X.view(-1, 784)
    kld_weight = X.shape[0]/60000
    loss = loss_fn(prediction, X, mu, logvar, kld_weight)
    
    with torch.no_grad():
      train_loss += loss.item()

    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    
  train_loss /= num_batches

  del X, prediction, loss
  return train_loss 

In [None]:
for t in range(epochs):
  print(f"Epoch {t+1:2}/{epochs}", end='')
  train_loss = train_loop(train_dataloader, vae, loss_fn, optimizer)
  val_loss = eval_loop(val_dataloader, vae, loss_fn)
  print(f"\t train_loss={train_loss:.3f}  val_loss={val_loss:.3f}")


print("Treinamento concluído!")

----
# 5. Gerando amostras

Este código gera uma imagem de exemplo do espaço latente do VAE e a exibe em escala de cinza, formatando-a como uma matriz 28×28, utilizada que o tamanho da imagem do Fashion MNIST.

In [None]:
exemplos_gerado = vae.sample(1).view(-1, 28, 28).detach().cpu().numpy()
plt.imshow(exemplos_gerado[0],  cmap="gray")
plt.axis('off')

----
## 6. **Perguntas:**  
   - Quais foram as dificuldades enfrentadas durante o desenvolvimento?  

   
   - Quais experimentos adicionais você realizou? O que aprendeu com eles?  