# Mini-project n° 2 – Conditional VAEs

In [None]:
%load_ext autoreload
%autoreload 2

import numpy as np
import torch
from torch.utils.data import DataLoader
import torchvision
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torchvision import datasets, transforms


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)


## Listes des hyperparamètres

Les définir + justifier les valeurs

In [None]:
batch_size = 128
latent_dim = 2
learning_rate = 1e-3
epochs = 30
beta = 1
kl_weights = [1, 10, 100]

## Chargement des données

In [None]:
# Data loading
transform = transforms.Compose([transforms.ToTensor()])
train_dataset = datasets.MNIST(root='../../data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='../../data', train=False, transform=transform, download=True)

train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)


## Création d'un modèle d'AutoEndodeur Variationnel Conditionnel (Conditionnal VAE)

Le modèle de VAE conditionnel utilisé par la suite se construit de la manière suivante :

- **Un Encodeur** : 
    - Une couche de convolution composée de 32 filtres, un noyau de taille 4, un stride de 2 and un padding de 1.
    - Une couche BatchNorm qui conserve le même nombre de caractérstiques.
    - Une fonction d'activation ReLu.
    - Une couche de convolution composée de 64 filtres, un noyau de taille 4, un stride de 2 and un padding de 1.
    - Une couche BatchNorm qui conserve le même nombre de caractérstiques.
    - Une fonction d'activation ReLu.
    - Une couche de convolution composée de 128 filtres, un noyau de taille 3, un stride de 2 and un padding de 1..
    - Une couche BatchNorm qui conserve le même nombre de caractérstiques.
    - Une fonction d'activation ReLu.

- **Un espace latent** :

- **Un Décodeur.** 
    - Une couche de déconvolution composée de 64 filtres, un noyau de taille 3, un stride de 2 and un padding de 1.
    - Une couche BatchNorm qui conserve le même nombre de caractérstiques
    - Une fonction d'activation ReLu
    - Une couche de déconvolution composée de 32 filtres, un noyau de taille 4, un stride de 2 and un padding de 1.
    - Une couche BatchNorm qui conserve le même nombre de caractérstiques
    - Une fonction d'activation ReLu
    - Une couche de déconvolution composée de 1 filtre, un noyau de taille 4, un stride de 2 and un padding de 1.
    - Une fonction d'activation Sigmoïde

Définir ce qu'est/ le rôle : noyau, stride, padding, fonction d'activation, espace latent

c sera à expliciter = condition

In [None]:
class ConvVAE(nn.Module):
    def __init__(self, latent_dim=10):
        super(ConvVAE, self).__init__()
        self.latent_dim = latent_dim
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1),  # Output: (32, 14, 14)
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),  # Output: (64, 7, 7)
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),  # Output: (128, 4, 4)
            nn.BatchNorm2d(128),
            nn.ReLU(),
        )

        # Fully connected layers for mean and log variance
        self.fc_mu = nn.Linear(128 * 4 * 4, latent_dim) # (128*4*4) est la taille de sortie de la couche précédente
        self.fc_logvar = nn.Linear(128 * 4 * 4, latent_dim) 
        self.fc_decode = nn.Linear(latent_dim, 128 * 4 * 4) 
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1),  # Output: (64, 8, 8)
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),  # Output: (32, 16, 16)
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1),  # Output: (1, 28, 28)
            nn.Sigmoid()
        )
    
    def encode(self, x,c ):
        x = self.encoder(x)
        x = torch.cat((x,c),dim=1)
        x = x.view(-1, 128 * 4 * 4) # Flatten the output of the convolutional layers
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar
    

    def latent_sample(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z, c):
        x = torch.cat((x,c),dim=1)
        x = self.fc_decode(z)
        x = x.view(-1, 128, 4, 4)  # Reshape to (128, 4, 4) for the decoder
        x = self.decoder(x)
        return x

    def forward(self, x, c):
        mu, logvar = self.encode(x, c)
        z = self.latent_sample(mu, logvar)
        return self.decode(z, c), mu, logvar

## Création de la fonction de perte (loss function)

In [None]:
def loss_function(recon_x, x, mu, logvar, beta=1):
    BCE = F.binary_cross_entropy(recon_x, x, reduction='sum') # terme de reconstruction 
    # on utilise binary entropy pour forcer les contrastes (sinon on pourrait travailler avec MSE)
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) # terme de regularisation (on veut se rapprocher loi normale)
    return BCE + beta * KLD # l'objectif est d'avoir un bon compromis entre reconstruction et regularisation

## Apprentissage du modèle de VAE conditionnel

In [None]:
# Initialize the VAE model and the Adam optimizer
vae = ConvVAE(latent_dim=latent_dim)
vae.to(device)
optimizer = optim.Adam(vae.parameters(), lr=learning_rate)

# Train the model for the given number of epochs
# At the end of each epoch, print the training loss
for epoch in range(1, epochs + 1):
    vae.train()
    running_loss = 0.0
    for batch_idx, (data, _) in enumerate(train_loader):
        data = data.to(device)
        optimizer.zero_grad()
        recon_batch, mu, logvar = vae(data)
        loss = loss_function(recon_batch, data, mu, logvar)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    
    epoch_loss = running_loss / len(train_loader.dataset)
    print(f'Epoch {epoch}, Training loss: {epoch_loss:.4f}')

## Génération des images

In [None]:
def generate_sample(num_samples=10, digit):
# digit est la valeur conditionelle
    vae.eval()
    with torch.no_grad():

        # Échantillonage selon une loi normale
        z = torch.randn(num_samples, latent_dim).to(device)  # Sample random latent vectors

        # On ajoute la condition ie la valeur que l'on veut générer
        condition = torch.zeros(100,10, dtype=int)
        condition[:,digit] = 1 # indique que c'est la valeur digit que l'on cherche à générer

        # On génère l'image
        samples = vae.decode(z, condition) # Decode the latent vectors
        samples = samples.cpu().view(num_samples, 1, 28, 28) # Reshape the samples

        fig, ax = plt.subplots(1, num_samples, figsize=(15, 2))
        for i in range(num_samples):
            ax[i].imshow(samples[i].squeeze(0), cmap='gray')
            ax[i].axis('off')
        plt.show()

generate_sample()

## Visualisation de l'espace latent

def loss_function(recon_x, x, mu, logvar, beta=1):
    BCE = F.binary_cross_entropy(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + beta * KLD, BCE, KLD

In [None]:
# Training and plotting function
def train_and_plot(kl_weight):
    model = ConvVAE(latent_dims).to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    model.train()
    train_losses = []
    
    for epoch in range(num_epochs):
        epoch_loss = 0
        bce_loss = 0
        kld_loss = 0
        for batch_idx, (data, _) in enumerate(train_loader):
            data = data.to(device)
            optimizer.zero_grad()
            x_recon, mu, logvar = model(data)
            loss, bce, kld = loss_function(x_recon, data, mu, logvar, kl_weight)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            bce_loss += bce.item()
            kld_loss += kld.item()
        
        average_loss = epoch_loss / len(train_loader.dataset)
        average_bce = bce_loss / len(train_loader.dataset)
        average_kld = kld_loss / len(train_loader.dataset)
        print(f'Epoch {epoch+1}: Average Loss: {average_loss:.4f}, BCE: {average_bce:.4f}, KLD: {average_kld:.4f}')
    
    # Plot latent space
    plot_latent_space(model, kl_weight)

# Function to plot latent space
def plot_latent_space(model, kl_weight):
    model.eval()
    with torch.no_grad():
        test_loader = DataLoader(dataset=test_dataset, batch_size=10000, shuffle=False)
        data, labels = next(iter(test_loader))
        data = data.to(device)
        mu, logvar = model.encode(data)
        z = mu  # For visualization, we use the mean
        z = z.cpu().numpy()
        labels = labels.numpy()
        
        plt.figure(figsize=(8,6))
        scatter = plt.scatter(z[:, 0], z[:, 1], c=labels, cmap='tab10', alpha=0.7)
        plt.colorbar(scatter, ticks=range(10))
        plt.clim(-0.5, 9.5)
        plt.title(f'Latent Space with KL Weight = {kl_weight}')
        plt.xlabel('Z1')
        plt.ylabel('Z2')
        plt.show()

# Run training and plotting for different KL weights
for kl_weight in kl_weights:
    print(f'\nTraining VAE with KL Weight = {kl_weight}')
    train_and_plot(kl_weight)