<a href="https://colab.research.google.com/github/njits030/Odor_from_SMILES/blob/master/assignment_4a.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 4a - Variational Auto-Encoders
## Deep Learning Course - Vrije Universiteit Amsterdam, 2022

### Notes by Marvin:

- We were asked to implement encoder and decoder as FCNN or CNN, I did CNN because we get more points. Mind that we use a less common layer called "deconv", which is like the opposite of the convolution. We need it for increasing dimensions in the decoding process. Also note that the input dimensions in the layers are not fitting to our data, because I haven't gotten that far.

 - In Question 3, they ask us to choose a conditional lieklihood distribution. I think this is the correct one since our data is continuous. But if you can confirm that this is true.

- In Question 5, they ask us to choose a prior distribution. Since these are a bit tricky, I am not sure which one to take here. Feel like I shouldn't decide by myself.

#### Instructions on how to use this notebook:

This notebook is hosted on Google Colab. To be able to work on it, you have to create your own copy. Go to *File* and select *Save a copy in Drive*.

You can also avoid using Colab entirely, and download the notebook to run it on your own machine. If you choose this, go to *File* and select *Download .ipynb*.

The advantage of using Colab is that you can use a GPU. You can complete this assignment with a CPU, but it will take a bit longer. Furthermore, we encourage you to train using the GPU not only for faster training, but also to get experience with this setting. This includes moving models and tensors to the GPU and back. This experience is very valuable because for various models and large datasets (like large CNNs for ImageNet, or Transformer models trained on Wikipedia), training on GPU is the only feasible way.

The default Colab runtime does not have a GPU. To change this, go to *Runtime - Change runtime type*, and select *GPU* as the hardware accelerator. The GPU that you get changes according to what resources are available at the time, and its memory can go from a 5GB, to around 18GB if you are lucky. If you are curious, you can run the following in a code cell to check:

```sh
!nvidia-smi
```

Note that despite the name, Google Colab does  not support collaborative work without issues. When two or more people edit the notebook concurrently, only one version will be saved. You can choose to do group programming with one person sharing the screen with the others, or make multiple copies of the notebook to work concurrently.

**Submission:** Upload your notebook in .ipynb format to Canvas. The code and answers to the questions in the notebook are sufficient, no separate report is expected.

In [None]:
!nvidia-smi

Mon Dec 23 14:42:23 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   54C    P8              12W /  70W |      0MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

## Introduction

In this assignment, we are going to implement a Variational Auto-Encoder (VAE). A VAE is a likelihood-based deep generative model that consists of a stochastic encoder (a variational posterior over latent variables), a stochastic decoder, and a marginal distribution over latent variables (a.k.a. a prior). The model was originally proposed in two concurrent papers:
- [Kingma, D. P., & Welling, M. (2013). Auto-encoding variational bayes. arXiv preprint arXiv:1312.6114.](https://arxiv.org/abs/1312.6114)
- [Rezende, Danilo Jimenez, Shakir Mohamed, and Daan Wierstra. "Stochastic backpropagation and approximate inference in deep generative models." International conference on machine learning. PMLR, 2014.](https://proceedings.mlr.press/v32/rezende14.html)

You can read more about VAEs in Chapter 4 of the following book:
- [Tomczak, J.M., "Deep Generative Modeling", Springer, 2022](https://link.springer.com/book/10.1007/978-3-030-93158-2)

In particular, the goals of this assignment are the following:

- Understand how VAEs are formulated
- Implement components of VAEs using PyTorch
- Train and evaluate a model for image data

### Theory behind VAEs

VAEs are latent variable models trained with variational inference. In general, the latent variable models define the following generative process:
\begin{align}
1.\ & \mathbf{z} \sim p_{\lambda}(\mathbf{z}) \\
2.\ & \mathbf{x} \sim p_{\theta}(\mathbf{x}|\mathbf{z})
\end{align}

In plain words, we assume that for observable data $\mathbf{x}$, there are some latent (hidden) factors $\mathbf{z}$. Then, the training objective is log-likelihood function of the following form:
$$
\log p_{\vartheta}(\mathbf{x})=\log \int p_\theta(\mathbf{x} \mid \mathbf{z}) p_\lambda(\mathbf{z}) \mathrm{d} \mathbf{z} .
$$

The problem here is the intractability of the integral if the dependencies between random variables $\mathbf{x}$ and $\mathbf{z}$ are non-linear and/or the distributions are non-Gaussian.

By introducing variational posteriors $q_{\phi}(\mathbf{z}|\mathbf{x})$, we get the following lower bound (the Evidence Lower Bound, ELBO):
$$
\log p_{\vartheta}(\mathbf{x}) \geq \mathbb{E}_{\mathbf{z} \sim q_\phi(\mathbf{z} \mid \mathbf{x})}\left[\log p_\theta(\mathbf{x} \mid \mathbf{z})\right]-\mathrm{KL}\left(q_\phi(\mathbf{z} \mid \mathbf{x}) \| p_\lambda(\mathbf{z})\right) .
$$

Note that we want to *maximize* this objective, therefore, in the code you are going to have to implement NELBO (negative ELBO) as a loss function (i.e., a minimization task).

## IMPORTS

In [None]:
# DO NOT REMOVE!
import os

import numpy as np
import matplotlib.pyplot as plt

import torch

from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F

import torchvision
from torchvision.datasets import MNIST

In [None]:
# Check if GPU is available and determine the device
if torch.cuda.is_available():
  device = 'cuda'
else:
  device = 'cpu'

print(f'The available device is {device}')

The available device is cuda


In [None]:
# mount drive: WE NEED IT FOR SAVING IMAGES!
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
# PLEASE CHANGE IT TO YOUR OWN GOOGLE DRIVE!
images_dir = '/content/gdrive/My Drive/DLresults/'

## Auxiliary functions

Let us define some useful log-distributions:

In [None]:
# DO NOT REMOVE
PI = torch.from_numpy(np.asarray(np.pi))
EPS = 1.e-5


def log_categorical(x, p, num_classes=256, reduction=None, dim=None):
    x_one_hot = F.one_hot(x.long(), num_classes=num_classes)
    log_p = x_one_hot * torch.log(torch.clamp(p, EPS, 1. - EPS))
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p


def log_bernoulli(x, p, reduction=None, dim=None):
    pp = torch.clamp(p, EPS, 1. - EPS)
    log_p = x * torch.log(pp) + (1. - x) * torch.log(1. - pp)
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p


def log_normal_diag(x, mu, log_var, reduction=None, dim=None):
    D = x.shape[1]
    log_p = -0.5 * D * torch.log(2. * PI) - 0.5 * log_var - 0.5 * torch.exp(-log_var) * (x - mu)**2.
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p


def log_standard_normal(x, reduction=None, dim=None):
    D = x.shape[1]
    log_p = -0.5 * D * torch.log(2. * PI) - 0.5 * x**2.
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p

## Implementing VAEs

The goal of this assignment is to implement four classes:
- `Encoder`: this class implements the encoder (variational posterior), $q_{\phi}(\mathbf{z}|\mathbf{x})$.
- `Decoder`: this class implements the decoded (the conditional likelihood), $p_{\theta}(\mathbf{x}|\mathbf{z})$.
- `Prior`: this class implements the marginal over latents (the prior), $p_{\lambda}(\mathbf{z})$.
- `VAE`: this class combines all components.

#### Question 0: (3 pt)
**Fully-connected Neural Networks (MLPs) or Convolutional Neural Networks**

This is not a real question but rather a comment. You are asked to implement your VAE using fully connected neural networks (MLPs) or convolutional neural networks (ConvNets).

There is a difference in grading of this assignment based on your decision:
- **If you decide to implement your VAE with MLPs and the model works properly, you get 1 pt.**
- **If you decide to implement your VAE with ConvNets and the model works properly, you get 3 pts.**

### Encoder
We start with `Encoder`. Please remember that we assume the Gaussian variational posterior with a diagonal covariance matrix.

Feel free to add other methods to the class as well as arguments to the class initialization.

In [None]:
class CNNEncoder(nn.Module):
    def __init__(self, input_channels, hidden_channels, z_dim):
        super(CNNEncoder, self).__init__()
        # Convolutional layers
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, hidden_channels, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_channels, hidden_channels * 2, kernel_size=3, stride=2, padding=1),
            nn.ReLU()
        )
        # Fully connected layers
        self.flatten_size = hidden_channels * 2 * 7 * 7
        self.fc_mu = nn.Linear(self.flatten_size, z_dim)
        self.fc_log_var = nn.Linear(self.flatten_size, z_dim)

    def reparameterization(self, mu, log_var):
        # Sample z using the reparameterization trick
        std = torch.exp(0.5 * log_var)
        epsilon = torch.randn_like(std)
        return mu + epsilon * std

    def forward(self, x):
        """
        Forward pass of the encoder.
        x: torch.tensor, with shape (batch_size, input_channels, height, width)
        Returns: mu, log_var
        """
        x = self.encoder(x).view(x.size(0), -1)  # Flatten
        mu = self.fc_mu(x)
        log_var = self.fc_log_var(x)
        return mu, log_var

    def sample(self, x=None, mu=None, log_var=None):
        """
        Sample from the encoder. If `x` is provided, compute `mu` and `log_var`.
        """
        if x is not None:
            mu, log_var = self.forward(x)
        return self.reparameterization(mu, log_var)


Please answer the following questions:

*italicized text*
#### Question 1 (0.5 pt)

Please explain the reparameterization trick and provide a mathematical formula.

ANSWER: The encoder of a variational autoencoder maps input X to a latent distribution. Its outputs are the parameters to a gaussian distribution: The mean ($μ$) and variance ($\sigma$). During training a latent vector $z$ is sampled from this distribution. $$ z \sim N(μ, σ^2) $$

However, the sampling introduces randomness and the randomness does not allow us to compute the derivatives to update the network weights. So instead of sampling from this unknown distribution $N(μ, σ^2)$ we sample from a distriution that we do know: $ ϵ \sim N(0, 1)$. We can than reparameterize the sample that we drew from the standard normal distribution using the μ and σ from the unknown distribution with the following formula: $z = μ + ϵ * σ$. By doing this we have isolated the randomness in epsilon which is independent of the model and μ and σ are now deterministic variables.

After sampling a latent representation $z$ from $q(z|x)$ with specific mean vector $\mu$ and covariance matrix $\Sigma$, we can test how likely said $z$ is under $q(z|x)$ by passing it through the probability density function of the multivariate Gaussian $q(z|x)$. The probability density function is:

$$
p(z|x) = \frac{1}{\sqrt{(2\pi)^d |\Sigma|}} \exp\left(-\frac{1}{2} (z - \mu)^T \Sigma^{-1} (z - \mu)\right),
$$

where $d$ is the dimensionality of $z$, $|\Sigma|$ is the determinant of the covariance matrix $\Sigma$, and $\Sigma^{-1}$ is the inverse of the covariance matrix.

Taking the logarithm, we arrive at:

$$
\log p(z|x) = -\frac{d}{2} \log(2\pi) - \frac{1}{2} \log |\Sigma| - \frac{1}{2} (z - \mu)^T \Sigma^{-1} (z - \mu).
$$



### Decoder

The decoder is the conditional likelihood, i.e., $p(\mathbf{x}|\mathbf{z})$. Please remember that we must decide on the form of the distribution (e.g., Bernoulli, Gaussian, Categorical). Please discuss it with a TA or a lecturer if you are in doubt.

In [None]:
class CNNDecoder(nn.Module):
    def __init__(self, z_dim, hidden_channels, output_channels, output_size):
        super(CNNDecoder, self).__init__()
        self.hidden_channels = hidden_channels
        self.fc = nn.Linear(z_dim, hidden_channels * 7 * 7)

        # Deconvolutional layers
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(hidden_channels, hidden_channels // 2, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(hidden_channels // 2, output_channels, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid()  # Output in range [0, 1] for Bernoulli
        )

    def sample(self, z):
        """
        For a given latent code, compute Bernoulli probabilities and sample x ~ p(x|z).
        """
        x = self.fc(z).view(-1, self.hidden_channels, 7, 7)
        x = self.decoder(x)
        return torch.bernoulli(x)  # Sample binary data from Bernoulli distribution

    def forward(self, z, x):
        """
        Compute the log probability: log p(x|z).
        """
        x_reconstructed = self.fc(z).view(-1, self.hidden_channels, 7, 7)
        x_reconstructed = self.decoder(x_reconstructed)

        # Compute log-probability using binary cross-entropy
        log_prob = -F.binary_cross_entropy(x_reconstructed, x, reduction="none")
        log_prob = log_prob.sum(dim=[1, 2, 3])  # Sum over all dimensions except batch

        return log_prob


Please answer the following questions:

#### Question 3 (0.5 pt)

Please explain your choice of the distribution for image data used in this assignment. Additionally, please write it down mathematically (if you think that presenting it as the log-probability, then please do it).

ANSWER:

Since the MNIST dataset is grayscale, our data is continuous due to the fact that values can range from 0 to 1. Because of this, we reasoned that a Gaussian distribution might be a fitting distribution, as compared to other discrete distributions such as Bernoulli. The equation for the log probability given our chosen conditional likelihood estimator is based on the PDF of the multivariate Gaussian distribution. However, we interchange $x$ and $z$ as compared to our variational posterior.

$$
p(x|z) = \frac{1}{(2\pi)^{d/2} |\Sigma(z)|^{1/2}} \exp\left(-\frac{1}{2} (x - \mu(z))^T \Sigma(z)^{-1} (x - \mu(z))\right).
$$

In order to get the log-probability, we take the log on both sides, arriving at:

$$
\log p(x|z) = -\frac{d}{2} \log(2\pi) - \frac{1}{2} \log |\Sigma(z)| - \frac{1}{2} (x - \mu(z))^T \Sigma(z)^{-1} (x - \mu(z)).
$$

Note that, in our code, the formula for the log-probability looks slightly different. The difference arises because the code assumes a diagonal covariance matrix, simplifying the multivariate Gaussian's quadratic term and determinant. Specifically, the quadratic term $(x - \mu)^T \Sigma^{-1} (x - \mu)$ reduces to an element-wise operation $\frac{(x - \mu)^2}{\sigma^2}$, and $\log |\Sigma|$ simplifies to $\sum \log(\sigma^2)$, making computations more efficient.


#### Question 4 (0.5 pt)

Please explain how one can sample from the distribution chosen by you. Please be specific and formal (i.e., provide mathematical formulae). If applicable, please provide a code snippet.

ANSWER:

Just like when sampling $z$ from the variational posterior, we again utilize the reparametrization trick, meaning we first sample an instance $x$ from our conditional likelihood distribution, the parameters of which are given by a forward pass through the decoder model. We then express $x$ as conditioned on $z$ by making it dependent on the mean and variance of the conditional likelihood:

$$
x = \mu (z) + \sigma (z) \cdot ϵ'
$$

where $$\epsilon' \sim N(0, 1).$$



### Prior

The prior is the marginal distribution over latent variables, i.e., $p(\mathbf{z})$. It plays a crucial role in the generative process and also in synthesizing images of a better quality.

In this assignment, you are asked to implement a prior that is learnable (e.g., parameterized by neural networks). If you decide to implement the standard Gaussian prior only, then please be aware that you will not get any points.


For the learnable prior you can choose one of the following options:


*   Mixture of Gaussians
*   Normalizing Flow


In [None]:
# YOUR CODE GOES HERE
# NOTES:
# (i) Implementing the standard Gaussian prior does not give you any points!
# (ii) The function "sample" must be implemented.
# (iii) The function "forward" must return the log-probability, i.e., log p(z)

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
from torch.distributions.transforms import Transform, ComposeTransform

class PlanarFlow(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
        self.u = nn.Parameter(torch.randn(1, dim))
        self.w = nn.Parameter(torch.randn(1, dim))
        self.b = nn.Parameter(torch.randn(1))
        self.scale = nn.Parameter(torch.ones(1))

    def forward(self, z):
        """Forward transform with improved stability."""
        linear = F.linear(z, self.w, self.b)
        return z + self.scale * (self.u * torch.tanh(linear))

    def log_abs_det_jacobian(self, z):
        """Directly compute log det Jacobian without inverse."""
        linear = F.linear(z, self.w, self.b)
        psi = (1.0 - torch.tanh(linear).pow(2)) * self.w
        det = 1.0 + self.scale * torch.sum(self.u * psi, dim=-1)
        return torch.log(torch.abs(det) + 1e-6)

class FlowPrior(nn.Module):
    def __init__(self, z_dim=2, num_flows=5, device='cuda'):
        super().__init__()
        self.z_dim = z_dim
        self.device = device
        self.flows = nn.ModuleList([PlanarFlow(z_dim) for _ in range(num_flows)])
        self.base_dist = Normal(torch.zeros(z_dim).to(device), torch.ones(z_dim).to(device))

    def sample(self, batch_size):
        """Draw z0 from a standard Normal, then transform it: zK = f(...f(z0))."""
        z = self.base_dist.sample((batch_size,))
        for flow in self.flows:
            z = flow(z)
        return z

    def forward(self, z):
        """More efficient forward pass using direct computation."""
        log_det_sum = 0
        for flow in reversed(self.flows):
            log_det_sum += flow.log_abs_det_jacobian(z)
            z = flow(z)
        return self.base_dist.log_prob(z).sum(-1) + log_det_sum

#### Question 5 (2 pts max)

**Option 1 (0 pt):  Standard Gaussian**

**NOTE: *If you decide to use the standard Gaussian prior, please indicate it in your answer. However, you will get 0 pt for this question.***

**Option 2 (0.5 pt): Mixture of Gaussains**

Please do the following:
- (0.25 pt) Please explain your prior and write it down mathematically
- (0.15 pt) Please write down its sampling procedure (if necessary, please add a code snippet).
- (0.1 pt) Please write down its log-probability (a mathematical formula).

**Option 3 (2 pts): Normalizing Flow**

Please do the following:
- (1 pt) Please explain your prior and write it down mathematically
- (0.5 pt) Please write down its sampling procedure (if necessary, please add a code snippet).
- (0.5 pt) Please write down its log-probability (a mathematical formula).

Our prior is a normalizing flow that sarts from a base distribution of $z_0 \sim N(0, I)$ in $\mathbb{R}^d$ and sequentially applies a chain of invertible transformations $f_1, f_2, \ldots, f_K$, so that the final variable $z=f_K \circ f_{K-1} \circ \cdots \circ f_1\left(z_0\right)$ can model a more flexible distribution.

To sample from this prior distribution, we first draw $z_0$ from the base normal distribution, then apply each transformation in order $z=f_K\left(f_{K-1}\left(\ldots f_1\left(z_0\right) \ldots\right)\right)$, to eventually return $z$. The PyTorch code snippet would look like,

```python
    def sample(self, batch_size):
        """
        Draw z0 from a standard Normal, then transform it: zK = f(...f(z0)).
        """
        z0 = self.base_dist.sample((batch_size,))  # shape [batch_size, z_dim]
        z = z0
        for transform in self.flow_transforms:
            z = transform(z)  # apply forward transform
        return z
```

The log-density of any point $z$ under the flow is,

$$
\log p(z)=\log p\left(z_0\right) \sum_{k=1}^K \log \left|\operatorname{det} \frac{\partial f_k}{\partial z_{k-1}}\right|, \\
$$

where $z_0=f_1^{-1}(f_2^{-1}(\ldots f_K^{-1}(z) \ldots))$.

### Complete VAE

The last class is `VAE` tha combines all components. Please remember that this class must implement the **Negative ELBO** in `forward`, as well as `sample` (*hint*: it is a composition of `sample` functions from the prior and the decoder).

In [None]:
# YOUR CODE GOES HERE
# This class combines Encoder, Decoder and Prior.
# NOTES:
# (i) The function "sample" must be implemented.
# (ii) The function "forward" must return the negative ELBO. Please remember to add an argument "reduction" that is either "mean" or "sum".

import torch
import torch.nn as nn
import torch.nn.functional as F
from math import pi

class VAE(nn.Module):
    def __init__(self, encoder, decoder, prior, device='cuda'):
        """
        encoder: CNNEncoder instance
            Must implement forward(x) -> (mu, logvar)
        decoder: CNNDecoder instance
            Must implement forward(z, x) -> log p(x|z) and sample(z) -> x_sample
        prior: FlowPrior (or another prior) instance
            Must implement sample(batch_size) -> z and forward(z) -> log p(z)
        """
        super(VAE, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.prior = prior
        self.device = device

    def sample(self, batch_size=64):
        """
        1) Sample z ~ prior
        2) Decode x ~ p(x|z)
        """
        with torch.no_grad():
            # Sample latent codes from the flow prior
            z = self.prior.sample(batch_size)  # shape [batch_size, z_dim]
            # Use the decoder's 'sample' method to sample x
            x_sample = self.decoder.sample(z)  # shape [batch_size, output_channels, H, W]
        return x_sample

    def forward(self, x, reduction='mean'):
        """
        Compute the Negative ELBO = -E_{q(z|x)}[log p(x|z)] + KL(q(z|x) || p(z)).

        Input:
            x: input images, shape [batch_size, input_channels, H, W]
            reduction: 'mean' or 'sum'
        Returns:
            nelbo: negative ELBO, either averaged or summed over batch.
        """
        batch_size = x.size(0)

        # Encode x -> (mu, logvar)
        mu, logvar = self.encoder(x)  # each shape [batch_size, z_dim]

        # Reparameterize to get z ~ q(z|x)
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mu + std * eps  # shape [batch_size, z_dim]

        # Reconstruction term: - E_{q(z|x)} [ log p(x|z) ]
        #    We can use the decoder's forward pass, which returns log p(x|z).
        log_p_x_given_z = self.decoder(z, x)  # shape [batch_size]
        recon_loss = -log_p_x_given_z  # negative log-likelihood

        # KL term: KL[q(z|x) || p(z)]
        #    = E_{q(z|x)} [ log q(z|x) - log p(z) ]
        #
        #    log q(z|x) for a diagonal Gaussian:
        #    = sum_{i=1}^d -0.5 [ log(2π) + log σ_i^2 + (z_i - μ_i)^2 / σ_i^2 ]
        #
        #    log p(z) is from the FlowPrior, which computes log p(z) = log p(z0) + ...
        # Compute log q(z|x)
        z_dim = mu.shape[1]
        log_q_z_given_x = -0.5 * (
            z_dim * torch.log(torch.tensor(2.0 * pi)) +
            torch.sum(logvar, dim=1) +
            torch.sum((z - mu)**2 / torch.exp(logvar), dim=1)
        )

        # Compute log p(z)
        log_p_z = self.prior(z)  # shape [batch_size]

        # KL = E_{q(z|x)} [ log q(z|x) - log p(z) ]
        kl = log_q_z_given_x - log_p_z

        # Negative ELBO per sample
        #    NELBO = recon_loss + kl
        nelbo = recon_loss + kl  # shape [batch_size]

        # Reduction
        if reduction == 'sum':
            return nelbo.sum()
        else:
            return nelbo.mean()

#### Question 6 (0.5 pt)

Please write down mathematically the **Negative ELBO** and provide a code snippet.

$$
\text{NELBO}
= - \mathbb{E}_{q(z \mid x)} \bigl[ \log p(x \mid z) \bigr]
	⋅	\mathrm{KL}\bigl(q(z \mid x) \,\|\, p(z)\bigr)
$$

### Evaluation and training functions

**Please do not remove or modify them.**

In [None]:
# DO NOT REMOVE

def evaluation(test_loader, name=None, model_best=None, epoch=None):
    # EVALUATION
    if model_best is None:
        # load best performing model
        model_best = torch.load(name + '.model')

    model_best.eval()
    loss = 0.
    N = 0.
    for indx_batch, (test_batch, _) in enumerate(test_loader):
        test_batch = test_batch.to(device)
        loss_t = model_best.forward(test_batch, reduction='sum')
        loss = loss + loss_t.item()
        N = N + test_batch.shape[0]
    loss = loss / N

    if epoch is None:
        print(f'FINAL LOSS: nll={loss}')
    else:
        print(f'Epoch: {epoch}, val nll={loss}')

    return loss


def samples_real(name, test_loader, shape=(28,28)):
    # real images-------
    num_x = 4
    num_y = 4
    x, _ = next(iter(test_loader))
    x = x.to('cpu').detach().numpy()

    fig, ax = plt.subplots(num_x, num_y)
    for i, ax in enumerate(ax.flatten()):
        plottable_image = np.reshape(x[i], shape)
        ax.imshow(plottable_image, cmap='gray')
        ax.axis('off')

    plt.savefig(name+'_real_images.pdf', bbox_inches='tight')
    plt.close()


def samples_generated(name, data_loader, shape=(28,28), extra_name=''):
    x, _ = next(iter(data_loader))
    x = x.to('cpu').detach().numpy()

    # generations-------
    model_best = torch.load(name + '.model')
    model_best.eval()

    num_x = 4
    num_y = 4
    x = model_best.sample(num_x * num_y)
    x = x.to('cpu').detach().numpy()

    fig, ax = plt.subplots(num_x, num_y)
    for i, ax in enumerate(ax.flatten()):
        plottable_image = np.reshape(x[i], shape)
        ax.imshow(plottable_image, cmap='gray')
        ax.axis('off')

    plt.savefig(name + '_generated_images' + extra_name + '.pdf', bbox_inches='tight')
    plt.close()


def plot_curve(name, nll_val):
    plt.plot(np.arange(len(nll_val)), nll_val, linewidth='3')
    plt.xlabel('epochs')
    plt.ylabel('nll')
    plt.savefig(name + '_nll_val_curve.pdf', bbox_inches='tight')
    plt.close()

In [None]:
# DO NOT REMOVE

def training(name, max_patience, num_epochs, model, optimizer, training_loader, val_loader, shape=(28,28)):
    nll_val = []
    best_nll = 1000.
    patience = 0

    # Main loop
    for e in range(num_epochs):
        print(f"e - {e}")
        # TRAINING
        model.train()
        for indx_batch, (batch, _) in enumerate(training_loader):
            batch = batch.to(device)
            loss = model.forward(batch, reduction='mean')
            if indx_batch % 100 == 0:
              print(f"e - {e}, batch - {indx_batch}, loss - {loss.item()}")

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Validation
        loss_val = evaluation(val_loader, model_best=model, epoch=e)
        print(f"e - {e}, val loss - {loss_val}")
        nll_val.append(loss_val)  # save for plotting

        if e == 0:
            print('saved!')
            torch.save(model, name + '.model')
            best_nll = loss_val
        else:
            if loss_val < best_nll:
                print('saved!')
                torch.save(model, name + '.model')
                best_nll = loss_val
                patience = 0

                samples_generated(name, val_loader, shape=shape, extra_name="_epoch_" + str(e))
            else:
                patience = patience + 1

        if patience > max_patience:
            break

    nll_val = np.asarray(nll_val)

    return nll_val

### Setup

**NOTE: *Please comment your code! Especially if you introduce any new variables (e.g., hyperparameters).***

In the following cells, we define `transforms` for the dataset. Next, we initialize the data, a directory for results and some fixed hyperparameters.

In [None]:
# PLEASE DEFINE APPROPRIATE TRANFORMS FOR THE DATASET
from torchvision import transforms

# NEED TO CHANGE TO PROPER TRANSFORMS

transforms_train = transforms.Compose([
    transforms.ToTensor(),  # Convert PIL Image to tensor
])

transforms_test = transforms.Compose([
    transforms.ToTensor(),  # Convert PIL Image to tensor
])



```
# This is formatted as code
```

Please do not modify the code in the next cell.

In [None]:
# DO NOT REMOVE
#-dataset
dataset = MNIST('/files/', train=True, download=True,
                      transform=transforms_train
                )

train_dataset, val_dataset = torch.utils.data.random_split(dataset, [50000, 10000], generator=torch.Generator().manual_seed(14))

test_dataset = MNIST('/files/', train=False, download=True,
                      transform=transforms_test
                     )
#-dataloaders
batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

#-creating a dir for saving results
name = 'vae'
result_dir = os.path.join(images_dir, 'results', name)
if not(os.path.exists(result_dir)):
    os.mkdir(result_dir)

#-hyperparams (please do not modify them for the final report)
num_epochs = 1000 # max. number of epochs
max_patience = 20 # an early stopping is used, if training doesn't improve for longer than 20 epochs, it is stopped

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to /files/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9.91M/9.91M [00:00<00:00, 18.1MB/s]


Extracting /files/MNIST/raw/train-images-idx3-ubyte.gz to /files/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to /files/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28.9k/28.9k [00:00<00:00, 506kB/s]


Extracting /files/MNIST/raw/train-labels-idx1-ubyte.gz to /files/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to /files/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1.65M/1.65M [00:00<00:00, 4.58MB/s]


Extracting /files/MNIST/raw/t10k-images-idx3-ubyte.gz to /files/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to /files/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4.54k/4.54k [00:00<00:00, 5.16MB/s]


Extracting /files/MNIST/raw/t10k-labels-idx1-ubyte.gz to /files/MNIST/raw



In the next cell, please initialize the model. Please remember about commenting your code!

In [None]:
# Instantiate the components
encoder = CNNEncoder(input_channels=1, hidden_channels=32, z_dim=4).to(device) # Move encoder to device
decoder = CNNDecoder(z_dim=4, hidden_channels=32, output_channels=1, output_size=28).to(device) # Move decoder to device
flow_prior = FlowPrior(z_dim=4, num_flows=5, device=device).to(device) # Move prior to device

# Combine into a VAE model
model = VAE(encoder=encoder, decoder=decoder, prior=flow_prior, device=device).to(device) # Move VAE to device
model

VAE(
  (encoder): CNNEncoder(
    (encoder): Sequential(
      (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (1): ReLU()
      (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (3): ReLU()
    )
    (fc_mu): Linear(in_features=3136, out_features=4, bias=True)
    (fc_log_var): Linear(in_features=3136, out_features=4, bias=True)
  )
  (decoder): CNNDecoder(
    (fc): Linear(in_features=4, out_features=1568, bias=True)
    (decoder): Sequential(
      (0): ConvTranspose2d(32, 16, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (1): ReLU()
      (2): ConvTranspose2d(16, 1, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
      (3): Sigmoid()
    )
  )
  (prior): FlowPrior(
    (flows): ModuleList(
      (0-4): 5 x PlanarFlow()
    )
  )
)

Please initialize the optimizer

In [None]:
# PLEASE DEFINE YOUR OPTIMIZER
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
optimizer

Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.001
    maximize: False
    weight_decay: 0
)

#### Question 7 (0.5 pt)

Please explain the choice of the optimizer, and comment on the choice of the hyperparameters (e.g., the learing reate value).

ANSWER: We chose Adam as our optimizer because it dynamically adapts learning rates for each parameter making the training more stable and efficient. VAEs optimize both the reconstruction loss and KL divergence which can have differing magnitudes and gradients. Adam balances these components by smoothing noisy gradients using its momentum terms and scaling updates adaptively. This is important due to the stochastic nature of VAEs introduced by the reparameterization trick. Furthermore, Adam performs well in non-stationary loss landscapes and requires minimal hyperparameter tuning, making it a good choice for VAEs.



### Training and final evaluation

In the following two cells, we run the training and the final evaluation.

In [None]:
# DO NOT REMOVE OR MODIFY
# Training procedure
nll_val = training(name=result_dir + name, max_patience=max_patience,
                   num_epochs=num_epochs, model=model, optimizer=optimizer,
                   training_loader=train_loader, val_loader=val_loader,
                   shape=(28,28))

e - 0
e - 0, batch - 0, loss - 563.719482421875
e - 0, batch - 100, loss - 217.77554321289062
e - 0, batch - 200, loss - 193.1779022216797
e - 0, batch - 300, loss - 182.01242065429688
e - 0, batch - 400, loss - 155.80377197265625
e - 0, batch - 500, loss - 177.58950805664062
e - 0, batch - 600, loss - 166.4644317626953
e - 0, batch - 700, loss - 161.02545166015625
e - 0, batch - 800, loss - 172.87841796875
e - 0, batch - 900, loss - 153.3182830810547
e - 0, batch - 1000, loss - 159.89297485351562
e - 0, batch - 1100, loss - 160.169189453125
e - 0, batch - 1200, loss - 167.84274291992188
e - 0, batch - 1300, loss - 137.34913635253906
e - 0, batch - 1400, loss - 138.3892364501953
e - 0, batch - 1500, loss - 144.32330322265625
Epoch: 0, val nll=149.1629515625
e - 0, val loss - 149.1629515625
saved!
e - 1
e - 1, batch - 0, loss - 153.97715759277344
e - 1, batch - 100, loss - 161.44317626953125
e - 1, batch - 200, loss - 153.38687133789062
e - 1, batch - 300, loss - 144.59136962890625
e - 

  model_best = torch.load(name + '.model')


e - 2
e - 2, batch - 0, loss - 147.32858276367188
e - 2, batch - 100, loss - 143.14723205566406
e - 2, batch - 200, loss - 134.77880859375
e - 2, batch - 300, loss - 139.90391540527344
e - 2, batch - 400, loss - 146.57115173339844
e - 2, batch - 500, loss - 135.63015747070312
e - 2, batch - 600, loss - 140.24977111816406
e - 2, batch - 700, loss - 145.89456176757812
e - 2, batch - 800, loss - 142.03213500976562
e - 2, batch - 900, loss - 148.29544067382812
e - 2, batch - 1000, loss - 137.85372924804688
e - 2, batch - 1100, loss - 143.29632568359375
e - 2, batch - 1200, loss - 124.9944839477539
e - 2, batch - 1300, loss - 155.89694213867188
e - 2, batch - 1400, loss - 154.27859497070312
e - 2, batch - 1500, loss - 127.33628845214844
Epoch: 2, val nll=140.27544786376953
e - 2, val loss - 140.27544786376953
saved!
e - 3
e - 3, batch - 0, loss - 145.2783966064453
e - 3, batch - 100, loss - 130.11331176757812
e - 3, batch - 200, loss - 125.09447479248047
e - 3, batch - 300, loss - 130.85833

In [None]:
# DO NOT REMOVE OR MODIFY
# Final evaluation
test_loss = evaluation(name=result_dir + name, test_loader=test_loader)
f = open(result_dir + name + '_test_loss.txt', "w")
f.write(str(test_loss))
f.close()

samples_real(result_dir + name, test_loader)
samples_generated(result_dir + name, test_loader, extra_name='_FINAL')

plot_curve(result_dir + name, nll_val)

### Results and discussion

After a successful training of your model, we would like to ask you to present your data and analyze it. Please answer the following questions.


#### Question 8 (1 pt)

Please select the real data, and the final generated data and include them in this report. Please comment on the following:
- (0.5 pt) Do you think the model was trained properly by looking at the generations? Please motivate your answer well.
- (0.5 pt) What are potential problems with evaluating a generative model by looking at generated data? How can we evalute generative models (ELBO or NLL do not count as an answer)?

ANSWER: [Please fill in]

#### Question 9 (1.25 pt)

Please include the plot of the negative ELBO. Please comment on the following:
- (0.25 pt) Is the training of your VAE stable or unstable? Why?
- (1 pt) What is the influence of the optimizer on your model? Do the hyperparameter values of the optimizer important and how do they influence the training? Motivate well your answer (e.g., run the script with more than one learning rate and present two plots here).

ANSWER: [Please fill in]

# Grading (10pt)

- Question 0: 3pt
- Question 1: 0.5pt
- Question 2: 0.25pt
- Question 3: 0.5pt
- Question 4: 0.5pt
- Question 5: 2pt
- Question 6: 0.5pt
- Question 7: 0.5pt
- Question 8: 1pt
- Question 9: 1.25pt