# Autoencoders

## Import Dependencies and Packages

In [9]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
import tensorflow as tf
from tensorflow.keras import layers, models, backend as K, metrics
from tensorflow.keras.datasets import mnist
from typing import Tuple

# Ensure TensorFlow is using the latest version
print(f"TensorFlow Version: {tf.__version__}")

TensorFlow Version: 2.18.0


## Setting Hyper-Parameters 

In [10]:
# Define key parameters
batch_size = 100
original_dim = 784  # 28x28 images flattened
latent_dim = 2      # Dimensionality of the latent space
intermediate_dim = 256
epochs = 50
epsilon_std = 1.0

## Sampling helper function


In [11]:
# Define the sampling function with type hints
def sampling(args: Tuple[tf.Tensor, tf.Tensor]) -> tf.Tensor:
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim), mean=0.0, stddev=epsilon_std)
    return z_mean + K.exp(z_log_var / 2) * epsilon

## Defining the Encoder

In [12]:
# Build the encoder model
# Input layer
encoder_input = layers.Input(shape=(original_dim,), name="encoder_input")

# Intermediate dense layer
encoder_hidden = layers.Dense(intermediate_dim, activation='relu', name="encoder_hidden")(encoder_input)

# Mean of the latent space
z_mean = layers.Dense(latent_dim, name="z_mean")(encoder_hidden)

# Log variance of the latent space
z_log_var = layers.Dense(latent_dim, name="z_log_var")(encoder_hidden)

# Sampling layer
z = layers.Lambda(sampling, name="z")([z_mean, z_log_var])

# Define the encoder model
encoder = models.Model(encoder_input, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

## Defining the Decoder

In [13]:
# Build the decoder model
# Input for the decoder
decoder_input = layers.Input(shape=(latent_dim,), name="decoder_input")

# Intermediate dense layer
decoder_hidden = layers.Dense(intermediate_dim, activation='relu', name="decoder_hidden")(decoder_input)

# Output layer
decoder_output = layers.Dense(original_dim, activation='sigmoid', name="decoder_output")(decoder_hidden)

# Define the decoder model
decoder = models.Model(decoder_input, decoder_output, name="decoder")
decoder.summary()

## Define the Variational Autoencoder (VAE)

In [14]:
# Define a custom VAE class
class VAE(models.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = metrics.Mean(name="kl_loss")
        
    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]
    
    def train_step(self, data):
        if isinstance(data, tuple):
            x = data[0]
        else:
            x = data
        with tf.GradientTape() as tape:
            # Forward pass
            z_mean, z_log_var, z = self.encoder(x)
            x_decoded = self.decoder(z)
            # Compute reconstruction loss using Keras backend binary_crossentropy
            reconstruction_loss = K.binary_crossentropy(x, x_decoded)
            # reconstruction_loss has shape [batch_size, original_dim]
            reconstruction_loss = K.sum(reconstruction_loss, axis=1)  # Sum over features
            # Compute KL divergence loss
            kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
            kl_loss = K.sum(kl_loss, axis=1)
            kl_loss *= -0.5
            # Total loss
            total_loss = K.mean(reconstruction_loss + kl_loss)
        # Compute gradients
        grads = tape.gradient(total_loss, self.trainable_weights)
        # Update weights
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        # Update metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(K.mean(reconstruction_loss))
        self.kl_loss_tracker.update_state(K.mean(kl_loss))
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
    
    def test_step(self, data):
        if isinstance(data, tuple):
            x = data[0]
        else:
            x = data
        # Forward pass
        z_mean, z_log_var, z = self.encoder(x)
        x_decoded = self.decoder(z)
        # Compute reconstruction loss using Keras backend binary_crossentropy
        reconstruction_loss = K.binary_crossentropy(x, x_decoded)
        # reconstruction_loss has shape [batch_size, original_dim]
        reconstruction_loss = K.sum(reconstruction_loss, axis=1)  # Sum over features
        # Compute KL divergence loss
        kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
        kl_loss = K.sum(kl_loss, axis=1)
        kl_loss *= -0.5
        # Total loss
        total_loss = K.mean(reconstruction_loss + kl_loss)
        # Update metrics
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(K.mean(reconstruction_loss))
        self.kl_loss_tracker.update_state(K.mean(kl_loss))
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
    
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        return self.decoder(z)

## Initiate the VAE

In [15]:
# Instantiate the VAE
vae = VAE(encoder, decoder)
vae.compile(optimizer='rmsprop')
vae.summary()

## Define the Dataset - MNIST

In [16]:
# Load and preprocess the MNIST dataset
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Normalize the data to [0, 1] and flatten the images
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape((-1, original_dim))
x_test = x_test.reshape((-1, original_dim))

## Train the Variational Autoencoder

In [17]:
# Train the VAE
history = vae.fit(
    x_train,
    epochs=epochs,
    batch_size=batch_size,
    shuffle=True,
    validation_data=(x_test, None)
)

Epoch 1/50
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 3ms/step - kl_loss: 12.4837 - loss: 222.6394 - reconstruction_loss: 210.1556 - val_loss: 173.3471 - val_reconstruction_loss: 168.5719 - val_kl_loss: 4.7752
Epoch 2/50
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - kl_loss: 4.7466 - loss: 172.4040 - reconstruction_loss: 167.6574 - val_loss: 168.6217 - val_reconstruction_loss: 163.6448 - val_kl_loss: 4.9769
Epoch 3/50
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 3ms/step - kl_loss: 4.9434 - loss: 168.2033 - reconstruction_loss: 163.2599 - val_loss: 165.8716 - val_reconstruction_loss: 160.9848 - val_kl_loss: 4.8868
Epoch 4/50
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - kl_loss: 5.0539 - loss: 165.2634 - reconstruction_loss: 160.2095 - val_loss: 163.7493 - val_reconstruction_loss: 158.6940 - val_kl_loss: 5.0553
Epoch 5/50
[1m600/600[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1