# **Generative Models Lesson**
In today's tutorial you will learn how to use deep generative models to generate handwritten digits.

We will use [**TensorFlow**](https://ekababisong.org/gcp-ml-seminar/tensorflow/) framework and [**Keras**](https://keras.io/) open-source library to rapidly prototype deep neural networks.

# **Useful modules import**
First of all, it is necessary to import useful modules used during the tutorial.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random
import time
from sklearn.model_selection import train_test_split
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import backend as K
from tensorflow.keras.utils import to_categorical

# **Utility functions**
Execute the following code to define some utility functions used in the tutorial:
- **plot_2d_data** plots 2D labeled data;
- **plot_history** draws in a graph the loss trend over epochs on both training and validation sets. Moreover, if provided, it draws in the same graph also the trend of the given metric;
- **plot_generated_images** visualizes a set of generated images;
- **plot_gan_losses** draws in a graph the generator and discriminator loss trends over epochs.

In [None]:
def plot_2d_data(data_2d, y, titles = None, figsize = (7, 7)):
  _, axs = plt.subplots(1, len(data_2d), figsize=figsize)

  for i in range(len(data_2d)):
    if (titles != None):
      axs[i].set_title(titles[i])
    scatter = axs[i].scatter(data_2d[i][:, 0], data_2d[i][:, 1], s = 1, c = y[i], cmap=plt.cm.Paired)
    axs[i].legend(*scatter.legend_elements())

def plot_history(history,metric = None):
  fig, ax1 = plt.subplots(figsize=(10, 8))

  epoch_count = len(history.history['loss'])

  line1, = ax1.plot(range(1, epoch_count + 1), history.history['loss'], label = 'train_loss', color = 'orange')
  ax1.plot(range(1, epoch_count + 1), history.history['val_loss'], label='val_loss', color = line1.get_color(), linestyle = '--')
  ax1.set_xlim([1, epoch_count])
  ax1.set_ylim([0, max(max(history.history['loss']), max(history.history['val_loss']))])
  ax1.set_ylabel('loss', color = line1.get_color())
  ax1.tick_params(axis = 'y', labelcolor = line1.get_color())
  ax1.set_xlabel('Epochs')
  _ = ax1.legend(loc = 'lower left')

  if (metric!=None):
    ax2 = ax1.twinx()
    line2, = ax2.plot(range(1, epoch_count + 1), history.history[metric], label = 'train_' + metric)
    ax2.plot(range(1, epoch_count + 1), history.history['val_' + metric], label = 'val_' + metric, color = line2.get_color(), linestyle = '--')
    ax2.set_ylim([0, max(max(history.history[metric]), max(history.history['val_' + metric]))])
    ax2.set_ylabel(metric, color = line2.get_color())
    ax2.tick_params(axis = 'y', labelcolor=line2.get_color())
    _ = ax2.legend(loc = 'upper right')

def plot_generated_images(generated_images, nrows, ncols, no_space_between_plots = False, figsize = (10, 10)):
  _, axs = plt.subplots(nrows, ncols, figsize = figsize, squeeze = False)

  for i in range(nrows):
    for j in range(ncols):
      axs[i, j].axis('off')
      axs[i, j].imshow(generated_images[i][j], cmap='gray')

  if no_space_between_plots:
    plt.subplots_adjust(wspace = 0, hspace = 0)

  plt.show()

def plot_gan_losses(d_losses, g_losses):
  fig, ax1 = plt.subplots(figsize = (10, 8))

  epoch_count = len(d_losses)

  line1, = ax1.plot(range(1, epoch_count + 1), d_losses, label = 'discriminator_loss', color = 'orange')
  ax1.set_ylim([0, max(d_losses)])
  ax1.tick_params(axis = 'y', labelcolor = line1.get_color())
  _ = ax1.legend(loc = 'lower left')

  ax2 = ax1.twinx()
  line2, = ax2.plot(range(1, epoch_count + 1), g_losses, label = 'generator_loss')
  ax2.set_xlim([1, epoch_count])
  ax2.set_ylim([0, max(g_losses)])
  ax2.set_xlabel('Epochs')
  ax2.tick_params(axis='y', labelcolor = line2.get_color())
  _ = ax2.legend(loc = 'upper right')

# **Dataset**
The [**digits MNIST**](http://yann.lecun.com/exdb/mnist/) dataset, containing 28x28 grayscale images of the 10 digits, will be used.

The goal is to developd and train deep generative models to generate images representing realistic handwritten digits.

The following code loads in memory the dataset.

In [None]:
category_count = 10 #Number of digit categories

(train_x, train_y), (test_x, test_y) = keras.datasets.mnist.load_data()

print('Train data flatten shape: ', train_x.shape)
print('Train label shape: ', train_y.shape)
print('Test data flatten shape: ', test_x.shape)
print('Test label shape: ', test_y.shape)

### **Visualization**
Randomly selected images can be shown by executing the following code.

In [None]:
image_count = 10

_, axs = plt.subplots(1, image_count,figsize = (15, 10))
for i in range(image_count):
  random_idx = random.randint(0, train_x.shape[0])
  axs[i].imshow(train_x[random_idx], cmap = 'gray')
  axs[i].axis('off')
  axs[i].set_title(train_y[random_idx])

### **Split data into training and validation sets**
In order to avoid overfitting during training, it is necessary to have a separate dataset (called validation set), in addition to the training and test datasets, to choose the optimal value for the hyperparameters.

For this reason, *train_x* and *train_y* are divided into training and validation sets using the [**train_test_split**](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html) function provided by Scikit-learn.

The *val_size* variable represents the percentage (or the absolute number) of patterns to include in the validation set.

In [None]:
val_size = 10000

train_x, val_x, train_y, val_y = train_test_split(train_x, train_y, test_size = val_size,random_state = 1, shuffle = True)

print('Train data flatten shape: ', train_x.shape)
print('Train label shape: ', train_y.shape)
print('Validation data flatten shape: ', val_x.shape)
print('Validation label shape: ', val_y.shape)

### **Intensity range normalization**
Pixel intensity is usually represented as discrete values in the range [0;255]. 

In [None]:
print('Min value: ', train_x.min())
print('Max value: ', train_x.max())

Such values could produce math range errors with the activation function or make training unstable. To overcome these issues, a simple normalization step can be applied by dividing all values by 255 to get continuous values in the range [0;1].

In [None]:
train_x = train_x / 255.0
val_x = val_x / 255.0
test_x = test_x / 255.0

print('Min value: ', train_x.min())
print('Max value: ', train_x.max())

### **Image linearization**
The images need to be converted from 2D matrices to vectors before they can be used as input of flatten networks.

The following code use the Numpy function [**reshape**](https://numpy.org/doc/stable/reference/generated/numpy.reshape.html) to flatten the data.

In [None]:
original_image_shape = (train_x.shape[1], train_x.shape[2])

train_x_flatten = np.reshape(train_x, (train_x.shape[0], -1))
val_x_flatten = np.reshape(val_x, (val_x.shape[0], -1))
test_x_flatten = np.reshape(test_x, (test_x.shape[0], -1))

print('Train data flatten shape: ', train_x_flatten.shape)
print('Validation data flatten shape: ', val_x_flatten.shape)
print('Test data flatten shape: ', test_x_flatten.shape)

# **Variational autoencoder (VAE)**
In this section a variational autoencoder is trained to generate handwritten digits.

- Variational autoencoders (VAEs) are a type of generative model that learn a compressed, low-dimensional representation of input data, and can be used to generate new data samples. 

- VAEs are a type of neural network that learn to encode an input sample into a latent variable space, and then decode the latent variable back into a reconstruction of the original input. Unlike traditional autoencoders, VAEs introduce probabilistic modeling into the encoding process, allowing for the generation of new data samples by sampling from the learned latent variable distribution.

- VAEs have been successfully applied in various domains, such as image and speech generation, as well as in anomaly detection and data compression.

<img src=https://raw.githubusercontent.com/nderus/Generative-models-Lesson/main/images/autoencoders.png width="700">


## **Model definition**
The following function creates a variational autoencoder given:
- the number of input features (*input_count*);
- the number of neurons for each hidden layer (*neuron_count_per_hidden_layer*);
- the dimension of the latent space (*encoded_dim*);
- the string identifier of the activation function of the hidden layers (*hidden_activation*);
- the string identifier of the activation function of the output layer (*output_activation*).

The function returns the encoder and the decoder models as well as the whole autoencoder.

The following image shows the architecture of a generic VAE.

<img src=https://raw.githubusercontent.com/nderus/Generative-models-Lesson/main/images/vae_architecture.png width="500">

In this case, the Keras [**Sequential**](https://keras.io/guides/sequential_model/) class cannot be used because the last layer of the encoder is connected to both mean and variance layers. 

For these situations, Keras provides the [**Model**](https://keras.io/api/models/model/) class to group layers into an object with training and inference features. It can be created by passing the input and output layers to the constructor.

[**Keras layers API**](https://keras.io/api/layers/) offers a wide range of built-in layers ready for use, including:
- [**Input**](https://keras.io/api/layers/core_layers/input/) - the input of the model. Note that, you can also omit the **Input** layer. In that case the model doesn't have any weights until the first call to a training/evaluation method (since it is not yet built);
- [**Dense**](https://keras.io/api/layers/core_layers/dense/) - a fully-connected layer;
- [**Lamda**](https://keras.io/api/layers/core_layers/lambda/) - to wrap simple expressions as a Layer object.

<u>Note that, instead of variance, the encoder returns its natural logarithm to bring stability and ease of training. When needed, it will be transformed back to the original space using the following properties:</u>

$$\sigma^2=e^{ln(\sigma^2)}$$
$$\sigma=e^{\frac{1}{2}\cdot ln(\sigma^2)}$$

In [None]:
def build_vae(input_count,neuron_count_per_hidden_layer,encoded_dim,hidden_activation,output_activation):
    #Encoder
    encoder_input = layers.Input(shape=input_count, name='encoder_input')
    
    prev_layer=encoder_input
    for neuron_count in neuron_count_per_hidden_layer:
      hidden_layer=layers.Dense(neuron_count,activation=hidden_activation)(prev_layer)
      prev_layer=hidden_layer
    
    mu = layers.Dense(encoded_dim, name='mu')(prev_layer)
    log_var = layers.Dense(encoded_dim, name='log_var')(prev_layer)
    
    encoder = keras.Model(encoder_input, [mu, log_var], name='encoder')

    #Decoder
    decoder_input = layers.Input(shape=(encoded_dim,), name='decoder_input')

    prev_layer=decoder_input
    for neuron_count in reversed(neuron_count_per_hidden_layer):
      hidden_layer=layers.Dense(neuron_count,activation=hidden_activation)(prev_layer)
      prev_layer=hidden_layer
    
    decoder_output_layer=layers.Dense(input_count,activation=output_activation, name='decoder_output')(prev_layer)

    decoder = keras.Model(decoder_input, decoder_output_layer, name='decoder')

    #Sampling layer
    s = layers.Lambda(sampling, output_shape=(encoded_dim,), name='s')([mu, log_var])

    #VAE
    vae=keras.Model(encoder.input, decoder(s),name='vae')
    
    return vae,encoder,decoder

The following sampling function (called by the **Lamda** layer) samples random points from the latent space using the *reparameterization trick*:

$$\mathbf{s= \boldsymbol{\sigma_x} \odot \boldsymbol{\varepsilon} + \boldsymbol{\mu}_x, \boldsymbol{\varepsilon} \sim \mathcal{N} (0,1)}$$

In [None]:
def sampling(args):
    mu, log_var = args
    batch_size = K.shape(mu)[0]
    dim = K.int_shape(mu)[1]
    epsilon = K.random_normal(shape=(batch_size, dim), mean=0., stddev=1.0)
    return K.exp(0.5 * log_var) * epsilon + mu

## **Model creation**
The following code creates a variational autoencoder by calling the **build_vae** function defined above.

In [None]:
vae, vae_encoder, vae_decoder = build_vae(train_x_flatten.shape[1], [256, 128], 2, 'sigmoid', 'sigmoid')

## **Model visualization**
A string summary of the network can be printed using the [**summary**](https://keras.io/api/models/model/#summary-method) method.

In [None]:
vae.summary()

The summary is useful for simple models, but can be confusing for complex models.

Function [**keras.utils.plot_model**](https://keras.io/api/utils/model_plotting_utils/) creates a plot of the neural network graph that can make more complex models easier to understand.

In [None]:
keras.utils.plot_model(vae, show_shapes=True, show_layer_names = True, expand_nested = True)

## **Loss function definition**
To train a VAE, an ad-hoc combined loss function must be defined. It is composed by:
- a *reconstruction* loss to measure the similarity between the input and the generated output;
- a *regularization* loss to evaluate how close the distribution returned by the encoder is to the standard normal distribution. It is computed as the KL divergence between the returned distribution and a standard normal distribution:

$$KL(\mathcal{N} (\mathbf{\boldsymbol{\mu}_x},\mathbf{\boldsymbol{\sigma}_x}) \parallel \mathcal{N} (\mathbf{0},\mathbf{1}))=\frac{1}{2}\sum_{i=1}^{k}\mu_i^2+\sigma_i^2-\log{\sigma_i^2}-1$$

<u>Note that:</u>
- *kl_coefficient* is a hyper-parameter to weight the contribution of the regularization term;
- unlike "traditional" losses, VAE loss not only depends on the model output but also on the model input and on the encoder output (*mu* and *log_var*);
- since the reconstruction loss is defined as the sum (and not the average) of the error of all generated elements (i.e., pixels), the [**mean_squared_error**](https://keras.io/api/losses/regression_losses/#mean_squared_error-function) output is multiplied by the number of elements.  

In [None]:
def vae_loss(vae_input,vae_ouput,mu,log_var,kl_coefficient):
  #Reconstruction loss
  reconstruction_loss = keras.losses.mean_squared_error(vae_input,vae_ouput) * train_x_flatten.shape[1]

  #Regularization loss
  kl_loss = 0.5 * K.sum(K.square(mu) + K.exp(log_var) - log_var - 1, axis = -1)

  #Combined loss
  return reconstruction_loss + kl_coefficient*kl_loss

## **Model compilation**
The compilation is the final step in configuring the model for training. 

The following code use the [**compile**](https://keras.io/api/models/model_training_apis/#compile-method) method to compile the model.

When a loss function does not fulfill  the signature **loss=fn(y_true, y_pred)** it cannot be directly passed to the **compile** method. In this case, the [**add_loss**](https://keras.io/api/losses/#the-addloss-api) method can be used with the loss function passed as parameter.

In [None]:
kl_coefficient = 1

#Information needed to compute the loss function
vae_input = vae.input
vae_output = vae.output
mu = vae.get_layer('mu').output
log_var=vae.get_layer('log_var').output

vae.add_loss(vae_loss(vae_input, vae_output, mu, log_var, kl_coefficient))
vae.compile(optimizer = 'adam')

## **Training**
Now we are ready to train our model by calling the [**fit**](https://keras.io/api/models/model_training_apis/#fit-method) method.

It trains the model for a fixed number of epochs (*epoch_count*) using the training set (*train_x_flatten*) divided into mini-batches of *batch_size* elements. During the training process, the performances will be evaluated on both training and validation (*val_x_flatten*) sets.

Break training when a metric or the loss has stopped improving on the validation set, helps to avoid overfitting.

For this purpose, Keras provides a class called [**EarlyStopping**](https://keras.io/api/callbacks/early_stopping/). Important class parameters are:
- *monitor* - the name of the metric or the loss to be observed; 
- *patience* - the number of epochs with no improvement after which training will be stopped;
- *restore_best_weights* - whether to restore model weights from the epoch with the best value of the monitored quantity.

Once created an instance of the **EarlyStopping** class, it can be passed to the **fit** method in the *callbacks* parameter.

In [None]:
epoch_count = 100
batch_size = 100
patience = 5

early_stop = keras.callbacks.EarlyStopping(monitor = 'val_loss', patience = patience, restore_best_weights = True)

history = vae.fit(train_x_flatten, validation_data=(val_x_flatten, None), epochs = epoch_count, batch_size = batch_size, callbacks = [early_stop])

We can learn a lot about our model by observing the graph of its performance over time during training.

The **fit** method returns an object (*history*) containing loss and metrics values at successive epochs for both training and validation sets.

The following code calls the **plot_history** function defined above to draw in a graph the loss over epochs on both training and validation sets.

In [None]:
plot_history(history)

## **Performance evaluation on the test set**
The performance on the test set can be easily measured by calling the **evaluate** method of the autoencoder.

In [None]:
test_loss = vae.evaluate(test_x_flatten, None, batch_size = batch_size, verbose = 0)
print('Test loss: {:.3f}'.format(test_loss))

## **Reduced space visualization**
The [**predict**](https://keras.io/api/models/model_training_apis/#predict-method) method of the *encoder* can be used to obtain *mu* and *log_var* values representing training, validation and test sets in the latent space.

In [None]:
train_x_latent = vae_encoder.predict(train_x_flatten)
val_x_latent = vae_encoder.predict(val_x_flatten)
test_x_latent = vae_encoder.predict(test_x_flatten)

The following code visualizes training, validation and test sets in the latent space by plotting *mu* values.

In [None]:
plot_2d_data([train_x_latent[0], val_x_latent[0], test_x_latent[0]], [train_y,val_y,test_y], ['Train', 'Validation', 'Test'], (18, 6))

## **Generated images**
The **predict** method of the *decoder* can be used to generate a handwritten digit from random noise.

The following code visualizes a randomly generated image.

In [None]:
random_sample = np.array([[random.normalvariate(0, 1), random.normalvariate(0, 1)]])

print('Random sample: ', random_sample)

decoded_x = vae_decoder.predict(random_sample)
digit = decoded_x[0].reshape(original_image_shape)

plt.axis('off')
plt.imshow(digit, cmap = 'gray')
plt.show()

Running the code below will show a continuous distribution of the different digit classes, with each digit morphing into another across the 2D latent space.

In [None]:
n = 20 # number of images per row and column
limit = 3 # random values are sampled from the range [-limit,+limit]

grid_x = np.linspace(-limit, limit, n) 
grid_y = np.linspace(limit, -limit, n)

generated_images=[]
for i, yi in enumerate(grid_y):
  single_row_generated_images = []
  for j, xi in enumerate(grid_x):
    random_sample = np.array([[xi, yi]])
    decoded_x = vae_decoder.predict(random_sample)
    single_row_generated_images.append(decoded_x[0].reshape(original_image_shape))
  generated_images.append(single_row_generated_images)      

plot_generated_images(generated_images, n, n, True)

# **Exercise 1: conditional variational autoencoder (CVAE)**
- A Conditional Variational AutoEncoder (CVAE) is a VAE with an extra input to both the encoder and the decoder to shape the entire generative process on a specific input.

- At training time, the input type (i.e., the class, label or category) is provided to both the encoder and decoder.

-  CVAEs are more powerful than traditional VAEs in capturing and encoding contextual information

<img src=https://raw.githubusercontent.com/nderus/Generative-models-Lesson/main/images/CVAE.png width="700">

Define and train a conditional variational autoencoder to generate handwritten digits given the digit label as input type:
1. define a CVAE model implementing the **build_cvae** function;
2. execute the training process;
3. generate different handwritten digits using the CVAE decoder.

## **Digit labels one hot encoding**
To avoid the model to misinterpret the digit labels, labels are conveniently converted into one hot encoding representation using the [**to_categorical**](https://keras.io/api/utils/python_utils/#to_categorical-function) function provided by Keras.

In [None]:
train_y_one_hot = to_categorical(train_y, category_count)
val_y_one_hot = to_categorical(val_y, category_count)
test_y_one_hot = to_categorical(test_y, category_count)

print('Train label one hot encoding shape: ', train_y_one_hot.shape)
print('Validation label one hot encoding shape: ', val_y_one_hot.shape)
print('Test label one hot encoding shape: ', test_y_one_hot.shape)

## **Model definition**
Implement the following function to create a CVAE model given:
- the number of input features (*input_count*);
- the dimension of input type (*condition_count*);
- the number of neurons for each hidden layer (*neuron_count_per_hidden_layer*);
- the dimension of the latent space (*encoded_dim*);
- the string identifier of the activation function of the hidden layers (*hidden_activation*);
- the string identifier of the activation function of the output layer (*output_activation*).

It could be useful to start from function used to build a VAE (**build_vae**).

As for VAEs, Keras **Sequential** class cannot be used because the layers are not stacked and some of them receive multiple inputs or return multiple outputs. The **Model** class can be used instead.

Both encoder and decoder need to receive two inputs. To this purpose, the Keras [**Concatenate**](https://keras.io/api/layers/merging_layers/concatenate/) layer can be used to concatenate multiple inputs into a single tensor.

In [None]:
def build_cvae(input_count, condition_count, neuron_count_per_hidden_layer, encoded_dim, hidden_activation, output_activation):
  # ...

## **Model creation**
The following code creates a conditional VAE by calling the **build_cvae** function defined above.

In [None]:
cvae, cvae_encoder, cvae_decoder = build_cvae(train_x_flatten.shape[1], category_count, [256, 128], 2, 'sigmoid', 'sigmoid')

## **Model visualization**
A string summary of the network can be printed by executing the following code.

In [None]:
cvae.summary()

Alternatively, a plot of the neural network graph can be visualized.

In [None]:
keras.utils.plot_model(cvae, show_shapes = True, show_layer_names = True, expand_nested = True)

## **Model compilation**
The following code compiles the model as already done for the variational autoencoder.

In [None]:
kl_coefficient = 1

cvae_input = cvae.input[0]
cvae_output = cvae.output
mu = cvae.get_layer('mu').output
log_var = cvae.get_layer('log_var').output

cvae.add_loss(vae_loss(cvae_input, cvae_output, mu, log_var, kl_coefficient))
cvae.compile(optimizer = 'adam')

## **Training**
Now we are ready to train our model by calling the **fit** method.

In [None]:
epoch_count = 100
batch_size = 100
patience = 5

early_stop = keras.callbacks.EarlyStopping(monitor = 'val_loss', patience = patience, restore_best_weights = True)

history = cvae.fit([train_x_flatten, train_y_one_hot],
                   validation_data = ([val_x_flatten, val_y_one_hot], None),
                   epochs = epoch_count,
                   batch_size = batch_size,
                   callbacks = [early_stop])

The following code calls the **plot_history** function defined above to draw in a graph the loss over epochs on both training and validation sets.

In [None]:
plot_history(history)

## **Performance evaluation on the test set**
The **evaluate** method of the autoencoder is used to measure the performance on the test set.

In [None]:
test_loss = cvae.evaluate([test_x_flatten, test_y_one_hot], None, batch_size = batch_size, verbose = 0)
print('Test loss: {:.3f}'.format(test_loss))

## **Reduced space visualization**
Obtain *mu* and *log_var* values representing training, validation and test sets in the latent space by calling the **predict** method of the CVAE *encoder*.

In [None]:
train_x_latent = cvae_encoder.predict([train_x_flatten,train_y_one_hot])
val_x_latent = cvae_encoder.predict([val_x_flatten,val_y_one_hot])
test_x_latent = cvae_encoder.predict([test_x_flatten,test_y_one_hot])

The following code visualizes training, validation and test sets in the latent space by plotting *mu* values.

In [None]:
plot_2d_data([train_x_latent[0], val_x_latent[0], test_x_latent[0]], [train_y, val_y, test_y], ['Train', 'Validation', 'Test'], (18, 6))

## **Generated images**
Use the **predict** method of the CVAE *decoder* to visualize a randomly generated handwritten digit (of a specific category: *digit_label*).

It could be useful to start from the code used to visualize a handwritten digit generated by a VAE.

In [None]:
digit_label=8

random_sample = np.array([[random.normalvariate(0,1), random.normalvariate(0,1)]])
digit_label_one_hot=to_categorical(digit_label, category_count).reshape(1,-1)

print('Random sample: ',random_sample)
print('Digit lable one hot encoding: ',digit_label_one_hot)

decoded_x = cvae_decoder.predict([random_sample,digit_label_one_hot])
digit = decoded_x[0].reshape(original_image_shape)

plt.axis('off')
plt.imshow(digit, cmap='gray')
plt.show()

Running the code below will explore the latent space by varying the first dimension and maintaining constant the second one.

In [None]:
n = 10  # number of images per row and column
limit = 3 # random values are sampled from the range [-limit,+limit]
second_dim_const = 0  # constant value of the second latent dimension

grid_x = np.linspace(-limit, limit, n) 

generated_images = []
for digit_label in range(category_count):
  digit_lable_one_hot = to_categorical(digit_label, category_count).reshape(1, -1)
  
  single_row_generated_images = []
  for i, xi in enumerate(grid_x):
    random_sample = np.array([[xi, second_dim_const]])
    decoded_x = cvae_decoder.predict([random_sample, digit_lable_one_hot])
    single_row_generated_images.append(decoded_x[0].reshape(original_image_shape))
  generated_images.append(single_row_generated_images)      

plot_generated_images(generated_images, n, n)

Running the code below will explore the latent space by varying the second dimension and maintaining constant the first one.

In [None]:
n = 10  # number of images per row and column
limit = 3 # random values are sampled from the range [-limit,+limit]
first_dim_const = 0  # constant value of the second latent dimension

grid_y = np.linspace(-limit, limit, n) 

generated_images = []
for digit_label in range(category_count):
  digit_lable_one_hot = to_categorical(digit_label, category_count).reshape(1, -1)
  
  single_row_generated_images = []
  for i, yi in enumerate(grid_y):
    random_sample = np.array([[first_dim_const, yi]])
    decoded_x = cvae_decoder.predict([random_sample, digit_lable_one_hot])
    single_row_generated_images.append(decoded_x[0].reshape(original_image_shape))
  generated_images.append(single_row_generated_images)      

plot_generated_images(generated_images, n, n)

# **Exercise 2**
Train VAE and CVAE to generate images similar to [**fashion MNIST**](https://github.com/zalandoresearch/fashion-mnist) dataset.

# **Generative adversarial network (GAN)**
In this section a generative adversarial network is trained to generate handwritten digits.

<img src=https://raw.githubusercontent.com/nderus/Generative-models-Lesson/main/images/GAN.png width="700">

GAN training proceeds in alternating periods:
1. The discriminator 𝐷 trains for one or more epochs (keeping the generator constant);
2. The generator 𝐺 trains for one or more epochs (keeping the discriminator constant);
3. Repeat steps 1 and 2 to continue training the generator and discriminator.

- As the generator improves with training, the discriminator performance gets worse because the discriminator cannot easily tell the difference between real and fake.

- If the generator succeeds perfectly, then the discriminator has a 50% accuracy.

- The discriminator is trained on two different mini-batches (one from real and one from fake data)

## **Model definition**
The following function creates a generative adversarial network given:
- the size of the input noise (*input_noise_dim*);
- the number of neurons for each hidden layer (*neuron_count_per_hidden_layer*);
- the dimension of the output (*output_dim*);
- the string identifier of the activation function of the hidden layers (*hidden_activation*);
- the string identifier of the activation function of the output layer of the generator (*generator_output_activation*).

The function returns the generator and the discriminator models as well as the whole GAN.

In Keras, a sequential is a stack of layers where each layer has exactly one input and one output. It can be created by passing a list of layers to the  constructor [**keras.Sequential**](https://keras.io/guides/sequential_model/).

To combine generator and discriminator together forming the GAN, the **Model** class provided by Keras is used. Input and output layers are passed to the constructor, then it groups layers into an object with training and inference features.

The GAN model will be used to train the model weights in the generator, using the output and error calculated by the discriminator model.

In [None]:
def build_gan(input_noise_dim, neuron_count_per_hidden_layer, output_dim, hidden_activation, generator_output_activation):
  #Generator
  generator = keras.Sequential(name = 'generator')
  generator.add(layers.Input(shape = input_noise_dim, name = 'generator_input'))
  for neuron_count in neuron_count_per_hidden_layer:
      generator.add(layers.Dense(neuron_count, activation = hidden_activation))
      
  generator.add(layers.Dense(output_dim, activation = generator_output_activation, name = 'generator_output'))

  #Discriminator
  discriminator = keras.Sequential(name = 'discriminator')
  discriminator.add(layers.Input(shape = output_dim, name = 'discriminator_input'))
  for neuron_count in reversed(neuron_count_per_hidden_layer):
      discriminator.add(layers.Dense(neuron_count, activation = hidden_activation))
      
  discriminator.add(layers.Dense(1, activation = 'sigmoid', name = 'discriminator_output'))

  #GAN
  gan = keras.Model(generator.input, discriminator(generator.output), name = 'gan')

  return gan,generator,discriminator

## **Model creation**
The following code creates a GAN by calling the **build_gan** function defined above.

In [None]:
input_noise_dim = 100

gan,gan_generator, gan_discriminator = build_gan(input_noise_dim, [256, 512, 1024], train_x_flatten.shape[1], 'relu', 'sigmoid')

## **Model visualization**
A string summary of the network can be printed by executing the following code.

In [None]:
gan.summary()

Alternatively, a plot of the neural network graph can be visualized.

In [None]:
keras.utils.plot_model(gan,show_shapes = True, show_layer_names = True, expand_nested = True)

## **Loss function definition**

## Discriminator loss
$$L_D = -y * log(\hat{y}) - (1 - y) * log(1 - \hat{y})$$

- where $\hat{y}$ and $y$ are the predicted and the desired outputs (0=fake and 1=real), respectively.

## Generator loss
$$L_G = log(1 - D(G(z)))$$

- The generator is trained to minimize the probability of the discriminator to classify as fake the data received from the generator.


## **Model compilation**
The following code compiles the discriminator and the whole generative adversarial network.

Because the discriminator model is trained separately, its weights are marked as not trainable in the GAN model to ensure that only the weights of the generator model are updated. This change to the trainability of the discriminator weights only has an effect when training the combined GAN model, not when training the discriminator standalone.

<u>Note that, *trainable* flag is evaluated during the model compilation.</u>

In [None]:
gan_discriminator.compile(loss = 'binary_crossentropy', optimizer = 'sgd')

gan_discriminator.trainable = False
gan.compile(loss = 'binary_crossentropy', optimizer = 'sgd')

## **Training**
Training a GAN model is not easy and cannot be done simply using the **fit** method.

To simplify and generalize the training function, it is subdivided into different sub-functions.

### **Random selection of real batches**
At each iteration, a batch of real inputs from the dataset is required. This can be achieved by selecting a random sample of images from the dataset each time.

The following code defines some functions useful to randomly select batches of real images from the dataset at each iteration:
- **chunks** divides "on the fly" a *list* in subsets of length *n*;
- **get_random_batch_indices** returns batches of *batch_size* random indices given the cardinality of the dataset (*data_count*);
- **get_gan_real_batch** creates a batch with real images with the corresponding labels (1=real).

In [None]:
def chunks(list, n):
    for i in range(0, len(list), n):
        yield list[i:i + n]

def get_random_batch_indices(data_count, batch_size):
    list_indices = list(range(0, data_count))
    random.shuffle(list_indices)
    return list(chunks(list_indices, batch_size))

def get_gan_real_batch(dataset_x, batch_indices, label):
  batch_x = dataset_x[batch_indices]
  batch_y = np.full(len(batch_indices), label)

  return batch_x, batch_y

### **Generation of fake batches**
At each iteration, the discriminator is trained on two different mini-batches: real and fake.

The following code defines functions useful to generate batches of fake images:
- **get_gan_random_input** returns a tensor (*batch_size*$\times$*noise_dim*) of random noise to use as generator input;
- **get_gan_fake_batch** returns a batch of fake images, created using the generator, and the corresponding labels (0=fake).

In [None]:
def get_gan_random_input(batch_size, noise_dim, *_):
  return np.random.normal(0, 1, size = (batch_size, noise_dim))

def get_gan_fake_batch(generator, batch_size, generator_input):
  batch_x = generator.predict(generator_input)
  batch_y = np.zeros(batch_size)

  return batch_x, batch_y

### **Concatenation of real and fake batches**
Before training the discriminator, real and fake batches need to be concatenated. The following function concatenates a real and a fake batch into a single batch.

In [None]:
def concatenate_gan_batches(real_batch_x, fake_batch_x):
  return np.concatenate((real_batch_x, fake_batch_x))

### **Train function**
The following function train a generative adversarial network given:
- the GAN model (*gan*);
- the generator model (*generator*);
- the discriminator model (*discriminator*);
- the training set (*train_x*);
- the number of example contained in the training set (*train_data_count*);
- the size of the input noise (*input_noise_dim*);
- the number of epochs (*epoch_count*);
- the batch size (*batch_size*);
- the function to generate random noise to use as input for the generator (*get_random_input_func*);
- the function to create a real batch from the training set (*get_real_batch_func*);
- the function to generate a fake batch with the generator (*get_fake_batch_func*);
- the function to concatenate a real and a fake batch into a single batch (*concatenate_batches_func*);
- the dimension of additional information (*condition_count*). It will be used only to train conditional generative adversarial networks (see Exercises 3 and 5);
- a flag to decide if using or not *one-sided label smoothing* (*use_one_sided_labels*);
- the update frequency of example images (*plt_frq*);
- the number of examples visualized (*plt_example_count*);
- the original shape of the example images (*example_shape*).

At each iteration:
1. a batch of real images is randomly selected from the dataset (*real_batch_x* and *real_batch_y*);
2. a batch of fake images is generated using the generator (*fake_batch_x* and *fake_batch_y*);
3. the two batches are concatenated forming a single batch (*discriminator_batch_x* and *discriminator_batch_x*);
4. the discriminator is trained on this batch and its weights are updated using the [**train_on_batch**](https://keras.io/api/models/model_training_apis/#trainonbatch-method) method obtaining the corresponding training loss value (*d_loss*);
5. a batch of noise data is randomly generated to be used as input of the generator (*gan_batch_x* and *gan_batch_y*);
6. the generator is trained on this batch and its weights are updated using the **train_on_batch** method obtaining the corresponding training loss value (*g_loss*);
7. discriminator and generator losses are aggregated to compute the corresponding epoch average losses (*avg_d_loss* and *avg_g_loss*).

In [None]:
def train_gan(gan,generator, discriminator, train_x, train_data_count, input_noise_dim, epoch_count, batch_size,
              get_random_input_func, get_real_batch_func, get_fake_batch_func, concatenate_batches_func, condition_count = -1,
              use_one_sided_labels = False, plt_frq = None, plt_example_count = 10, example_shape = (28, 28)):
    iteration_count = int(train_data_count / batch_size)
    
    print('Epochs: ', epoch_count)
    print('Batch size: ', batch_size)
    print('Iterations: ', iteration_count)
    print('')
    
    #Plot generated images
    if plt_frq != None:
      print('Before training:')
      noise_to_plot = get_random_input_func(plt_example_count, input_noise_dim,condition_count)
      generated_output = generator.predict(noise_to_plot)
      generated_images = generated_output.reshape(plt_example_count, example_shape[0], example_shape[1])
      plot_generated_images([generated_images], 1, plt_example_count, figsize = (15, 5))
          
    d_epoch_losses = []
    g_epoch_losses = []
    for e in range(1, epoch_count + 1):
        start_time = time.time()
        avg_d_loss = 0
        avg_g_loss = 0

        # Training indices are shuffled and grouped into batches
        batch_indices = get_random_batch_indices(train_data_count, batch_size)

        for i in range(iteration_count):
            current_batch_size = len(batch_indices[i])

            # 1. create a batch with real images from the training set
            real_batch_x, real_batch_y = get_real_batch_func(train_x, batch_indices[i], 0.9 if use_one_sided_labels else 1)
                        
            # 2. create noise vectors for the generator and generate the images from the noise
            generator_input = get_random_input_func(current_batch_size, input_noise_dim, condition_count)
            fake_batch_x, fake_batch_y = get_fake_batch_func(generator, current_batch_size, generator_input)

            # 3. concatenate real and fake batches into a single batch
            discriminator_batch_x = concatenate_batches_func(real_batch_x, fake_batch_x)
            discriminator_batch_y = np.concatenate((real_batch_y, fake_batch_y))

            # 4. train discriminator
            d_loss = discriminator.train_on_batch(discriminator_batch_x, discriminator_batch_y)
            
            # 5. create noise vectors for the generator
            gan_batch_x = get_random_input_func(current_batch_size, input_noise_dim, condition_count)
            gan_batch_y = np.ones(current_batch_size)    #Flipped labels

            # 6. train generator
            g_loss = gan.train_on_batch(gan_batch_x, gan_batch_y)

            # 7. avg losses
            avg_d_loss += d_loss*current_batch_size
            avg_g_loss += g_loss*current_batch_size
            
        avg_d_loss /= train_data_count
        avg_g_loss /= train_data_count

        d_epoch_losses.append(avg_d_loss)
        g_epoch_losses.append(avg_g_loss)

        end_time = time.time()

        print('Epoch: {0} exec_time={1:.1f}s d_loss={2:.3f} g_loss={3:.3f}'.format(e, end_time - start_time, avg_d_loss, avg_g_loss))

        # Update the plots
        if plt_frq != None and e%plt_frq == 0:
            generated_output = generator.predict(noise_to_plot)
            generated_images = generated_output.reshape(plt_example_count, example_shape[0], example_shape[1])
            plot_generated_images([generated_images], 1, plt_example_count, figsize = (15, 5))
    
    return d_epoch_losses, g_epoch_losses

### **Execute training**
The following code train our model by calling the **train_gan** function defined above.

To reduce the time needed to execute each epoch, the validation set is used instead of the training set.

In [None]:
epoch_count = 10
batch_size = 100

d_epoch_losses, g_epoch_losses = train_gan(gan,
                                        gan_generator,
                                        gan_discriminator,
                                        val_x_flatten,
                                        val_x_flatten.shape[0],
                                        input_noise_dim,
                                        epoch_count,
                                        batch_size,
                                        get_gan_random_input,
                                        get_gan_real_batch,
                                        get_gan_fake_batch,
                                        concatenate_gan_batches,
                                        plt_frq = 1,
                                        plt_example_count = 15)

The following code calls the **plot_gan_losses** function defined above to draw in a graph the discriminator and generator loss trends over epochs.

In [None]:
plot_gan_losses(d_epoch_losses, g_epoch_losses)

## **Tips and tricks for training**
Training stable GAN models can be very challenging. The reason they are difficult to train is that the training process is inherently unstable because both generator and discriminator models are trained simultaneously in a game. This means that improvements to one model come at the expense of the other model.

The goal of training two models involves finding a point of equilibrium between the two competing concerns.

GANs particularly suffer of the following problems:
- non-convergence: the models do not converge and worse they become unstable;
- mode collapse: the generator produces limited modes;
- slow training: the gradient to train the generator vanished.

Practitioners use several tricks to improve the performance of GANs. It can be difficult to tell how effective some of these tricks are; many of them seem to help in some contexts and hurt in others.

In this section some of the common practical tips to train stable GAN models are reported. 

### **Normalize the inputs**
Normalize inputs to the range [-1,1] and use *tanh* as activation function in the output layer of the generator.

In [None]:
train_x_flatten = (train_x_flatten * 2) - 1
val_x_flatten = (val_x_flatten * 2) - 1
test_x_flatten = (test_x_flatten * 2) - 1

generator_output_activation = 'tanh'

### **Avoid sparse gradients**
The stability of the GAN training suffers sparse gradients. To reduce this problem, practitioners suggest to use LeakyReLU (instead of ReLU) in the hidden layers of both generator and discriminator with a slope coefficient of 0.2.

Keras provides the [**LeakyReLU**](https://keras.io/api/layers/activation_layers/leaky_relu/) class to implement such activation function.

In [None]:
hidden_activation = layers.LeakyReLU(alpha = 0.2)

### **One-sided label smoothing**
Deep networks may suffer from overconfidence. For example, it uses very few features to classify an object.

If the discriminator depends on a small set of features to detect real images, the generator may just produce these features only to exploit the discriminator obtaining no long term benefit.

*One-sided label smoothing* can be used to avoid the problem by replacing the target of real examples with a value slightly less than 1, such as 0.9. This prevents extreme extrapolation behavior in the discriminator.

In [None]:
use_one_sided_labels = True

### **Use Adam optimizer**
The use of Adam optimizer with a learning rate of 0.0002 and a momentum ($\beta_1$) of 0.5 seems usually works better than other methods. 

In [None]:
optimizer = keras.optimizers.legacy.Adam(learning_rate = 0.0002, beta_1 = 0.5)

### **Execute training**
The following code train our model by calling the **train_gan** function defined above.

To reduce the time needed to execute each epoch, the validation set is used instead of the training set.

In [None]:
epoch_count = 30
batch_size = 100

gan, gan_generator, gan_discriminator = build_gan(input_noise_dim,
                                              [256, 512, 1024],
                                              train_x_flatten.shape[1],
                                              hidden_activation,
                                              generator_output_activation)

gan_discriminator.compile(loss = 'binary_crossentropy', optimizer = optimizer)

gan_discriminator.trainable = False
gan.compile(loss = 'binary_crossentropy', optimizer = optimizer)

d_epoch_losses,g_epoch_losses=train_gan(gan,
                                        gan_generator,
                                        gan_discriminator,
                                        val_x_flatten,
                                        val_x_flatten.shape[0],
                                        input_noise_dim,
                                        epoch_count,
                                        batch_size,
                                        get_gan_random_input,
                                        get_gan_real_batch,
                                        get_gan_fake_batch,
                                        concatenate_gan_batches,
                                        use_one_sided_labels=use_one_sided_labels,
                                        plt_frq = 1,
                                        plt_example_count = 15)

The following code calls the **plot_gan_losses** function defined above to draw in a graph the discriminator and generator loss trends over epochs.

In [None]:
plot_gan_losses(d_epoch_losses, g_epoch_losses)

## **Generated images**
The following code visualizes a randomly generated handwritten digit obtained calling the **predict** method of the *generator*.

In [None]:
noise = np.random.normal(0, 1, size = (1, input_noise_dim))

generated_x = gan_generator.predict(noise)
digit = generated_x[0].reshape(original_image_shape)

plt.axis('off')
plt.imshow(digit, cmap = 'gray')
plt.show()

Running the code below will show handwritten digits randomly generated by the generator.

In [None]:
n = 10 # number of images per row and column

generated_images = []
for i in range(n):
  noise = np.random.normal(0, 1, size=(n, input_noise_dim))
  generated_x = gan_generator.predict(noise)
  generated_images.append([g.reshape(original_image_shape) for g in generated_x])

plot_generated_images(generated_images, n, n)

# **Exercise 3: conditional generative adversarial network (cGAN)**
Define and train a conditional generative adversarial network to generate handwritten digits given the digit label as additional information:
1. define a cGAN model implementing the **build_cgan** function;
2. execute the training process;
3. generate different handwritten digits using the cGAN generator.

## **Model definition**
Implement the following function to create a cGAN model given:
- the size of the input noise (*input_noise_dim*);
- the dimension of additional information (*condition_dim*);
- the number of neurons for each hidden layer (*neuron_count_per_hidden_layer*);
- the dimension of the output (*output_dim*);
- the string identifier of the activation function of the hidden layers (*hidden_activation*);
- the string identifier of the activation function of the output layer of the generator (*generator_output_activation*).

It could be useful to start from functions used to build a GAN (**build_gan**) and a cVAE (**build_cvae**).

In [None]:
def build_cgan(input_noise_dim,condition_dim,neuron_count_per_hidden_layer,output_dim,hidden_activation,generator_output_activation):
  # ...

## **Model creation**
Call the **build_cgan** function to create a cGAN model.

In [None]:
input_noise_dim = 100
hidden_activation = layers.LeakyReLU(alpha = 0.2)
generator_output_activation = 'tanh'

cgan, cgan_generator, cgan_discriminator = build_cgan(input_noise_dim,
                                                  category_count,
                                                  [256, 512, 1024],
                                                  train_x_flatten.shape[1],
                                                  hidden_activation,
                                                  generator_output_activation)

## **Model visualization**
A string summary of the network can be printed by executing the following code.

In [None]:
cgan.summary()

Alternatively, a plot of the neural network graph can be visualized.

In [None]:
keras.utils.plot_model(cgan, show_shapes = True, show_layer_names = True, expand_nested = True)

## **Model compilation**
Compile the model for training.

In [None]:
keras.optimizers.legacy.Adam(learning_rate = 0.0002, beta_1 = 0.5)

cgan_discriminator.compile(loss='binary_crossentropy', optimizer = optimizer)

cgan_discriminator.trainable = False
cgan.compile(loss = 'binary_crossentropy', optimizer = optimizer)

## **Training**
To train the cGAN model the **train_gan** function previously defined can be used. 

Before that, some sub-functions previously defined for GANs need to be redefined. This is necessary because the input of cGANs contains an extra information.

### **Random selection of real batches**
The following code defines a function to create a batch with real inputs with the corresponding labels (1=real). Each real input is composed by a real image and the one hot encoding representation of its category (additional information).

In [None]:
def get_cgan_real_batch(dataset, batch_indices, label):
  dataset_input = dataset[0]
  dataset_condition_info = dataset[1]
  batch_x = [dataset_input[batch_indices], dataset_condition_info[batch_indices]]
  batch_y = np.full(len(batch_indices), label)

  return batch_x, batch_y

### **Generation of fake batches**
The following code defines functions useful to generate batches of fake inputs for cGANs:
- **get_cgan_random_input** returns a tensor (*batch_size*$\times$*noise_dim*) of random noise and corresponding additional information to be used as generator input;
- **get_cgan_fake_batch** returns a batch of fake inputs and the corresponding labels (0=fake). Each fake input is composed by a fake image, created by the generator, and a randomly generated one hot encoding vector representing its category (additional information).
  

In [None]:
def get_cgan_random_input(batch_size, noise_dim, condition_count):
  noise=np.random.normal(0, 1, size = (batch_size, noise_dim))
  condition_info= to_categorical(np.random.randint(0, condition_count, size = batch_size), condition_count)

  return [noise,condition_info]

def get_cgan_fake_batch(generator, batch_size, generator_input):
  batch_x = [generator.predict(generator_input), generator_input[1]]
  batch_y = np.zeros(batch_size)

  return batch_x,batch_y

### **Concatenation of real and fake batches**
The following function concatenates a real and a fake batch into a single batch. The resulting batch is composed by images and additional information separately concatenated. 

In [None]:
def concatenate_cgan_batches(real_batch_x, fake_batch_x):
  batch_input = np.concatenate((real_batch_x[0], fake_batch_x[0]))
  batch_condition_info = np.concatenate((real_batch_x[1], fake_batch_x[1]))

  return [batch_input, batch_condition_info]

### **Execute training**
Execute training calling the **train_gan** method.

In [None]:
epoch_count = 50
batch_size = 100

d_epoch_losses,g_epoch_losses=train_gan(cgan,
                                        cgan_generator,
                                        cgan_discriminator,
                                        [val_x_flatten,val_y_one_hot],
                                        val_x_flatten.shape[0],
                                        input_noise_dim,
                                        epoch_count,
                                        batch_size,
                                        get_cgan_random_input,
                                        get_cgan_real_batch,
                                        get_cgan_fake_batch,
                                        concatenate_cgan_batches,
                                        condition_count = category_count,
                                        use_one_sided_labels = True,
                                        plt_frq = 1,
                                        plt_example_count = 15)

Call the **plot_gan_losses** function to draw the discriminator and generator loss trends over epochs.

In [None]:
plot_gan_losses(d_epoch_losses, g_epoch_losses)

## **Generated images**
The following code visualizes a randomly generated handwritten digit (of a specific category: *digit_label*) obtained calling the **predict** method of the *generator*.

In [None]:
digit_label=0

noise = np.random.normal(0, 1, size=(1, input_noise_dim))
digit_label_one_hot = to_categorical(digit_label, category_count).reshape(1,-1)

generated_x = cgan_generator.predict([noise, digit_label_one_hot])
digit = generated_x[0].reshape(original_image_shape)

plt.axis('off')
plt.imshow(digit, cmap='gray')
plt.show()

Running the code below will show a row of randomly generated images for each digit category.

In [None]:
n = 10 # number of images per digit category

generated_images=[]
for digit_label in range(category_count):
  noise = np.random.normal(0, 1, size=(n, input_noise_dim))
  digit_label_one_hot = to_categorical(np.full(n, digit_label), category_count)
  generated_x = cgan_generator.predict([noise, digit_label_one_hot])
  generated_images.append([g.reshape(original_image_shape) for g in generated_x])

plot_generated_images(generated_images, category_count, n)