# Deep Conditional Convolutional Generative Adversarial Network

In [None]:
import tensorflow as tf

In [None]:
tf.__version__

In [None]:
# Generación de GIFs
!pip install -q imageio

In [None]:
import glob
import imageio
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
from tensorflow.keras import layers
from tensorflow.keras import Model
import time

from IPython import display

### Carga y prepara el dataset

Vas a utilizar el dataset fashion MNIST para entrenar el generador y el discriminador. El generador, por tanto, aprenderá a generar imágenes del mismo tipo que las presentes en fashion MNIST.

In [None]:
import tensorflow_datasets as tfds
import tensorflow as tf

In [None]:
tfds.list_builders()
builder = tfds.builder("fashion_mnist")
builder.download_and_prepare()

In [None]:
print(builder.info)

In [None]:
(train_dataset_raw) = builder.as_dataset(split="train", as_supervised=True)

In [None]:
def format_example(image, label):
  image = tf.cast(image, tf.float32)
  # scale from [0,255] to [-1,1]
  image = (image - 127.5) / 127.5
  image = tf.image.resize(image, (28, 28))
  return image, label

In [None]:
train_dataset = train_dataset_raw.map(format_example)

In [None]:
i = 0
for image, label in train_dataset.take(49):
  # define subplot
  plt.subplot(7, 7, 1 + i)
  # turn off axis
  plt.axis('off')
  # plot raw pixel data
  image = (image + 1.0) / 2.0
  plt.imshow(image[:,:,0], cmap='gray')
  i = i + 1
  print(label)
plt.show()

## Creación de modelos

Tanto el generador como el discriminador van a ser definidos usando la [Keras Sequential API](https://www.tensorflow.org/guide/keras#sequential_model).

### El generador

El generador utiliza capas `tf.keras.layers.Conv2DTranspose` (upsampling) para producir una imágen a partir de una semilla (ruido aleatorio). Comienza con una capa `Dense`que toma la semilla como entrada, después la escala (upsample) varias veces hasta alcanzar el tamaño de imágen deseado de 28x28x1. Utilizaremos capas de activación `tf.keras.layers.LeakyReLU`, excepto para la capa de salida, que usará tanh.

In [None]:
def make_generator_model(n_classes):
    # TODO 1: haz tu propio modelo de generador
      # Capa input para label
      in_label = layers.Input(shape=(1,))
      # Generar el embedding pata categorizar las label
      li = layers.Embedding(n_classes, 50)(in_label)
      # Generamos capa dense con tantas neuronas como dimensiones puede tener la imagen
      # que empezamos a crear desde el vesctor de ruido.
      n_nodes = 7 * 7
      li = layers.Dense(n_nodes)(li)
      # Se hace un reshape con las dimensiones de inicial
      li = layers.Reshape((7, 7, 1))(li)

      # Capa input para el vector de ruido, que tiene una longitud de 100
      in_lat = layers.Input(shape=(100,))

      # Partimos de una imagen de 7x7
      n_nodes = 256 * 7 * 7
      gen = layers.Dense(n_nodes)(in_lat)
      gen = layers.LeakyReLU(alpha=0.2)(gen)
      gen = layers.Reshape((7, 7, 256))(gen)

      # Fusión de las dos capas input
      merge = layers.Concatenate()([gen, li])

      # Escala tamaño al formato 14x14
      gen = layers.Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(merge)
      gen = layers.LeakyReLU(alpha=0.2)(gen)
      # Escala tamaño al formato 28x28
      gen = layers.Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
      gen = layers.LeakyReLU(alpha=0.2)(gen)
      # Capa de salida de la imagen de 28x28x1
      out_layer = layers.Conv2DTranspose(1, (7,7), activation='tanh', padding='same')(gen)

      model = Model([in_lat, in_label], out_layer)

      return model

Utiliza el generador (aún sin entrenar) para crear una imágen.

In [None]:
n_classes = 10
generator = make_generator_model(n_classes)
tf.keras.utils.plot_model(generator, show_shapes=True, to_file='generator.png')

In [None]:
#generator.summary()
noise = tf.random.normal([1, 100])
from numpy.random import randint
noisy_label = randint(0, n_classes, 1)
generated_image = generator([noise, noisy_label], training=False)
plt.imshow((generated_image[0, :, :, 0] + 1) / 2, cmap='gray')

### El discriminador

El discriminador es un clasificador de imágenes basado en CNNs.

In [None]:
def make_discriminator_model(n_classes):

    #TODO: Haz tu propio discriminador
    # Capa input para label
    in_label = layers.Input(shape=(1,))
    # Generar el embedding pata categorizar las label
    li = layers.Embedding(n_classes, 50)(in_label)
    # Generamos capa dense con tantas neuroans como dimensiones puede tener la imagen
    n_nodes = 28*28*1
    li = layers.Dense(n_nodes)(li)
    # Se hace un reshape con las dimensiones de la imagen incluyendo el canal
    li = layers.Reshape((28,28, 1))(li)

    # Capa input para las imagenes
    in_image = layers.Input(shape=(28,28,1))

    # Capa que concatena los dos inputs
    merge = layers.Concatenate()([in_image, li])
   
    fe = layers.Conv2D(128, (3,3), strides=(2,2), padding='same')(merge)
    fe = layers.LeakyReLU(alpha=0.2)(fe)
    
    fe = layers.Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = layers.LeakyReLU(alpha=0.2)(fe)

    # Aplana el mapa de características 
    fe = layers.Flatten()(fe)
    
    fe = layers.Dropout(0.4)(fe)

    # Capa Dense de 1 neurona para hacer la discriminación Real o Fake
    out_layer = layers.Dense(1)(fe)
    
    model = Model([in_image, in_label], out_layer)
    
    return model


In [None]:
n_classes = 10
discriminator = make_discriminator_model(n_classes)
tf.keras.utils.plot_model(discriminator, show_shapes=True, to_file='discriminator.png')

Utiliza el discriminador (aún sin entrenar) para clasificar las imágenes como reales o fake. El modelo se entrenará para sacar valores positivos con imágenes reales y valores negativos con imágenes fake.

In [None]:
#discriminator.summary()
from numpy.random import randint
label = randint(0, n_classes, 1)
print(label)
decision = discriminator([generated_image, label])
print (decision)

## Definición de la función de coste y el optimizador

Definimos la función de coste y los optimizadores para ambos modelos.

In [None]:
# Este método devuelve una función helper para calcular cross entropy loss. 
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

### Función de coste del Discriminador

Este método cuantifica cuán bien el dicriminador es capaz de distinguir entre imágenes reales y falsas. Va a comparar las predicciones de imágenes reales con un array de 1s, y las predicciones de imágenes falsas con un array de 0s.

In [None]:
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

### Coste del generador

La función de coste del generador cuantifica cuán bien es capaz de engañar al discriminador. De forma intuitiva, si el generador funciona bien, el descriminador se equivocará. Aquí vamos a comparar las decisiones del discriminador en las imágenes falsas con un array de 1s.

In [None]:
def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

El discriminador y el generador tendrán optimizadores separados ya que vamos a entrenar las redes de forma separada.

In [None]:
generator_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5)
metricsAcc=tf.keras.metrics.BinaryAccuracy()

### Guardar checkpoints

Este notebook también demuestra cómo guardar y recuperar modelos, lo que puede ser muy útil en caso de que el entrenamiento se pause o queramos utilizarlo a posteriori.

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

## Definiendo el bucle de entrenamiento

In [None]:
noise_dim = 100
num_examples_to_generate = 100

# Vamos a reutilizar esta semilla para visualizar mejor el progreso en el GIF.
from numpy import asarray
seed_images = tf.random.normal([num_examples_to_generate, noise_dim])
seed_labels = asarray([x for _ in range(10) for x in range(10)])
print(seed_labels)

El bucle de entrenamiento comienza con el generador recibiendo una semilla aleatoria como entrada. Esta semilla se usa para producir una imagen. El discriminador se utiliza después para clasificar imágenes reales (del dataset) e imágenes falsas (producidas por el generador). El coste se calcula para cada una de las redes por separado y se actualizan los pesos mediante descenso por gradiente.

In [None]:
# Aquí utilizamos `tf.function`
# Esto hace que la función se "compile".
@tf.function
def train_step(images, labels):
    noise_image = tf.random.normal([BATCH_SIZE, noise_dim])
    noise_labels = label = randint(0, n_classes, BATCH_SIZE)
    #print("coming into the function")
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:

      real_output = discriminator([images, labels], training=True)
      #print("salida con imagenes reales {:.5f}".format(tf.math.reduce_mean(real_output)))
      generated_images = generator([noise_image, noise_labels], training=True)
      fake_output = discriminator([generated_images, noise_labels], training=True)
      #print("salida con imagenes falsas {:.5f}".format(tf.math.reduce_mean(fake_output)))
      
      gen_loss = generator_loss(fake_output)
      disc_loss = discriminator_loss(real_output, fake_output)
      
    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

In [None]:
def train(dataset, epochs):
  for epoch in range(epochs):
    start = time.time()
    j = 0
    for image_batch, label_batch in dataset:
      #print(j)
      train_step(image_batch, label_batch)
      #j = j + 1

    print("A NEW EPOCH")

    # Producimos imágenes para el GIF sobre la marcha
    display.clear_output(wait=True)
    generate_and_save_images(generator, epoch + 1, seed_images, seed_labels)

    # Guardamos el modelo cada 15 épocas
    if (epoch + 1) % 15 == 0:
      checkpoint.save(file_prefix = checkpoint_prefix)

    print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))

  # Generación tras cada época
  display.clear_output(wait=True)
  generate_and_save_images(generator, epochs, seed_images, seed_labels)

**Generación y guardado de imágenes**


In [None]:
def generate_and_save_images(model, epoch, test_input_images, test_input_labels):
  # Notice `training` is set to False.
  # This is so all layers run in inference mode (batchnorm).
  predictions = model([test_input_images, test_input_labels], training=False)

  fig = plt.figure(figsize=(10,10))

  for i in range(predictions.shape[0]):
      plt.subplot(10, 10, i+1)
      # scale [-1, 1] to [0, 1]
      image = (predictions[i, :, :, 0] + 1.0) / 2.0
      plt.imshow(image, cmap='gray')
      plt.axis('off')

  plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))
  plt.show()

## Entrenamos el modelo

Llamamos a la función `train()` definida anteriormente para enrtenar el generador y el discriminador de forma simultanea. Entrenar GANs es complejo, es importante que el generador y el discriminador no aventajen mucho al adversario (ambos deberían entrenar a un ritmo parecido).

Al principio del entrenamiento, las imágenes generadas van a parecer ruido aleatorio. A medida que el entrenamiento progrese, los dígitos generados irán pareciendo cada vez más reales. Tras unas 50 épocas, ya son como dígitos de MNIST.

In [None]:
BUFFER_SIZE = 60000
BATCH_SIZE = 256
EPOCHS = 100
train_dataset_shuffled = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
train(train_dataset_shuffled, EPOCHS)

Recuperamos el último checkpoint

In [None]:
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

## Creación del GIF


In [None]:
# Muestra de una imágen según el número de época
def display_image(epoch_no):
  return PIL.Image.open('image_at_epoch_{:04d}.png'.format(epoch_no))

In [None]:
display_image(EPOCHS)

Usamos `imageio` para crear un gif animado con estas imágenes.

In [None]:
anim_file = 'dcgan.gif'

with imageio.get_writer(anim_file, mode='I') as writer:
  filenames = glob.glob('image*.png')
  filenames = sorted(filenames)
  last = -1
  for i,filename in enumerate(filenames):
    frame = 2*(i**0.5)
    if round(frame) > round(last):
      last = frame
    else:
      continue
    image = imageio.imread(filename)
    writer.append_data(image)
  image = imageio.imread(filename)
  writer.append_data(image)

import IPython
if IPython.version_info > (6,2,0,''):
  display.Image(filename=anim_file)

Si estás trabajando en Colab, puedes descargar la animación con el siguiente código.

In [None]:
try:
  from google.colab import files
except ImportError:
   pass
else:
  files.download(anim_file)