<div style="line-height:1.2;">

<h1 style="color:#FF7C00; margin-bottom: 0.3em;">Variational Autoencoders in TensorFlow </h1>

<h4 style="margin-top: 0.3em; margin-bottom: 1em;"> Examples of VAEs with Bayesian approach with Prior.</h4>

<div style="line-height:1.4; margin-bottom: 0.5em;">
    <h3 style="color: lightblue; display: inline; margin-right: 0.5em;">Keywords:</h3> 
    tf reduce_mean + tfp distributions + tf.keras.layers + tf.GradientTape()
</div>

</div>

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_probability as tfp
from tensorflow.keras.datasets import mnist

<h3 style="color:#FF7C00  "> Recap: </h3>
<div style="margin-top: -20px;">
Variational Autoencoders (are a type of generative model for unsupervised learning. <br>
VAE is a probabilistic approach to autoencoders, allowing dimensionality reduction and latent space exploration. <br>
=> Goal: assign the highest likelihood to our data set. <br>
</div>

In [3]:
tfd = tfp.distributions     #tf.contrib.distributions deprecated!

In [4]:
def make_encoder(data, code_size):
    # Flatten the input data
    x = tf.layers.flatten(data)

    # Apply a fully connected layer with 200 units and ReLU activation
    x = tf.layers.dense(x, 200, tf.nn.relu)
    # Apply another fully connected layer with 200 units and ReLU activation
    x = tf.layers.dense(x, 200, tf.nn.relu)
    
    # Generate the mean (loc) of the latent code
    loc = tf.layers.dense(x, code_size)
    # Generate the scale (standard deviation) of the latent code using softplus
    scale = tf.layers.dense(x, code_size, tf.nn.softplus)
    
    # Return a Multivariate normal distribution with the calculated mean and scale
    return tfd.MultivariateNormalDiag(loc, scale)

In [5]:
def make_encoder(data, code_size):
    # Flatten the input 'data' using a Flatten layer
    x = tf.keras.layers.Flatten()(data)
    
    # Add a Dense layer with 200 units and ReLU activation function
    x = tf.keras.layers.Dense(200, activation=tf.nn.relu)(x)
    # Add another Dense layer with 200 units and ReLU activation function
    x = tf.keras.layers.Dense(200, activation=tf.nn.relu)(x)

    # Create a Dense layer with 'code_size' units for loc (mean of the distribution)
    loc = tf.keras.layers.Dense(code_size)(x)
    # Create a Dense layer with 'code_size' units and a softplus activation function for scale 
    # (standard deviation of the distribution)
    scale = tf.keras.layers.Dense(code_size, activation=tf.nn.softplus)(x)

    # Return a Multivariate Normal Diagonal distribution defined by loc and scale
    return tfp.distributions.MultivariateNormalDiag(loc, scale)


In the context of a VAE, the Prior distribution represents the assumptions about the latent space before any data is observed. <br>
It is used to regularize the learning process by providing a source of information for the latent space.    
The Prior is fixed and defines what distribution of codes we would expect.     
=> Usually => Normal distribution with zero mean and unit variance.        
Using a prior means encouraging the learned latent space to follow the assumed prior distribution, promoting meaningful and smooth representations.



In [6]:
def make_prior(code_size):
    """ Create the prior distribution for the latent code. """
    # Set the mean of the prior distribution to zero
    loc = tf.zeros(code_size)
    # Set the scale (standard deviation) of the prior distribution to one
    scale = tf.ones(code_size)
    # Create a multivariate normal distribution representing the prior
    return tfd.MultivariateNormalDiag(loc, scale)

<h2 style="color:#FF7C00  ">  Decoder </h2>
The decoder maps goal is to reconstruct images, the given code to a distribution of images that are plausible for the code.

In [7]:
def make_decoder(code, data_shape):
    """ Decode to a Bernoulli distribution is constructed to model binary data generation from logits created from dense layers. """
    # Set the input to the latent code
    x = code
    ## Create 2 layers to introduce non-linearity and complexity to transform the latent code 
    # (Fully connected layer with 200 units and ReLU activation)
    x = tf.layers.dense(x, 200, tf.nn.relu)
    x = tf.layers.dense(x, 200, tf.nn.relu)

    # Add another fully connected layer to Generate logits (pre-activation values) for the output data
    logit = tf.layers.dense(x, np.prod(data_shape))
    # Reshape the logits to match the desired output data shape
    logit = tf.reshape(logit, [-1] + data_shape)

    """ The Bernoulli distribution for binary data with the calculated logits (model with binary data). 
        - 2 means that the distribution independence across rows and columns (width and height),
        allowing to evaluate the probability of an image under the distribution, not just individual pixels.
    """
    indip = tfd.Independent(tfd.Bernoulli(logit), 2)
    return indip

In [8]:
def make_decoder(code, data_shape):
    x = code
    x = tf.keras.layers.Dense(200, activation=tf.nn.relu)(x)
    x = tf.keras.layers.Dense(200, activation=tf.nn.relu)(x)

    logit = tf.keras.layers.Dense(np.prod(data_shape))(x)
    logit = tf.reshape(logit, [-1] + data_shape)

    bernoulli_distribution = tfp.distributions.Bernoulli(logits=logit)
    independent_distribution = tfp.distributions.Independent(bernoulli_distribution, 2)

    return independent_distribution

<div style="line-height:0.3">
<h2 style="color:#FF7C00  "> Loss </h2>
</div>

- Evidence lower bound (ELBO), an approximation to the data likelihood
- ELBO combines the likelihood and the KL divergence, and maximizing it helps learn meaningful latent representations 

In [None]:
""" Calculate Loss.
N.B. 
The eager_execution need to be disabled to make this code running without errors, 
but it makes difficult to implement the right code for next cells!
"""
tf.compat.v1.disable_eager_execution()

# Define a placeholder for input data with shape [batch_size, 28, 28]
#data = tf.placeholder(tf.float32, [None, 28, 28])
data = tf.compat.v1.placeholder(tf.float32, [None, 28, 28])

# Create the prior distribution for the latent code with a size of 2
prior = make_prior(code_size=2)
# Create the posterior distribution by encoding the input data into the latent space
posterior = make_encoder(data, code_size=2)

# Sample a latent code from the posterior distribution
code = posterior.sample()

In [9]:
# Calculate the log-likelihood of the data given the generated code
likelihood = make_decoder(code, [28, 28]).log_prob(data)

# Calculate the KL divergence (deviation) between the posterior and prior distributions
divergence = tfd.kl_divergence(posterior, prior)

# Calculate the Evidence Lower Bound (ELBO) which is a measure of how well the model fits the data
elbo = tf.reduce_mean(likelihood - divergence)

<h2 style="color:#FF7C00  "> Optimizer </h2>

**Recap tf.GradientTape():** <br>
Record Operations: Inside a  context, TensorFlow "records" the operations that are performed on tensors. <br> 
Any operation performed within this context is tracked, so that their gradients can be computed later. <br>

Compute Gradients: TensorFlow computes gradients through the computational graph formed by the recorded operations, <br> 
using the backpropagation algorithm. <br>

In [10]:
### Get trainable variables for the prior, posterior, and decoder to compute the gradient descent
prior_trainable_vars = prior.trainable_variables
posterior_trainable_vars = posterior.trainable_variables
decoder_trainable_vars = make_decoder(code, [28, 28]).trainable_variables

# Combine all trainable variables
all_trainable_vars = prior_trainable_vars + posterior_trainable_vars + decoder_trainable_vars

# Create an optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

## Use a GradientTape to compute gradients
with tf.GradientTape() as tape:
    loss = -elbo                # Negative ELBO since we want to maximize ELBO

# Compute gradients
gradients = tape.gradient(loss, all_trainable_vars)

# Apply gradients using the optimizer
optimizer.apply_gradients(zip(gradients, all_trainable_vars))

<tf.Variable 'iteration:0' shape=() dtype=int64>

In [11]:
samples = make_decoder(prior.sample(10), [28, 28]).mean()

In [12]:
def plot_codes(codes):
    with tf.compat.v1.Session() as sess:
        codes_values = sess.run(codes)

    plt.scatter(codes_values[:, 0], codes_values[:, 1], c='b', marker='o')
    plt.xlabel('Code Dimension 1')
    plt.ylabel('Code Dimension 2')
    plt.title('Latent Space Visualization')
    plt.show()

In [13]:
""" Check eager mode """
tf.executing_eagerly()

False

In [14]:
%%script echo Skipping, tf.enable_eager_execution must be called at program startup to avoid the specific ValueError
tf.compat.v1.enable_eager_execution()

skipping since tf.enable_eager_execution must be called at program startup to avoid the specific ValueError


In [15]:
""" Check again eager mode """
tf.compat.v1.executing_eagerly()

False

The following snippet it is not working due to the error: <br>
RuntimeError: `tf.data.Dataset` only supports Python-style iteration in eager mode or within tf.function. <br>
The eager mode was disabled before! <br>

In [16]:
%%script echo skipping due to error
# Define the number of training steps and the batch size
num_steps = 10000
batch_size = 32

# Load the MNIST dataset
(train_images, _), (test_images, _) = mnist.load_data()

# Preprocess the data
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1).astype("float32") / 255
test_images = test_images.reshape(test_images.shape[0], 28, 28, 1).astype("float32") / 255

# Binarize the images
train_images[train_images >= 0.5] = 1.0
train_images[train_images < 0.5] = 0.0
test_images[test_images >= 0.5] = 1.0
test_images[test_images < 0.5] = 0.0

# Use tf.data to batch and shuffle the data
train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(60000).batch(batch_size)

""" Training loop """
for step in range(1, num_steps + 1):
    # Sample a batch of data
    for batch_x in train_dataset:
        # Open a GradientTape
        with tf.GradientTape() as tape:
            # Encode the data to the latent space and sample a code
            posterior = make_encoder(batch_x, code_size=2)
            code = posterior.sample()

            # Decode the code and calculate the log-likelihood of the data
            likelihood = make_decoder(code, [28, 28]).log_prob(batch_x)
            
            # Calculate the KL divergence between the posterior and prior distributions
            divergence = tfd.kl_divergence(posterior, prior)
            # Calculate the negative ELBO as the loss
            loss = -tf.reduce_mean(likelihood - divergence)

        # Compute the gradients with respect to the loss
        gradients = tape.gradient(loss, tape.watched_variables())

        # Apply the gradients using the optimizer
        optimizer.apply_gradients(zip(gradients, tape.watched_variables()))

    # Print loss and generate samples every 1000 steps
    if step % 1000 == 0:
        print(f"Step {step}, Loss: {loss}")

        # Generate samples
        samples = make_decoder(prior.sample(10), [28, 28]).mean()

        ##### Plot the samples
        for i in range(10):
            plt.subplot(2, 5, i + 1)
            plt.imshow(samples[i, :, :, 0], cmap="gray")
            plt.axis("off")
        plt.show()

skipping due to error


<div style="line-height:0.5">
<h3 style="color:#FF7C00  "> Note: </h3>
</div>

The Following code works instead ... => NB huge RAM consuption! <br> 
Therefore the number of steps and the batch size is minimal (num_steps = 100 + batch_size = 5)

In [None]:
## Define the number of training steps and the batch size
num_steps = 100 #10000
batch_size = 5 #32

# Load the MNIST dataset
(train_images, _), (test_images, _) = mnist.load_data()

## Preprocess the data
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1).astype("float32") / 255
test_images = test_images.reshape(test_images.shape[0], 28, 28, 1).astype("float32") / 255

#### Binarize the images
train_images[train_images >= 0.5] = 1.0
train_images[train_images < 0.5] = 0.0
test_images[test_images >= 0.5] = 1.0
test_images[test_images < 0.5] = 0.0

# Use tf.data to batch and shuffle the data
train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(60000).batch(batch_size).repeat()

# Create an iterator for the dataset
train_iterator = tf.compat.v1.data.make_one_shot_iterator(train_dataset)

# Define the prior
prior = tfp.distributions.Normal(loc=0.0, scale=1.0)

## Define the decoder
def make_decoder(code, output_shape):
    return tfp.distributions.Bernoulli(logits=code)

# Define the optimizer
optimizer = tf.keras.optimizers.Adam()

# Sample a batch of data outside the loop, 
# to avoid to call "Iterator.get_next()" inside the training loop, which can lead to resource exhaustion.
batch_x = train_iterator.get_next()

<h3 style="color:#FF7C00  "> => Train </h3>

In [17]:
""" Training loop """
for step in range(1, num_steps + 1):
    # Sample a batch of data
    #batch_x = train_iterator.get_next() #removed from here

    # Open a GradientTape
    with tf.GradientTape() as tape:
        ## Encode the data to the latent space and sample a code
        posterior = make_encoder(batch_x, code_size=2)
        code = posterior.sample()

        ## Decode the code and calculate the log-likelihood of the data
        decoder = make_decoder(code, [28, 28])
        likelihood = decoder.log_prob(batch_x)

        ####### Sample from the posterior distribution to estimate the KL-divergence
        num_samples = 10
        samples = posterior.sample(num_samples)
        log_probs_posterior = posterior.log_prob(samples)
        log_probs_prior = prior.log_prob(samples)
        kl_estimate = tf.reduce_mean(log_probs_posterior - log_probs_prior)

        # Calculate the negative ELBO as the loss
        loss = -tf.reduce_mean(likelihood - kl_estimate)

    # Get all trainable variables
    trainable_vars = prior.trainable_variables + posterior.trainable_variables + decoder.trainable_variables

    # Compute the gradients with respect to the loss
    gradients = tape.gradient(loss, trainable_vars)

    # Apply the gradients using the optimizer
    optimizer.apply_gradients(zip(gradients, trainable_vars))

    # Print loss and generate samples every 1000 steps
    if step % 1000 == 0:
        print(f"Step {step}, Loss: {loss}")

        # Generate samples
        samples = decoder.sample(10)

        # Plot the samples
        for i in range(10):
            plt.subplot(2, 5, i + 1)
            plt.imshow(samples[i, :, :, 0], cmap="gray")
            plt.axis("off")
        plt.show()

    # Sample a new batch of data for the next step
    batch_x = train_iterator.get_next()

