<a href="https://colab.research.google.com/github/rdr453/ChainLink/blob/main/Assignments/Assignment4/Assignment_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 4: Variational Autoencoder for Face Image Synthesis


## Student Name: Bobby Rizzo


Objective: The goal of this assignment is to implement a Variational Autoencoder (VAE) using a deep neural network and train it on the CelebA dataset to generate synthetic face images. This exercise will provide hands-on experience with building and training a probabilistic generative model.

Random CelebA inhabitant: David Niven (1910-1983). One Academy Award and two Golden Globes for movies released about 70 years ago.

<img src=https://www.cs.toronto.edu/~lindell/teaching/420/slides/celebahq/37.jpg width=128 height=128>


Put all of your `import`s in the cell below.

In [None]:
# import all of the bits
import os
import sys
import gc
import random
import math
import tqdm.notebook as tqdm
import numpy as np
import zipfile
import gdown
import PIL
import datetime

import cv2
import torch
import torch.nn.functional as F
import torchvision
import torchvision.transforms as tt
import torchvision.datasets as td

import torchsummary
import matplotlib.pyplot as plt


# some housekeeping; random stuff & cuda
_ = torch.random.seed()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Executing NN code on {device}')


## Freebie: Dataset Preparation


You're getting code to prep the CelebA dataset for training use.  The CelebA images are all 218 rows x 178 columns x 3 channels.

The steps performed below are:
 1. Copy the zip file containing CelebA from my Google drive location to your runtime. Recycling some earlier code from Assignment 2.
 2. Create a `Dataset` for training (call it `celeba_train_ds`) , specifying the CelebA training partition. We don't care about the labels, because the "target" when training is the input itself.  Specify `torchvision.transforms.ToTensor()` as the transform -- that will take care of converting the PIL images in the zipfile to PyTorch tensors, with each component scaled to [0,1].
 3. Repeat step 2 for the validation partition. Call the `Dataset` by the name `celeba_val_ds`.
 4. As a sanity check, the code renders a random image from `celeba_train_ds` and a random image from `celeba_val_ds` using Matplotlib's `imshow()` method.  You can use subscripting to get the first sample from a data set, which returns a (data,target) tuple. Subscript that to get the data. Permute the dimensions of the data using `torch.permute()` to put the channels last, and send the result to `plt.imshow()`  In the depicted images, see how the eyes are in approximately the same positions in each image.  The CelebA data is _registered_, which means the faces are scaled and translated as needed to put the facial features in approximately the same location in all images.

In [None]:
# FREEBIE CODE: DATA

DATA_ROOT_DIR = "celeba"

if not os.path.exists(DATA_ROOT_DIR):
    print('Downloading ND copy of celeba.zip - this will take a minute or so.')
    gdown.download('https://drive.google.com/uc?id=1gyR8WSFURkc4TWil9zik3OP03Wj4D4bz','celeba.zip')
    print('Unzipping celeba.zip - this will take a few minutes.')
    with zipfile.ZipFile('celeba.zip', 'r') as z:
        z.extractall('.')
    print('Done!')

SIZE = 160

transform = tt.Compose([tt.ToTensor(),tt.CenterCrop(SIZE)])

celeba_train_ds = td.CelebA(".",split="train", transform=transform,download='False')
celeba_val_ds = td.CelebA(".",split="valid", transform=transform, download='False')

tmp_train = celeba_train_ds[0][0]
print(f'{tmp_train.shape}')
tmp_train = torch.permute(tmp_train,[1,2,0])

tmp_val = celeba_val_ds[0][0]
tmp_val = torch.permute(tmp_val,[1,2,0])

fig,(ax1,ax2) = plt.subplots(1,2)
ax1.imshow(tmp_train)
ax2.imshow(tmp_val)
plt.axis('off')
plt.show()

## Task 2. VAE Model Implementation

A VAE consists of:
 * an encoder that reduces the image to a _latent vector_ whose dimensionality you get to explore
 * a decoder that takes a latent vector and creates an image from it.

You'll implement an `Encoder` class and a `Decoder` class, as well as a `VAE` class which uses both of them.

When designing the VAE (as with most nets that use convolutions), the size of the input doesn't matter in the convolutional layers, but it does matter in the fully connected layers that are attached to those convolutional layers. Since the CelebA dataset images are fixed in size, and it's a pretty reasonable size, we're going to avoid making the network adapt to different image sizes.


### 2a. Encoder Network
Design and implement a convolutional neural network (CNN) to act as the encoder. The encoder will take a face image as input and output the parameters (mean $\mu$ and log variance $\log \sigma^2$) of a lower-dimensional latent Gaussian distribution $q(z|x, \theta)$ for the input image $x$, with encoder parameters $\theta$.

Your encoder class should be named `Encoder`, and should consist of the blocks listed below. Your implementation may split the functionality in these blocks between `__init__()` and `forward()`; the exact split is up to you.

* Your `__init__()` method must take two parameters:
 * `in_channels`, number of input channels, with a default of 3.
 * `latent_dim`, the dimensionality of the output latent spaces for $\mu$ and $\log \sigma^2$, with a default of 128.

* Your `forward()` method will receive a batch of images. This batch will be a `Tensor` of size `(bs, 3, 160,160)`, where `bs` is the batch size of your `DataLoader`. You will perform a forward pass with the batch, and output a tuple of two tensors:
 * `mu`, which will contain the mean of a probabilistic (multivariate Gaussian) model for the data, and
 * `logvar`, which will contain the natural logarithms of the variances of the components of the latent space.
Both `mu` and `logvar` will have shape `(bs,latent_dim)`

**Blocks:**
 * block 1:
  * a `Conv2d` with a 4x4 kernel, a `stride` of 2, a `padding` of 1, and 32 output channels.  It will transform the `(bs,3,160,160)` input to a `(bs,32,80,80)` output.
  * `BatchNorm2d`
  * `ReLU()`
 * block 2:
  * a `Conv2d` with a 4x4 kernel, a `stride` of 2, a `padding` of 1, and 64 output channels. `(bs,32,80,80)` --> `(bs,64,40,40)`
  * `BatchNorm2d`
  * `ReLU()`
 * block 3:
  * a `Conv2d` with a 4x4 kernel, a `stride` of 2, a `padding` of 1, and 128 output channels. `(bs, 64, 40, 40)` --> `(bs,128,20,20)`
  * `BatchNorm2d`
  * `ReLU()`
 * block 4:
  * a `Conv2d` with a 4x4 kernel, a `stride` of 2, a `padding` of 1, and 128 output channels. `(bs,128,20,20)` --> `(bs,256,10,10)`
  * `BatchNorm2d`
  * `ReLU()`
 * block 5:
  * a `Conv2d` with a 4x4 kernel, a `stride` of 2, a `padding` of 1, and 128 output channels. `(bs,256,10,10)` --> `(bs,512,5,5)`
  * `BatchNorm2d`
  * `ReLU()`
 * block 6:
  * a `view` to unravel the output of block 3 (of size `(bs,512,5,5)`) to a batch of 1d vectors of size `(bs,512*5*5)`
  * The result of the `view` operator will be provided to two layers _in parallel_:
     * a `Linear` layer `fc_mu` with `512*5*5` inputs and `latent_dim` outputs that outputs the estimated mean.
     * another `Linear` layer `fc_logvar` with `512*5*5` inputs and `latent_dim` outputs that outputs the log of the estimated variance.

    The figure below illustrates these two parallel units.

**Note**: the implementation of the `Encoder` should be _much, much_ shorter than the explanation above. 😀

In [None]:
# STUDENT CODE IS FILLED IN BELOW

# Encoder Definition
# input is (-1,3,160,160)
class Encoder(torch.nn.Module):
    """The encoder of an autoencoder."""
    def __init__(self, in_channels=3, latent_dim=128):
        """ set up encoder.
        in_channels is number of input channels (e.g., 3 for color, 1 for grayscale).
        latent_dim is the size of each vector in the elements of the output tuple.
        """
        super(Encoder, self).__init__()

        # set up layers, etc.

    def forward(self, x):
        """ generate output from input.
        input size (-1, 3, 160, 160).
        output is a tuple with sizes ((-1,latent_dim), (-1,latent_dim)).
        """
        # execute layers
        # ...
        # return a tuple of two batches of `latent_dim` sized vectors
        return mu, logvar

### 2b. Decoder Network

The second component of the VAE implementation is a decoder class, to be named `Decoder`. The decoder will take a sample $z$ from the latent space as input and output a reconstructed face image $y = g(z, \phi)$, where $\phi$ are the decoder parameters. The decoder should have a structure that is roughly the inverse of the encoder, using transposed convolutional layers, batch normalization, and activation functions. The final layer should output an image with the same dimensions and number of channels as the input images.

The inputs to the `Decoder` constructor are
 * `latent_dim`, default 128
 * `output_channels`, default 3.

The `forward` method's input is a batch of latent vectors, with size `(bs,latent_dim)`.
It produces an output of shape `(bs,output_channels,160,160)`

[_Note_: You might say "Whaaat? The encoder creates a _tuple_ with **each element** of size (batch size,`latent_dim`). How can that be coerced into a batch of `latent_dim`-dimensional vectors that the decoder needs?"  Good for you for noticing this seeming problem. The answer (at least the practical part) is below - it's called the _reparameterization trick_. For now, just assume that each latent vector going in is `latent_dim`-dimensional. ]

Given this latent input, the layers are:
 * a `Linear` layer that maps the latent vector batch `(bs,latent_dim)` into an output batch of size `(bs,5*5*512)`.
 * a `view` that converts the output batch above to size `(bs,512,5,5)`
 * a [`ConvTranspose2d`](https://pytorch.org/docs/stable/generated/torch.nn.ConvTranspose2d.html) layer with `kernel_size`=4, `stride`=2, and 256 outputs  (fractional stride will double the size of the output horizontally and
 vertically, so the shape change is `(bs,512,5,5)` --> `(bs,256,10,10)`).
 * ReLU
 * a `ConvTranspose2d` layer with `kernel_size`=4, `stride`=2, and 128 outputs. `(bs,256,10,10)` --> `(bs,128,20,20)`.
 * ReLU
 * a `ConvTranspose2d` layer with `kernel_size`=4, `stride`=2, and 64 outputs. `(bs,128,20,20)` --> `(bs,64,40,40)`.
 * ReLU
 * a `ConvTranspose2d` layer with `kernel_size`=4, `stride`=2, and 32 outputs. `(bs,64,40,40)` --> `(bs,32,80,80)`.
 * ReLU
 * a `ConvTranspose2d` layer with `kernel_size`=4, `stride`=2, and `output_channels` outputs. `(bs,32,80,80)` --> `(bs,output_channels,160,160)`.
 * (**no `ReLU` here!**)
 * Sigmoid

The output of the decoder is the output of the sigmoid.

In [None]:
# STUDENT CODE BELOW

# Decoder Definition
class Decoder(torch.nn.Module):
    def __init__(self, latent_dim=128, out_channels=3):
        super(Decoder, self).__init__()
        # specify the layers.

    def forward(self, z):
        # implement the forward pass
        #
        # ...
        #
        return reconstruction

### 2c. The VAE

Tie the encoder and decoder together into a VAE class. Call it `VAE`. Its constructor should take two parameters: `input_channels` which defaults to 3, and `latent_dim` (the dimensionality of the latent space) which defaults to 128. `input_channels` and `latent_dim` are supplied to the constructor for `Encoder`; `latent_dim` and `input_channels` are supplied to the constructor for `Decoder`; thus, the output batch should have the same size as the input batch.

The `__init__()` method should instantiate an Encoder and a Decoder with proper parameters and store them in the `VAE` instance; _e.g., `self.Encoder = Encoder( ... )`

The `forward()` method will:
 * take a batch of input images `x` (shape `(bs,input_channels,160,160)` as provided by the `DataLoader` you'll create shortly),
 * encode the batch (getting a tuple (`mu`,`logvar`)),
 * _reparametrize the tuple into a batch of latent vectors_ `z` (see below),
 * and provide `z` to the decoder (getting a batch of output images which will reconstruct `x`).
 That reconstructed batch, and `mu` and `logvar` as supplied by the decoder, are the output of `forward()`.  You have to output `mu` and `logvar` because they're needed by the loss function.

#### Latent Space Sampling ([Reparameterization Trick](https://en.wikipedia.org/wiki/Reparameterization_trick))
![](https://upload.wikimedia.org/wikipedia/commons/1/11/Reparameterized_Variational_Autoencoder.png)
Implement the reparameterization trick to enable backpropagation through the stochastic sampling process. This involves sampling a latent vector $z$ from the approximate posterior distribution $q(z|x, \theta)$ using the formula $z = \mu + \sigma \epsilon$, where $\epsilon$ is a sample from a standard normal distribution $N(0, I)$ and $\sigma = \exp(\frac{1}{2} \log \sigma^2)$.

To explain in a more [Torchy](https://pytorch.org) way ... implement a method `reparametrize` that takes `mu` and `logvar` as parameters, and does this:
 * compute standard deviation $\sigma$ from log variance (use `torch.exp()`): `std` = $e^{\mbox{logvar/2}}$
 * compute a normal deviate with zero mean and standard deviation `std` (use `std * torch.randn_like(std)` - the second use of `std` sets the size and the first use scales the output)
 * add `mu` to the scaled output of `torch.randn_like()`. That's the output of the method.

In [None]:
# STUDENT CODE

# VAE Model Definition
class VAE(torch.nn.Module):
    def __init__(self, input_channels=3, latent_dim=128):
        super(VAE, self).__init__()
        # define the pieces

    def reparameterize(self, mu, logvar):
        # return a batch the same shape as logvar or mu
        # it contains uniform random deviates scaled by exp(logvar/2)
        # and offset by mu
        return  .... something ...

    def forward(self, x):
        # define the forward operation
        # ...
        # ...
        return reconstruction, mu, logvar

## Task 3. Loss Function


The total loss function to be minimized in a VAE is the negative [Evidence Lower Bound (ELBO)](https://en.wikipedia.org/wiki/Evidence_lower_bound):
\begin{align}
L(\theta, \phi) = -E_{q(z|x, \theta)}[\log p(x|z, \phi)] + D_{KL}(q(z|x, \theta) || p(z))
\end{align}

The first term corresponds to the reconstruction loss, and the second to the Kullback-Liebler divergence.

Implement the VAE loss function, which consists of two terms:
 * Reconstruction Loss: This term measures how well the decoder reconstructs the input image from the latent representation. Use `torch.functional.mse_loss()` with `reduction=sum` for this.

* [Kullback-Liebler Divergence](https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence): This term measures the difference between the approximate posterior distribution $q(z|x, \theta) = N(\mu, \sigma^2 I)$ and the prior distribution $p(z)$, which is typically a standard normal distribution $N(0, I)$. The KL divergence between two Gaussian distributions $N(\mu_1, \sigma_1^2 I)$ and $N(0, I)$ has a closed-form expression:
\begin{align}
D_{KL}(N(\mu_1, \sigma_1^2 I) || N(0, I)) = \frac{1}{2} \sum_{j=1}^{D_z} (1 + \log(\sigma_{1j}^2) - \mu_{1j}^2 - \sigma_{1j}^2),
\end{align}
where $D_z$ is the dimensionality of the latent space.

To clarify something about this loss that's not obvious in the first equation in this cell: the loss function is MSE **minus** the KL Divergence.

In practice, we generally create a hyperparameter that allows the contributions of MSE and KL Divergence to be traded off against one another. We'll call this hyperparameter `alpha`, and it will be a multiplier on the  KL Divergence.


Thus: Implement a function `loss_vae(xpred,x,mu,logvar, alpha)` that computes and returns loss values as MSE - `alpha` * KL Divergence.

Note: although PyTorch provides a function called `KLDivLoss()`, you shouldn't use it here. Implement the second equation above, using `torch.sum()`, and the `.pow()` and `.exp()` methods of Torch tensors.

In [None]:
#STUDENT CODE

# Loss Function
def vae_loss(reconstruction, x, mu, logvar, alpha):
    """compute VAE loss as a weighted sum of MSE and KL Divergence."""
    # Reconstruction loss (e.g., MSE)
    recon_loss = F.mse_loss(reconstruction, x)

    # KL divergence loss
    # (implement it)
    # ...

    return recon_loss - alpha *  ... something ...

## Task 4. Training

Implement training in three cells of code below.

## 4a. Function: one iteration of training

Define a function  `train_epoch(epoch,model,loader,optimizer,loss_fn,alpha)` that performs one epoch of training of the specified `model`, whose training `DataLoader` is `loader`, using the supplied `optimizer`, and the supplied `loss_fn` which requires the supplied parameter `alpha`.`epoch` is the epoch index. Make sure the net is in `train()` mode when you are doing the training. Ths function should return the training loss for the epoch.

In [None]:
# STUDENT CODE

def train_epoch(epoch,model,loader,optimizer,loss_fn,alpha):
    """train a single epoch.
    Note: not generic, because it assumes VAE output (prediction,mu,logvar)
    and calls loss_fn accordingly"""

    # implement the loop

    return ... the loss you calculated ...

## 4b. Training with specified hyperparameters
Define a function `train(model,optimizer,latent_dim,batch_size,lr,epochs,alpha)` that trains and saves a model as guided by the optimizer and other parameters.

In your implementation of the function before training commences, choose a random image from the `celeba_train_ds` `Dataset`. Use `torch.permute()` and `torch.unsqueeze()` to turn that image (original shape `(160,160,3)` into a 1-sample batch of shape `(1,3,160,160)`.  We'll call this the "test image".

You'll need to instantiate the model and then create an Adam optimizer for it, providing it with the model parameters and the learning rate.

Your function should **use the VALIDATION partition of `CelebA` for training** (it's smaller and will train the net way faster than the training partition). Build a `DataLoader` for that dataset using the specified batch_size and supply it to the `train_epoch()` function. After each epoch of training, print out the loss for that epoch. Also, at the end of the epoch, switch the net to `eval()` mode, and pass the test image through the network. Use `squeeze()` and `permute()` to render the reconstructed output side-by-side with the input test image so you can track improvement in reconstruction. Compute and print the reconstruction loss using the `vae_loss()` function.

At the end of the cell, use `torch.save()` to save the model parameters using a unique name ending in `.pth`. More info on loading and saving models is [here](https://pytorch.org/tutorials/beginner/saving_loading_models.html#saving-loading-model-for-inference).  You should keep track of the hyperparameter settings associated with each saved model.

Make sure you download the `.pth` file to your local computer if you want to save it, as it will be removed when your Colab session is shut down.

In [None]:
# STUDENT CODE

def train(model,latent_dim,batch_size,lr,optimizer,epochs,alpha):
    """run training with specified model, optimizer, and parameters .
    """
    celeba_train_dl = torch.utils.data.DataLoader(celeba_train_ds,batch_size=batch_size)
    celeba_val_dl = torch.utils.data.DataLoader(celeba_val_ds,batch_size=batch_size)

    # stage a randomly chosen sample image from the training partition on 'device'
    test_img_tmp = random.choice(celeba_train_ds)[0] # (3,SIZE,SIZE)
    test_img = torch.unsqueeze(test_img_tmp,0) # (1,3,SIZE,SIZE)
    test_img = test_img.to(device) # --> GPU if we have one
    test_img_perm = torch.permute(test_img_tmp,[1,2,0]) # (SIZE,SIZE,3) for display

    # show sample image
    print("I'll use this sample image while training.")
    _ = plt.imshow(test_img_perm)
    plt.show()

    for epoch in tqdm.trange(epochs):
        # ... inner part of training loop ...

    # save model
    # ...


# Task 5. Experiments


## 5a. _Ad hoc_ hyperparameter search

Consider this set of hyperparameter values:
 * `lr` (learning rate): 0.1, 0.01, 0.001
 * `alpha`: 0.1, 0.01, 0.001
 * `latent_dim`: 64, 128, 256

Perform a hyperparameter search by training the VAE net for 20 epochs over four different combinations of these hyperparameters, as specified by student in the list below.  Since 4x7 is 28, one combination will be done by two students. Use a batch size of 128 samples.

* Matt (0.1, 0.1, 64) (0.1, 0.001, 128) (0.01, 0.01, 256) (0.001, 0.01, 64)
* Will (0.1, 0.1, 128)
(0.1, 0.001, 256)
(0.01, 0.001, 64)
(0.001, 0.01, 128)
* Joe  (0.1, 0.1, 256)
 (0.01, 0.1, 64)
 (0.01, 0.001, 128)
(0.001, 0.01, 256)
* Samara(0.001, 0.001, 64)
 (0.1, 0.01, 64)
(0.01, 0.1, 128)
 (0.01, 0.001, 256)
* Ryan (0.1, 0.01, 128)
 (0.001, 0.1, 64)
 (0.01, 0.1, 256)
(0.001, 0.001, 128)
* Bobby (0.1, 0.01, 256)
 (0.01, 0.01, 64)
 (0.001, 0.1, 128)
(0.001, 0.001, 256)
* Brisny (0.1, 0.001, 64)
 (0.01, 0.01, 128)
(0.001, 0.1, 256)
 (0.001, 0.1, 64)

In the text cell below, describe the results of your four tests. Report, for each test:
 * the hyperparameter values used
 * the training loss
 * your opinion of the visual quality of the results.  Rank the four sample output images by subjective visual quality (it's fine if you use a different test image for each combination of hyperparameter values).

In [None]:
# STUDENT CODE GOES HERE



STUDENT RESPONSE GOES HERE

## 5b. Reconstruction from random latent vectors

In the code cell below
* Use `torch.load()` to load the best-performing VAE from your experiments above.
* Create a batch of 16 latent vectors $z$ from the prior distribution $p(z) = N(0, I)$.  Use [`torch.randn()`](https://pytorch.org/docs/stable/generated/torch.randn.html) with a suitable size specification.

* Pass the batch of latent vectors through the VAE Decoder stage to get 16 output images.
* Display the synthesized images -- ideally  in a 4x4 grid of images (use `plt.subfigure()` to set up the grid).
* Comment on the visual quality and diversity (gender, skin tone, etc.) of the generated images. The results of any VAE trained on CelebA will be biased toward Causasian skin. Why?


In [None]:
# STUDENT CODE

PATH = '/content/name_of_pytorch_save_file.pth'

# ... load model and put it in train mode

NUM_IMAGES = 16

# Sampling from latent space. We're not starting with an image, so
# we can't assess the loss numerically - but we can characterize quality
# visually
with torch.no_grad():
    # create random latent vectors from N(0,I)
    # then feed them to the decoder, getting a result
    z_batch = torch.randn(NUM_IMAGES, 64).to(device)
    generated_image_batch = vae.decoder(z_batch)

# outside the no_grad scope...
# move images to cpu
#
# set up a 4x4 grid of subplots
# loop over the generated image batch
#   place that image in the proper cell in the grid of subplots


STUDENT COMMENTARY RESPONSE HERE