### Importing the libraries

In [1]:
import tensorflow as tf

import os
import time
import numpy as np 
import glob
import matplotlib.pyplot as plt
import PIL
import imageio

from IPython import display

### TensorFlow and Python version

In [2]:
print(str(tf.__version__))

2.1.0


In [3]:
import platform
print(platform.python_version())

3.7.6


### TensorFlow 2 Eager execution

Recall that in TensortFlow 2 eager execution is enabled by default. With eager execution operations are evaluated immediately without building graphs. Operations return concrete values instead of constructing a computational graph to run later.

In [5]:
tf.executing_eagerly()

True

### Loading the MNIST dataset

First we load the MNIST dataset directly from `tf.keras.datasets`.  We won't need the labels, thus we just keep the training images and test images.

By printing the shape of `train_images` and `test_images` we see that we have 60000 training images and 10000 test images. Each MNIST picture consists of 28x28 pixels. 

In [6]:
(train_images, _), (test_images, _) = tf.keras.datasets.mnist.load_data()

print()
print("Type: {}".format(type(train_images)))
print("train_images shape: {}".format(train_images.shape))
print("test_images shape: {}".format(test_images.shape))


Type: <class 'numpy.ndarray'>
train_images shape: (60000, 28, 28)
test_images shape: (10000, 28, 28)


### Reshaping and normalizing the dataset

In [7]:
# reshaping dataset
train_images = train_images.reshape(train_images.shape[0],28,28,1).astype("float32")
test_images = test_images.reshape(test_images.shape[0],28,28,1).astype("float32")

# normalize dataset: each MNIST image consists of 28x28 integers between 0-255
train_images /= 255.
test_images /= 255.

# binarization: pixel values >= 0.5 become 1 | pixel values < 0.5 become 0
train_images[train_images >= .5] = 1.
train_images[train_images < .5] = 0.

test_images[test_images >= .5] = 1.
test_images[test_images < .5] = 0.


### Shuffle dataset and create batches

**(Stochastic) Gradient Descent** works properly when the instances in the training set are **i.i.d**. An easy way to ensure this, is to shuffle the training data. We can do so by first creating a `TensorSliceDataset` object of `train_images` and `test_images` respectively and then applying the`shuffle()` method on them. 

We will also split our training and test data into **batches** by applying the `batch()` method.

For the sake of reproducibility we set `seed=42` in the method `shuffle()`.

In [8]:
# initialize varibles for shuffling dataset and creating batches
TRAIN_BUF = 60000
BATCH_SIZE = 100
TEST_BUF = 10000

# shuffle dataset and creat batches
train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(TRAIN_BUF,seed=42).batch(BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices(test_images).shuffle(TEST_BUF,seed=42).batch(BATCH_SIZE)

The last code cell shuffled our dataset and split it into batches of size `BATCH_SIZE`. When we iterate through **the first 3 items** in `train_dataset`, we see the shape of our batches is indeed what we wanted them to be.

In [17]:
for item in iter(train_dataset.take(3)):
    print(item.shape)

(100, 28, 28, 1)
(100, 28, 28, 1)
(100, 28, 28, 1)


### How Variational Autoencoders work in a nutshell

### Network Architecture

### Refresher on Transposed Convolution
In case you are not familiar with **Transposed Convolution**, I recommend looking at these [visualizations](https://github.com/vdumoulin/conv_arithmetic) on github.

In [45]:
# define convolutional variational autoencoder model class
class CVAE(tf.keras.Model):
    def __init__(self,latent_dim):
        super(CVAE,self).__init__()
        self.latent_dim = latent_dim
        self.inference_net = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(28,28,1)),
                tf.keras.layers.Conv2D(
                    filters=32, kernel_size=3, strides=(2,2), activation='relu'),
                tf.keras.layers.Conv2D(
                    filters=64, kernel_size=3, strides=(2,2), activation='relu'),
                tf.keras.layers.Flatten(),
                # no activation since mean and log_var can be either pos. or neg.
                tf.keras.layers.Dense(latent_dim + latent_dim)

            ]
        )
        
        self.generative_net = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
                tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),
                tf.keras.layers.Reshape(target_shape=(7,7,32)),
                tf.keras.layers.Conv2DTranspose(
                    filters=64,
                    kernel_size=3,
                    strides=(2,2),
                    padding="SAME",
                    activation='relu'),
                tf.keras.layers.Conv2DTranspose(
                    filters=32,
                    kernel_size=3,
                    strides=(2,2),
                    padding="SAME",
                    activation='relu'),
                # no activation
                tf.keras.layers.Conv2DTranspose(
                    filters=1, kernel_size=3, strides=(1,1), padding="SAME")

            ]
        )
    
    # here only decorate sample method with @tf.functionthe 
    @tf.function
    def sample(self, eps=None):
        if eps is None:
            eps = tf.random.normal(shape=(BATCH_SIZE, self.latent_dim))
        return self.decode(eps, apply_sigmoid=True)
    
    #methods below will be decorated with @tf.function in trainining loop
    def encode(self, x):
        mean, logvar =tf.split(self.inference_net(x), num_or_size_splits=2, axis=1)
        return mean, logvar
    
    def reparametrize(self, mean, logvar):
        eps = tf.random.normal(shape=mean.shape)
        return eps * tf.exp(logvar *.5) + mean
    
    def decode(self, z, apply_sigmoid=False):
        logits = self.generative_net(z)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs
        
        return logits

### The Loss Function

Variational Autoencoders are trained to maximize the evidence lower bound **(ELBO)** which is a lower bound to the marginal log likelihood, i.e. 

$\log p(x) \geq ELBO = \mathbb{E}_{q(z|x)}\bigg[\frac{p(x,z)}{q(z|x)}\bigg] = \mathbb{E}_{q(z|x)} \bigg[ \log p(x|z) + \log p(z) - \log q(z|x) \bigg]$.

We have to estimate the expectation above and will do this with a simple Monte Carlo estimate:

$\log p(x|z) + \log p(z) - \log q(z|x)$

where $z$ is sampled from $q(z|x)$.

**Note:**

The term 

$\mathbb{E}_{q(z|x)} \bigg[ \log p(z) - \log q(z|x) \bigg]$

is the **KL-Divergence** of two gaussians and has an analytic solution that we could use but for simplicity we will use a simple Monte Carlo estimate.

In [50]:
# use adam optimizer
optimizer = tf.keras.optimizers.Adam(1e-4)

# computes log of normal density
def log_normal_pdf(sample, mean, logvar, raxis=1):
    log2pi = tf.math.log(2. * np.pi)
    return tf.reduce_sum(
        -.5 * ((sample -mean)**2. * tf.exp(-logvar) + logvar + log2pi), axis=raxis
    )

# computes loss of model for batch x
@tf.function
def compute_loss(model, x):
    mean, logvar = model.encode(x)
    z = model.reparametrize(mean, logvar)
    x_logit = model.decode(z)
    
    cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
    logpx_z = -tf.reduce_sum(cross_ent, axis=[1,2,3])
    logpz = log_normal_pdf(z, 0., 0.)
    logqz_x = log_normal_pdf(z, mean, logvar)
    return -tf.reduce_mean(logpx_z + logpz - logqz_x)

# computes and applies gradients of loss with resoect to model weights
@tf.function
def compute_apply_gradients(model, x, optimizer):
    with tf.GradientTape() as tape:
        loss = compute_loss(model, x)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

### Trainining
* We will train for **100 epochs**
* 

In [51]:
# hyperparameters of model and training
EPOCHS = 100
latent_dim = 50

# instantiate model
model = CVAE(latent_dim)

### Generating Images
Our model can generate **NEW IMAGES**. We would like to see what kind of images our model would generate after every epoch. We do this by:
* specifying the **numbers of examples** we want to generate in total at each epoch (in `num_examples_to_generate`)
* **sampling the latent vectors** form unit gaussian prior $p(z)$ (reference to these samples is `random_vector_for_generation`)
* after a certain epoch **feeding the generator** of the model each $z$ in `random_vector_for_generation`  and turn it into logits of $p(x|z)$ which is a Bernoulli Distribution on the pixels of an image
* **plotting the resulting Bernoulli distribution** $p(x|z)$ for each latent vecotr $z$

**Recall:** The Bernoulli distribution $=p(x|z)$ assigns each pixel $i$ a probabilty $\theta_i$ (the probabilty that the pixel is white). The expected image is  $\mathbb{E}_{p(x|z)}[ x ] = \theta$. We can thus see the expected image by plotting the generated probabilities $\theta$. The generator $p(x|z)$ outputs these probabilities $\theta$.

In [52]:
# initialize random latent vector for image generation (after training)
num_examples_to_generate = 16
random_vector_for_generation = tf.random.normal(
    shape=[num_examples_to_generate,latent_dim])

# generates and saves
def generate_and_save_images(model, epoch, latent_vectors):
    predictions = model.sample(latent_vectors)
    fig = plt.figure(figsize=(4,4))
    
    for i in range(predictions.shape[0]):
        plt.subplot(4,4, i+1)
        plt.imshow(predictions[i, :, :, 0], cmap="gray")
        plt.axis("off")
    
    plt.savefig("image_at_epoch_{:04d}.png".format(epoch))
    plt.show()

In [None]:
generate_and_save_images(model, 0, random_vector_for_generation)

for epoch in range(1,EPOCHS +1):
    #perform one training epoch
    start_time = time.time()
    for train_x in train_dataset:
        compute_apply_gradients(model, train_x, optimizer)
    end_time = time.time()
    
    # print metrics and generate and save images
    if epoch % 1 == 0:
        loss = tf.keras.metrics.Mean()
        for test_x in test_dataset:
            loss(compute_loss(model, test_x))
        elbo = -loss.result()
        display.clear_output(wait=False)
        print("Epoch: {}, Test set ELBO: {}, "
              "Time passed for current epoch: {}".format(epoch, elbo, end_time-start_time)
             )
        
        generate_and_save_images(model, epoch, random_vector_for_generation)

Epoch: 3, Test set ELBO: -111.88827514648438, time passed for current epoch 31.5055570602417
