# Variational Autoencoder

In this assignment, you will build Variational Autoencoder, train it on the MNIST dataset, and play with its architecture and hyperparameters.

### Installation

You will need ```numpy```, ```tensorflow```, ```keras```, ```matplotlib``` libraries for this assignment

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pickle
from pathlib import Path

In [None]:
import tensorflow as tf
import keras
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Lambda
from keras.layers import InputLayer
from keras.layers import concatenate
from keras.models import Model
from keras.models import Sequential
from keras.models import load_model
from keras import backend as K
from keras import metrics
from keras.datasets import mnist
from keras.utils import np_utils

In [None]:
from grader import Grader

### Grading
We will create a grader instance below and use it to collect your answers. Note that these outputs will be stored locally inside grader and will be uploaded to the platform only after running submit function in the last part of this assignment. If you want to make a partial submission, you can run that cell anytime you want.

In [None]:
grader = Grader()

### Variational Autoencoder

Recall that Variational Autoencoder is a probabilistic model of data based on a continious mixture of distributions. In the lecture we covered the mixture of gaussians case, but here we will apply VAE to binary MNIST images (each pixel is either black or white). To better model binary data we will use a continuous mixture of binomial distributions: $p(x \mid w) = \int p(x \mid t, w) p(t) dt$, where the prior distribution on the latent code $t$ is standard normal $p(t) = \mathcal{N}(0, I)$, but probability that $(i, j)$-th pixel is black equals to $(i, j)$-th output of the decoder neural detwork: $p(x_{i, j} \mid t, w) = \text{decoder}(t, w)_{i, j}$.

To train this model we would like to maximize marginal log-likelihood of our dataset $\max_w \log p(X \mid w)$, but it's very hard to do computationally, so instead we maximize the Variational Lower Bound w.r.t. both the original parameters $w$ and variational distribution $q$ which we define as encoder neural network with parameters $\phi$ which takes input image $x$ and outputs parameters of the gaussian distribution $q(t \mid x, \phi)$: $\log p(X \mid w) \geq \mathcal{L}(w, \phi) \rightarrow \max_{w, \phi}$.

So overall our model looks as follows: encoder takes an image $x$, produces a distribution over latent codes $q(t \mid x)$ which should approximate the posterior distribution $p(t \mid x)$ (at least after training), samples a point from this distribution $\widehat{t} \sim q(t \mid x, \phi)$, and finally feeds it into a decoder that outputs a distribution over images.

![](VAE.png)

In the lecture, we also discussed that variational lower bound has an expected value inside which we are going to approximate with sampling. But it is not trivial since we need to differentiate through this approximation. However, we learned about _reparametrization trick_ which suggests instead of sampling from distribution $\widehat{t} \sim q(t \mid x, \phi)$ sample from a distribution which doesn't depend on any parameters, e.g. standard normal, and then deterministically transform this sample to the desired one: $\varepsilon \sim \mathcal{N}(0, I); ~~\widehat{t} = m(x, \phi) + \varepsilon \sigma(x, \phi)$. This way we don't have to worry about our stochastic gradient being biased and can straightforwardly differentiate our loss w.r.t. all the parameters while treating the current sample $\varepsilon$ as constant.




### Variational Lower Bound

**Task 1** Derive and implement Variational Lower Bound for the continuous mixture of Binomial distributions.

**Note** that to pass the test, your code should work with any mini-batch size.

**Also note** that although we need a stochastic estimate of VLB: 
$$\text{VLB} = \sum_{i=1}^N \text{VLB}_i \approx \frac{N}{M}\sum_{i_s}^M \text{VLB}_{i_s}$$
where $N$ is the dataset size, $\text{VLB}_i$ is the term of VLB corresponding to the $i$-th object, and $M$ is the mini-batch size; in the function below you need to return just average across the mini-batch $\frac{1}{M}\sum_{i_s}^M \text{VLB}_{i_s}$. People usually optimize this unscaled version of VLB since it doesn't depend on the dataset set size - you can write VLB function once and use it for different datasets - and it doesn't affect optimization (it does affect the learning rate though). The correct value for this unscaled VLB should be around $100 - 170$.

In [None]:
EPS = K.epsilon()

In [None]:
def vlb_binomial(x, x_decoded_mean, t_mean, t_log_var):
    """
    Returns the value of Variational Lower Bound
    
    Parameters
    ----------
    x : tf.Tensor, shape (batch_size, number_of_pixels)
        The matrix with one image per row with zeros and ones
    x_decoded_mean : tf.Tensor, shape (batch_size, number_of_pixels) 
        Mean of the distribution p(x | t), real numbers from 0 to 1
    t_mean : tf.Tensor, shape (batch_size, LATENT_DIM)
        Mean vector of the (normal) distribution q(t | x)
    t_log_var : tf.Tensor (batch_size, LATENT_DIM)
        Logarithm of the variance vector of the (normal) distribution q(t | x)
    
    Returns
    -------
    tf.Tensor, shape (1,)
        A tf.Tensor with one element (averaged across the batch), VLB    
    """
    
    # tf.reduce_sum(a, 1) <- sum over the first index (the columns). Results in a 1-d vector
    
    # Calculation of the Kullback Leibler "distance"
    # NOTE: From slide 5_gradients_I we see an expression for KL including -log(sigma)
    #       We are given t_log_var = log(sigma**2) = log(sigma**2) = 2 log(sigma)
    #       so 
    #       log(sigma) = t_log_var/2 
    sigma_squared = tf.exp(t_log_var)
    kl = 0.5*tf.reduce_sum((-t_log_var + sigma_squared + tf.square(t_mean) - 1), axis=1)
    
    # Calculate of the reconstruction loss
    # We are here using the binary cross entropy (pushing x as close to the reconstruction as possible)
    # NOTE: Add EPS in order not to take logrithm of 0
    reconstruction_loss = - tf.reduce_sum(
                                x * tf.log(x_decoded_mean + EPS)
                                + (1 - x) * tf.log(1 - x_decoded_mean + EPS),
                            axis=1)

    # tf.reduce_mean(a) <- taking the mean, and thus casting to a number
    return tf.reduce_mean(reconstruction_loss + kl)

In [None]:
# Start tf session so we can run code.
sess = tf.InteractiveSession()
# Connect keras to the created session.
K.set_session(sess)

In [None]:
grader.submit_vlb(sess, vlb_binomial)

## Encoder / decoder definition

**Task 2** Read the code below that defines encoder and decoder networks and implement sampling with reparametrization trick in the provided space.

In [None]:
batch_size = 100
ORIGINAL_DIM = 784 # Number of pixels in MNIST images.
LATENT_DIM = 16 # d, dimensionality of the latent code t.
INTERMEDIATE_DIM = 512 # Size of the hidden layer.
epochs = 20

x = Input(batch_shape=(None, ORIGINAL_DIM))

In [None]:
def create_encoder(input_dim):
    """
    Creates the encoder
    
    Parameters
    ----------
    input_dim : int
        Dimensionality of vector (i.e. the number of pixels)
    
    Returns
    -------
    encoder : Sequential
        The encoder model
    """
    
    # Encoder network.
    # We instantiate these layers separately so as to reuse them later
    encoder = Sequential(name='encoder')
    encoder.add(InputLayer([input_dim]))
    encoder.add(Dense(INTERMEDIATE_DIM, activation='relu'))
    encoder.add(Dense(INTERMEDIATE_DIM//2, activation='relu'))
    encoder.add(Dense(2 * LATENT_DIM))
    
    return encoder

In [None]:
encoder = create_encoder(ORIGINAL_DIM)

In [None]:
# NOTE: The output of the encoder will be the mean and log variance
#       These function extract the numbers as a Lambda layer
get_t_mean = Lambda(lambda h : h[:, :LATENT_DIM])
get_t_log_var = Lambda(lambda h : h[:, LATENT_DIM:])

# Here the definition of extraction takes place as a layer
h = encoder(x)
t_mean = get_t_mean(h)
t_log_var = get_t_log_var(h)

In [None]:
# Sampling from the distribution 
#     q(t | x) = N(t_mean, exp(t_log_var))
# with reparametrization trick.
def sampling(args):
    """
    Returns sample from a distribution N(t_mean, diag(t_log_var))
    The sample is computed with the reparametrization trick.
    
    Notes
    -----
    It's easiest to have a variational number of inputs due to 
    how the function is going to be called
    
    Parameters
    ----------
    t_mean : tf.Tensor, shape (batch_size, LATENT_DIM)
        Mean of the desired distribution
    t_log_var :tf.Tensor, shape (batch_size, LATENT_DIM)
        Logarithm of the variance vector of the desired distribution
    
    Returns
    -------
    t_hat : tf.Tensor, shape (batch_size, LATENT_DIM)
        The samples
    """
    
    t_mean, t_log_var = args
    
    eps = tf.random_normal(shape=tf.shape(t_mean), 
                           mean=0.0, 
                           stddev=1.0)
    
    # NOTE: Using log(sigma) = t_log_var/2 
    t_hat = eps*tf.exp(t_log_var*0.5) + t_mean

    return t_hat

In [None]:
t = Lambda(sampling)([t_mean, t_log_var])

In [None]:
def create_decoder(input_dim):
    """
    Creates the decoder
    
    Parameters
    ----------
    input_dim : int
        The dimensionality of the latent variable vector
        
    Returns
    -------
    decoder : Sequential
        The decoder model
    """
    
    # Decoder network
    # We instantiate these layers separately so as to reuse them later
    decoder = Sequential(name='decoder')
    decoder.add(InputLayer([input_dim]))
    decoder.add(Dense(INTERMEDIATE_DIM//2, activation='relu'))
    decoder.add(Dense(INTERMEDIATE_DIM, activation='relu'))
    decoder.add(Dense(ORIGINAL_DIM, activation='sigmoid'))
    
    return decoder

In [None]:
decoder = create_decoder(LATENT_DIM)
x_decoded_mean = decoder(t)

In [None]:
grader.submit_samples(sess, sampling)

## Training the model

**Task 3** Run the cells below to train the model with the default settings. Modify the parameters to get better results. Especially pay attention the encoder / encoder architectures (e.g. using more layers, maybe making them convolutional), learning rate, and the number of epochs.

In [None]:
loss = vlb_binomial(x, x_decoded_mean, t_mean, t_log_var)

In [None]:
vae = Model(x, x_decoded_mean)
# Keras will provide input (x) and output (x_decoded_mean) to the function that
# should construct loss, but since our function also depends on other
# things (e.g. t_means), it is easier to build the loss in advance and pass
# a function that always returns it.
vae.compile(optimizer=keras.optimizers.RMSprop(lr=0.001), loss=lambda x, y: loss)

#### Load and prepare the data

In [None]:
# train the VAE on MNIST digits
(x_train, y_train), (x_test, y_test) = mnist.load_data()
# One hot encoding.
y_train = np_utils.to_categorical(y_train)
y_test = np_utils.to_categorical(y_test)

x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))

#### Train the model

In [None]:
# NOTE: We here only save only weights as saving the model is difficult due to the
#       'An operation has `None` for gradient' error
#       I.e. there is no easy way to add the loss through the custom_objects argument in `load_model`

weights_name = 'vae_weights.h5'
history_name = 'hist.pkl'
if not Path(weights_name).is_file() or not Path(history_name).is_file():
    # NOTE: x is Both train and target
    hist = vae.fit(x=x_train, y=x_train,
                   shuffle=True,
                   epochs=epochs,
                   batch_size=batch_size,
                   validation_data=(x_test, x_test),
                   verbose=1)
    vae.save_weights(weights_name)
    print(f'Saved weights to {weights_name}')
    with Path(history_name).open('wb') as f:
        pickle.dump(hist.history, f, pickle.HIGHEST_PROTOCOL)
        print(f'Saved history to {history_name}')
else:
    vae.load_weights(weights_name)
    print(f'Loaded weights from {weights_name}')
    with Path(history_name).open('rb') as f:
        history = pickle.load(f)
    hist = keras.callbacks.History()
    setattr(hist, 'history', history)
    print(f'Loaded history from {history_name}')

### Visualize reconstructions for train and validation data
In the picture below you can see the reconstruction ability of your network on training and validation data. In each of the two images, the left column is MNIST images and the right column is the corresponding image after passing through autoencoder (or more precisely the mean of the binomial distribution over the output images).

Note that getting the best possible reconstruction is not the point of VAE, the KL term of the objective specifically hurts the reconstruction performance. But the reconstruction should be anyway reasonable and they provide a visual debugging tool.

In [None]:
fig = plt.figure(figsize=(10, 10))
for fid_idx, (data, title) in enumerate(
            zip([x_train, x_test], ['Train', 'Validation'])):
    n = 10  # figure with 10 x 2 digits
    digit_size = 28
    figure = np.zeros((digit_size * n, digit_size * 2))
    decoded = sess.run(x_decoded_mean, feed_dict={x: data[:batch_size, :]})
    for i in range(10):
        figure[i * digit_size: (i + 1) * digit_size,
               :digit_size] = data[i, :].reshape(digit_size, digit_size)
        figure[i * digit_size: (i + 1) * digit_size,
               digit_size:] = decoded[i, :].reshape(digit_size, digit_size)
    ax = fig.add_subplot(1, 2, fid_idx + 1)
    ax.imshow(figure, cmap='Greys_r')
    ax.set_title(title)
    ax.axis('off')
plt.show()

### Sending the results of your best model as Task 3 submission

In [None]:
grader.submit_best_val_loss(hist)

## Hallucinating new data
**Task 4** Write code to generate new samples of images from your trained VAE. To do that you have to sample from the prior distribution $p(t)$ and then from the likelihood $p(x \mid t)$.

**Note** that the sampling you've written in Task 2 was for the variational distribution $q(t \mid x)$, while here you need to sample from the prior.

To better model binary data we will use a continuous mixture of binomial distributions: $p(x \mid w) = \int p(x \mid t, w) p(t) dt$, where the prior distribution on the latent code $t$ is standard normal $p(t) = \mathcal{N}(0, I)$, but probability that $(i, j)$-th pixel is black equals to $(i, j)$-th output of the decoder neural detwork: $p(x_{i, j} \mid t, w) = \text{decoder}(t, w)_{i, j}$.

In [None]:
n_samples = 10  # To pass automatic grading please use at least 2 samples here.

# NOTE: Here we are sampling from the prior
#       Our prior is the standard normal distribution
t_from_prior = tf.random_normal(shape=(n_samples, LATENT_DIM), 
                                mean=0.0, 
                                stddev=1.0)

# sampled_im_mean is a tf.Tensor of size n_samples x 784 with n_samples random
# images sampled from the vae model.
# NOTE: We have that the likelihood p(x|t, w) = decoder(t, w),
#       where the w are already baked into the decoder after trainig
sampled_im_mean = decoder(t_from_prior)

In [None]:
# NOTE: We do not need any feed dict here, as we are not encoding
sampled_im_mean_np = sess.run(sampled_im_mean)
# Show the sampled images.
plt.figure()
for i in range(n_samples):
    ax = plt.subplot(n_samples // 5 + 1, 5, i + 1)
    plt.imshow(sampled_im_mean_np[i, :].reshape(28, 28), cmap='gray')
    ax.axis('off')
plt.show()

In [None]:
grader.submit_hallucinating(sess, sampled_im_mean)

# Conditional VAE

In the final task, you will modify your code to obtain Conditional Variational Autoencoder [1]. The idea is very simple: to be able to control the samples you generate, we condition all the distributions on some additional information. In our case, this additional information will be the class label (the digit on the image, from 0 to 9).

![](CVAE.png)

So now both the likelihood and the variational distributions are conditioned on the class label: $p(x \mid t, \text{label}, w)$, $q(t \mid x, \text{label}, \phi)$.

The only thing you have to change in your code is to concatenate input image $x$ with (one-hot) label of this image to pass into the encoder $q$ and to concatenate latent code $t$ with the same label to pass into the decoder $p$. Note that it's slightly harder to do with convolutional encoder / decoder model.

[1] Sohn, Kihyuk, Honglak Lee, and Xinchen Yan. “Learning Structured Output Representation using Deep Conditional Generative Models.” Advances in Neural Information Processing Systems. 2015.

## Final task

**Task 5.1** Implement CVAE model. You may reuse ```create_encoder``` and ```create_decoder``` modules defined previously (now you can see why they accept the input size as an argument ;) ). You may also need `concatenate` Keras layer to concat labels with input data and latent code.

To finish this task, you should go to `Conditionally hallucinate data` section and find there Task 5.2

In [None]:
# One-hot labels placeholder
n_labels = 10
x = Input(batch_shape=(batch_size, ORIGINAL_DIM))
label = Input(batch_shape=(batch_size, n_labels))

# NOTE: We are adding one hot encoded labels
cond_encoder = create_encoder(ORIGINAL_DIM + n_labels)
cond_decoder = create_decoder(LATENT_DIM + n_labels)

# Creating the encoder h
cond_h = cond_encoder(concatenate([x, label]))

# Mean of the latent code (without label) for cvae model
cond_t_mean = get_t_mean(cond_h) 
# Logarithm of the variance of the latent code (without label) for cvae model
cond_t_log_var = get_t_log_var(cond_h)

# Sampling from the condition
cond_t = Lambda(sampling)([cond_t_mean, cond_t_log_var])

# Final output of the cvae model
cond_x_decoded_mean = cond_decoder(concatenate([cond_t, label]))

## Define the loss and the model

In [None]:
conditional_loss = vlb_binomial(x, cond_x_decoded_mean, cond_t_mean, cond_t_log_var)
cvae = Model([x, label], cond_x_decoded_mean)
cvae.compile(optimizer=keras.optimizers.RMSprop(lr=0.001), loss=lambda x, y: conditional_loss)

### Train the model

In [None]:
weights_name = 'cvae_weights.h5'
history_name = 'c_hist.pkl'
if not Path(weights_name).is_file() or not Path(history_name).is_file():
    # NOTE: x is Both train and target, and we add the label to the input
    hist = cvae.fit(x=[x_train, y_train],
                    y=x_train,
                    shuffle=True,
                    epochs=epochs,
                    batch_size=batch_size,
                    validation_data=([x_test, y_test], x_test),
                    verbose=1)
    cvae.save_weights(weights_name)
    print(f'Saved weights to {weights_name}')
    with Path(history_name).open('wb') as f:
        pickle.dump(hist.history, f, pickle.HIGHEST_PROTOCOL)
        print(f'Saved history to {history_name}')
else:
    cvae.load_weights(weights_name)
    print(f'Loaded weights from {weights_name}')
    with Path(history_name).open('rb') as f:
        history = pickle.load(f)
    hist = keras.callbacks.History()
    setattr(hist, 'history', history)
    print(f'Loaded history from {history_name}')

### Visualize reconstructions for train and validation data

In [None]:
fig = plt.figure(figsize=(10, 10))
for fid_idx, (x_data, y_data, title) in enumerate(
            zip([x_train, x_test], [y_train, y_test], ['Train', 'Validation'])):
    n = 10  # figure with 10 x 2 digits
    digit_size = 28
    figure = np.zeros((digit_size * n, digit_size * 2))
    decoded = sess.run(cond_x_decoded_mean,
                       feed_dict={x: x_data[:batch_size, :],
                                  label: y_data[:batch_size, :]})
    for i in range(10):
        figure[i * digit_size: (i + 1) * digit_size,
               :digit_size] = x_data[i, :].reshape(digit_size, digit_size)
        figure[i * digit_size: (i + 1) * digit_size,
               digit_size:] = decoded[i, :].reshape(digit_size, digit_size)
    ax = fig.add_subplot(1, 2, fid_idx + 1)
    ax.imshow(figure, cmap='Greys_r')
    ax.set_title(title)
    ax.axis('off')
plt.show()

## Conditionally hallucinate data
**Task 5.2** Implement the conditional sampling from the distribution $p(x \mid t, \text{label})$ by firstly sampling from the prior $p(t)$ and then sampling from the likelihood $p(x \mid t, \text{label})$.

In [None]:
n_samples = 5
n_labels = 10

# Prepare one hot labels of form
#   0 0 0 0 0 1 1 1 1 1 2 2 2 2 2 ...
# to sample five zeros, five ones, etc
curr_labels = np.eye(n_labels)
curr_labels = np.repeat(curr_labels, n_samples, axis=0)  # Its shape is 50 x 10.

# NOTE: Here we are sampling from the prior
#       Our prior is the standard normal distribution
t_from_prior = tf.random_normal(shape=(n_samples*n_labels, LATENT_DIM), 
                                mean=0.0, 
                                stddev=1.0)

# cond_sampled_im_mean is a tf.Tensor of size 50 x 784 with 5 random zeros,
# then 5 random ones, etc sampled from the cvae model.
cond_sampled_im_mean = cond_decoder(concatenate([t_from_prior,
                                                 tf.convert_to_tensor(curr_labels, dtype=tf.float32)]))

In [None]:
cond_sampled_im_mean_np = sess.run(cond_sampled_im_mean)
# Show the sampled images.
plt.figure(figsize=(10, 10))
global_idx = 0
for digit in range(10):
    for _ in range(5):
        ax = plt.subplot(10, 5, global_idx + 1)
        plt.imshow(cond_sampled_im_mean_np[global_idx, :].reshape(28, 28), cmap='gray')
        ax.axis('off')
        global_idx += 1
plt.show()

In [None]:
# Submit Task 5 (both 5.1 and 5.2).
grader.submit_conditional_hallucinating(sess, cond_sampled_im_mean)

# Authorization & Submission
To submit assignment parts to Cousera platform, please, enter your e-mail and token into variables below. You can generate the token on this programming assignment page. <b>Note:</b> Token expires 30 minutes after generation.

In [None]:
STUDENT_EMAIL =  ''
STUDENT_TOKEN =  ''
grader.status()

In [None]:
grader.submit(STUDENT_EMAIL, STUDENT_TOKEN)

# Playtime (UNGRADED)
Once you passed all the tests, modify the code below to work with the mixture of Gaussian distributions (in contrast to the mixture of Binomial distributions), and redo the experiments with CIFAR-10 dataset, which are much full color natural images.

In [None]:
from keras.datasets import cifar10
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

In [None]:
plt.imshow(x_train[7, :])
plt.show()