<!--NAVIGATION-->
<a href="https://colab.research.google.com/github/marcoteran/deeplearningmodule/blob/main/05_generativeadversialnetworks/05_generativeadversialnetworks.ipynb" target="_blank"><img align="left" src="https://colab.research.google.com/assets/colab-badge.svg" alt="Abrir en Colab" title="Abrir y ejecutar en Google Colaboratory"></a>

## Ejemplos de código
# Deep Learning: Generative Adversarial Networks

- *Name:* Marco Teran
- *E-mail:* marco.teran@usa.edu.co
- [Website](http://marcoteran.github.io/), [Github](https://github.com/marcoteran), [LinkedIn](https://www.linkedin.com/in/marcoteran/).


[**Slide del tema** ](https://github.com/marcoteran/deeplearningmodule/raw/main/05_deeplearning_generativeadversialnetworks.pdf)

# Programando una GAN

A continuación se presentarán los pasos para implementar una GAN que permite entrenar un *Generador* que sintetiza dígitos escritos a mano que parecen reales usando *Deep Convolutional Generative Adversarial Networks (DCGAN)*

Capturas de las imágenes producidas por el Generador en diferentes *epochs* durante su proceso de entrenamiento (se indica en qué número de epoch se ha generado cada imagen).

* Se usa el Generador para generar 100 imágenes
* Se podrá observar fácilmente que al inicio del entrenamiento las imágenes aparecen como ruido aleatorio
* A medida que va avanzando el entrenamiento (va avanzando el número de *epochs*) los dígitos generados se parecen cada vez más a dígitos reales escritos a mano.
* El objetivo final en un caso real es que el Generador sintetice imágenes de 28 × 28 que se confundan como datos reales del conjunto MNIST (conjunto de imágenes que hemos considerado como «reales»)

### Preparación del entorno y descarga de datos

Empezamos con las importaciones estándar para preparar el entorno.

#### Importar TensorFlow 2.0

In [None]:
import tensorflow as tf
from tensorflow import keras
print(tf.__version__)

Importar todos los paquetes necesarios para ejecutar el modelo propuesto

In [None]:
import imageio
import matplotlib.pyplot as plt
import numpy as np
import os

#### Descarga de los datos  y preprocesado de los datos

Descargar y preparar el conjunto de datos con el que entrenaremos la GAN.
Ahora ya podemos descargar las imágenes del conjunto de datos **MNIST** de dígitos escritos a mano, que serán las imágenes que consideraremos *«reales»* para este ejemplo. Podemos hacerlo directamente desde ``keras.datasets`` y preparar las imágenes para ser usadas por las redes con el siguiente código

In [None]:
(train_images, _), (_, _) = tf.keras.datasets.mnist.load_data()

train_images = train_images.reshape(train_images.shape[0], 28, 28, 1).astype('float32')

En este caso solo interesan las imágenes (no se descargan las labels ni los datos de prueba)

Las imágenes se normalizan en el rango ``[-1, 1]`` para poder usar como función de activación en la capa final del Generador la función ``tanh``:

In [None]:
train_images = (train_images - 127.5) / 127.5 

Se barajan y preparan los datos en lotes con el siguiente código:

In [None]:
BUFFER_SIZE = 60000
BATCH_SIZE = 256
train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

## Creación de los modelos

A continuación, se pasa a crear las redes neuronales que actuarán de Generador y Discriminador.

### Generador

Siguiendo el esquema que había descrito, el Generador recibe como entrada ruido, que puede obtener por ejemplo con ``tf.random.normal``.
De este ruido debe crear una imagen de $28×28$ píxeles

In [None]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Reshape, Conv2DTranspose, BatchNormalization, LeakyReLU

In [None]:
def make_generator_model():
    model = Sequential()
    model.add(Dense(7*7*256, use_bias=False, input_shape=(100,)))

    model.add(Reshape((7, 7, 256)))

    model.add(Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))

    model.add(Conv2DTranspose(64, (5, 5), strides=(1, 1), padding='same'))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))

    model.add(Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', activation='tanh'))

    return model

In [None]:
generator = make_generator_model()
generator.summary ()

Descripción del modelo:
1. Capa densa que recoge el vector de ruido de entrada y lo transforma en un tensor tridimensional
2. En las capas sucesivas se va transformando hasta llegar a una salida de 28 × 28 × 1.
3. La red aumenta los tamaños de los mapas de características con la capa Conv2DTranspose.
    * La capa convolución transpuesta se usa generalmente para aumentar el mapa de características.
    * La convolución transpuesta funciona insertando ceros entre los elementos de los mapas de características de entrada y, después, aplicando una convolución normal.
4. La capa ``LeakyReLU`` es una versión modificada de la función de activación ReLU que tiene una pequeña pendiente para valores negativos, en lugar de cero como la ReLU (determinada por el argumento).
    * Esta se utiliza en cada capa excepto en la última capa.
    * Esta capa es una funciones de activación avanzadas en Keras y solo está disponible como capa y no como funciones de activación.
5. En la última capa se ha usado una función de activación ``tanh``:
    * ``tanh`` tiende a producir imágenes más nítidas que la sigmoide (rango típico de 0 a1)
6. Se usa la capa ``BatchNormalization`` para normalizar las entradas de la capa

Podemos comprobar que la red funciona como se espera con el siguiente código, en el que el Generador genera una instancia de datos fake: 

In [None]:
noise_dim = 100
noise = tf.random.normal([1, noise_dim])
generated_image = generator(noise, training=False)

plt.imshow(generated_image[0, :, :, 0], cmap='gray')

### Discriminador

Por otra parte, el Discriminator recibe imágenes de $28×28×1$ píxeles y saca una probabilidad que indica si esta imagen de entrada la considera real (en lugar de fake)

In [None]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Flatten, BatchNormalization, LeakyReLU, Conv2D

In [None]:
def make_discriminator_model():
    model = Sequential()
    model.add(Conv2D(32, (5, 5), strides=(2, 2), padding='same',
                                     input_shape=[28, 28, 1]))
    model.add(LeakyReLU(alpha=0.01))
    
    model.add(Conv2D(64, (5, 5), strides=(2, 2), padding='same'))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))
    
    model.add(Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))
    
    model.add(Conv2D(128*2, (5, 5), strides=(2, 2), padding='same'))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))
    
    model.add(Conv2D(128*3, (5, 5), strides=(2, 2), padding='same'))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))

    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))

    return model

In [None]:
discriminator = make_discriminator_model()
discriminator.summary ()

Esta red se construye con tres capas convolucionales de 32, 64 y 128 neuronas, con función de activación ``LeakyReLU`` y ``BatchNormalization``. La última capa es una capa densa con una función de activación sigmoide.

El Discriminador se usará para clasificar las imágenes generadas como reales o fake, generando valores:
* Próximos al 1 para imágenes que considera reales
* Próximos a 0 para imágenes que considera fake.

Podemos comprobar que el Discriminador funciona como creemos con el siguiente código:

In [None]:
decision = discriminator(generated_image)
print (decision)

## Funciones de *Loss* y optimizadores


Una vez los modelos del Generador y Discriminador fueron creados, el siguiente paso para entrenar las redes es establecer la función de pérdida y el optimizador que se usará para el proceso de entrenamiento.
* Anteriormente esto se ha indicado con los argumentos del método ``compile()`` que luego se usan cuando se entrena el modelo al ejecutar el método ``fit()`` 
* Ahora se tinen **dos redes neuronales**, se requieren dos funciones de pérdida y dos optimizadores.
* Los parámetros de ambas redes están interrelacionados en el cálculo de la función de pérdida.

### Funciones de pérdida

Para las funciones de pérdida de ambas redes neuronales se utilizá la *binary cross entropy*, que es una medida de la diferencia entre las probabilidades calculadas y las probabilidades reales de predicciones en los casos donde solo hay **dos clases posibles** en las que pueden ser clasificados los datos de entrada.

In [None]:
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

### Discriminator loss

Con la función auxiliar ``cross_entropy`` se definirá la función ``discriminator_loss()`` para cuantificar una *loss* del Discriminador que nos indicará cómo de bien el Discriminador consigue distinguir imágenes reales de imágenes fake en una iteración
* La función recibe en el primer argumento (real_output) la predicción que ha hecho el Discriminador de un batch de imágenes reales
* En el segundo argumento (fake_output) la predicción que ha hecho de un batch una imagen fake.

Si la predicción fuera la correcta, para una imagen real la predicción debería ser 1 y para una imagen fake debería ser 0.
* Esta función compara el batch de imágenes reales predecidas por el Discriminador con un array de unos ``(tf.ones_like(real_output))`` y el batch de imágenes fake con un array de ceros ``(tf.zeros_like(real_output))``.

La loss para esta iteración está compuesta tanto por los errores de predicción de imágenes reales como por los errores de las imágenes fake; por tanto, se deben sumar.

In [None]:
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

### Generator loss

La manera de construir la función ``generator_loss`` para cuantificar una loss del Generador será muy parecida:
* La loss del Generador deberá cuantificar cómo de bien fue capaz de engañar al Discriminador
* Se comparan las decisiones del Discriminador sobre las imágenes generadas por el Generador con una matriz de unos

In [None]:
def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

### Optimizadores

Los optimizadores del Discriminador y del Generador son diferentes y se requieren dos, ya que se entrenarán las dos redes por separado:

In [None]:
generator_optimizer = tf.keras.optimizers.Adam(1e-4)

In [None]:
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

Para la optimización de ambas redes, es sescoge el algoritmo *Adam*. *Adam* se ha convertido en el optimizador para la mayoría de las implementaciones de GAN porque se ha demostrado que en la práctica tiene un rendimiento superior a otros métodos de optimización en este tipo de redes. 

## Entrenamiento con la API de bajo nivel de TensorFlow

Anteriormente se ha utilizado el objeto ``tf.keras.Model``, a través de sus métodos ``fit()`` y ``compile()``, para entrenar a los modelos.
* Muy útil, ya que permite ahorrar escribir el código de bucle de entrenamiento 
* Da suficiente control con *callbacks*, *métricas*, etc.

**¿Cómo se puede especificar con el método ``fit()`` que esta red neuronal tiene dos funciones de pérdida y dos optimizadores?** La respuesta es ¡no se puede!

En **TensorFlow 2.0** se puede utilizar la API de bajo nivel para escribir bucles de entrenamiento personalizados. Se utilizará el ``GradientTape`` de la API de bajo nivel, que  permite más control para personalizar el bucle de entrenamiento:
* Entrenar a la vez las dos redes neuronales que se requieren para una GAN

En general, los códigos utilizarán mayormente las API de alto nivel (especialmente ``tf.keras`` y ``tf.data``), pero cuando se necesita más flexibilidad, se utiliza la API de Python de nivel inferior.

### Entrenamiento de las redes GAN 

Es necesario crear un bucle de entrenamiento personalizado utilizando la API de bajo nivel de TensorFlow. Se recreará una función ``train()`` a ala cual se le pasarán en los argumentos los datos de entrenamiento y el número de epochs que se quieren ejecutar para entrenar simultaneamente el Generador y el Discriminador.

La función ``train_step`` llamada dentro de ``train()`` representa un paso del bucle de entrenamiento (en sustitución del método ``fit()`` usando la API de bajo nivel de TensorFlow 2). Se define en 4 pasos:
1. Creación de un conjunto de semillas de ruido con el que el Generador pueda generar las imágenes fake correspondientes (recordemos que estamos procesando las imágenes por lotes de tamaño BATCH_SIZE y que, por tanto, genera un vector de semillas, no una sola semilla)
```Python
noise = tf.random.normal([BATCH_SIZE, noise_dim])
```
Con este vector de ruido el Generador genera un batch de imágenes fake:
```Python
generated_images = generator(noise, training=True)
```
Se usa el Discriminador para clasificar un batch de imágenes reales extraídas del conjunto MNIST (recibidas como argumento) y un batch de imágenes fake acabadas de producir por el Generador:
```Python
real_output = discriminator(images, training=True)
fake_output = discriminator(generated_images, training=True)
```
2. Cuando ya se tienen los valores obtenidos por el Generador y el Discriminador, se calcula la loss de ambos modelos con las funciones que se han definido anteriormente
```Python
gen_loss = generator_loss(fake_output)
disc_loss = discriminator_loss(real_output, fake_output)
```
    * Los pasos anteriores se ejecutan dentro del contexto (context) de ``tf.GradientTape`` indicados por ``with``.
    * En concreto, se están considerando dos contextos, uno para la información del Generador (``gen_tape``) y otro para la información del Discriminador (``disc_tape``).
    * ``tf.GradientTape`` permite que se *«graben»* en un objeto las operaciones ejecutadas en el contexto para permitir obtener los gradientes con respecto a la loss
3. Se debe propagar hacia atrás la loss para que llegue a todas las variables que conforman los parámetros (entrenables) para cada una de las dos redes neuronales. Esto es fácil una vez grabadas las operaciones en los respectivos contextos ``gen_tape`` y ``disc_tape``, aplicando el método ``gradient()`` para obtener los gradientes con respecto a la función de pérdida para las dos redes:
```Python
gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
```
4. Utilizar la información de los gradientes propagada para actualizar con el algoritmo de descenso del gradiente las variables correspondientes a los parámetros entrenables de cada una de las redes neuronales. Para ello simplemente hace falta usar el método ``apply_gradients()`` de los optimizadores de ambas redes
```Python
generator_optimizer.apply_gradients(zip(gradients_of_generator,
                                        generator.trainable_variables))
discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator,
                                            discriminator.trainable_variables))
```

In [None]:
@tf.function # Decorador: timizada a nivel interno para poder ser acelerada en el hardware disponible.
def train_step(images):
    noise = tf.random.normal([BATCH_SIZE, noise_dim])
    
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_images = generator(noise, training=True)
        
        real_output = discriminator(images, training=True)
        fake_output = discriminator(generated_images, training=True)
        
        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)
    
    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

#### Training loop

Este método ejecuta un doble bucle, como se ve en el siguiente código:
* El primer bucle tiene tantas iteraciones como epochs hemos indicado en el argumento, y el segundo itera para todos los batch del dataset.

* El cuerpo del bucle simplemente llama al método ``train_step``,que realiza todo el trabajo requerido para calcular las loss y actualizar los parámetros de las dos redes, para todas las imágenes de un batch en una epoch.

In [None]:
import time

grid_size_x= 10
grid_size_y= 10
seed = tf.random.normal([grid_size_x*grid_size_y , noise_dim])

def train(dataset, epochs):
    for epoch in range(epochs):
        start = time.time()

    for image_batch in dataset:
        train_step(image_batch)

    generate_images(generator,seed)
    print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))
    
    generate_images(generator, seed)

Después de cada epoch se invoca a la función ``generate_images`` que visualiza por pantalla predicciones generadas por el Generador, y para ello usa las variables ``seed``, ``grid_size_x`` y ``grid_size_y``

#### Visualización de las imagenes 
Función auxiliar para visualizar las imagenes que genera el Generador para ver que va aprendiendo

In [None]:
def generate_images(model, test_input):
    
    predictions = model(test_input, training=False)
    
    fig = plt.figure(figsize=(grid_size_x,grid_size_y))
    
    for i in range(predictions.shape[0]):
        plt.subplot(grid_size_x, grid_size_y, i+1)
        plt.imshow(predictions[i, :, :, 0] * 127.5 + 127.5, cmap='gray')
        plt.axis('off')
    
    plt.show()

### Entrenamiento del modelo

In [None]:
## %%time
EPOCHS = 12000
train(train_dataset, EPOCHS)