<a href="https://colab.research.google.com/github/ra4ola/APA-Assigment2/blob/main/Parte%201/Parte1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [56]:
from torch.nn.modules.flatten import Flatten
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import os
from google.colab import drive
import torch.nn.functional as F
from sklearn.manifold import TSNE


# Mount Drive

In [57]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Definição de Variáveis

In [58]:
PATH = '/content/drive/MyDrive/APA/Assigment2/Parte3'

In [59]:
TRAIN = True

In [60]:
image_size = 28
lr = 0.001
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(42)

<torch._C.Generator at 0x7871909d1a10>

# Modelos


In [61]:
# @title AE

class AE(nn.Module):
    def __init__(self):
        super(AE, self).__init__()
        self.encoder = nn.Sequential(

            nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(8),
            nn.ELU(),

            nn.Conv2d(in_channels=8, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ELU(),

            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ELU(),

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ELU(),

            nn.Conv2d(in_channels=128, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.ELU()
        )
        self.decoder = nn.Sequential(

         nn.ConvTranspose2d(in_channels=16, out_channels=128, kernel_size=3, stride=1, padding=1),
         nn.BatchNorm2d(128),
         nn.ELU(),

         nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=3, stride=1, padding=1),
         nn.BatchNorm2d(64),
         nn.ELU(),

         nn.ConvTranspose2d(in_channels=64, out_channels=32, kernel_size=3, stride=1, padding=1),
         nn.BatchNorm2d(32),
         nn.ELU(),

         nn.ConvTranspose2d(in_channels=32, out_channels=8, kernel_size=3, stride=1, padding=1),
         nn.BatchNorm2d(8),
         nn.ELU(),

         nn.Upsample(scale_factor=2, mode='nearest'),  # Upsample
         nn.ConvTranspose2d(in_channels=8, out_channels=1, kernel_size=3, stride=1, padding=1),
         nn.BatchNorm2d(1),
         nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [62]:
# @title VAE

class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()

        # Encoder
        self.encoder = AE()

        # Latent space parameters
        self.fc_mean = nn.Linear(3136, 16)
        self.fc_logvar = nn.Linear(3136, 16)

        # Decoder
        self.decoder_fc = nn.Linear(16,3136)
        self.decoder = AE()

    def reparameterize(self, mean, logvar):
        std = torch.exp(0.5 * logvar)
        epsilon = torch.randn_like(std)
        return mean + epsilon * std

    def forward(self, x):
        # Encoder
        x = self.encoder.encoder(x)
        x = x.view(x.size(0), -1)

        # Latent space parameters
        mean, logvar = self.fc_mean(x), self.fc_logvar(x)

        # Reparameterization trick
        z = self.reparameterize(mean, logvar)

        z = self.decoder_fc(z)


        z = z.view(z.size(0),16, 14, 14)

        x_recon = self.decoder.decoder(z)

        return x_recon, mean, logvar



In [63]:
# @title DAE

class DAE(nn.Module):
    def __init__(self):
      super(DAE, self).__init__()
      self.encoder = AE()

    def forward(self, x):
      x = self.encoder(x)
      return x

# Definição de Funções

### Funções de Construção das String para os Caminhos

In [64]:
def ModelDirStr(modelName,path = PATH):
  return f"{path}/{modelName}"
def pathModelStr(modelName,epoch, path = PATH):
  return f"{ModelDirStr(modelName,path)}/{modelName}_epoch={epoch}"

### Função para adicionar Noise

In [65]:
# @title Noise Fuction DAE

def add_noise(data, noise_factor=0.5):
    noisy_data = data + noise_factor * torch.randn_like(data)
    return torch.clamp(noisy_data, 0., 1.)




### Função de Treino


In [85]:
def vae_loss(recon_x, x, mu, logvar,criterion):
    BCE = criterion(recon_x, x)
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

In [67]:
# @title Train Fuction

def train(model, criterion, optimizer, train_loader, epochs, modelName, scheduler=None, path=PATH):
    os.makedirs(ModelDirStr(modelName=modelName, path=path), exist_ok=True)

    for epoch in range(epochs):
        model.train()  # Set the model to training mode

        epoch_loss = 0

        for inputs, _ in train_loader:
            inputs = inputs.to(device)
            optimizer.zero_grad()

            if isinstance(model, VAE):
                outputs, mean, log_var = model(inputs)
                loss = vae_loss(outputs,input,mean,log_var,criterion)
            else:
                outputs = model(inputs)
                loss = criterion(outputs, inputs)
            loss.backward()
            optimizer.step()

            epoch_loss = epoch_loss =+ loss.item()

        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")

        if scheduler is not None:
            scheduler.step()

        torch.save(model, pathModelStr(modelName=modelName, path=path, epoch=epoch))


### Função de Implementação das Perdas

In [68]:
# @title Evalute Training
def evaluate_training(model, training_loader, loss_fn, num_epochs, modelName, path=PATH):
    all_losses = []
    all_kl_loss = []

    model.eval()

    for epoch in range(num_epochs):
        running_loss = 0.0
        running_kld = 0.0

        with torch.no_grad():
            for batch in training_loader:
                inputs, targets = batch
                inputs = inputs.to(device)

                if isinstance(model, VAE):
                    outputs, mean, log_var = model(inputs)
                    outputs, mean, log_var = model(inputs)
                    loss = vae_loss(outputs,input,mean,log_var)
                    running_kld += kld_loss.item()
                else:
                    outputs = model(inputs)
                    loss = loss_fn(outputs, inputs)
                running_loss += loss.item()

        all_losses.append(running_loss)
        all_kl_loss.append(running_kld)

        print(f'Epoch {epoch}: Epoch Loss: {running_loss/len(training_loader):.4f}', end='')
        if isinstance(model, VAE):
            print(f', KL Divergence: {running_kld:.4f}')
        else:
            print('')

    return all_losses, all_kl_loss



### Função Display Images

In [69]:
# @title Plot Losses

def plot_loss_accuracy(losses, Tittle = 'Traning Loss', curve_Label = 'Loss',X_axis_label = 'Epochs', Y_axis_Label = 'Loss'):
    epochs = range(1, len(losses) + 1)

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, losses, 'b', label=curve_Label)
    plt.title(Tittle)
    plt.xlabel(X_axis_label)
    plt.ylabel(Y_axis_Label)
    plt.legend()

    plt.tight_layout()
    plt.show()

In [70]:
# @title Image Reconstruction

def reconstruction_images_plot(model,test_loader, Title = 'Reconstruction'):
  with torch.no_grad():
    for data in test_loader:
      inputs, _ = data
      inputs = inputs.to(device)

      if isinstance(model,VAE):
        outputs,_,_ = model(inputs)
      else:
        outputs = model(inputs)

      print(outputs.shape)
      n = 10
      plt.figure(figsize=(20, 4))
      plt.title = Title
      for i in range(n):
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(inputs[i].cpu().view(28, 28).numpy(), cmap='gray')
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        ax = plt.subplot(2, n, i + 1 + n)
        plt.imshow(outputs[i].cpu().view(28, 28).numpy(), cmap='gray')
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
      plt.show()
      break

# Importação do DataSet

In [71]:
# @title DataSet Import

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform)

train_loader = DataLoader(dataset=train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=128, shuffle=False)

# Implementação dos Modelos

## Inicialização do Modelo

In [75]:
# @title AE Initialization

epochs = 20
ae = AE().to(device)

criterion = nn.MSELoss()

optimizerAE = optim.Adam(ae.parameters(), lr = lr)

## AE Treino

In [73]:
# @title Treino AE
train(
      model = ae,
      criterion = criterion,
      optimizer = optimizerAE,
      train_loader = train_loader,
      epochs = epochs,
      modelName = "AE",
      scheduler=None,
      path = PATH
    )

Epoch [1/50], Loss: 1.3421
Epoch [2/50], Loss: 1.1832


KeyboardInterrupt: ignored

In [74]:
# @title AE Losses Calculation

loss,_ =evaluate_training(
    training_loader = train_loader,
    loss_fn = criterion,
    num_epochs = epochs,
    modelName = "AE",
    path = PATH
    )


TypeError: ignored

In [None]:
# @title AE Plot Losses
plot_loss_accuracy(losses = loss, Tittle = 'AE Loss')

## DAE Treino

In [76]:
# @title DAE Initialization

dae = DAE().to(device)

optimizerDAE = optim.Adam(dae.parameters(), lr = lr)

In [77]:
# @title Treino DAE

train(
    model = dae,
    criterion = criterion,
    optimizer = optimizerDAE,
    train_loader = train_loader,
    epochs = epochs,
    modelName = "DAE",
    scheduler=None,
    path = PATH
  )

KeyboardInterrupt: ignored

In [78]:
# @title DAE Losses Calculation

loss,_ =evaluate_training(
    training_loader = train_loader,
    loss_fn = criterion,
    num_epochs = epochs,
    modelName = "DAE",
    path = PATH
  )

TypeError: ignored

In [79]:
# @title DAE Plot Losses
plot_loss_accuracy(losses = loss, "DAE Loss")

SyntaxError: ignored

## VAE Treino

In [87]:
# @title Initialization VAE
vae = VAE().to(device)
criterion = nn.MSELoss(reduction = "sum")
epochs = 10
optimizerVAE = optim.Adam(vae.parameters(), lr = lr)

In [89]:
# @title VAE Treino
train(
    model = vae,
    criterion = criterion,
    optimizer = optimizerVAE,
    train_loader = train_loader,
    epochs = epochs,
    modelName = "VAE",
    scheduler=None,
    path = PATH
  )

AttributeError: ignored

In [None]:
# @title VAE Calculate Losses

loss,kld_loss =evaluate_training(
    training_loader = train_loader,
    loss_fn = criterion,
    num_epochs = epochs,
    modelName = "VAE",
    path = PATH
  )

In [None]:
# @title VAE Plot Losses

plot_loss_accuracy(losses= loss, Tittle= "VAE Loss")

In [None]:
# @title Reconstrução das Imagens
reconstruction_images_plot(vae,test_loader)

# Latent Space

In [None]:
# @title Latent Space Calculation  and Plot Fuction
def extract_latent_space_and_labels_with_tsne(model, training_loader, device):
    latent_space = []
    labels = []

    with torch.no_grad():
        for batch_idx, data in enumerate(training_loader):

            model.eval()
            _,mu_ = model.encoder(data.to(device))
            mu = mu.view(mu.size(0), -1)[:, :2]

            latent_space.append(mu)
            labels.append(data)

    latent_space = torch.cat(latent_space, dim=0).cpu().numpy()
    labels = torch.cat(labels, dim=0).cpu().numpy()

    # Apply t-SNE
    tsne = TSNE(n_components=2, random_state=42)
    latent_tsne = tsne.fit_transform(latent_space)

    # Plot the t-SNE visualization
    plt.scatter(latent_tsne[:, 0], latent_tsne[:, 1], c=labels, cmap='viridis')
    plt.colorbar()
    plt.title("t-SNE Visualization of Latent Space")
    plt.show()

    return latent_space, labels, latent_tsne

In [None]:
latent_spacedae, labelsdae, latent_tsnedae = extract_latent_space_and_labels_with_tsne(dae, test_dataset, device)