# Variational Autoencoder (VAE) for MNIST Dataset

In this notebook, we implement a Variational Autoencoder (VAE) using TensorFlow/Keras.
We use the MNIST dataset to train our model, visualize the reconstructed images, and explore the latent space.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K

### VariationalAutoencoder class
The VariationalAutoencoder class encapsulates the entire VAE architecture, from building the encoder and decoder to training and visualization. It's structured to facilitate easy experimentation and understanding of Variational Autoencoders, particularly applied to the MNIST dataset in this example.

Let's break down the `VariationalAutoencoder` class step by step, explaining each method and its purpose:

### 1. `__init__` Method

```python
    def __init__(self, input_shape=(784,), latent_dim=2, intermediate_dim=512):
        self.input_shape = input_shape
        self.latent_dim = latent_dim
        self.intermediate_dim = intermediate_dim
        self.encoder = self.build_encoder()
        self.decoder = self.build_decoder()
        self.vae = self.build_vae()
```

- **Purpose**: This method initializes the VAE with specified dimensions and builds its encoder, decoder, and VAE models.
  
- **Parameters**:
  - `input_shape`: Shape of the input data. Default is `(784,)` corresponding to MNIST images flattened to 784 dimensions.
  - `latent_dim`: Dimensionality of the latent space. Default is `2` for easy visualization.
  - `intermediate_dim`: Dimensionality of the intermediate layer in the encoder and decoder. Default is `512`.

- **Attributes**:
  - `self.input_shape`: Stores the input shape.
  - `self.latent_dim`: Stores the dimensionality of the latent space.
  - `self.intermediate_dim`: Stores the dimensionality of the intermediate layer.
  - `self.encoder`: Instance of the encoder model.
  - `self.decoder`: Instance of the decoder model.
  - `self.vae`: Instance of the VAE model.

### 2. `build_encoder` Method

```python
    def build_encoder(self):
        inputs = keras.Input(shape=self.input_shape)
        h = layers.Dense(self.intermediate_dim, activation='relu')(inputs)
        z_mean = layers.Dense(self.latent_dim)(h)
        z_log_var = layers.Dense(self.latent_dim)(h)

        def sampling(args):
            z_mean, z_log_var = args
            epsilon = K.random_normal(shape=(K.shape(z_mean)[0], self.latent_dim), mean=0., stddev=1.)
            return z_mean + K.exp(z_log_var / 2) * epsilon

        z = layers.Lambda(sampling)([z_mean, z_log_var])
        return Model(inputs, [z_mean, z_log_var, z], name='encoder')
```

- **Purpose**: Constructs the encoder model which maps inputs to the latent space (`z_mean`, `z_log_var`, `z`).

- **Details**:
  - `inputs`: Defines the input layer with shape `self.input_shape`.
  - `h`: Hidden layer of the encoder with `self.intermediate_dim` units and ReLU activation.
  - `z_mean`: Dense layer outputs the mean of the latent space.
  - `z_log_var`: Dense layer outputs the log variance of the latent space.
  - `sampling`: Lambda layer for sampling latent space points based on the reparameterization trick.
  - `z`: Outputs the sampled latent space points.

### 3. `build_decoder` Method

```python
    def build_decoder(self):
        latent_inputs = keras.Input(shape=(self.latent_dim,))
        h_decoded = layers.Dense(self.intermediate_dim, activation='relu')(latent_inputs)
        x_decoded_mean = layers.Dense(self.input_shape[0], activation='sigmoid')(h_decoded)
        return Model(latent_inputs, x_decoded_mean, name='decoder')
```

- **Purpose**: Constructs the decoder model which reconstructs inputs from the latent space.

- **Details**:
  - `latent_inputs`: Defines the input layer for the latent space points with shape `(self.latent_dim,)`.
  - `h_decoded`: Hidden layer of the decoder with `self.intermediate_dim` units and ReLU activation.
  - `x_decoded_mean`: Outputs the reconstructed data with `self.input_shape[0]` units and sigmoid activation.

### 4. `build_vae` Method

```python
    def build_vae(self):
        input_x = keras.Input(shape=self.input_shape)
        z_mean, z_log_var, z = self.encoder(input_x)
        reconstructed_x = self.decoder(z)
        vae = Model(input_x, reconstructed_x, name='vae')

        # Add KL divergence regularization loss
        kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
        vae.add_loss(K.mean(kl_loss))

        return vae
```

- **Purpose**: Constructs the VAE model which combines the encoder and decoder.

- **Details**:
  - `input_x`: Input layer for the original data.
  - `z_mean`, `z_log_var`, `z`: Outputs of the encoder.
  - `reconstructed_x`: Outputs of the decoder, reconstructing the input.
  - `vae`: Model that takes `input_x` and outputs `reconstructed_x`.
  - Adds a KL divergence regularization loss to the VAE model to ensure the latent space distribution is close to a standard normal distribution.

### 5. `compile` Method

```python
    def compile(self, optimizer='adam', loss='binary_crossentropy'):
        self.vae.compile(optimizer=optimizer, loss=loss)
```

- **Purpose**: Compiles the VAE model with specified optimizer and loss function.

- **Parameters**:
  - `optimizer`: Optimizer algorithm to use during training. Default is `'adam'`.
  - `loss`: Loss function to minimize during training. Default is `'binary_crossentropy'`.

### 6. `train` Method

```python
    def train(self, x_train, x_test, epochs=50, batch_size=128):
        history = self.vae.fit(x_train, x_train,
                               epochs=epochs,
                               batch_size=batch_size,
                               validation_data=(x_test, x_test))
        return history
```

- **Purpose**: Trains the VAE model on given training data.

- **Parameters**:
  - `x_train`: Training data.
  - `x_test`: Validation data.
  - `epochs`: Number of training epochs. Default is `50`.
  - `batch_size`: Batch size for training. Default is `128`.

- **Returns**: Training history which includes loss and metrics values.

### 7. `plot_results` Method

```python
    def plot_results(self, x_test, n=10):
        decoded_imgs = self.vae.predict(x_test)

        plt.figure(figsize=(20, 4))
        for i in range(n):
            # Display original images
            ax = plt.subplot(2, n, i + 1)
            plt.imshow(x_test[i].reshape(28, 28))
            plt.gray()
            ax.get_xaxis().set_visible(False)
            ax.get_yaxis().set_visible(False)

            # Display reconstructed images
            ax = plt.subplot(2, n, i + 1 + n)
            plt.imshow(decoded_imgs[i].reshape(28, 28))
            plt.gray()
            ax.get_xaxis().set_visible(False)
            ax.get_yaxis().set_visible(False)
        plt.show()
```

- **Purpose**: Plots original and reconstructed images to visualize model performance.

- **Parameters**:
  - `x_test`: Test data used for plotting.
  - `n`: Number of samples to display. Default is `10`.

- **Details**:
  - `decoded_imgs`: Reconstructed images obtained by predicting on `x_test`.
  - Displays a grid of `n` original images followed by their corresponding reconstructions using Matplotlib.


In [2]:
# Define Variational Autoencoder (VAE) class
class VariationalAutoencoder:
    def __init__(self, input_shape=(784,), latent_dim=2, intermediate_dim=512):
        self.input_shape = input_shape
        self.latent_dim = latent_dim
        self.intermediate_dim = intermediate_dim
        self.encoder = self.build_encoder()
        self.decoder = self.build_decoder()
        self.vae = self.build_vae()

    def build_encoder(self):
        inputs = keras.Input(shape=self.input_shape)
        h = layers.Dense(self.intermediate_dim, activation='relu')(inputs)
        z_mean = layers.Dense(self.latent_dim)(h)
        z_log_var = layers.Dense(self.latent_dim)(h)

        def sampling(args):
            z_mean, z_log_var = args
            epsilon = K.random_normal(shape=(K.shape(z_mean)[0], self.latent_dim), mean=0., stddev=1.)
            return z_mean + K.exp(z_log_var / 2) * epsilon

        z = layers.Lambda(sampling)([z_mean, z_log_var])
        return Model(inputs, [z_mean, z_log_var, z], name='encoder')

    def build_decoder(self):
        latent_inputs = keras.Input(shape=(self.latent_dim,))
        h_decoded = layers.Dense(self.intermediate_dim, activation='relu')(latent_inputs)
        x_decoded_mean = layers.Dense(self.input_shape[0], activation='sigmoid')(h_decoded)
        return Model(latent_inputs, x_decoded_mean, name='decoder')

    def build_vae(self):
        input_x = keras.Input(shape=self.input_shape)
        z_mean, z_log_var, z = self.encoder(input_x)
        reconstructed_x = self.decoder(z)
        vae = Model(input_x, reconstructed_x, name='vae')

        # Add KL divergence regularization loss
        kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
        vae.add_loss(K.mean(kl_loss))

        return vae

    def compile(self, optimizer='adam', loss='binary_crossentropy'):
        self.vae.compile(optimizer=optimizer, loss=loss)

    def train(self, x_train, x_test, epochs=50, batch_size=128):
        history = self.vae.fit(x_train, x_train,
                               epochs=epochs,
                               batch_size=batch_size,
                               validation_data=(x_test, x_test))
        return history

    def plot_results(self, x_test, n=10):
        decoded_imgs = self.vae.predict(x_test)

        plt.figure(figsize=(20, 4))
        for i in range(n):
            # Display original images
            ax = plt.subplot(2, n, i + 1)
            plt.imshow(x_test[i].reshape(28, 28))
            plt.gray()
            ax.get_xaxis().set_visible(False)
            ax.get_yaxis().set_visible(False)

            # Display reconstructed images
            ax = plt.subplot(2, n, i + 1 + n)
            plt.imshow(decoded_imgs[i].reshape(28, 28))
            plt.gray()
            ax.get_xaxis().set_visible(False)
            ax.get_yaxis().set_visible(False)
        plt.show()

### Data Loading and Preprocessing Explanation

```python
# Load MNIST dataset
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Normalize pixel values between 0 and 1
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

# Flatten images into 784-dimensional vectors (28x28)
image_size = x_train.shape[1]
original_dim = image_size * image_size
x_train = np.reshape(x_train, [-1, original_dim])
x_test = np.reshape(x_test, [-1, original_dim])
```

#### Step-by-Step Explanation:

1. **Loading the MNIST Dataset**:
   - `mnist.load_data()`: This function is provided by TensorFlow/Keras and retrieves the MNIST dataset split into training and test sets.
   - **Outputs**:
     - `(x_train, y_train)`: Training images (`x_train`) and corresponding labels (`y_train`).
     - `(x_test, y_test)`: Test images (`x_test`) and corresponding labels (`y_test`).

2. **Normalizing Pixel Values**:
   - `x_train = x_train.astype('float32') / 255.`: Converts the pixel values of the training images to floats and normalizes them between 0 and 1 by dividing by 255.
   - `x_test = x_test.astype('float32') / 255.`: Similarly, normalizes the pixel values of the test images.

3. **Flattening Images**:
   - **Why Flatten?**: In the MNIST dataset, each image is originally a 28x28 pixel grid. To feed these images into a fully connected neural network (as required by the VAE architecture), we flatten each image into a single 784-dimensional vector.
   - **Reshape Operation**:
     - `image_size = x_train.shape[1]`: Retrieves the size of each image dimension (28 for MNIST).
     - `original_dim = image_size * image_size`: Calculates the total number of pixels per image (784 for MNIST).
     - `x_train = np.reshape(x_train, [-1, original_dim])`: Reshapes the training images (`x_train`) into a 2D array where each row represents one flattened image.
     - `x_test = np.reshape(x_test, [-1, original_dim])`: Reshapes the test images (`x_test`) similarly.

#### Summary:

- **Dataset Loading**: Loads the MNIST dataset using TensorFlow/Keras utility functions, providing training and test sets of images and labels.
- **Normalization**: Converts pixel values from integers (0 to 255) to floats between 0 and 1, aiding model convergence during training.
- **Flattening**: Transforms each 28x28 image into a flat vector of 784 elements, preparing the data for input into the VAE's fully connected layers.

This preprocessing step ensures that the data is in a suitable format and range for training the Variational Autoencoder on the MNIST dataset. It's a crucial initial step in any machine learning pipeline to prepare data for model training and evaluation.

In [3]:
# Load and preprocess MNIST dataset
(x_train, _), (x_test, _) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = np.reshape(x_train, [-1, 784])
x_test = np.reshape(x_test, [-1, 784])

### 1. Instantiation of VAE Model

```python
# Instantiate VAE model
vae = VariationalAutoencoder(input_shape=(784,), latent_dim=2, intermediate_dim=512)
```

#### Explanation:

- **Purpose**: The instantiation step creates an instance of the `VariationalAutoencoder` class, which encapsulates the entire VAE architecture, including the encoder, decoder, and the VAE model itself.
  
- **Parameters**:
  - `input_shape`: Specifies the shape of the input data. Here, it's `(784,)`, corresponding to the flattened MNIST images.
  - `latent_dim`: Defines the dimensionality of the latent space. In this case, it's `2` for easy visualization.
  - `intermediate_dim`: Determines the size of the intermediate layer in the encoder and decoder. The default is `512`.

- **Instance Attributes**:
  - `vae.encoder`: Instance of the encoder model, responsible for mapping inputs to the latent space.
  - `vae.decoder`: Instance of the decoder model, responsible for reconstructing inputs from the latent space.
  - `vae.vae`: Instance of the combined VAE model, which integrates both encoder and decoder.

### 2. Compiling the VAE Model

```python
# Compile VAE model
vae.compile(optimizer='adam', loss='binary_crossentropy')
```

#### Explanation:

- **Purpose**: The compilation step configures the VAE model for training by specifying the optimizer and loss function.

- **Parameters**:
  - `optimizer`: Specifies the optimization algorithm to use during training. Here, `'adam'` is a popular choice due to its efficiency and effectiveness for a wide range of problems.
  - `loss`: Defines the loss function to minimize during training. `'binary_crossentropy'` is appropriate for binary classification tasks, which aligns with the reconstruction objective of the VAE.

- **Compilation Details**:
  - `vae.compile(optimizer='adam', loss='binary_crossentropy')`: Configures the VAE model with the Adam optimizer and binary cross-entropy loss. This setup is standard for VAEs aiming to reconstruct input data while regularizing the latent space distribution.

### 3. Training the VAE Model

```python
# Train the VAE model
history = vae.train(x_train, x_test, epochs=50, batch_size=128)
```

#### Explanation:

- **Purpose**: The training step fits the VAE model to the training data (`x_train`) and evaluates its performance on the validation data (`x_test`).

- **Parameters**:
  - `x_train`: Training data, consisting of flattened MNIST images.
  - `x_test`: Validation data, used to monitor model performance during training.
  - `epochs`: Number of training epochs. Each epoch represents one complete pass through the entire dataset. Here, `epochs=50` specifies training for 50 iterations over the dataset.
  - `batch_size`: Number of samples per gradient update. Larger batch sizes can speed up training but require more memory.

- **Returns**:
  - `history`: Training history object containing recorded loss and metric values for each epoch.

#### Training Workflow:

- The `vae.train` method executes the training loop, optimizing the VAE model parameters based on the specified optimizer and loss function.
- During each epoch, the model computes gradients, updates weights, and evaluates performance on both training and validation data.
- `history` object captures and stores training metrics such as loss values, allowing analysis and visualization of model performance over time.

### Summary:

- **Instantiation**: Creates an instance of the `VariationalAutoencoder` class, initializing the VAE model architecture.
- **Compilation**: Configures the VAE model with an optimizer (`'adam'`) and a loss function (`'binary_crossentropy'`), preparing it for training.
- **Training**: Fits the VAE model to the MNIST dataset, optimizing model parameters (`weights`) to minimize reconstruction error while regularizing the latent space distribution.

These steps collectively enable the training of a Variational Autoencoder on the MNIST dataset, facilitating both image reconstruction and latent space exploration for generative modeling tasks. Each step plays a crucial role in constructing, optimizing, and evaluating the performance of the VAE model.

In [4]:
# Instantiate VAE model
vae = VariationalAutoencoder(input_shape=(784,), latent_dim=2, intermediate_dim=512)

# Compile VAE model
vae.compile(optimizer='adam', loss='binary_crossentropy')

# Train VAE model
history = vae.train(x_train, x_test, epochs=50, batch_size=128)

In [5]:
# Plot results (original vs reconstructed images)
vae.plot_results(x_test)