# Variational Autoencoders

In [None]:
# Utility libraries
import os
from datetime import datetime

# Core libraries
import numpy as np

from keras.datasets import mnist
from keras.models import Model  # Model used by Functional API
from keras.layers import Dense, Lambda, Input  # Lambda and Input used by Functional API
from keras import losses
from keras import optimizers

# For variational auto-encoder
from keras import backend as K

import matplotlib.pyplot as plt # For plotting purposes

from keras.callbacks import TensorBoard


In [None]:
# Load the TensorBoard notebook extension (for visualization purposes)
%load_ext tensorboard

In [None]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))

## Let's define a Variational Autoencoder

![VAE](https://miro.medium.com/v2/resize:fit:1400/1*Qd1xKV9o-AnWtfIDhhNdFg@2x.png)

Now, let's define a function that provides the epsilon of the picture and multiplies it with Sigma (the variance associated with the latent representation). We are not interested in the math behind it here.

In [None]:
# Reparameterization trick (no need to know the details)
def sampling(args):
    """
    Reparameterization trick by sampling from an isotropic unit Gaussian.
    
    Arguments
        args (tensor): mean and log of variance of Q(z|X)
    Returns
        z (tensor): sampled latent vector
    """
    z_mean, z_log_var = args
    batch = K.shape(z_mean)[0]  # get dimension of mini-batch
    dim = K.int_shape(z_mean)[1]  # get dimension of each z

    # by default, random_normal has mean = 0 and std = 1.0
    epsilon = K.random_normal(shape=(batch, dim))

    return z_mean + K.exp(0.5 * z_log_var) * epsilon

### Definition of VAE and its hyperparameters

__Hyperparameters__

In [None]:
intermediate_dim = 256  # Hidden units of the MLP
batch_size = 128
latent_dim = 2  # Dimension of the compressed input encoding our digit
epochs = 20
learning_rate = 1e-3
input_shape = (784,)

__Encoder__

In [None]:
inputs = Input(shape=input_shape, name='encoder_input')  #  The image is the input of our Encoder 
h = Dense(intermediate_dim, activation='relu')(inputs)  # A Dense layer compresses the input
# First Output of the encoder
z_mean = Dense(latent_dim, activation='linear', name='z_mean')(h)  # Further compression into latent space for mean

# Second Output of the encoder
z_log_var = Dense(latent_dim, name='z_log_var')(h)  # Further compression into latent space for log_var

# Third Output of the encoder
# note that "output_shape" isn't necessary with the TensorFlow backend
z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])

# Instantiate the Encoder as a Model, by specifying its inputs and outputs
encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')
encoder.summary()

__Decoder__

In [None]:
latent_inputs = Input(shape=(latent_dim,), name='z')
h = Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = Dense(input_shape[0], activation='sigmoid')(h)

# Instantiate the Decoder as a Model, by specifying its inputs and outputs
decoder = Model(latent_inputs, outputs, name='decoder')
decoder.summary()

__VAE__

In [None]:
# Instantiate VAE model 
outputs = decoder(encoder(inputs)[2])

vae = Model(inputs, outputs, name='vae_mlp')
vae.summary()

### Custom Loss definition

In [None]:
def add_vae_loss(vae, encoder_inputs, decoder_outputs, z_mean, z_log_var, original_dim):
    
    reconstruction_loss = losses.mse(inputs, outputs)  # Start with the Mean Squared Error
    reconstruction_loss *= original_dim  # we will average later! This is now the "Squared Error"
    
    # Compute the KL divergence (no need to know the math details here), which is our additional regularization term
    kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
    kl_loss = K.sum(kl_loss, axis=-1)
    kl_loss *= -0.5
    
    # Recompute the mean (over the examples) of the reconstruction error + regularization term
    vae_loss = K.mean(reconstruction_loss + kl_loss)
    
    # Add the loss to the model before compiling it
    vae.add_loss(vae_loss)    

## Prepare the model for training

In [None]:
# Let's prepare the VAE loss, which is a reconstruction error + a regularization term
add_vae_loss(vae, inputs, outputs, z_mean, z_log_var, original_dim=input_shape[0])
vae.compile(optimizer=optimizers.Adam(lr=learning_rate))  # Optimizer

In [None]:
# Clear any logs from previous runs
!rm -rf ./logs_vae

# Set up a log folder in which we will store the output to be displayed on TensorBoard
logdir = "logs_vae/fit/" + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(log_dir=logdir)

# Chekpoint path for storing our model
checkpoint_path = "checkpoints/vae/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

# Train the Model!
# Note: fit has also the chance to specify a validation split percentage
print('# Fit model on training data')
history = vae.fit(x_train,
                  epochs=epochs,
                  batch_size=batch_size,
                  validation_split=0.1,
                  callbacks=[tensorboard_callback])

In [None]:
# Visualize your model
%tensorboard --logdir {logdir}

In [None]:
def plot_results(models,
                 data,
                 batch_size=128,
                 model_name="vae_mnist"):
    """
    Plots labels and MNIST digits as a function of the 2D latent vector
    Args:
        models (tuple): encoder and decoder models
        data (tuple): test data and label
        batch_size (int): prediction batch size
        model_name (string): which model is using this function
    """

    encoder, decoder = models
    x_test, y_test = data
    os.makedirs(model_name, exist_ok=True)

    filename = os.path.join(model_name, "vae_mean.png")
    # display a 2D plot of the digit classes in the latent space
    z_mean, _, _ = encoder.predict(x_test,
                                   batch_size=batch_size)
    plt.figure(figsize=(12, 10))
    plt.scatter(z_mean[:, 0], z_mean[:, 1], c=y_test)
    plt.colorbar()
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.savefig(filename)
    plt.show()

    filename = os.path.join(model_name, "digits_over_latent.png")
    # display a 30x30 2D manifold of digits
    n = 30
    digit_size = 28
    figure = np.zeros((digit_size * n, digit_size * n))
    # linearly spaced coordinates corresponding to the 2D plot
    # of digit classes in the latent space
    grid_x = np.linspace(-4, 4, n)
    grid_y = np.linspace(-4, 4, n)[::-1]

    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            z_sample = np.array([[xi, yi]])
            x_decoded = decoder.predict(z_sample)
            digit = x_decoded[0].reshape(digit_size, digit_size)
            figure[i * digit_size: (i + 1) * digit_size,
                   j * digit_size: (j + 1) * digit_size] = digit

    plt.figure(figsize=(10, 10))
    start_range = digit_size // 2
    end_range = (n - 1) * digit_size + start_range + 1
    pixel_range = np.arange(start_range, end_range, digit_size)
    sample_range_x = np.round(grid_x, 1)
    sample_range_y = np.round(grid_y, 1)
    plt.xticks(pixel_range, sample_range_x)
    plt.yticks(pixel_range, sample_range_y)
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.imshow(figure, cmap='Greys_r')
    plt.savefig(filename)
    plt.show()

In [None]:
plot_results((encoder, decoder),
             (x_test, y_test),
             batch_size=batch_size,
             model_name="vae_mlp")

# Reconstruction Loss to perform anomaly detection
__If reconstruction loss is above a certain threshold, then we consider the input as an anomaly.__

In [None]:
reconstructed_test = vae.predict(x_test)  # let's reconstruct the test input

In [None]:
# Compute the squared error for each example
squared_error = np.sum(np.power((x_test - reconstructed_test), 2), axis=1)/(input_shape[0])

# Plot the histogram of the reconstruction errors per image
plt.hist(squared_error)

For some reason (i.e., model selection), we know that 0.07 is a good threshold to decide whether an input is an anomaly or not.

In [None]:
# Set the desired threshold
threshold = 0.07

# Now let's perturbate a test example
sample = x_test[0]
plt.imshow(np.reshape(sample, (28, 28)), cmap='gray')

#perturbated_sample = sample + np.random.randint(0,2, size=(mnist_img_rows*mnist_img_cols))
perturbated_sample = sample

plt.figure()
plt.imshow(np.reshape(perturbated_sample, (28, 28)), cmap='gray')

# Add the "batch" dimension
perturbated_sample = np.expand_dims(perturbated_sample, axis=0)

In [None]:
reconstructed_sample = vae.predict(perturbated_sample)  # let's reconstruct the test input

# Compute the squared error for each example
squared_error = np.sum(np.power((perturbated_sample - reconstructed_sample), 2), axis=1)/(input_shape[0])

print(f'Reconstruction error is {squared_error[0]}, is this an anomaly? --> {bool(squared_error > threshold)}')

plt.imshow(np.reshape(reconstructed_sample, (28, 28)), cmap='gray')

# Exercise #1: Change the model above to implement a Deep Neural Autoencoder

![DeepAE](https://upload.wikimedia.org/wikipedia/commons/2/28/Autoencoder_structure.png)