# Week 3: Variational Autoencoders on Anime Faces

## Imports

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds

import matplotlib.pyplot as plt
import numpy as np

import os
import zipfile
import urllib.request
import random
from IPython import display

2024-07-30 13:47:56.809042: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-30 13:47:56.809223: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-30 13:47:56.954724: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


## Parameters

In [None]:
# set a random seed
np.random.seed(51)

# parameters for building the model and training
BATCH_SIZE=2000
LATENT_DIM=512
IMAGE_SIZE=64

## Download the Dataset

In [None]:
# make the data directory
try:
  os.mkdir('/tmp/anime')
except OSError:
  pass

# download the zipped dataset to the data directory
data_url = "https://storage.googleapis.com/learning-datasets/Resources/anime-faces.zip"
data_file_name = "animefaces.zip"
download_dir = '/tmp/anime/'
urllib.request.urlretrieve(data_url, data_file_name)

# extract the zip file
zip_ref = zipfile.ZipFile(data_file_name, 'r')
zip_ref.extractall(download_dir)
zip_ref.close()

## Prepare the Dataset

In [None]:
# Data Preparation Utilities

def get_dataset_slice_paths(image_dir):
  '''returns a list of paths to the image files'''
  image_file_list = os.listdir(image_dir)
  image_paths = [os.path.join(image_dir, fname) for fname in image_file_list]

  return image_paths

In [None]:
def map_image(image_filename):
  '''preprocesses the images'''
  img_raw = tf.io.read_file(image_filename)
  image = tf.image.decode_jpeg(img_raw)

  image = tf.cast(image, dtype=tf.float32)
  image = tf.image.resize(image, (IMAGE_SIZE, IMAGE_SIZE))
  image = image / 255.0  
  image = tf.reshape(image, shape=(IMAGE_SIZE, IMAGE_SIZE, 3,))

  return image

In [None]:
# get the list containing the image paths
paths = get_dataset_slice_paths("/tmp/anime/images/")

# shuffle the paths
random.shuffle(paths)

# split the paths list into to training (80%) and validation sets(20%).
paths_len = len(paths)
train_paths_len = int(paths_len * 0.8)

train_paths = paths[:train_paths_len]
val_paths = paths[train_paths_len:]

# load the training image paths into tensors, create batches and shuffle
training_dataset = tf.data.Dataset.from_tensor_slices((train_paths))
training_dataset = training_dataset.map(map_image)
training_dataset = training_dataset.shuffle(1000).batch(BATCH_SIZE)

# load the validation image paths into tensors and create batches
validation_dataset = tf.data.Dataset.from_tensor_slices((val_paths))
validation_dataset = validation_dataset.map(map_image)
validation_dataset = validation_dataset.batch(BATCH_SIZE)


print(f'number of batches in the training set: {len(training_dataset)}')
print(f'number of batches in the validation set: {len(validation_dataset)}')

## Display Utilities

In [None]:
def display_faces(dataset, size=9):
  '''Takes a sample from a dataset batch and plots it in a grid.'''
  dataset = dataset.unbatch().take(size)
  n_cols = 3
  n_rows = size//n_cols + 1
  plt.figure(figsize=(5, 5))
  i = 0
  for image in dataset:
    i += 1
    disp_img = np.reshape(image, (64,64,3))
    plt.subplot(n_rows, n_cols, i)
    plt.xticks([])
    plt.yticks([])
    plt.imshow(disp_img)

In [None]:
def display_one_row(disp_images, offset, shape=(28, 28)):
  '''Displays a row of images.'''
  for idx, image in enumerate(disp_images):
    plt.subplot(3, 10, offset + idx + 1)
    plt.xticks([])
    plt.yticks([])
    image = np.reshape(image, shape)
    plt.imshow(image)

In [None]:
def display_results(disp_input_images, disp_predicted):
  '''Displays input and predicted images.'''
  plt.figure(figsize=(15, 5))
  display_one_row(disp_input_images, 0, shape=(IMAGE_SIZE,IMAGE_SIZE,3))
  display_one_row(disp_predicted, 20, shape=(IMAGE_SIZE,IMAGE_SIZE,3))

In [None]:
display_faces(validation_dataset, size=12)

## Build the Model

### Sampling Class

In [None]:
class Sampling(tf.keras.layers.Layer):
  def call(self, inputs):
    """Generates a random sample and combines with the encoder output
    
    Args:
      inputs -- output tensor from the encoder

    Returns:
      `inputs` tensors combined with a random sample
    """
    ### START CODE HERE ###
    mu, sigma = inputs
    batch = tf.shape(mu)[0]
    dim = tf.shape(mu)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
    z = mu + tf.exp(0.5 * sigma) * epsilon
    ### END CODE HERE ###
    return  z

### Encoder Layers

In [None]:
# Function to define the layers of the encoder
def encoder_layers(inputs, latent_dim):
    x = tf.keras.layers.Conv2D(filters=32, kernel_size=3, strides=2, padding="same", activation='relu', name="encode_conv1")(inputs)  # First convolutional layer
    x = tf.keras.layers.BatchNormalization()(x)  # Batch normalization
    x = tf.keras.layers.Conv2D(filters=64, kernel_size=3, strides=2, padding='same', activation='relu', name="encode_conv2")(x)  # Second convolutional layer
    batch_2 = tf.keras.layers.BatchNormalization()(x)  # Batch normalization
    x = tf.keras.layers.Flatten(name="encode_flatten")(batch_2)  # Flatten the feature maps
    x = tf.keras.layers.Dense(20, activation='relu', name="encode_dense")(x)  # Dense layer with 20 units
    x = tf.keras.layers.BatchNormalization()(x)  # Batch normalization
    mu = tf.keras.layers.Dense(latent_dim, name='latent_mu')(x)  # Dense layer to output the mean of the latent space
    sigma = tf.keras.layers.Dense(latent_dim, name ='latent_sigma')(x)  # Dense layer to output the log variance of the latent space
    return mu, sigma, batch_2.shape  # Return the mean, log variance, and shape of the feature maps

### Encoder Model

In [None]:
# Function to build the encoder model
def encoder_model(latent_dim, input_shape):
    inputs = tf.keras.layers.Input(shape=input_shape)  # Input layer with the given shape
    mu, sigma, conv_shape = encoder_layers(inputs, latent_dim=latent_dim)  # Get the outputs of the encoder layers
    z = Sampling()((mu, sigma))  # Sample z using the Sampling layer
    model = tf.keras.Model(inputs, outputs=[mu, sigma, z])  # Define the encoder model
    return model, conv_shape  # Return the encoder model and the shape of the feature maps

### Decoder Layers

In [None]:
# Function to define the layers of the decoder
def decoder_layers(inputs, conv_shape):
    units = conv_shape[1] * conv_shape[2] * conv_shape[3]  # Compute the number of units for the Dense layer
    x = tf.keras.layers.Dense(units, activation='relu', name="decode_dense1")(inputs)  # Dense layer to expand the latent space
    x = tf.keras.layers.BatchNormalization()(x)  # Batch normalization
    x = tf.keras.layers.Reshape((conv_shape[1], conv_shape[2], conv_shape[3]), name="decode_reshape")(x)  # Reshape to the shape of the feature maps
    x = tf.keras.layers.Conv2DTranspose(filters=64, kernel_size=3, strides=2, padding='same', activation='relu', name="decode_conv2d_2")(x)  # First transposed convolutional layer
    x = tf.keras.layers.BatchNormalization()(x)  # Batch normalization
    x = tf.keras.layers.Conv2DTranspose(filters=32, kernel_size=3, strides=2, padding='same', activation='relu', name="decode_conv2d_3")(x)  # Second transposed convolutional layer
    x = tf.keras.layers.BatchNormalization()(x)  # Batch normalization
    x = tf.keras.layers.Conv2DTranspose(filters=1, kernel_size=3, strides=1, padding='same', activation='sigmoid', name="decode_final")(x)  # Final transposed convolutional layer
    return x  # Return the output of the decoder

### Decoder Model

In [None]:
def decoder_model(latent_dim, conv_shape):
  """Defines the decoder model.
  Args:
    latent_dim -- dimensionality of the latent space
    conv_shape -- shape of the features before flattening

  Returns:
    model -- the decoder model
  """
  ### START CODE HERE ###
  inputs = tf.keras.layers.Input(shape=(latent_dim,))
  outputs = decoder_layers(inputs, conv_shape)
  model = tf.keras.Model(inputs, outputs)
  ### END CODE HERE ###
  model.summary()
  return model

### Kullback–Leibler Divergence

In [None]:
# Define a custom KLLossLayer to compute the KL divergence loss
class KLLossLayer(tf.keras.layers.Layer):
    def call(self, inputs):
        mu, sigma = inputs  # Unpack the mean and log variance
        kl_loss = 1 + sigma - tf.square(mu) - tf.math.exp(sigma)  # Compute the KL divergence
        kl_loss = tf.reduce_mean(kl_loss) * -0.5  # Compute the mean KL divergence
        self.add_loss(kl_loss)  # Add the KL divergence loss to the layer
        return inputs  # Return the inputs

### Putting it all together

In [None]:
# Function to build the VAE model
def vae_model(encoder, decoder, input_shape):
    inputs = tf.keras.layers.Input(shape=input_shape)  # Input layer with the given shape
    mu, sigma, z = encoder(inputs)  # Get the outputs of the encoder
    reconstructed = decoder(z)  # Get the reconstructed output from the decoder
    kl_loss_layer = KLLossLayer()([mu, sigma])  # Compute the KL divergence loss
    model = tf.keras.Model(inputs=inputs, outputs=reconstructed)  # Define the VAE model
    return model  # Return the VAE model

In [None]:
# Function to get the encoder, decoder, and VAE models
def get_models(input_shape, latent_dim):
    encoder, conv_shape = encoder_model(latent_dim=latent_dim, input_shape=input_shape)  # Build the encoder model
    decoder = decoder_model(latent_dim=latent_dim, conv_shape=conv_shape)  # Build the decoder model
    vae = vae_model(encoder, decoder, input_shape=input_shape)  # Build the VAE model
    return encoder, decoder, vae  # Return the encoder, decoder, and VAE models

In [None]:
# Get the encoder, decoder and 'master' model (called vae)
encoder, decoder, vae = get_models(input_shape=(28, 28, 1), latent_dim=LATENT_DIM)  # Get the models with the specified input shape and latent dimensionality

## Train the Model

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.002)
loss_metric = tf.keras.metrics.Mean()
mse_loss = tf.keras.losses.MeanSquaredError()
bce_loss = tf.keras.losses.BinaryCrossentropy()

In [None]:
def generate_and_save_images(model, epoch, step, test_input):
  """Helper function to plot our 16 images

  Args:

  model -- the decoder model
  epoch -- current epoch number during training
  step -- current step number during training
  test_input -- random tensor with shape (16, LATENT_DIM)
  """
  predictions = model.predict(test_input)

  fig = plt.figure(figsize=(4,4))

  for i in range(predictions.shape[0]):
      plt.subplot(4, 4, i+1)
      img = predictions[i, :, :, :] * 255
      img = img.astype('int32')
      plt.imshow(img)
      plt.axis('off')

  # tight_layout minimizes the overlap between 2 sub-plots
  fig.suptitle("epoch: {}, step: {}".format(epoch, step))
  plt.savefig('image_at_epoch_{:04d}_step{:04d}.png'.format(epoch, step))
  plt.show()

In [None]:
# Training loop. 

# generate random vector as test input to the decoder
random_vector_for_generation = tf.random.normal(shape=[16, LATENT_DIM])

# number of epochs
epochs = 100

# initialize the helper function to display outputs from an untrained model
generate_and_save_images(decoder, 0, 0, random_vector_for_generation)

for epoch in range(epochs):
  print('Start of epoch %d' % (epoch,))

  # iterate over the batches of the dataset.
  for step, x_batch_train in enumerate(train_dataset):
    with tf.GradientTape() as tape:

      # feed a batch to the VAE model
      reconstructed = vae(x_batch_train)

      # compute reconstruction loss
      flattened_inputs = tf.reshape(x_batch_train, shape=[-1])
      flattened_outputs = tf.reshape(reconstructed, shape=[-1])
      loss = bce_loss(flattened_inputs, flattened_outputs) * 784
      
      # add KLD regularization loss
      loss += sum(vae.losses)  

    # get the gradients and update the weights
    grads = tape.gradient(loss, vae.trainable_weights)
    optimizer.apply_gradients(zip(grads, vae.trainable_weights))

    # compute the loss metric
    loss_metric(loss)

    # display outputs every 100 steps
    if step % 100 == 0:
      display.clear_output(wait=False)    
      generate_and_save_images(decoder, epoch, step, random_vector_for_generation)
      print('Epoch: %s step: %s mean loss = %s' % (epoch, step, loss_metric.result().numpy()))