# Exercises Dimensionality reduction


In this exercise we will implement a variational auto-encoder (VAE). An auto-encoder encodes some input into a new and usually more compact representation which can be used to reconstruct the input data again. A VAE makes the assumption that the compact representation follows a probabilistic distribution (usually Gaussian) which makes it possible to sample new points and decode them into new data from a trained variational auto-encoder. The "variational" part comes from the fact that these models are training through variational inference.

The mathematical details of the training can be a bit challenging. However, we believe that probabilistic deep learning will be an important part of future machine learning, which is why we find it important to introduce the concepts.

As background material we recommend reading Tutorial on Variational Autoencoder. For the implementation of the model you can read the article "Auto-Encoding Variational Bayes", Kingma & Welling, ICLR 2014: http://arxiv.org/pdf/1312.6114v10.pdf and "Stochastic Backpropagation and Approximate Inference in Deep Generative Models", Rezende et al, ICML 2014: http://arxiv.org/pdf/1401.4082v3.pdf.

# VAE crash course

Like the simple auto-encoder, VAEs consist of two parts as seen in the figure below where all arrows are non-linear mappings through a neural network. The two parts are the:

 * **Encoder** : Maps the input data into a probabilistic latent space, z, by defining the mean and variance parameters of a Gaussian distribution as non-linear functions of the input data x like:
     - $q(z|x) = \mathcal{N}(z|\mu_\theta(x), \sigma_\phi(x))$, which is called the approximate posterior or latent distribution. The parameters $\mu_\theta(x)$ (mean) and $\log \sigma_\phi(x)^2$ (log-variance) are outputs from a hidden layer each.
 * **Decoder** (also known as generative part of the model): Conditioned on samples drawn from $z \sim q(z|x)$ in the encoder the input data is reconstructed through the:
     - $p(x|z)$, which is the conditional likelihood (generative distribution). This is the decoder that will reconstruct the data from the latent space z.

### Training a VAE
The VAE is similar to a deterministic autoencoder except that we assume that the latent units follows a distribution. Usually we just assume that the units are independent standard normally distributed (i.i.d.).

Above we defined a lower bound on the log-likelihood of the data. We can train the model by maximising the lower bound w.r.t. the model parameters, weight matrices, through the stochastic gradient descent algorithm.  Feasible approximations of the expectations in the lower bound, $\mathcal{L}(x)$, are obtained by evaluating the inside with samples drawn from the latent distribution, $z \sim q(z|x) = \mathcal{N}(z|\mu_\theta(x), \sigma_\phi(x)I)$ and dividing by the number of samples drawn. By using the _reparameterization trick_, $ \mu_\theta(x) + \sigma_\phi(x) \cdot \epsilon$, for the sampling procedure we can directly backpropogate gradients through the latent bottleneck and optimize the parameters w.r.t. the lower bound.

### Setting up the network

We define the network like an auto-encoder except that the bottleneck is the __sample_layer__ which samples the latent units.

In [1]:
# prerequisites
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable
from torchvision.utils import save_image

bs = 100 # Batch-size
# MNIST Dataset
train_dataset = datasets.MNIST(root='./mnist_data/', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.MNIST(root='./mnist_data/', train=False, transform=transforms.ToTensor(), download=False)

# Data Loader (Input Pipeline)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=bs, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=bs, shuffle=False)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to ./mnist_data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:02<00:00, 4073103.11it/s]


Extracting ./mnist_data/MNIST/raw/train-images-idx3-ubyte.gz to ./mnist_data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to ./mnist_data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 338585.39it/s]


Extracting ./mnist_data/MNIST/raw/train-labels-idx1-ubyte.gz to ./mnist_data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to ./mnist_data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 2242934.58it/s]


Extracting ./mnist_data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./mnist_data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to ./mnist_data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 13626987.67it/s]

Extracting ./mnist_data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./mnist_data/MNIST/raw






In [2]:
class VAE(nn.Module):
    def __init__(self, x_dim, h_dim1, h_dim2, z_dim):
        super(VAE, self).__init__()

        # encoder part
        self.fc1 = nn.Linear(x_dim, h_dim1)
        self.fc2 = nn.Linear(h_dim1, h_dim2)
        self.fc31 = nn.Linear(h_dim2, z_dim)
        self.fc32 = nn.Linear(h_dim2, z_dim)
        # decoder part
        self.fc4 = nn.Linear(z_dim, h_dim2)
        self.fc5 = nn.Linear(h_dim2, h_dim1)
        self.fc6 = nn.Linear(h_dim1, x_dim)

    def encoder(self, x):
        h = F.relu(self.fc1(x))
        h = F.relu(self.fc2(h))
        return self.fc31(h), self.fc32(h) # mu, log_var

    def sampling(self, mu, log_var):
        std = torch.exp(0.5*log_var)
        eps = torch.randn_like(std)
        return eps.mul(std).add_(mu) # return z sample

    def decoder(self, z):
        h = F.relu(self.fc4(z))
        h = F.relu(self.fc5(h))
        return F.sigmoid(self.fc6(h))

    def forward(self, x):
        mu, log_var = self.encoder(x.view(-1, 784))
        z = self.sampling(mu, log_var)
        return self.decoder(z), mu, log_var

# build model
vae = VAE(x_dim=784, h_dim1= 512, h_dim2=256, z_dim=2)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vae.to(device)


VAE(
  (fc1): Linear(in_features=784, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=256, bias=True)
  (fc31): Linear(in_features=256, out_features=2, bias=True)
  (fc32): Linear(in_features=256, out_features=2, bias=True)
  (fc4): Linear(in_features=2, out_features=256, bias=True)
  (fc5): Linear(in_features=256, out_features=512, bias=True)
  (fc6): Linear(in_features=512, out_features=784, bias=True)
)

In [3]:
import torch.nn.functional as F
import matplotlib.pyplot as plt

# We define the optimizer
optimizer = optim.Adam(vae.parameters())

# Assuming loss_function returns reconstruction loss and KL divergence as well
def loss_function(recon_x, x, mu, log_var):
    # Flatten the tensors if necessary
    recon_x = recon_x.view(-1, 784)
    x = x.view(-1, 784)
    BCE = F.binary_cross_entropy(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
    return BCE + KLD, BCE, KLD

# Plotting function oif the loss terms
def plot_losses(train_losses, recon_losses, kl_divergences):
    epochs = range(1, len(train_losses) + 1)

    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, label='Train Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Total Loss over Epochs')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs, recon_losses, label='Reconstruction Loss')
    plt.plot(epochs, kl_divergences, label='KL Divergence')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Reconstruction Loss and KL Divergence over Epochs')
    plt.legend()

    plt.tight_layout()
    plt.show()

# Training function
def train(epoch, device):
    vae.train()
    train_loss = 0
    recon_loss_total = 0
    kl_loss_total = 0

    for batch_idx, (data, _) in enumerate(train_loader):
        data = data.to(device)
        optimizer.zero_grad()

        recon_batch, mu, log_var = vae(data)
        loss, recon_loss, kl_loss = loss_function(recon_batch, data, mu, log_var)

        loss.backward()
        train_loss += loss.item()
        recon_loss_total += recon_loss.item()
        kl_loss_total += kl_loss.item()
        optimizer.step()

        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item() / len(data)))

    average_loss = train_loss / len(train_loader.dataset)
    average_recon_loss = recon_loss_total / len(train_loader.dataset)
    average_kl_loss = kl_loss_total / len(train_loader.dataset)

    print('====> Epoch: {} Average loss: {:.4f}'.format(epoch, average_loss))
    print('====> Epoch: {} Average reconstruction loss: {:.4f}'.format(epoch, average_recon_loss))
    print('====> Epoch: {} Average KL divergence: {:.4f}'.format(epoch, average_kl_loss))

    return average_loss, average_recon_loss, average_kl_loss


In [4]:
def test(epoch, device):
    vae.eval()  # Set the model to evaluation mode
    test_loss = 0
    recon_loss_total = 0
    kl_loss_total = 0

    with torch.no_grad():  # Disable gradient computation
        for data, _ in test_loader:
            data = data.to(device)

            recon_batch, mu, log_var = vae(data)
            loss, recon_loss, kl_loss = loss_function(recon_batch, data, mu, log_var)

            test_loss += loss.item()
            recon_loss_total += recon_loss.item()
            kl_loss_total += kl_loss.item()

    average_test_loss = test_loss / len(test_loader.dataset)
    average_recon_loss = recon_loss_total / len(test_loader.dataset)
    average_kl_loss = kl_loss_total / len(test_loader.dataset)

    print('====> Test Epoch: {} Average loss: {:.4f}'.format(epoch, average_test_loss))
    print('====> Test Epoch: {} Average reconstruction loss: {:.4f}'.format(epoch, average_recon_loss))
    print('====> Test Epoch: {} Average KL divergence: {:.4f}'.format(epoch, average_kl_loss))

    return average_test_loss, average_recon_loss, average_kl_loss


In [None]:
# Lists to store losses for each epoch
train_losses = []
recon_losses = []
kl_divergences = []

# Lists to store test losses for each epoch
test_losses = []
test_recon_losses = []
test_kl_divergences = []

# Training loop
num_epochs = 15 # Set the number of epochs
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


for epoch in range(1, num_epochs + 1):
    # Train
    avg_loss, avg_recon_loss, avg_kl_loss = train(epoch, device)
    train_losses.append(avg_loss)
    recon_losses.append(avg_recon_loss)
    kl_divergences.append(avg_kl_loss)
    #Test
    avg_test_loss, avg_test_recon_loss, avg_test_kl_loss = test(epoch, device)
    test_losses.append(avg_test_loss)
    test_recon_losses.append(avg_test_recon_loss)
    test_kl_divergences.append(avg_test_kl_loss)



====> Epoch: 1 Average loss: 179.5390
====> Epoch: 1 Average reconstruction loss: 175.5682
====> Epoch: 1 Average KL divergence: 3.9708
====> Test Epoch: 1 Average loss: 161.5768
====> Test Epoch: 1 Average reconstruction loss: 156.5050
====> Test Epoch: 1 Average KL divergence: 5.0717
====> Epoch: 2 Average loss: 157.4206
====> Epoch: 2 Average reconstruction loss: 151.9717
====> Epoch: 2 Average KL divergence: 5.4489
====> Test Epoch: 2 Average loss: 154.6701
====> Test Epoch: 2 Average reconstruction loss: 149.3267
====> Test Epoch: 2 Average KL divergence: 5.3434
====> Epoch: 3 Average loss: 152.9693
====> Epoch: 3 Average reconstruction loss: 147.2508
====> Epoch: 3 Average KL divergence: 5.7186
====> Test Epoch: 3 Average loss: 151.7023
====> Test Epoch: 3 Average reconstruction loss: 146.0322
====> Test Epoch: 3 Average KL divergence: 5.6701


In [None]:
# Plot the losses
plot_losses(train_losses, recon_losses, kl_divergences)
plot_losses(test_losses, test_recon_losses, test_kl_divergences)

In [None]:
!pip install umap-learn

In [None]:
from sklearn.manifold import TSNE
import umap
import numpy as np
"""
def plot_latent_space(vae, data_loader, device, num_samples=1000):
    vae.eval()

    latent_vars = []
    labels = []

    with torch.no_grad():
        for data, label in data_loader:
            data = data.to(device)
            recon_batch, mu, log_var = vae(data)
            latent_vars.append(mu.cpu().numpy())
            labels.append(label.numpy())

            # Break after collecting enough samples
            if len(np.concatenate(latent_vars)) >= num_samples:
                break

    # Concatenate all collected latent variables and labels
    latent_vars = np.concatenate(latent_vars)[:num_samples]
    labels = np.concatenate(labels)[:num_samples]

    # Perform t-SNE for dimensionality reduction
    tsne = TSNE(n_components=2, random_state=0)
    latent_2d = tsne.fit_transform(latent_vars)

    # Plotting
    plt.figure(figsize=(10, 8))
    scatter = plt.scatter(latent_2d[:, 0], latent_2d[:, 1], c=labels, cmap='viridis', s=5)
    plt.colorbar(scatter)
    plt.title('Latent Space Visualization')
    plt.xlabel('t-SNE 1')
    plt.ylabel('t-SNE Component 2')
    plt.show()
"""

# This is the umap function in case the installation does not work use t-SNE
def plot_latent_space(vae, data_loader, device, num_samples=1000):
    vae.eval()

    latent_vars = []
    labels = []

    with torch.no_grad():
        for data, label in data_loader:
            data = data.to(device)
            recon_batch, mu, log_var = vae(data)
            latent_vars.append(mu.cpu().numpy())
            labels.append(label.numpy())

            # Break after collecting enough samples
            if len(np.concatenate(latent_vars)) >= num_samples:
                break

    # Concatenate all collected latent variables and labels
    latent_vars = np.concatenate(latent_vars)[:num_samples]
    labels = np.concatenate(labels)[:num_samples]

    # Perform UMAP for dimensionality reduction
    umap_model = umap.UMAP(n_components=2, random_state=0)
    latent_2d = umap_model.fit_transform(latent_vars)

    # Plotting
    plt.figure(figsize=(10, 8))
    scatter = plt.scatter(latent_2d[:, 0], latent_2d[:, 1], c=labels, cmap='viridis', s=5)
    plt.colorbar(scatter)
    plt.title('Latent Space Visualization (UMAP)')
    plt.xlabel('UMAP Component 1')
    plt.ylabel('UMAP Component 2')
    plt.show()

# Plot the latent space for training
plot_latent_space(vae, train_loader, device)


In [None]:
# Plot latent space for test
plot_latent_space(vae, test_loader, device)

## Exercises
###0) Training : Find a good fit

Determine how many epochs are necessary to get a latent space that represents the data well. You can do this visually by checking that the latent space presents cluster of classes that are distinct. If you want to quantify yoyu can do this by running a K-means clustering algorithm and finding optimal cluster assignment based on the amount of epochs.

### 1) Experiment with architecture
Experiment with the number of layers and their units in order to improve the reconstructions and latent representation. What solution did you find the best and why? (HINT: you will need to change the class VAE(nn.Module))

### 2) Dimensionality of the latent space
Increase the number of units in the latent layer. Does it increase the models representational power and how can you see and explain this? How does this affect the quality of the reconstructions? HINT: You can visualize the latent space in 2D by transforming z to a lower dimensional representation with PCA or UMAP.

### 3) Visualizing the latent space
 Do you see any differences in the latent space when doing the visualization with UMAP or PCA ? Why do you think this is ?

### 4) EXTRA : Using the latent space for prediction
 Use the latent space as features to predict the the number of the observation compare it to using the whole data. If you did a k-means clustering in the first question you can use it here to quantify the quality of the latent space. If you prefer it you can use the latent space as features in a neural network. Below you will find a nn that works on the MNIST dataset, it's up to you to make it work on the latent space as an input.

### 1) Experiment with architecture
Experiment with the number of layers and their units in order to improve the reconstructions and latent representation. What solution did you find the best and why? (HINT: you will need to change the class VAE(nn.Module))

###2) Dimensionality of the latent space
Increase the number of units in the latent layer. Does it increase the models representational power and how can you see and explain this? How does this affect the quality of the reconstructions? HINT: You can visualize the latent space in 2D by transforming z to a lower dimensional representation with PCA or UMAP.

### 3) Visualizing the latent space
 Do you see any differences in the latent space when doing the visualization with UMAP or PCA ? Why do you think this is ?

### 4) EXTRA : Using the latent space for prediction
 Use the latent space as features to predict the the number of the observation compare it to using the whole data. If you did a k-means clustering in the first question you can use it here to quantify the quality of the latent space. If you prefer it you can use the latent space as features in a neural network. Below you will find a nn that works on the MNIST dataset, it's up to you to make it work on the latent space as an input.

In [None]:
# Question 2 (help)

# Example code for PCA.
# from sklearn.decomposition import PCA
# pca = PCA(n_components=2)
# pca.fit(X)
# pca.transform(X)

# You will need to incorporate the PCA piece in the plotting function of the latent space

In [None]:
# This is the code to predict the class from the MNIST dataset, try to build a NN that predicts this from the latent space instead of the whole feature space and compare the results

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

# 1. Load and Preprocess Data
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# 2. Define the Neural Network
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(28*28, 128)  # Fully connected layer (input: 784, output: 128)
        self.fc2 = nn.Linear(128, 64)     # Fully connected layer (input: 128, output: 64)
        self.fc3 = nn.Linear(64, 10)      # Fully connected layer (input: 64, output: 10)

    def forward(self, x):
        x = x.view(-1, 28*28)  # Flatten the input
        x = torch.relu(self.fc1(x))  # Apply ReLU activation function
        x = torch.relu(self.fc2(x))  # Apply ReLU activation function
        x = self.fc3(x)  # Output layer
        return x

# Create an instance of the network
model = SimpleNN()

# 3. Define the Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()  # Loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Optimizer

# 4. Train the Model
def train(num_epochs=5):
    for epoch in range(num_epochs):
        model.train()  # Set the model to training mode
        running_loss = 0.0

        for i, (images, labels) in enumerate(train_loader, 0):
            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Print statistics
            running_loss += loss.item()
            if (i+1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {running_loss/100:.4f}')
                running_loss = 0.0

# 5. Evaluate the Model
def evaluate():
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        correct = 0
        total = 0

        for images, labels in test_loader:
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print(f'Accuracy of the network on the 10000 test images: {100 * correct / total:.2f}%')

# Train and evaluate the model
train(num_epochs=5)
evaluate()

# Plot some test images with their predictions
def plot_predictions():
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        data_iter = iter(test_loader)
        images, labels = next(data_iter)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)

        # Plot the first 10 images, their predicted labels, and the true labels
        fig = plt.figure(figsize=(10, 5))
        for i in range(10):
            ax = fig.add_subplot(2, 5, i+1)
            ax.imshow(images[i].numpy().squeeze(), cmap='gray')
            ax.set_title(f'Pred: {predicted[i].item()}\nTrue: {labels[i].item()}')
            ax.axis('off')

        plt.show()

# Plot predictions
plot_predictions()