# Assignment 4a - Variational Auto-Encoders
## Deep Learning Course - Vrije Universiteit Amsterdam, 2022

#### Instructions on how to use this notebook:

This notebook is hosted on Google Colab. To be able to work on it, you have to create your own copy. Go to *File* and select *Save a copy in Drive*.

You can also avoid using Colab entirely, and download the notebook to run it on your own machine. If you choose this, go to *File* and select *Download .ipynb*.

The advantage of using Colab is that you can use a GPU. You can complete this assignment with a CPU, but it will take a bit longer. Furthermore, we encourage you to train using the GPU not only for faster training, but also to get experience with this setting. This includes moving models and tensors to the GPU and back. This experience is very valuable because for various models and large datasets (like large CNNs for ImageNet, or Transformer models trained on Wikipedia), training on GPU is the only feasible way.

The default Colab runtime does not have a GPU. To change this, go to *Runtime - Change runtime type*, and select *GPU* as the hardware accelerator. The GPU that you get changes according to what resources are available at the time, and its memory can go from a 5GB, to around 18GB if you are lucky. If you are curious, you can run the following in a code cell to check:

```sh
!nvidia-smi
```

Note that despite the name, Google Colab does  not support collaborative work without issues. When two or more people edit the notebook concurrently, only one version will be saved. You can choose to do group programming with one person sharing the screen with the others, or make multiple copies of the notebook to work concurrently.

**Submission:** Upload your notebook in .ipynb format to Canvas. The code and answers to the questions in the notebook are sufficient, no separate report is expected. 

In [1]:
!nvidia-smi

Fri Dec 20 19:37:38 2024       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 561.17                 Driver Version: 561.17         CUDA Version: 12.6     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                  Driver-Model | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA GeForce RTX 4060 ...  WDDM  |   00000000:01:00.0 Off |                  N/A |
| N/A   45C    P8              5W /   43W |       0MiB /   8188MiB |      7%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

## Introduction

In this assignment, we are going to implement a Variational Auto-Encoder (VAE). A VAE is a likelihood-based deep generative model that consists of a stochastic encoder (a variational posterior over latent variables), a stochastic decoder, and a marginal distribution over latent variables (a.k.a. a prior). The model was originally proposed in two concurrent papers:
- [Kingma, D. P., & Welling, M. (2013). Auto-encoding variational bayes. arXiv preprint arXiv:1312.6114.](https://arxiv.org/abs/1312.6114)
- [Rezende, Danilo Jimenez, Shakir Mohamed, and Daan Wierstra. "Stochastic backpropagation and approximate inference in deep generative models." International conference on machine learning. PMLR, 2014.](https://proceedings.mlr.press/v32/rezende14.html)

You can read more about VAEs in Chapter 4 of the following book:
- [Tomczak, J.M., "Deep Generative Modeling", Springer, 2022](https://link.springer.com/book/10.1007/978-3-030-93158-2)

In particular, the goals of this assignment are the following:

- Understand how VAEs are formulated
- Implement components of VAEs using PyTorch
- Train and evaluate a model for image data

### Theory behind VAEs

VAEs are latent variable models trained with variational inference. In general, the latent variable models define the following generative process:
\begin{align}
1.\ & \mathbf{z} \sim p_{\lambda}(\mathbf{z}) \\
2.\ & \mathbf{x} \sim p_{\theta}(\mathbf{x}|\mathbf{z})
\end{align}

In plain words, we assume that for observable data $\mathbf{x}$, there are some latent (hidden) factors $\mathbf{z}$. Then, the training objective is log-likelihood function of the following form:
$$
\log p_{\vartheta}(\mathbf{x})=\log \int p_\theta(\mathbf{x} \mid \mathbf{z}) p_\lambda(\mathbf{z}) \mathrm{d} \mathbf{z} .
$$

The problem here is the intractability of the integral if the dependencies between random variables $\mathbf{x}$ and $\mathbf{z}$ are non-linear and/or the distributions are non-Gaussian.

By introducing variational posteriors $q_{\phi}(\mathbf{z}|\mathbf{x})$, we get the following lower bound (the Evidence Lower Bound, ELBO):
$$
\log p_{\vartheta}(\mathbf{x}) \geq \mathbb{E}_{\mathbf{z} \sim q_\phi(\mathbf{z} \mid \mathbf{x})}\left[\log p_\theta(\mathbf{x} \mid \mathbf{z})\right]-\mathrm{KL}\left(q_\phi(\mathbf{z} \mid \mathbf{x}) \| p_\lambda(\mathbf{z})\right) .
$$

Note that we want to *maximize* this objective, therefore, in the code you are going to have to implement NELBO (negative ELBO) as a loss function (i.e., a minimization task). 

## IMPORTS

In [2]:
# DO NOT REMOVE!
import os
import math
import numpy as np
import matplotlib.pyplot as plt

import torch

from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F

import torchvision
from torchvision.datasets import MNIST

In [3]:
# Check if GPU is available and determine the device
if torch.cuda.is_available():
  device = 'cuda'
else:
  device = 'cpu'

print(f'The available device is {device}')

The available device is cpu


In [4]:
# mount drive: WE NEED IT FOR SAVING IMAGES!
# from google.colab import drive
# drive.mount('/content/gdrive')

In [5]:
# PLEASE CHANGE IT TO YOUR OWN GOOGLE DRIVE!
# images_dir = '/content/gdrive/My Drive/Colab Notebooks/Results/'

## Auxiliary functions

Let us define some useful log-distributions:

In [6]:
# DO NOT REMOVE
PI = torch.from_numpy(np.asarray(np.pi))
EPS = 1.e-5


def log_categorical(x, p, num_classes=256, reduction=None, dim=None):
    x_one_hot = F.one_hot(x.long(), num_classes=num_classes)
    log_p = x_one_hot * torch.log(torch.clamp(p, EPS, 1. - EPS))
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p


def log_bernoulli(x, p, reduction=None, dim=None):
    pp = torch.clamp(p, EPS, 1. - EPS)
    log_p = x * torch.log(pp) + (1. - x) * torch.log(1. - pp)
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p


def log_normal_diag(x, mu, log_var, reduction=None, dim=None):
    D = x.shape[1]
    log_p = -0.5 * D * torch.log(2. * PI) - 0.5 * log_var - 0.5 * torch.exp(-log_var) * (x - mu)**2.
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p


def log_standard_normal(x, reduction=None, dim=None):
    D = x.shape[1]
    log_p = -0.5 * D * torch.log(2. * PI) - 0.5 * x**2.
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p

## Implementing VAEs

The goal of this assignment is to implement four classes:
- `Encoder`: this class implements the encoder (variational posterior), $q_{\phi}(\mathbf{z}|\mathbf{x})$.
- `Decoder`: this class implements the decoded (the conditional likelihood), $p_{\theta}(\mathbf{x}|\mathbf{z})$.
- `Prior`: this class implements the marginal over latents (the prior), $p_{\lambda}(\mathbf{z})$.
- `VAE`: this class combines all components.

#### Question 0: (3 pt) 
**Fully-connected Neural Networks (MLPs) or Convolutional Neural Networks**

This is not a real question but rather a comment. You are asked to implement your VAE using fully connected neural networks (MLPs) or convolutional neural networks (ConvNets). 

There is a difference in grading of this assignment based on your decision:
- **If you decide to implement your VAE with MLPs and the model works properly, you get 1 pt.**
- **If you decide to implement your VAE with ConvNets and the model works properly, you get 3 pts.**

### Encoder
We start with `Encoder`. Please remember that we assume the Gaussian variational posterior with a diagonal covariance matrix.

Feel free to add other methods to the class as well as arguments to the class initialization.

In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Encoder(nn.Module):
    def __init__(self, z_dim):
        super(Encoder, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1)    # Output: (32, 14, 14)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1)   # Output: (64, 7, 7)
        # Fully connected layers for mean and log variance
        self.fc_mu = nn.Linear(64 * 7 * 7, z_dim)
        self.fc_log_var = nn.Linear(64 * 7 * 7, z_dim)

    def encode(self, x):
        h = F.relu(self.conv1(x))    # Shape: (batch_size, 32, 14, 14)
        h = F.relu(self.conv2(h))    # Shape: (batch_size, 64, 7, 7)
        h = h.view(h.size(0), -1)    # Flatten: (batch_size, 64*7*7)
        mu = self.fc_mu(h)           # Shape: (batch_size, z_dim)
        log_var = self.fc_log_var(h) # Shape: (batch_size, z_dim)
        return mu, log_var

    @staticmethod
    def reparameterization(mu, log_var):
        std = torch.exp(0.5 * log_var)
        epsilon = torch.randn_like(std)
        z = mu + std * epsilon
        return z


Please answer the following questions:


#### Question 1 (0.5 pt)

Please explain the reparameterization trick and provide a mathematical formula.

The **reparameterization trick** is a technique used in Variational
Autoencoders (VAEs) to enable backpropagation through stochastic
variables when optimizing the model with gradient-based methods. In
VAEs, we sample latent variables $\mathbf{z}$ from a probability
distribution $q_{\phi}(\mathbf{z}|\mathbf{x})$ parameterized by the
encoder's output. Directly sampling $\mathbf{z}$ makes the operation
non-differentiable with respect to the network parameters $\phi$,
hindering the learning process.

To address this, the reparameterization trick expresses the random
variable $\mathbf{z}$ as a deterministic function of $\boldsymbol{\mu}$,
$\boldsymbol{\sigma}$, and an auxiliary noise variable
$\boldsymbol{\epsilon}$ that is independent of $\phi$. Specifically, for
a Gaussian distribution, the trick is formulated as:

$$\mathbf{z} = \boldsymbol{\mu} + \boldsymbol{\sigma} \odot \boldsymbol{\epsilon}$$

where:

\- $\boldsymbol{\mu}$ is the mean vector output by the encoder
network. - $\boldsymbol{\sigma}$ is the standard deviation vector,
computed as
$\boldsymbol{\sigma} = \exp\left(\tfrac{1}{2} \log \boldsymbol{\sigma}^2\right)$. -
$\boldsymbol{\epsilon}$ is a noise vector sampled from a standard normal
distribution,
$\boldsymbol{\epsilon} \sim \mathcal{N}(\mathbf{0}, \mathbf{I})$. -
$\odot$ denotes element-wise (Hadamard) multiplication.

By reparameterizing $\mathbf{z}$ in this manner, we transform the
sampling operation into a differentiable function with respect to
$\boldsymbol{\mu}$ and $\boldsymbol{\sigma}$. This allows gradients to
flow through $\mathbf{z}$ during backpropagation, enabling the
optimization of the encoder parameters $\phi$ using standard gradient
descent methods.


#### Question 2 (0.25 pt)

Please write down mathematically the log-probability of the encoder (variational posterior).

To mathematically express the **log-probability** of the encoder
(variational posterior) $q_\phi(\mathbf{z}|\mathbf{x})$ in a Variational
Autoencoder (VAE), we assume that $q_\phi(\mathbf{z}|\mathbf{x})$ is a
multivariate Gaussian distribution with a diagonal covariance matrix.
The log-probability is given by:

$$\log q_\phi(\mathbf{z}|\mathbf{x}) = -\frac{1}{2} \sum_{i=1}^d \left[ \log(2\pi \sigma_i^2) + \frac{(z_i - \mu_i)^2}{\sigma_i^2} \right]$$

where:

\- $d$ is the dimensionality of the latent space. -
$\mathbf{z} = [z_1, z_2, \dots, z_d]$ is the latent variable vector. -
$\boldsymbol{\mu} = [\mu_1, \mu_2, \dots, \mu_d]$ is the mean vector
output by the encoder network for input $\mathbf{x}$. -
$\boldsymbol{\sigma}^2 = [\sigma_1^2, \sigma_2^2, \dots, \sigma_d^2]$ is
the variance vector (diagonal elements of the covariance matrix).

This expression can also be rewritten as:

$$\log q_\phi(\mathbf{z}|\mathbf{x}) = -\frac{d}{2} \log(2\pi) - \frac{1}{2} \sum_{i=1}^d \left[ \log \sigma_i^2 + \frac{(z_i - \mu_i)^2}{\sigma_i^2} \right]$$

This formula computes the log-probability of the latent variable
$\mathbf{z}$ under the variational posterior distribution given the
input $\mathbf{x}$. It accounts for each dimension $i$ of the latent
space by summing over the individual Gaussian components, assuming
independence between latent dimensions.


### Decoder

The decoder is the conditional likelihood, i.e., $p(\mathbf{x}|\mathbf{z})$. Please remember that we must decide on the form of the distribution (e.g., Bernoulli, Gaussian, Categorical). Please discuss it with a TA or a lecturer if you are in doubt.

In [8]:
class Decoder(nn.Module):
    def __init__(self, z_dim):
        super(Decoder, self).__init__()
        # Fully connected layer to expand z to a suitable size
        self.fc = nn.Linear(z_dim, 64 * 7 * 7)
        # Transposed convolutional layers
        self.deconv1 = nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1)  # Output: (32, 14, 14)
        self.deconv2 = nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1)   # Output: (1, 28, 28)

    def decode(self, z):
        h = self.fc(z)
        h = h.view(-1, 64, 7, 7)         # Shape: (batch_size, 64, 7, 7)
        h = F.relu(self.deconv1(h))      # Shape: (batch_size, 32, 14, 14)
        h = self.deconv2(h)              # Shape: (batch_size, 1, 28, 28)
        return h

    def forward(self, z, x):
        x_logits = self.decode(z)
        # Flatten tensors for loss computation
        x_flat = x.view(x.size(0), -1)                # Shape: (batch_size, 784)
        x_logits_flat = x_logits.view(x.size(0), -1)  # Shape: (batch_size, 784)
        # Compute binary cross-entropy loss
        bce = F.binary_cross_entropy_with_logits(x_logits_flat, x_flat, reduction='none')
        log_px_given_z = -torch.sum(bce, dim=1)
        return log_px_given_z

    def sample(self, z):
        x_logits = self.decode(z)
        x_probs = torch.sigmoid(x_logits)
        pi = torch.clamp(x_probs, min=1e-6, max=1 - 1e-6)
        x_sample = torch.bernoulli(pi)
        return x_sample


Please answer the following questions:

#### Question 3 (0.5 pt)

Please explain your choice of the distribution for image data used in this assignment. Additionally, please write it down mathematically (if you think that presenting it as the log-probability, then please do it).

I chose the Bernoulli distribution to model the conditional likelihood
$p(\mathbf{x}|\mathbf{z})$ in the decoder because it is appropriate for
the image data used in this assignment, such as the MNIST dataset where
images consist of grayscale pixel values normalized between 0 and 1.
Each pixel value can be interpreted as the probability of the pixel
being \"on\" or \"off,\" which aligns well with the Bernoulli
distribution that models binary outcomes. Assuming that pixels are
conditionally independent given the latent variable $\mathbf{z}$, the
Bernoulli distribution simplifies the computation of the likelihood. The
mathematical formulation of the conditional likelihood under the
Bernoulli distribution is:

$$p(\mathbf{x}|\mathbf{z}) = \prod_{i=1}^{D} \pi_i^{x_i} (1 - \pi_i)^{(1 - x_i)},$$

where $D$ is the dimensionality (number of pixels) of the image,
$x_i \in \{0, 1\}$ is the pixel value at position $i$, and
$\pi_i = \sigma(f_i(\mathbf{z}))$ represents the probability of pixel
$x_i$ being active, modeled by the decoder network with $\sigma(\cdot)$
being the sigmoid function to ensure $\pi_i \in (0, 1)$. Taking the
logarithm of the conditional likelihood yields the log-probability:

$$\log p(\mathbf{x}|\mathbf{z}) = \sum_{i=1}^{D} \left[ x_i \log \pi_i + (1 - x_i) \log (1 - \pi_i) \right],$$

which is equivalent to the negative binary cross-entropy loss between
the true image $\mathbf{x}$ and the reconstructed probabilities
$\boldsymbol{\pi}$. This choice simplifies the loss computation and is
advantageous in training Variational Autoencoders (VAEs) because it
aligns with common loss functions used in deep learning frameworks,
facilitating efficient optimization. The Bernoulli distribution
effectively models the normalized image data by interpreting pixel
intensities as probabilities, and its assumption of pixel independence
given $\mathbf{z}$ simplifies both the mathematical formulation and
computational implementation of the model.


#### Question 4 (0.5 pt)

Please explain how one can sample from the distribution chosen by you. Please be specific and formal (i.e., provide mathematical formulae). If applicable, please provide a code snippet.

To sample from the **Bernoulli distribution** chosen for the
conditional likelihood $p(\mathbf{x}|\mathbf{z})$ in the decoder, we
follow a formal and specific procedure. Given a latent variable
$\mathbf{z}$, the decoder network outputs the parameters
$\boldsymbol{\pi} = \sigma(f(\mathbf{z}))$, where $\sigma$ is the
sigmoid function ensuring that each $\pi_i$ lies in the interval (0, 1).
Each element $\pi_i$ represents the probability of the corresponding
pixel $x_i$ being active (i.e., having a value of 1).

Mathematically, sampling from the Bernoulli distribution for
each pixel is expressed as:

$$x_i \sim \text{Bernoulli}(\pi_i) \quad \text{for each } i = 1, 2, \dots, D$$

where: - $D$ is the dimensionality of the image (number of pixels). -
$x_i \in \{0, 1\}$ is the sampled value for pixel $i$. -
$\pi_i = \sigma(f_i(\mathbf{z}))$ is the probability parameter for pixel
$i$, obtained from the decoder.

This means that each pixel $x_i$ is independently sampled to be 1 with
probability $\pi_i$ and 0 otherwise.

**In code**, particularly using PyTorch, this sampling process can
be implemented as follows:

```python 
def sample_bernoulli(pi):
    """
    Sample from a Bernoulli distribution parameterized by pi.

    Parameters:
    - pi (torch.Tensor): Tensor of probabilities with values in (0, 1).

    Returns:
    - x (torch.Tensor): Sampled binary tensor with values 0 or 1.
    """
    # Ensure numerical stability
    pi = torch.clamp(pi, min=1e-6, max=1 - 1e-6)
    # Sample from Bernoulli distribution
    x = torch.bernoulli(pi)
    return x
```

Clamping Probabilities: The 'torch.clamp' function ensures that the
probability values $\pi_i$ are within a numerically stable range,
avoiding exact 0 or 1, which can cause issues during training. 2.

Sampling: 'torch.bernoulli(pi)' performs element-wise sampling
from the Bernoulli distribution based on the probabilities $\pi_i$. Each
element in the output tensor 'x' will be either 0 or 1, sampled
according to the corresponding probability in 'pi'.

This sampling method allows the decoder to generate binary images by
independently determining the state of each pixel based on the learned
probabilities from the latent representation $\mathbf{z}$. By leveraging
the Bernoulli distribution, the model can effectively reconstruct binary
or normalized grayscale images, facilitating the training and generation
processes within the Variational Autoencoder framework.


### Prior

The prior is the marginal distribution over latent variables, i.e., $p(\mathbf{z})$. It plays a crucial role in the generative process and also in synthesizing images of a better quality.

In this assignment, you are asked to implement a prior that is learnable (e.g., parameterized by neural networks). If you decide to implement the standard Gaussian prior only, then please be aware that you will not get any points.


For the learnable prior you can choose one of the following options:


*   Mixture of Gaussians
*   Normalizing Flow


In [9]:
class Prior(nn.Module):
    def __init__(self, z_dim, K=10, device='cuda'):
        super(Prior, self).__init__()
        self.device = device
        self.z_dim = z_dim
        self.num_components = K

        # Mixture weights (unnormalized). We'll apply softmax to get valid probabilities.
        self.pi_logits = nn.Parameter(torch.randn(K))

        # Means of the Gaussian components
        self.mu = nn.Parameter(torch.randn(K, z_dim))

        # Log variances of the Gaussian components
        self.log_var = nn.Parameter(torch.randn(K, z_dim))

    def sample(self, batch_size):
        """
        Sample z ~ p(z), where p(z) is a mixture of Gaussians.

        batch_size: int, number of samples to generate

        return:
        z: torch.tensor, with dimensionality (batch_size, z_dim)
        """
        # Compute mixture weights
        pi = F.softmax(self.pi_logits, dim=0)  # Shape: (num_components,)

        # Sample component indices according to the mixture weights
        component_indices = torch.multinomial(pi, batch_size, replacement=True)  # Shape: (batch_size,)

        # Gather the parameters for the selected components
        mu = self.mu[component_indices]           # Shape: (batch_size, z_dim)
        log_var = self.log_var[component_indices] # Shape: (batch_size, z_dim)
        std = torch.exp(0.5 * log_var)            # Shape: (batch_size, z_dim)

        # Sample from the selected Gaussian components
        epsilon = torch.randn(batch_size, self.z_dim).to(self.device)
        z = mu + std * epsilon                    # Shape: (batch_size, z_dim)
        return z

    def forward(self, z):
        """
        Compute the log-probability log p(z), where p(z) is a mixture of Gaussians.

        z: torch.tensor, with dimensionality (batch_size, z_dim)

        return:
        log_p_z: torch.tensor, with dimensionality (batch_size,)
        """

        # Expand z to compute log probabilities under each component
        z = z.unsqueeze(1)  # Shape: (batch_size, 1, z_dim)

        # Compute mixture weights
        pi = F.softmax(self.pi_logits, dim=0)  # Shape: (num_components,)

        # Expand parameters for vectorized computation
        mu = self.mu.unsqueeze(0)              # Shape: (1, num_components, z_dim)
        log_var = self.log_var.unsqueeze(0)    # Shape: (1, num_components, z_dim)
        var = torch.exp(log_var)               # Shape: (1, num_components, z_dim)

        # Compute log probability under each Gaussian component
        # log N(z; mu_k, var_k) = -0.5 * [log(2π) + log var_k + (z - mu_k)^2 / var_k]
        log_prob = -0.5 * (log_var + ((z - mu) ** 2) / var + torch.log(torch.tensor(2.0 * math.pi, device=z.device)))        # Sum over z_dim to get log_prob for each component
        log_prob = torch.sum(log_prob, dim=2)  # Shape: (batch_size, num_components)

        # Add log mixture weights
        log_pi = torch.log(pi + 1e-8)  # Add small value to prevent log(0)
        log_prob += log_pi.unsqueeze(0)  # Shape: (batch_size, num_components)

        # Compute log p(z) = log ∑_k [π_k * N(z; μ_k, Σ_k)] using log-sum-exp trick
        log_p_z = torch.logsumexp(log_prob, dim=1)  # Shape: (batch_size,)

        return log_p_z


#### Question 5 (2 pts max)

**Option 1 (0 pt):  Standard Gaussian**

**NOTE: *If you decide to use the standard Gaussian prior, please indicate it in your answer. However, you will get 0 pt for this question.***

**Option 2 (0.5 pt): Mixture of Gaussains**

Please do the following:
- (0.25 pt) Please explain your prior and write it down mathematically
- (0.15 pt) Please write down its sampling procedure (if necessary, please add a code snippet).
- (0.1 pt) Please write down its log-probability (a mathematical formula).

**Option 3 (2 pts): Normalizing Flow**

Please do the following:
- (1 pt) Please explain your prior and write it down mathematically
- (0.5 pt) Please write down its sampling procedure (if necessary, please add a code snippet).
- (0.5 pt) Please write down its log-probability (a mathematical formula).

**Option 3: Normalizing Flow Prior**

**Explanation and Mathematical Formulation:**

In this assignment, I implemented a prior $p(\mathbf{z})$ using a
**Normalizing Flow**. A normalizing flow constructs a complex
probability distribution by transforming a simple base distribution
through a sequence of invertible and differentiable transformations.
This approach allows the prior to capture more intricate structures in
the latent space, enhancing the model's generative capabilities and
leading to better synthesis of images.

Mathematically, we begin with a base distribution $p_0(\mathbf{z}_0)$,
typically a standard multivariate Gaussian:

$$p_0(\mathbf{z}_0) = \mathcal{N}(\mathbf{z}_0 \mid \mathbf{0}, \mathbf{I}).$$

We then apply a sequence of $K$ invertible and differentiable
transformations $f_k$, parameterized by $\boldsymbol{\theta}_k$, to
obtain the final latent variable $\mathbf{z}$:

$$\begin{align*}
\mathbf{z}_1 &= f_1(\mathbf{z}_0; \boldsymbol{\theta}_1), \\
\mathbf{z}_2 &= f_2(\mathbf{z}_1; \boldsymbol{\theta}_2), \\
&\vdots \\
\mathbf{z}_K &= f_K(\mathbf{z}_{K-1}; \boldsymbol{\theta}_K).
\end{align*}$$

The overall transformation from $\mathbf{z}_0$ to
$\mathbf{z} = \mathbf{z}_K$ defines the complex prior distribution
$p(\mathbf{z})$.

Using the change of variables formula, the log-density of $\mathbf{z}$
is given by:

$$\log p(\mathbf{z}) = \log p_0(\mathbf{z}_0) - \sum_{k=1}^{K} \log \left\lvert \det \left( \frac{\partial f_k}{\partial \mathbf{z}_{k-1}} \right) \right\rvert.$$

Here:

\- $\log p_0(\mathbf{z}_0)$ is the log-density of the base
distribution. - $\frac{\partial f_k}{\partial \mathbf{z}_{k-1}}$ is the
Jacobian matrix of the transformation $f_k$. - The determinant accounts
for the change in volume introduced by each transformation.

This formulation allows us to compute the exact log-probability of
$\mathbf{z}$ under the transformed distribution $p(\mathbf{z})$.

**Sampling Procedure:**

To sample from the normalizing flow prior $p(\mathbf{z})$, we perform
the following steps:

1\. **Sample from the Base Distribution:**

$$\mathbf{z}_0 \sim p_0(\mathbf{z}_0) = \mathcal{N}(\mathbf{0}, \mathbf{I}).$$

2\. **Apply the Sequence of Transformations:**

Sequentially apply the invertible transformations to obtain
$\mathbf{z}$:

$$\mathbf{z}_k = f_k(\mathbf{z}_{k-1}; \boldsymbol{\theta}_k), \quad \text{for } k = 1, 2, \dots, K.$$

3\. **Obtain the Final Sample:**

The final sample from the prior is:

$$\mathbf{z} = \mathbf{z}_K.$$

**Log-Probability Computation:**

The log-probability $\log p(\mathbf{z})$ under the normalizing flow
prior is calculated using the change of variables formula:

$$\log p(\mathbf{z}) = \log p_0(\mathbf{z}_0) - \sum_{k=1}^{K} \log \left\lvert \det \left( \frac{\partial f_k}{\partial \mathbf{z}_{k-1}} \right) \right\rvert.$$

**Components:**

1\. **Base Distribution Log-Probability:**

For a standard normal distribution, the log-probability is:

$$\log p_0(\mathbf{z}_0) = -\frac{1}{2} \sum_{i=1}^{d} \left( z_{0i}^2 + \log 2\pi \right),$$

where $d$ is the dimensionality of $\mathbf{z}_0$, and $z_{0i}$ is the
$i$-th component of $\mathbf{z}_0$.

2\. **Log Determinant of Jacobian:**

For specific transformations like **Planar Flows**, the Jacobian
determinant can be computed efficiently. A planar flow is defined as:

$$f_k(\mathbf{z}_{k-1}) = \mathbf{z}_{k-1} + \mathbf{u}_k \cdot h\left( \mathbf{w}_k^\top \mathbf{z}_{k-1} + b_k \right),$$

where:

\- $\mathbf{u}_k, \mathbf{w}_k \in \mathbb{R}^d$ are learnable
parameters. - $b_k \in \mathbb{R}$ is a scalar bias term. - $h$ is an
element-wise non-linear activation function (e.g., hyperbolic tangent
$\tanh$).

The log-determinant of the Jacobian for a planar flow is:

$$\log \left\lvert \det \left( \frac{\partial f_k}{\partial \mathbf{z}_{k-1}} \right) \right\rvert = \log \left\lvert 1 + \mathbf{u}_k^\top \psi_k(\mathbf{z}_{k-1}) \right\rvert,$$

where:

$$\psi_k(\mathbf{z}_{k-1}) = h'\left( \mathbf{w}_k^\top \mathbf{z}_{k-1} + b_k \right) \mathbf{w}_k,$$

and $h'$ denotes the derivative of the activation function $h$.

3\. **Total Log-Probability:**

Combining the base log-density and the sum of log-determinants:

$$\log p(\mathbf{z}) = -\frac{1}{2} \sum_{i=1}^{d} \left( z_{0i}^2 + \log 2\pi \right) - \sum_{k=1}^{K} \log \left\lvert 1 + \mathbf{u}_k^\top \psi_k(\mathbf{z}_{k-1}) \right\rvert.$$

**Conclusion:**

By utilizing a normalizing flow prior, we significantly increase the
flexibility and expressiveness of the prior distribution
$p(\mathbf{z})$. This allows the model to capture complex, multimodal
distributions in the latent space, better aligning the prior with the
posterior distribution learned by the encoder. Consequently, this leads
to improved generative performance, producing higher-quality
reconstructions and samples in the Variational Autoencoder.


In [20]:
class PlanarFlow(nn.Module):
    def __init__(self, z_dim):
        super(PlanarFlow, self).__init__()
        self.u = nn.Parameter(torch.randn(z_dim))
        self.w = nn.Parameter(torch.randn(z_dim))
        self.b = nn.Parameter(torch.randn(1))

    def forward(self, z):
        # Compute the linear term
        linear = torch.matmul(z, self.w) + self.b
        # Apply the transformation
        print('u:', self.u.shape)
        print('w:', self.w.shape)
        print('b:', self.b.shape)
        print('z:', z.shape)
        print('linear:', linear.shape)
        f_z = z + self.u * torch.tanh(linear)
        return f_z

    def inverse(self, z):
        # Inverting planar flow analytically is not straightforward
        # Numerical methods may be required
        # For simplicity, assume inverse is approximated
        raise NotImplementedError("Inverse of PlanarFlow is not implemented.")

    def log_abs_det_jacobian(self, z):
        linear = torch.matmul(z, self.w) + self.b
        psi = (1 - torch.tanh(linear) ** 2) * self.w
        det_jacobian = 1 + torch.matmul(self.u, psi)
        return torch.log(torch.abs(det_jacobian) + 1e-8)

# Class NormalizingFlowPrior
class NormalizingFlowPrior(nn.Module):
    def __init__(self, z_dim, K):
        super(NormalizingFlowPrior, self).__init__()
        self.z_dim = z_dim
        self.K = K
        self.flows = nn.ModuleList([PlanarFlow(z_dim) for _ in range(K)])

    def sample(self, batch_size=32):
        # Sample from base distribution
        z = torch.randn(batch_size, self.z_dim)
        # Apply flows sequentially
        for flow in self.flows:
            z = flow(z)
        return z

    def forward(self, z):
        z_k = z
        # Compute log(2*pi) and move to the correct device
        log_2pi_value = math.log(2 * math.pi)
        log_2pi = torch.tensor(log_2pi_value).to(z_k.device)
        # Compute log probability of the base distribution
        log_p0 = -0.5 * torch.sum(z_k ** 2 + log_2pi, dim=1)
        sum_log_abs_det_jacobians = 0
        # Apply inverse flows and accumulate log determinants
        for flow in reversed(self.flows):
            # Assuming invertibility, apply inverse flow
            z_k = flow(z_k)
            sum_log_abs_det_jacobians += flow.log_abs_det_jacobian(z_k)
        log_p = log_p0 - sum_log_abs_det_jacobians
        return log_p



### Complete VAE

The last class is `VAE` tha combines all components. Please remember that this class must implement the **Negative ELBO** in `forward`, as well as `sample` (*hint*: it is a composition of `sample` functions from the prior and the decoder).

In [21]:
class VAE(nn.Module):
    def __init__(self, z_dim, device='cpu'):
        super(VAE, self).__init__()
        self.device = device
        self.z_dim = z_dim
        # Initialize Encoder, Decoder, and Prior
        self.encoder = Encoder(z_dim).to(device)
        self.decoder = Decoder(z_dim).to(device)
        self.prior = NormalizingFlowPrior(z_dim, K=10).to(device)  # If using normalizing flows

    def sample(self, batch_size):
        z = self.prior.sample(batch_size).to(self.device)
        x_samples = self.decoder.sample(z)
        return x_samples

    def forward(self, x, reduction='mean'):
        batch_size = x.size(0)
        # Encode x to obtain the parameters of q(z|x)
        mu, log_var = self.encoder.encode(x)
        # Sample z from q(z|x)
        z = self.encoder.reparameterization(mu, log_var)
        # Compute log q(z|x)
        log_2pi = torch.log(torch.tensor(2 * torch.pi).to(mu.device))
        log_q_z_given_x = -0.5 * (log_var + (z - mu) ** 2 / torch.exp(log_var) + log_2pi)
        log_q_z_given_x = torch.sum(log_q_z_given_x, dim=1)
        # Compute log p(z)
        log_p_z = self.prior.forward(z)
        # Compute log p(x|z)
        log_p_x_given_z = self.decoder.forward(z, x)
        # Compute Negative ELBO
        NELBO = (log_q_z_given_x - log_p_z) - log_p_x_given_z
        # Apply reduction
        if reduction == 'sum':
            return NELBO.sum()
        else:
            return NELBO.mean()


In [22]:
# Import necessary libraries
import torch

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define the latent dimension
z_dim = 20

# Instantiate the model components
encoder = Encoder(z_dim=z_dim).to(device)
decoder = Decoder(z_dim=z_dim).to(device)

# Create a dummy input
batch_size = 32
dummy_input = torch.randn(batch_size, 1, 28, 28).to(device)

# Test the encoder
mu, log_var = encoder.encode(dummy_input)
z = encoder.reparameterization(mu, log_var)

# Test the decoder
reconstructed_x = decoder.decode(z)

# Print shapes to verify
print("Input shape:", dummy_input.shape)
print("Encoded mu shape:", mu.shape)
print("Encoded log_var shape:", log_var.shape)
print("Latent vector z shape:", z.shape)
print("Reconstructed x shape:", reconstructed_x.shape)


Input shape: torch.Size([32, 1, 28, 28])
Encoded mu shape: torch.Size([32, 20])
Encoded log_var shape: torch.Size([32, 20])
Latent vector z shape: torch.Size([32, 20])
Reconstructed x shape: torch.Size([32, 1, 28, 28])


#### Question 6 (0.5 pt)

Please write down mathematically the **Negative ELBO** and provide a code snippet.

ANSWER: [Please fill in]

### Evaluation and training functions

**Please do not remove or modify them.**

In [23]:
# DO NOT REMOVE
def evaluation(test_loader, name=None, model_best=None, epoch=None):
    # EVALUATION
    if model_best is None:
        # load best performing model
        model_best = torch.load(name + '.model')

    model_best.eval()
    loss = 0.
    N = 0.
    for indx_batch, (test_batch, _) in enumerate(test_loader):
        test_batch = test_batch.to(device)
        loss_t = model_best.forward(test_batch, reduction='sum')
        loss = loss + loss_t.item()
        N = N + test_batch.shape[0]
    loss = loss / N

    if epoch is None:
        print(f'FINAL LOSS: nll={loss}')
    else:
        print(f'Epoch: {epoch}, val nll={loss}')

    return loss


def samples_real(name, test_loader, shape=(28,28)):
    # real images-------
    num_x = 4
    num_y = 4
    x, _ = next(iter(test_loader))
    x = x.to('cpu').detach().numpy()

    fig, ax = plt.subplots(num_x, num_y)
    for i, ax in enumerate(ax.flatten()):
        plottable_image = np.reshape(x[i], shape)
        ax.imshow(plottable_image, cmap='gray')
        ax.axis('off')

    plt.savefig(name+'_real_images.pdf', bbox_inches='tight')
    plt.close()


def samples_generated(name, data_loader, shape=(28,28), extra_name=''):
    x, _ = next(iter(data_loader))
    x = x.to('cpu').detach().numpy()

    # generations-------
    model_best = torch.load(name + '.model')
    model_best.eval()

    num_x = 4
    num_y = 4
    x = model_best.sample(num_x * num_y)
    x = x.to('cpu').detach().numpy()

    fig, ax = plt.subplots(num_x, num_y)
    for i, ax in enumerate(ax.flatten()):
        plottable_image = np.reshape(x[i], shape)
        ax.imshow(plottable_image, cmap='gray')
        ax.axis('off')

    plt.savefig(name + '_generated_images' + extra_name + '.pdf', bbox_inches='tight')
    plt.close()


def plot_curve(name, nll_val):
    plt.plot(np.arange(len(nll_val)), nll_val, linewidth='3')
    plt.xlabel('epochs')
    plt.ylabel('nll')
    plt.savefig(name + '_nll_val_curve.pdf', bbox_inches='tight')
    plt.close()

In [24]:
# DO NOT REMOVE

def training(name, max_patience, num_epochs, model, optimizer, training_loader, val_loader, shape=(28,28)):
    nll_val = []
    best_nll = 1000.
    patience = 0

    # Main loop
    for e in range(num_epochs):
        # TRAINING
        model.train()
        for indx_batch, (batch, _) in enumerate(training_loader):
            batch = batch.to(device)
            loss = model.forward(batch, reduction='mean')

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Validation
        loss_val = evaluation(val_loader, model_best=model, epoch=e)
        nll_val.append(loss_val)  # save for plotting

        if e == 0:
            print('saved!')
            torch.save(model, name + '.model')
            best_nll = loss_val
        else:
            if loss_val < best_nll:
                print('saved!')
                torch.save(model, name + '.model')
                best_nll = loss_val
                patience = 0

                samples_generated(name, val_loader, shape=shape, extra_name="_epoch_" + str(e))
            else:
                patience = patience + 1

        if patience > max_patience:
            break

    nll_val = np.asarray(nll_val)

    return nll_val

### Setup

**NOTE: *Please comment your code! Especially if you introduce any new variables (e.g., hyperparameters).***

In the following cells, we define `transforms` for the dataset. Next, we initialize the data, a directory for results and some fixed hyperparameters.

In [25]:
import torchvision.transforms as transforms

transforms_train = transforms.Compose([
        transforms.ToTensor(),  # Convert images to PyTorch tensors
    ])

transforms_test = transforms.Compose([
    transforms.ToTensor(),
])

Please do not modify the code in the next cell.

In [26]:
# DO NOT REMOVE
#-dataset
dataset = MNIST('./files/', train=True, download=True,
                      transform=transforms_train
                )

train_dataset, val_dataset = torch.utils.data.random_split(dataset, [50000, 10000], generator=torch.Generator().manual_seed(14))

test_dataset = MNIST('./files/', train=False, download=True,
                      transform=transforms_test
                     )
#-dataloaders
batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

#-creating a dir for saving results
name = 'vae'
result_dir = "." + '/results/' + name + '/'
if not(os.path.exists(result_dir)):
    os.mkdir(result_dir)

#-hyperparams (please do not modify them for the final report)RuntimeError: The size of tensor a (32) must match the size of tensor b (16) at non-singleton dimension 0

num_epochs = 1000 # max. number of epochs
max_patience = 20 # an early stopping is used, if training doesn't improve for longer than 20 epochs, it is stopped

In the next cell, please initialize the model. Please remember about commenting your code!

In [27]:
# YOUR CODE COMES HERE:
model = VAE(z_dim=32, device=device)

Please initialize the optimizer

In [28]:
# PLEASE DEFINE YOUR OPTIMIZER
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)

#### Question 7 (0.5 pt)

Please explain the choice of the optimizer, and comment on the choice of the hyperparameters (e.g., the learing reate value).

ANSWER: [Please fill in]

### Training and final evaluation

In the following two cells, we run the training and the final evaluation.

In [29]:
# DO NOT REMOVE OR MODIFY
# Training procedure
nll_val = training(name=result_dir + name, max_patience=max_patience, 
                   num_epochs=num_epochs, model=model, optimizer=optimizer,
                   training_loader=train_loader, val_loader=val_loader,
                   shape=(28,28))

u: torch.Size([32])
w: torch.Size([32])
b: torch.Size([1])
z: torch.Size([32, 32])
linear: torch.Size([32])
u: torch.Size([32])
w: torch.Size([32])
b: torch.Size([1])
z: torch.Size([32, 32])
linear: torch.Size([32])
u: torch.Size([32])
w: torch.Size([32])
b: torch.Size([1])
z: torch.Size([32, 32])
linear: torch.Size([32])
u: torch.Size([32])
w: torch.Size([32])
b: torch.Size([1])
z: torch.Size([32, 32])
linear: torch.Size([32])
u: torch.Size([32])
w: torch.Size([32])
b: torch.Size([1])
z: torch.Size([32, 32])
linear: torch.Size([32])
u: torch.Size([32])
w: torch.Size([32])
b: torch.Size([1])
z: torch.Size([32, 32])
linear: torch.Size([32])
u: torch.Size([32])
w: torch.Size([32])
b: torch.Size([1])
z: torch.Size([32, 32])
linear: torch.Size([32])
u: torch.Size([32])
w: torch.Size([32])
b: torch.Size([1])
z: torch.Size([32, 32])
linear: torch.Size([32])
u: torch.Size([32])
w: torch.Size([32])
b: torch.Size([1])
z: torch.Size([32, 32])
linear: torch.Size([32])
u: torch.Size([32])
w: torch

RuntimeError: The size of tensor a (32) must match the size of tensor b (16) at non-singleton dimension 0

In [None]:
# DO NOT REMOVE OR MODIFY
# Final evaluation
test_loss = evaluation(name=result_dir + name, test_loader=test_loader)
f = open(result_dir + name + '_test_loss_lr_0.005.txt', "w")
f.write(str(test_loss))
f.close()

samples_real(result_dir + name, test_loader)
samples_generated(result_dir + name, test_loader, extra_name='lr_0.005')

plot_curve(result_dir + name, nll_val)

### Results and discussion

After a successful training of your model, we would like to ask you to present your data and analyze it. Please answer the following questions.


#### Question 8 (1 pt)

Please select the real data, and the final generated data and include them in this report. Please comment on the following:
- (0.5 pt) Do you think the model was trained properly by looking at the generations? Please motivate your answer well.
- (0.5 pt) What are potential problems with evaluating a generative model by looking at generated data? How can we evalute generative models (ELBO or NLL do not count as an answer)?

ANSWER: [Please fill in]

#### Question 9 (1.25 pt)

Please include the plot of the negative ELBO. Please comment on the following:
- (0.25 pt) Is the training of your VAE stable or unstable? Why?
- (1 pt) What is the influence of the optimizer on your model? Do the hyperparameter values of the optimizer important and how do they influence the training? Motivate well your answer (e.g., run the script with more than one learning rate and present two plots here).

ANSWER: [Please fill in]

# Grading (10pt)

- Question 0: 3pt
- Question 1: 0.5pt 
- Question 2: 0.25pt 
- Question 3: 0.5pt 
- Question 4: 0.5pt 
- Question 5: 2pt 
- Question 6: 0.5pt
- Question 7: 0.5pt
- Question 8: 1pt
- Question 9: 1.25pt