# **Práctica 3**: GANs
# Alumno : José Javier Gutiérrez Gil
# Parte 2_2: Ejercicios extra GANs simples

### ALUMNO: JOSÉ JAVIER GUTIÉRREZ GIL

In [None]:
!pip install -q git+https://github.com/tensorflow/examples.git

### Ejercicio EXTRA:
**Busca algún dataset interesante y entrena el modelo de pix2pix:**



Un dataset interesante para entrenar el modelo de pix2pix es el "**CamVid**" dataset. Este dataset es un conjunto de datos de segmentación de imágenes que contiene imágenes y anotaciones de segmentación de escenas de conducción grabadas desde una cámara montada en un automóvil en movimiento.

El dataset se compone de 701 imágenes de tamaño 960x720 en formato PNG y sus correspondientes imágenes de segmentación de tamaño 960x720 en formato PNG. Las imágenes originales se tomaron a una velocidad de 30 fps mientras se conducía por varias áreas de Cambridge, Reino Unido. Las imágenes de segmentación fueron creadas manualmente y contienen etiquetas para 32 clases diferentes, que incluyen carreteras, edificios, aceras, vehículos, peatones, árboles, señales de tráfico, entre otras.

Cada imagen de segmentación contiene píxeles etiquetados con una de las 32 categorías presentes en la imagen original. El objetivo de entrenar un modelo de pix2pix en este conjunto de datos sería crear un modelo que pueda tomar como entrada una imagen sin procesar y genere una imagen de segmentación correspondiente, donde cada píxel de la imagen segmentada es etiquetado con una de las 32 categorías presentes en la imagen.

Este conjunto de datos es útil para entrenar modelos de segmentación de imágenes en el contexto de la conducción autónoma y puede ser utilizado para crear modelos de pix2pix que generen segmentaciones precisas de las imágenes de la cámara de un automóvil.

Para descargar el dataset de **CamVid**, podemos visitar su sitio web  (http://www0.cs.ucl.ac.uk/staff/G.Brostow/) y una explicación del dataset en  (http://mi.eng.cam.ac.uk/research/projects/VideoRec/CamVid//). 

Para implementar el entrenamiento del modelo de pix2pix utilizando el dataset de **CamVid**, necesitamos seguir los pasos ya utilizados en apartados anteriores:

1. - Preprocesamiento del dataset: Convertimos las imágenes a tensores y las normalizamos para tener valores en el rango [-1, 1]. También aplicamos técnicas de aumento de datos, como rotaciones aleatorias o recortes, para aumentar la variabilidad del dataset.

2. - Definición del modelo: Creamos el modelo de pix2pix utilizando con nuestras propias funciones. Para poder obtener más detalles pequeños, lo que hacemos es aumentar la profundidad de la red del generador. Notar que la resolución de las imagenes en nuestro caso es 1024x768x3. El aumentar la profundidad nos ayudará a mejorar la calidad en este tipod e imagenes en pro de un pequeño aumento en el calculo. Además, tendremos que controlar un posible overfiting por dicho aumento.

3. - Funciones de pérdida: Definimos las funciones de pérdida que se utilizarán para entrenar el modelo. En el caso de pix2pix, se utiliza una combinación de la pérdida de adversario (para entrenar el discriminador) y la pérdida de contenido (para entrenar el generador).

4. - Entrenamiento: Entrenamos el modelo utilizando el dataset de **CamVid** y las funciones de pérdida definidas. Durante el entrenamiento, actualizamos los pesos del generador y el discriminador en función de las retroalimentaciones que recibimos del conjunto de datos.

In [None]:
                                                                                                                import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator

import os
import pathlib
import time
import datetime

from matplotlib import pyplot as plt
from IPython import display

### DATA

In [None]:
PATH = 'camVid'

In [None]:
def load (image_file, labeel_file):
    # Read and decode an image file to a uint8 tensor
    image = tf.io.read_file (image_file)
    image = tf.io.decode_png (image)

    label = tf.io.read_file (labeel_file)
    label = tf.io.decode_png (label)  
 
    input_image = label  
    real_image  = image

    # Convert both images to float32 tensors
    input_image = tf.cast (input_image, tf.float32)
    real_image  = tf.cast (real_image, tf.float32)

    return input_image, real_image

inp, re = load(str (PATH + '/train/0001TP_009210.png'), str (PATH + '/train_labels/0001TP_009210_L.png') )
print (inp.shape)
print (re.shape)
# Casting to int for matplotlib to display the images
plt.figure ()
plt.imshow (inp / 255.0)
plt.figure ()
plt.imshow (re / 255.0)

In [None]:
# The facade training set consist of 400 images
BUFFER_SIZE = 1042
# The batch size of 1 produced better results for the U-Net in the original pix2pix experiment
BATCH_SIZE  = 1
# Each image is 256x256 in size
IMG_WIDTH   = 720
IMG_HEIGHT  = 960

# resize 
RE_IMG_WIDTH   = 820  
RE_IMG_HEIGHT  = 1042 #  

# Resize to work with pix2pix arch
M_IMG_WIDTH   = 768  
M_IMG_HEIGHT  = 1024 

OUTPUT_CHANNELS = 3 

In [None]:
def resize(input_image, real_image, width, height):
    input_image = tf.image.resize(input_image, [height, width],
                                method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    real_image = tf.image.resize(real_image, [height, width],
                               method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

    return input_image, real_image

def random_crop(input_image, real_image, width, height):
    
    print (input_image.shape)
    print (real_image.shape)
    
    stacked_image = tf.stack([input_image, real_image], axis=0)
    cropped_image = tf.image.random_crop(
                              stacked_image, size=[2, height, width, 3])
    print (cropped_image[0].shape)
    print (cropped_image[1].shape)
    
    return cropped_image[0], cropped_image[1]

# Normalizing the images to [-1, 1]
def normalize(input_image, real_image):
    input_image = (input_image / 127.5) - 1
    real_image = (real_image / 127.5) - 1

    return input_image, real_image

@tf.function()
def random_jitter (input_image, real_image):
    # Resizing to 286x286
    input_image, real_image = resize(input_image, real_image, RE_IMG_WIDTH, RE_IMG_HEIGHT)
    
    # Random cropping back to 256x256
    input_image, real_image = random_crop(input_image, real_image, M_IMG_WIDTH, M_IMG_HEIGHT)
    
    if tf.random.uniform(()) > 0.5:
        # Random mirroring
        input_image = tf.image.flip_left_right (input_image)
        real_image  = tf.image.flip_left_right (real_image)

    return input_image, real_image

plt.figure(figsize=(6, 6))
rj_inp, rj_re = random_jitter (inp, re)
for i in range(4):
    plt.subplot(2, 2, i + 1)
    plt.imshow(rj_inp / 255.0)
    plt.axis('off')
plt.show()


for i in range(4):
    plt.subplot(2, 2, i + 1)
    plt.imshow(rj_re / 255.0)
    plt.axis('off')
plt.show()

In [None]:
def load_image_train (image_file, label_file):
    input_image, real_image = load (image_file, label_file)
    input_image, real_image = random_jitter (input_image, real_image)
    input_image, real_image = normalize (input_image, real_image)

    return input_image, real_image

def load_image_test (image_file, label_file):
    input_image, real_image = load (image_file, label_file)
    input_image, real_image = resize (input_image, real_image,
                                       IMG_HEIGHT, IMG_WIDTH)
    input_image, real_image = normalize (input_image, real_image)

    return input_image, real_image



In [None]:
# Directorios de entrenamiento y etiquetas de entrenamiento
train_dir       = PATH + '/train/'
train_label_dir = PATH + '/train_labels/'

# Obtener rutas de todas las imágenes de entrenamiento
train_image_paths       = [os.path.join(train_dir, fname) for fname in os.listdir(train_dir)]
train_label_image_paths = [os.path.join(train_label_dir, fname) for fname in os.listdir(train_label_dir)]

# Crear dataset de tensorflow a partir de las rutas de las imágenes
train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_label_image_paths))

# Aplicar función de carga y preprocesamiento a todas las imágenes del dataset
train_dataset = train_dataset.map(load_image_train)

train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE)


try:
    test_dir       = PATH + '/test/'
    test_label_dir = PATH + '/test_labels/'

    # Obtener rutas de todas las imágenes de entrenamiento
    test_image_paths       = [os.path.join(test_dir, fname) for fname in os.listdir( test_dir)]
    test_label_image_paths = [os.path.join(test_label_dir, fname) for fname in os.listdir( test_label_dir)]

except tf.errors.InvalidArgumentError:
    test_dir       = '/val/'
    test_label_dir = '/val_labels/'

    # Obtener rutas de todas las imágenes de entrenamiento
    test_image_paths       = [os.path.join(test_dir, fname) for fname in os.listdir( test_dir)]
    test_label_image_paths = [os.path.join(test_label_dir, fname) for fname in os.listdir( test_label_dir)]

# Crear dataset de tensorflow a partir de las rutas de las imágenes
test_dataset = tf.data.Dataset.from_tensor_slices((test_image_paths, test_label_image_paths))
# Aplicar función de carga y preprocesamiento a todas las imágenes del dataset
test_dataset = test_dataset.map(load_image_train)
test_dataset = test_dataset.batch(BATCH_SIZE)

### PIX2PIX

In [None]:
def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)

    result = tf.keras.Sequential()
    result.add(
          tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                                     kernel_initializer=initializer, use_bias=False)
    )

    if apply_batchnorm:
        result.add(tf.keras.layers.BatchNormalization())

    result.add(tf.keras.layers.LeakyReLU())

    return result

In [None]:
down_model  = downsample (3, 4)
down_result = down_model (tf.expand_dims (inp, 0))
print (down_result.shape)

In [None]:
def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)

    result = tf.keras.Sequential()
    result.add(
        tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                            padding='same',
                                            kernel_initializer=initializer,
                                            use_bias=False)
    )

    result.add(tf.keras.layers.BatchNormalization())

    if apply_dropout:
        result.add(tf.keras.layers.Dropout(0.5))

    result.add(tf.keras.layers.ReLU())

    return result

In [None]:
up_model  = upsample (3, 4)
up_result = up_model (down_result)
print (up_result.shape)

In [None]:
def Generator():
    '''
    Original: 
        down_stack = [
        downsample(64, 4, apply_batchnorm=False),  # (batch_size, 128, 128, 64)
        downsample(128, 4),  # (batch_size, 64, 64, 128)
        downsample(256, 4),  # (batch_size, 32, 32, 256)
        downsample(512, 4),  # (batch_size, 16, 16, 512)
        downsample(512, 4),  # (batch_size, 8, 8, 512)
        downsample(512, 4),  # (batch_size, 4, 4, 512)
        downsample(512, 4),  # (batch_size, 2, 2, 512)
        downsample(512, 4),  # (batch_size, 1, 1, 512)
    ]

    up_stack = [
        upsample(512, 4, apply_dropout=True),  # (batch_size, 2, 2, 1024)
        upsample(512, 4, apply_dropout=True),  # (batch_size, 4, 4, 1024)
        upsample(512, 4, apply_dropout=True),  # (batch_size, 8, 8, 1024)
        upsample(512, 4),  # (batch_size, 16, 16, 1024)
        upsample(256, 4),  # (batch_size, 32, 32, 512)
        upsample(128, 4),  # (batch_size, 64, 64, 256)
        upsample(64, 4),  # (batch_size, 128, 128, 128)
    ]
    '''
    inputs = tf.keras.layers.Input(shape=[None, None, 3])
    num_filters = 64 
 
    down_stack = [
        downsample(64, 4, apply_batchnorm=False),  # (batch_size, 128, 128, 64)
        downsample(128, 4),  # (batch_size, 64, 64, 128)
        downsample(256, 4),  # (batch_size, 32, 32, 256)
        downsample(512, 4),  # (batch_size, 16, 16, 512)
        downsample(512, 4),  # (batch_size, 8, 8, 512)
        downsample(512, 4),  # (batch_size, 4, 4, 512)
        downsample(512, 4),  # (batch_size, 2, 2, 512)
        downsample(512, 4),  # (batch_size, 1, 1, 512)
    ]

    up_stack = [
        upsample(512, 4, apply_dropout=True),  # (batch_size, 2, 2, 1024)
        upsample(512, 4, apply_dropout=True),  # (batch_size, 4, 4, 1024)
        upsample(512, 4, apply_dropout=True),  # (batch_size, 8, 8, 1024)
        upsample(512, 4),  # (batch_size, 16, 16, 1024)
        upsample(256, 4),  # (batch_size, 32, 32, 512)
        upsample(128, 4),  # (batch_size, 64, 64, 256)
        upsample(64, 4),  # (batch_size, 128, 128, 128)
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    last = tf.keras.layers.Conv2DTranspose(OUTPUT_CHANNELS, 4,
                                         strides=2,
                                         padding='same',
                                         kernel_initializer=initializer,
                                         activation='tanh')  # 

    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = tf.keras.layers.Concatenate()([x, skip])

   # x = tf.keras.layers.Conv2DTranspose(num_filters, 4, strides=1, padding='same',
   #                                      kernel_initializer=tf.keras.initializers.RandomNormal(0., 0.02))(x)
   #  x = tf.keras.layers.BatchNormalization()(x)
   #  x = tf.keras.layers.ReLU()(x)
    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

In [None]:
def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)

    inp = tf.keras.layers.Input (shape=[None, None, 3], name='input_image')
    tar = tf.keras.layers.Input (shape=[None, None, 3], name='target_image')

    x = tf.keras.layers.concatenate([inp, tar])  # (batch_size, 256, 256, channels*2)

    down1 = downsample(64, 4, False)(x)  # (batch_size, 128, 128, 64)
    down2 = downsample(128, 4)(down1)  # (batch_size, 64, 64, 128)
    down3 = downsample(256, 4)(down2)  # (batch_size, 32, 32, 256)

    zero_pad1 = tf.keras.layers.ZeroPadding2D()(down3)  # (batch_size, 34, 34, 256)
    conv = tf.keras.layers.Conv2D(512, 4, strides=1,
                                kernel_initializer=initializer,
                                use_bias=False)(zero_pad1)  # (batch_size, 31, 31, 512)

    batchnorm1 = tf.keras.layers.BatchNormalization()(conv)

    leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

    zero_pad2  = tf.keras.layers.ZeroPadding2D()(leaky_relu)  # (batch_size, 33, 33, 512)

    last = tf.keras.layers.Conv2D(1, 4, strides=1,
                                kernel_initializer=initializer)(zero_pad2)  # (batch_size, 30, 30, 1)

    return tf.keras.Model(inputs=[inp, tar], outputs=last)

In [None]:
generator = Generator ()
 
tf.keras.utils.plot_model (generator, show_shapes=True, dpi=64)

rj_inp, rj_re = random_jitter (inp, re)
gen_output = generator (rj_inp [tf.newaxis, ...], training = False)
plt.imshow (gen_output [0, ...])
plt.show ()

discriminator = Discriminator ()
 
disc_out = discriminator ([rj_inp [tf.newaxis, ...], gen_output], training = False)
plt.imshow (disc_out [0, ..., -1], vmin = -20, vmax = 20, cmap = 'RdBu_r')
plt.colorbar ()
plt.show () 

### LOSS

In [None]:
LAMBDA = 100
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True) 

In [None]:
def generator_loss(disc_generated_output, gen_output, target):
    gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

    # Mean absolute error
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

    total_gen_loss = gan_loss + (LAMBDA * l1_loss)

    return total_gen_loss, gan_loss, l1_loss

In [None]:
def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)

    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

    total_disc_loss = real_loss + generated_loss

    return total_disc_loss

## Define the optimizers and a checkpoint-saver

In [None]:
generator_optimizer     = tf.keras.optimizers.Adam (2e-4, beta_1 = 0.5)
discriminator_optimizer = tf.keras.optimizers.Adam (2e-4, beta_1 = 0.5)

In [None]:
checkpoint_dir    = './training_camvid_checkpoints'
checkpoint_prefix = os.path.join (checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint (generator_optimizer = generator_optimizer,
                                    discriminator_optimizer = discriminator_optimizer,
                                    generator     = generator,
                                    discriminator = discriminator)

## Generate images

In [None]:
def generate_images (model, test_input, tar):
    prediction = model (test_input, training=True)
    plt.figure(figsize = (15, 15))
    prediction = (127.5 * prediction + 127.5).astype(np.uint8)
    display_list = [test_input [0], tar [0], prediction [0]]
    title = ['Input Image', 'Ground Truth', 'Predicted Image']

    for i in range (3):
        plt.subplot (1, 3, i+1)
        plt.title (title [i])
        # Getting the pixel values in the [0, 1] range to plot.
        plt.imshow (display_list [i] * 0.5 + 0.5)
        plt.axis ('off')
    plt.show ()

In [None]:
for example_input, example_target in test_dataset.take (1):
    generate_images (generator, example_input, example_target)

## Training

In [None]:
log_dir="logs/"

summary_writer = tf.summary.create_file_writer(
  log_dir + "fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

In [None]:
@tf.function
def train_step(input_image, target, step):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        gen_output = generator(input_image, training=True)

        disc_real_output = discriminator([input_image, target], training=True)
        disc_generated_output = discriminator([input_image, gen_output], training=True)

        gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    generator_gradients = gen_tape.gradient(gen_total_loss,
                                          generator.trainable_variables)
    discriminator_gradients = disc_tape.gradient(disc_loss,
                                               discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(generator_gradients,
                                          generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients,
                                              discriminator.trainable_variables))

    with summary_writer.as_default():
        tf.summary.scalar('gen_total_loss', gen_total_loss, step=step//1000)
        tf.summary.scalar('gen_gan_loss', gen_gan_loss, step=step//1000)
        tf.summary.scalar('gen_l1_loss', gen_l1_loss, step=step//1000)
        tf.summary.scalar('disc_loss', disc_loss, step=step//1000)

In [None]:
 def fit (train_ds, test_ds, steps):
    example_input, example_target = next (iter (test_ds.take (1)))
    start = time.time()

    for step, (input_image, target) in train_ds.repeat ().take (steps).enumerate ():
        if (step) % 1000 == 0:
            display.clear_output (wait=True)

            if step != 0:
                print (f'Time taken for 1000 steps: {time.time () - start:.2f} sec\n')

            start = time.time ()

            generate_images (generator, example_input, example_target)
            print (f"Step: {step//1000}k")

        train_step (input_image, target, step)

        # Training step
        if (step+1) % 10 == 0:
            print ('.', end = '', flush=True)


        # Save (checkpoint) the model every 5k steps
        if (step + 1) % 5000 == 0:
            checkpoint.save (file_prefix = checkpoint_prefix)
 

In [None]:
fit (train_dataset, test_dataset, steps = 40000)

## Generate some images using the test set

In [None]:
# Run the trained model on a few examples from the test set
for inp, tar in test_dataset.take(15):
    generate_images(generator, inp, tar)

### Ejercicio EXTRA:
Busca algún dataset interesante y entrena el modelo de cyclegan

Una idea interesante podría ser entrenar un CycleGAN para convertir cuadros de vangogh en fotos y viceversa. Para ello, podemos utilizar el conjunto de datos "vangogh2phto" disponible en Tensorflow.

Seguimos los mismos pasos que en el apartao de evaluación del modelo cycleGan:

- Carga y procesamiento de las imagnes.

- Creación modelo cycleGan 

- checkpoint para salvar los pesos de la red entrenada y así poder cargarlos donde se quedaron la última vez

- Entrenamiento de la red con los dataset de train

- Evaluación del modelo entrenado con los datasets de test.  

In [None]:
import tensorflow as tf

import tensorflow_datasets as tfds
from tensorflow_examples.models.pix2pix import pix2pix

import os
import time
import matplotlib.pyplot as plt
from IPython.display import clear_output
 
import pathlib
 
import datetime

AUTOTUNE = tf.data.AUTOTUNE

In [None]:
dataset_name = 'vangogh2photo'  # @param [apple2orange, summer2winter_yosemite, horse2zebra, monet2photo, cezanne2photo, ukiyoe2photo, vangogh2photo, maps, cityscapes, facades, iphone2dslr_flower, ae_photos]

In [None]:
_URL = f'https://people.eecs.berkeley.edu/~taesung_park/CycleGAN/datasets/{dataset_name}.zip'

#_URL = f'http://efrosgans.eecs.berkeley.edu/pix2pix/datasets/{dataset_name}.tar.gz'
#_URL = f'https://people.eecs.berkeley.edu/~taesung_park/CycleGAN/datasets/{dataset_name}.zip'

path_to_zip = tf.keras.utils.get_file (fname = f"{dataset_name}.tar.gz",
                                        origin  = _URL,
                                        extract = True)

path_to_zip  = pathlib.Path (path_to_zip)
PATH = path_to_zip.parent/dataset_name

In [None]:
print (PATH)
list  (PATH.parent.iterdir())

In [None]:
sample_image = tf.io.read_file(str(PATH / 'trainA/00001.jpg'))
sample_image = tf.io.decode_jpeg(sample_image)

print (sample_image.shape)

plt.figure ()
plt.imshow (sample_image)

sample_image = tf.io.read_file(str(PATH / 'trainB/2013-12-02 11_26_22.jpg'))
sample_image = tf.io.decode_jpeg(sample_image)

print (sample_image.shape)

plt.figure ()
plt.imshow (sample_image)

In [None]:
 # Directorios de entrenamiento y etiquetas de entrenamiento
trainA_dir  = PATH / 'trainA'
trainB_dir  = PATH / 'trainB'

# cargar y preprocesar los datasets de imágenes
trainA_s = tf.data.Dataset.list_files (str (trainA_dir / '*.jpg'))
trainB_s = tf.data.Dataset.list_files (str (trainB_dir / '*.jpg'))

print(len(trainA_s))
print(len(trainB_s))

# Directorios de entrenamiento y etiquetas de entrenamiento
testA_dir = PATH / 'testA'
testB_dir = PATH / 'testB'

# cargar y preprocesar los datasets de imágenes
testA_s = tf.data.Dataset.list_files (str (testA_dir / '*.jpg'))
testB_s = tf.data.Dataset.list_files (str (testB_dir / '*.jpg'))  

In [None]:
def load (image_file):
    # Read and decode an image file to a uint8 tensor
    image = tf.io.read_file (image_file)
    image = tf.io.decode_jpeg (image, channels = 3)

    # Convert both images to float32 tensors
    input_image = tf.cast (image, tf.float32)

    return input_image

print ("------------- TRAIN Images ----------")

i_TA = load (str (PATH / 'trainA/00001.jpg') )
i_TB = load (str (PATH / 'trainB/2013-12-02 11_26_22.jpg') )
print (i_TA.shape)
print (i_TB.shape)
# Casting to int for matplotlib to display the images
plt.figure ()
plt.imshow (i_TA / 255.0)
plt.figure ()
plt.imshow (i_TB / 255.0)
plt.show()
plt.close ()

print ("------------- TEST Images ----------")

i_TA = load (str (PATH / 'testA/00001.jpg') )
i_TB = load (str (PATH / 'testB/2014-08-01 17_41_55.jpg') )
print (i_TA.shape)
print (i_TB.shape)
# Casting to int for matplotlib to display the images
plt.figure ()
plt.imshow (i_TA / 255.0)

plt.figure ()
plt.imshow (i_TB / 255.0)
plt.show()
plt.close ()

In [None]:
BUFFER_SIZE_TRAIN = 400
BUFFER_SIZE_TEST  = 6287

BATCH_SIZE  = 1

IMG_WIDTH   = 256
IMG_HEIGHT  = 256

R_IMG_WIDTH   = 286
R_IMG_HEIGHT  = 286

def random_crop (image):
    cropped_image = tf.image.random_crop (
                          image, size = [IMG_HEIGHT, IMG_WIDTH, 3])

    return cropped_image

# normalizing the images to [-1, 1]
def normalize (image):
    image = tf.cast (image, tf.float32)
    image = (image / 127.5) - 1
    
    return image

 

In [None]:
import numpy as np

OUTPUT_CHANNELS = 3

generator_g = pix2pix.unet_generator (OUTPUT_CHANNELS, norm_type = 'instancenorm')
generator_f = pix2pix.unet_generator (OUTPUT_CHANNELS, norm_type = 'instancenorm')

discriminator_x = pix2pix.discriminator (norm_type = 'instancenorm', target=False)
discriminator_y = pix2pix.discriminator (norm_type = 'instancenorm', target=False)

sample_A = next(iter(trainA))
sample_B = next(iter(trainB))
 
to_B = generator_g (sample_A)
to_B = to_B / 255.0
to_A = generator_f (sample_B)
to_A = to_A / 255.0

plt.figure(figsize=(8, 8))
contrast = 8

imgs = [sample_A, to_B, sample_B, to_A]
title = ['picture', 'To photo', 'photo', 'To picture']

for i in range(len(imgs)):
    plt.subplot(2, 2, i+1)
    plt.title(title[i])
    print (np.min (imgs[i][0]))
    print (np.max (imgs[i][0]))
    img = ((imgs[i][0] + 1) * 127.5).numpy().astype(np.uint8)
    if i % 2 == 0:

        plt.imshow(img)
    else:
        plt.imshow(img)
plt.show()

plt.figure(figsize=(8, 8))

plt.subplot(121)
plt.title('Is a real photo?')
plt.imshow(discriminator_y(sample_B)[0, ..., -1], cmap='RdBu_r')

plt.subplot(122)
plt.title('Is a real picture?')
plt.imshow(discriminator_x(sample_A)[0, ..., -1], cmap='RdBu_r')

plt.show ()

### LOSS

In [None]:
LAMBDA = 10

loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real, generated):
    real_loss = loss_obj(tf.ones_like(real), real)

    generated_loss = loss_obj(tf.zeros_like(generated), generated)

    total_disc_loss = real_loss + generated_loss

    return total_disc_loss * 0.5

def generator_loss(generated):
    return loss_obj(tf.ones_like(generated), generated)

def calc_cycle_loss(real_image, cycled_image):
    loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))

    return LAMBDA * loss1

def identity_loss(real_image, same_image):
    loss = tf.reduce_mean(tf.abs(real_image - same_image))
    return LAMBDA * 0.5 * loss

generator_g_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
generator_f_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

discriminator_x_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_y_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

### Checkpoint

In [None]:
checkpoint_path = "./checkpoints/vangogh2photo/train"

ckpt = tf.train.Checkpoint(generator_g=generator_g,
                           generator_f=generator_f,
                           discriminator_x=discriminator_x,
                           discriminator_y=discriminator_y,
                           generator_g_optimizer=generator_g_optimizer,
                           generator_f_optimizer=generator_f_optimizer,
                           discriminator_x_optimizer=discriminator_x_optimizer,
                           discriminator_y_optimizer=discriminator_y_optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# if a checkpoint exists, restore the latest checkpoint.
if ckpt_manager.latest_checkpoint:
  ckpt.restore(ckpt_manager.latest_checkpoint)
  print ('Latest checkpoint restored!!')

### Train

In [None]:
EPOCHS = 50

def generate_images(model, test_input):
    prediction = model(test_input)
    contrast = 8
    plt.figure(figsize=(12, 12))
    
    display_list = [test_input[0]* 0.5 + 0.5, prediction[0]* 0.5 * contrast + 0.5]
    title = ['Input Image', 'Predicted Image']

    for i in range (2):
        plt.subplot(1, 2, i+1)
        plt.title(title[i])
        # normalize pixel values to [0, 1]
        img = display_list[i]
        img = (img - np.min(img)) / (np.max(img) - np.min(img))
        plt.imshow(img)
        plt.axis('off')
    plt.show()

@tf.function
def train_step(real_x, real_y):
    # persistent is set to True because the tape is used more than
    # once to calculate the gradients.
    with tf.GradientTape(persistent=True) as tape:
        # Generator G translates X -> Y
        # Generator F translates Y -> X.

        fake_y = generator_g(real_x, training=True)
        cycled_x = generator_f(fake_y, training=True)

        
        fake_x = generator_f(real_y, training=True)
        cycled_y = generator_g(fake_x, training=True)

        # same_x and same_y are used for identity loss.
        same_x = generator_f(real_x, training=True)
        same_y = generator_g(real_y, training=True)

        disc_real_x = discriminator_x(real_x, training=True)
        disc_real_y = discriminator_y(real_y, training=True)

        disc_fake_x = discriminator_x(fake_x, training=True)
        disc_fake_y = discriminator_y(fake_y, training=True)

        # calculate the loss
        gen_g_loss = generator_loss(disc_fake_y)
        gen_f_loss = generator_loss(disc_fake_x)

        total_cycle_loss = calc_cycle_loss(real_x, cycled_x) + calc_cycle_loss(real_y, cycled_y)

        # Total generator loss = adversarial loss + cycle loss
        total_gen_g_loss = gen_g_loss + total_cycle_loss + identity_loss(real_y, same_y)
        total_gen_f_loss = gen_f_loss + total_cycle_loss + identity_loss(real_x, same_x)

        disc_x_loss = discriminator_loss(disc_real_x, disc_fake_x)
        disc_y_loss = discriminator_loss(disc_real_y, disc_fake_y)
  
    # Calculate the gradients for generator and discriminator
    generator_g_gradients = tape.gradient(total_gen_g_loss, 
                                        generator_g.trainable_variables)
    generator_f_gradients = tape.gradient(total_gen_f_loss, 
                                        generator_f.trainable_variables)

    discriminator_x_gradients = tape.gradient(disc_x_loss, 
                                            discriminator_x.trainable_variables)
    discriminator_y_gradients = tape.gradient(disc_y_loss, 
                                            discriminator_y.trainable_variables)

    # Apply the gradients to the optimizer
    generator_g_optimizer.apply_gradients(zip(generator_g_gradients, 
                                            generator_g.trainable_variables))

    generator_f_optimizer.apply_gradients(zip(generator_f_gradients, 
                                            generator_f.trainable_variables))

    discriminator_x_optimizer.apply_gradients(zip(discriminator_x_gradients,
                                                discriminator_x.trainable_variables))

    discriminator_y_optimizer.apply_gradients(zip(discriminator_y_gradients,
                                                discriminator_y.trainable_variables))

for epoch in range(EPOCHS):
    start = time.time()

    n = 0
    for image_x, image_y in tf.data.Dataset.zip((trainA, trainB)):
        train_step (image_x, image_y)
        if n % 10 == 0:
            print ('.', end='')
        n += 1

    clear_output (wait=True)
        # Using a consistent image (sample_horse) so that the progress of the model
        # is clearly visible.
    generate_images (generator_g, sample_A)

    if (epoch + 1) % 5 == 0:
        ckpt_save_path = ckpt_manager.save()
        print ('Saving checkpoint for epoch {} at {}'.format(epoch+1,
                                                                 ckpt_save_path))

    print ('Time taken for epoch {} is {} sec\n'.format(epoch + 1,
                                                              time.time()-start))

### Test

In [None]:
# Run the trained model on the test dataset
for inp in testA.take (5):
    generate_images (generator_g, inp)

In [None]:
# Run the trained model on the test dataset
for inp in testB.take(5):
    generate_images(generator_f, inp)

### Ejercicio EXTRA:
Busca algún dataset interesante y entrena el modelo de pix2pix y cyclegan y compara los resultados.

Para comparar los resultados de pix2pix y CycleGAN, podemos utilizar el conjunto de datos "**cityscapes**" presente en los ejemplos de tensorflow con cyclegan. Además tenemos un directorio con las imagenes para luego implementar el modelo pix2pix.  

# 1) Test to Pix2pix

In [None]:
import tensorflow as tf

import os
import pathlib
import time
import datetime

from matplotlib import pyplot as plt
from IPython import display

## Data: Log Cityscapes

  - Dataset alacenado en directorio del notebook:
                   path\
                        |
                        cityscapes\
                                  |
                                  train\
                                        1.jpg
                                        ......
                                  |
                                   val\
                                       1.jpg
                                        ...                               
                     

In [None]:
PATH = 'cityscapes'

In [None]:
def load(image_file):
    # Read and decode an image file to a uint8 tensor
    image = tf.io.read_file(image_file)
    image = tf.io.decode_jpeg(image)

    # Split each image tensor into two tensors:
    # - one with a real building facade image
    # - one with an architecture label image 
    w = tf.shape(image)[1]
    w = w // 2
    input_image = image[:, w:, :]
    real_image = image[:, :w, :]

    # Convert both images to float32 tensors
    input_image = tf.cast(input_image, tf.float32)
    real_image = tf.cast(real_image, tf.float32)

    return input_image, real_image

inp, re = load(str(PATH + '/train/1.jpg'))
print (inp.shape)
print (re.shape)
# Casting to int for matplotlib to display the images
plt.figure()
plt.imshow(inp / 255.0)
plt.figure()
plt.imshow(re / 255.0)

In [None]:
# The facade training set consist of 2975 images
BUFFER_SIZE = 2975
# The batch size of 1 produced better results for the U-Net in the original pix2pix experiment
BATCH_SIZE  = 1
# Each image is 256x256 in size
IMG_WIDTH   = 256
IMG_HEIGHT  = 256

# resize
RE_IMG_WIDTH   = 286  
RE_IMG_HEIGHT  = 286  

In [None]:
def resize(input_image, real_image, height, width):
    input_image = tf.image.resize(input_image, [height, width],
                                method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    real_image = tf.image.resize(real_image, [height, width],
                               method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

    return input_image, real_image
def random_crop(input_image, real_image):
    stacked_image = tf.stack([input_image, real_image], axis=0)
    cropped_image = tf.image.random_crop(
                                  stacked_image, size=[2, IMG_HEIGHT, IMG_WIDTH, 3])

    return cropped_image[0], cropped_image[1]

# Normalizing the images to [-1, 1]
def normalize(input_image, real_image):
    input_image = (input_image / 127.5) - 1
    real_image  = (real_image / 127.5) - 1

    return input_image, real_image
@tf.function()
def random_jitter(input_image, real_image):
    # Resizing to 286x286
    input_image, real_image = resize(input_image, real_image, RE_IMG_WIDTH, RE_IMG_HEIGHT)

    # Random cropping back to 256x256
    input_image, real_image = random_crop(input_image, real_image)

    if tf.random.uniform(()) > 0.5:
        # Random mirroring
        input_image = tf.image.flip_left_right(input_image)
        real_image = tf.image.flip_left_right(real_image)

    return input_image, real_image

plt.figure(figsize=(6, 6))
for i in range(4):
    rj_inp, rj_re = random_jitter(inp, re)
    plt.subplot(2, 2, i + 1)
    plt.imshow(rj_inp / 255.0)
    plt.axis('off')
plt.show()


for i in range(4):
    rj_inp, rj_re = random_jitter(inp, re)
    plt.subplot(2, 2, i + 1)
    plt.imshow(rj_re / 255.0)
    plt.axis('off')
plt.show()


In [None]:
def load_image_train(image_file):
    input_image, real_image = load(image_file)
    input_image, real_image = random_jitter(input_image, real_image)
    input_image, real_image = normalize(input_image, real_image)

    return input_image, real_image

def load_image_test(image_file):
    input_image, real_image = load(image_file)
    input_image, real_image = resize(input_image, real_image,
                                   IMG_HEIGHT, IMG_WIDTH)
    input_image, real_image = normalize(input_image, real_image)

    return input_image, real_image 

In [None]:
train_dataset = tf.data.Dataset.list_files(str(PATH + '/train/*.jpg'))
train_dataset = train_dataset.map(load_image_train,
                                  num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE)


try:
    test_dataset = tf.data.Dataset.list_files(str(PATH + '/val/*.jpg'))
except tf.errors.InvalidArgumentError:
    print ('Error al cargar las imagenes de validación')
test_dataset = test_dataset.map (load_image_test)
test_dataset = test_dataset.batch (BATCH_SIZE)

## pix2pix

In [None]:
OUTPUT_CHANNELS = 3

def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)

    result = tf.keras.Sequential()
    result.add(
      tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False)
    )

    if apply_batchnorm:
        result.add(tf.keras.layers.BatchNormalization())

    result.add(tf.keras.layers.LeakyReLU())

    return result

def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)

    result = tf.keras.Sequential()
    result.add(
        tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                        padding='same',
                                        kernel_initializer=initializer,
                                        use_bias=False)
    )

    result.add(tf.keras.layers.BatchNormalization())

    if apply_dropout:
        result.add(tf.keras.layers.Dropout(0.5))

    result.add(tf.keras.layers.ReLU())

    return result

In [None]:
down_model = downsample(3, 4)
down_result = down_model(tf.expand_dims(inp, 0))
print (down_result.shape)

up_model  = upsample(3, 4)
up_result = up_model(down_result)
print (up_result.shape)

In [None]:
def Generator():
    inputs = tf.keras.layers.Input(shape=[256, 256, 3])

    down_stack = [
        downsample(64, 4, apply_batchnorm=False),  # (batch_size, 128, 128, 64)
        downsample(128, 4),  # (batch_size, 64, 64, 128)
        downsample(256, 4),  # (batch_size, 32, 32, 256)
        downsample(512, 4),  # (batch_size, 16, 16, 512)
        downsample(512, 4),  # (batch_size, 8, 8, 512)
        downsample(512, 4),  # (batch_size, 4, 4, 512)
        downsample(512, 4),  # (batch_size, 2, 2, 512)
        downsample(512, 4),  # (batch_size, 1, 1, 512)
    ]

    up_stack = [
        upsample(512, 4, apply_dropout=True),  # (batch_size, 2, 2, 1024)
        upsample(512, 4, apply_dropout=True),  # (batch_size, 4, 4, 1024)
        upsample(512, 4, apply_dropout=True),  # (batch_size, 8, 8, 1024)
        upsample(512, 4),  # (batch_size, 16, 16, 1024)
        upsample(256, 4),  # (batch_size, 32, 32, 512)
        upsample(128, 4),  # (batch_size, 64, 64, 256)
        upsample(64, 4),  # (batch_size, 128, 128, 128)
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    last = tf.keras.layers.Conv2DTranspose(OUTPUT_CHANNELS, 4,
                                             strides=2,
                                             padding='same',
                                             kernel_initializer=initializer,
                                             activation='tanh')  # (batch_size, 256, 256, 3)

    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = tf.keras.layers.Concatenate()([x, skip])

    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

generator = Generator()
tf.keras.utils.plot_model(generator, show_shapes=True, dpi=64)

In [None]:
rj_inp, rj_re = random_jitter(inp, re)  # --123--  
print(rj_inp.shape)
gen_output = generator(rj_inp[tf.newaxis, ...], training=False) # --123--  
plt.imshow(gen_output[0, ...])

In [None]:
def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)

    inp = tf.keras.layers.Input(shape=[256, 256, 3], name='input_image')
    tar = tf.keras.layers.Input(shape=[256, 256, 3], name='target_image')

    x = tf.keras.layers.concatenate([inp, tar])  # (batch_size, 256, 256, channels*2)

    down1 = downsample(64, 4, False)(x)  # (batch_size, 128, 128, 64)
    down2 = downsample(128, 4)(down1)  # (batch_size, 64, 64, 128)
    down3 = downsample(256, 4)(down2)  # (batch_size, 32, 32, 256)

    zero_pad1 = tf.keras.layers.ZeroPadding2D()(down3)  # (batch_size, 34, 34, 256)
    conv = tf.keras.layers.Conv2D(512, 4, strides=1,
                                kernel_initializer=initializer,
                                use_bias=False)(zero_pad1)  # (batch_size, 31, 31, 512)

    batchnorm1 = tf.keras.layers.BatchNormalization()(conv)

    leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

    zero_pad2 = tf.keras.layers.ZeroPadding2D()(leaky_relu)  # (batch_size, 33, 33, 512)

    last = tf.keras.layers.Conv2D(1, 4, strides=1,
                                kernel_initializer=initializer)(zero_pad2)  # (batch_size, 30, 30, 1)

    return tf.keras.Model(inputs=[inp, tar], outputs=last)

discriminator = Discriminator()
tf.keras.utils.plot_model(discriminator, show_shapes=True, dpi=64)

In [None]:
rj_inp, rj_re = random_jitter(inp, re)  # --123--   
disc_out = discriminator([rj_inp[tf.newaxis, ...], gen_output], training=False)
plt.imshow(disc_out[0, ..., -1], vmin=-20, vmax=20, cmap='RdBu_r')
plt.colorbar()

## LOSS

In [None]:
LAMBDA = 100
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)
def generator_loss(disc_generated_output, gen_output, target):
    gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

    # Mean absolute error
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

    total_gen_loss = gan_loss + (LAMBDA * l1_loss)

    return total_gen_loss, gan_loss, l1_loss

def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)

    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

    total_disc_loss = real_loss + generated_loss

    return total_disc_loss

## Optimicer

In [None]:
generator_optimizer     = tf.keras.optimizers.Adam (2e-4, beta_1 = 0.5)
discriminator_optimizer = tf.keras.optimizers.Adam (2e-4, beta_1 = 0.5)

## Checkpoints

In [None]:
checkpoint_dir = './checkpoints/city/raining'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

## Logs

In [None]:
log_dir="logs/"

summary_writer = tf.summary.create_file_writer (
                                  log_dir + "fit_maps/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
                    )

## Train

In [None]:
def generate_images(model, test_input, tar):
    prediction = model(test_input, training=True)
    plt.figure(figsize=(15, 15))

    display_list = [test_input[0], tar[0], prediction[0]]
    title = ['Input Image', 'Ground Truth', 'Predicted Image']

    for i in range(3):
        plt.subplot(1, 3, i+1)
        plt.title(title[i])
        # Getting the pixel values in the [0, 1] range to plot.
        plt.imshow(display_list[i] * 0.5 + 0.5)
        plt.axis('off')
    plt.show()

for example_input, example_target in test_dataset.take(1):
    generate_images(generator, example_input, example_target)

In [None]:
@tf.function
def train_step(input_image, target, step):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        gen_output = generator(input_image, training=True)

        disc_real_output = discriminator([input_image, target], training=True)
        disc_generated_output = discriminator([input_image, gen_output], training=True)

        gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    generator_gradients = gen_tape.gradient(gen_total_loss,
                                          generator.trainable_variables)
    discriminator_gradients = disc_tape.gradient(disc_loss,
                                               discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(generator_gradients,
                                          generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients,
                                              discriminator.trainable_variables))

    with summary_writer.as_default():
        tf.summary.scalar('gen_total_loss', gen_total_loss, step=step//1000)
        tf.summary.scalar('gen_gan_loss', gen_gan_loss, step=step//1000)
        tf.summary.scalar('gen_l1_loss', gen_l1_loss, step=step//1000)
        tf.summary.scalar('disc_loss', disc_loss, step=step//1000)

In [None]:
def fit(train_ds, test_ds, steps):
    example_input, example_target = next(iter(test_ds.take(1)))
    start = time.time()

    for step, (input_image, target) in train_ds.repeat().take(steps).enumerate():
        if (step) % 1000 == 0:
            display.clear_output(wait=True)

        if step != 0:
            print(f'Time taken for 1000 steps: {time.time()-start:.2f} sec\n')

        start = time.time()

        generate_images(generator, example_input, example_target)
        print(f"Step: {step//1000}k")

    train_step(input_image, target, step)

    # Training step
    if (step+1) % 10 == 0:
        print('.', end='', flush=True)


    # Save (checkpoint) the model every 5k steps
    if (step + 1) % 5000 == 0:
      checkpoint.save(file_prefix=checkpoint_prefix)

In [None]:
%load_ext tensorboard
%tensorboard --logdir {log_dir}

In [None]:
fit(train_dataset, test_dataset, steps = 40000)

In [None]:
!ls {checkpoint_dir}

In [None]:
# Run the trained model on a few examples from the test set
for inp, tar in test_dataset.take(10):
    generate_images(generator, inp, tar)

In [None]:
# Run the trained model on a few examples from the test set
for inp, tar in test_dataset.take(10):
    generate_images(generator, inp, tar)

#  2. **********************************************************************
#        ****************************** Cyclegan*************************** 
#      **********************************************************************

In [None]:
import tensorflow as tf

import tensorflow_datasets as tfds
from tensorflow_examples.models.pix2pix import pix2pix
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import os
import time
import matplotlib.pyplot as plt
from IPython.display import clear_output
 
import pathlib
import time
import datetime

from matplotlib import pyplot as plt
from IPython import display

AUTOTUNE = tf.data.AUTOTUNE

### DATA

In [None]:
dataset_name = 'cityscapes'
_URL = f'http://efrosgans.eecs.berkeley.edu/pix2pix/datasets/{dataset_name}.tar.gz'

path_to_zip = tf.keras.utils.get_file(
    fname=f"{dataset_name}.tar.gz",
    origin=_URL,
    extract=True)

path_to_zip  = pathlib.Path(path_to_zip)

PATH = path_to_zip.parent/dataset_name

In [None]:
list(PATH.parent.iterdir())

In [None]:
list(PATH.parent.iterdir())
sample_image = tf.io.read_file(str(PATH /'train/1.jpg'))
sample_image = tf.io.decode_jpeg(sample_image, channels=3)
print(sample_image.shape)
plt.figure()
plt.imshow(sample_image)

In [None]:
BUFFER_SIZE_TRAIN = 400
BUFFER_SIZE_TEST  = 6287

BATCH_SIZE  = 1

IMG_WIDTH   = 256
IMG_HEIGHT  = 256

R_IMG_WIDTH   = 286
R_IMG_HEIGHT  = 286

PATH      = 'cityscapes'
train_dir = 'cityscapes' + '/train'
test_dir  = 'cityscapes' + '/val'

In [None]:
# Función para cargar las imágenes
def load (image_file, bool_img = False):
    # Read and decode an image file to a uint8 tensor
    image = tf.io.read_file (image_file)
    image = tf.io.decode_jpeg(image, channels=3)
    
    # Split each image tensor into two tensors:
    # - one with a real building facade image
    # - one with an architecture label image 
    w = tf.shape (image)[1]
    w = w // 2
    input_image = image [:, w:, :]
    real_image  = image [:, :w, :]
    # Convert both images to float32 tensors
    input_image = tf.cast (input_image, tf.float32)
    real_image  = tf.cast (real_image, tf.float32)
    
    if bool_img:
      # Casting to int for matplotlib to display the images
        plt.figure()
        plt.imshow(input_image / 255.0)
        plt.figure()
        plt.imshow(real_image / 255.0)
  
    return input_image, real_image

inp, re = load (str(PATH + '/train/1.jpg'), bool_img = True)

 

# Función para crear el conjunto de datos


In [None]:
def random_jitter (image):
    # resizing to 286 x 286 x 3
    image = tf.image.resize (image, [R_IMG_HEIGHT, R_IMG_WIDTH],
                                      method = 'nearest')
    # randomly cropping to 256 x 256 x 3
    image = random_crop(image)

    # random mirroring
    image = tf.image.random_flip_left_right(image) #Modificiamos la imagenpara aumentar el numero de muestras

    return image

def random_crop(image):
    cropped_image = tf.image.random_crop(
                      image, size=[IMG_HEIGHT, IMG_WIDTH, 3])

    return cropped_image

def normalize (image):
    image = tf.cast(image, tf.float32)
    image = (image / 127.5) - 1
    return image


def resize (input_image, real_image, height, width):
    input_image = tf.image.resize(input_image, [height, width],
                                method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    real_image = tf.image.resize(real_image, [height, width],
                               method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    return input_image, real_image

In [None]:
def load_image_train (image_file):
    input_image, real_image = load (image_file)
    
    input_image  = random_jitter (input_image)
    real_image   = random_jitter (real_image)
    
    input_image  = normalize (input_image)
    real_image   = normalize (real_image)
 
    
    return input_image, real_image

def load_image_test (image_file):
    input_image, real_image = load (image_file)
 
    #input_image, real_image = resize (input_image, real_image,IMG_HEIGHT, IMG_WIDTH)
    
    input_image = normalize (input_image)
    real_image  = normalize (real_image)
    
    
    
    return input_image, real_image

In [None]:
 # Directorios de entrenamiento y etiquetas de entrenamiento
train_dir  = PATH + '/train'
test_dir   = PATH +  '/val'

# Obtener rutas de todas las imágenes de entrenamiento
train_image_paths = [os.path.join(train_dir, fname) for fname in os.listdir (train_dir)]
test_image_paths  = [os.path.join(test_dir, fname) for fname in os.listdir (test_dir)]


# Crear dataset de tensorflow a partir de las rutas de las imágenes
 # tf.data.Dataset (imgA),tf.data.Dataset (imgB) = 
lstA = []
lstB = []
for f in train_image_paths:
    imgA, imgB = load_image_train (f)
    lstA.append (imgA)
    lstB.append (imgB)

#convertir la lista de imágenes y etiquetas a tensores
dtsA = tf.convert_to_tensor(lstA)
dtsB = tf.convert_to_tensor(lstB)

train_datasetA = tf.data.Dataset.from_tensor_slices(dtsA).batch (BATCH_SIZE)
train_datasetB = tf.data.Dataset.from_tensor_slices(dtsB).batch (BATCH_SIZE)



# Crear dataset de tensorflow a partir de las rutas de las imágenes
 # tf.data.Dataset (imgA),tf.data.Dataset (imgB) = 
lstA = []
lstB = []
for f in test_image_paths:
    imgA, imgB = load_image_test(f)
    lstA.append(imgA)
    lstB.append(imgB)

#convertir la lista de imágenes y etiquetas a tensores
dtsA = tf.convert_to_tensor(lstA)
dtsB = tf.convert_to_tensor(lstB)

test_datasetA = tf.data.Dataset.from_tensor_slices(dtsA).batch (BATCH_SIZE)
test_datasetB = tf.data.Dataset.from_tensor_slices(dtsB).batch (BATCH_SIZE)

In [None]:
sample_dtsA = next(iter(train_datasetA))
sample_dtsB = next(iter(train_datasetB))

In [None]:
import matplotlib.pyplot as plt

# Obtener un iterador para las primeras 3 imágenes
subset_iter = iter(train_datasetA.take(5))

fig, axs = plt.subplots(1, 5, figsize=(10, 10))
for i, img in enumerate(subset_iter):
    img = (img + 1) / 2.0  # Escalar los valores de los píxeles al rango [0, 1]
    axs[i].imshow(img[0])
plt.show()

 

# Obtener un iterador para las primeras 3 imágenes
subset_iter = iter(train_datasetB.take(5))

fig, axs = plt.subplots(1, 5, figsize=(10, 10))
for i, img in enumerate(subset_iter):
    img = (img + 1) / 2.0  # Escalar los valores de los píxeles al rango [0, 1]
    axs[i].imshow(img[0])
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Obtener un iterador para las primeras 3 imágenes
subset_iter = iter(test_datasetA.take(5))

fig, axs = plt.subplots(1, 5, figsize=(10, 10))
for i, img in enumerate(subset_iter):
    img = (img + 1) / 2.0  # Escalar los valores de los píxeles al rango [0, 1]
    axs[i].imshow(img[0])
plt.show()

 

# Obtener un iterador para las primeras 3 imágenes
subset_iter = iter(test_datasetB.take(5))

fig, axs = plt.subplots(1, 5, figsize=(10, 10))
for i, img in enumerate(subset_iter):
    img = (img + 1) / 2.0  # Escalar los valores de los píxeles al rango [0, 1]
    axs[i].imshow(img[0])
plt.show()

## CycleGan Model:  

 github.com/tensorflow/examples/blob/master/tensorflow_examples/models/pix2pix/pix2pix.py) via the installed [tensorflow_examples](https://github.com/tensorflow/examples) package.

The model architecture used in this tutorial is very similar to what was used in [pix2pix](https://github.com/tensorflow/examples/blob/master/tensorflow_examples/models/pix2pix/pix2pix.py). Some of the differences are:

* Cyclegan 
There are 2 generators (G and F) and 2 discriminators (X and Y) being trained here. 

* Generator `G` learns to transform image `X` to image `Y`. $(G: X -> Y)$
* Generator `F` learns to transform image `Y` to image `X`. $(F: Y -> X)$
* Discriminator `D_X` learns to differentiate between image `X` and generated image `X` (`F(Y)`).
* Discriminator `D_Y` learns to differentiate between image `Y` and generated image `Y` (`G(X)`).

- El modelo toma una entrada de forma (256, 256, 3) mediante una capa de entrada.
- A continuación, se aplican varias capas convolucionales con función de activación LeakyReLU y Batch Normalization para realizar la codificación (o downsampling) de la entrada.
- Luego, se aplican varias capas de convolución transpuesta con función de activación ReLU y Batch Normalization para realizar la decodificación (o upsampling) de la entrada codificada.
- Se utilizan capas de dropout para regularizar el modelo y prevenir el sobreajuste.
- Se utilizan capas de concatenación para fusionar las características de las capas de convolución transpuesta con las capas de convolución correspondientes durante la codificación.
- La capa de salida tiene una activación 'tanh' para generar valores de píxeles en el rango [-1, 1].

In [None]:
OUTPUT_CHANNELS = 3

generator_g = pix2pix.unet_generator(OUTPUT_CHANNELS, norm_type='instancenorm')
generator_f = pix2pix.unet_generator(OUTPUT_CHANNELS, norm_type='instancenorm')

discriminator_x = pix2pix.discriminator(norm_type='instancenorm', target=False)
discriminator_y = pix2pix.discriminator(norm_type='instancenorm', target=False)

In [None]:
to_dtsA = generator_g (sample_dtsA )
to_dtsB = generator_f (sample_dtsB)
plt.figure(figsize=(8, 8))
contrast = 8

imgs = [sample_dtsA, to_dtsB, sample_dtsB, to_dtsA]
title = ['dtsA', 'To dtsB', 'dtsB', 'To dtsA']

for i in range(len(imgs)):
    plt.subplot(2, 2, i+1)
    plt.title(title[i])
    if i % 2 == 0:
        plt.imshow(imgs[i][0] * 0.5 + 0.5)
    else:
        plt.imshow(imgs[i][0] * 0.5 * contrast + 0.5)
plt.show()

In [None]:
plt.figure(figsize=(8, 8))

plt.subplot(121)
plt.title('Is a real zebra?')
plt.imshow(discriminator_y(sample_dtsB)[0, ..., -1], cmap='RdBu_r')

plt.subplot(122)
plt.title('Is a real horse?')
plt.imshow(discriminator_x(sample_dtsA)[0, ..., -1], cmap='RdBu_r')

plt.show()

## Loss Functions

In [None]:
LAMBDA = 10

In [None]:
loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
def discriminator_loss(real, generated):
    real_loss = loss_obj(tf.ones_like(real), real)

    generated_loss = loss_obj(tf.zeros_like(generated), generated)

    total_disc_loss = real_loss + generated_loss

    return total_disc_loss * 0.5

In [None]:
def generator_loss(generated):
    return loss_obj(tf.ones_like(generated), generated)

## Optimizador

#### Cycle loss

In [None]:
def calc_cycle_loss(real_image, cycled_image):
    loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))

    return LAMBDA * loss1

In [None]:
def identity_loss(real_image, same_image):
    loss = tf.reduce_mean(tf.abs(real_image - same_image))
    return LAMBDA * 0.5 * loss

Initialize the optimizers for all the generators and the discriminators.

In [None]:
generator_g_optimizer = tf.keras.optimizers.Adam (2e-4, beta_1 = 0.5)
generator_f_optimizer = tf.keras.optimizers.Adam (2e-4, beta_1 = 0.5)

discriminator_x_optimizer = tf.keras.optimizers.Adam (2e-4, beta_1 = 0.5)
discriminator_y_optimizer = tf.keras.optimizers.Adam (2e-4, beta_1 = 0.5)

# Checkpoints

In [None]:
checkpoint_path = "./checkpoints/cityscapes/train"

ckpt = tf.train.Checkpoint(generator_g=generator_g,
                           generator_f=generator_f,
                           discriminator_x=discriminator_x,
                           discriminator_y=discriminator_y,
                           generator_g_optimizer=generator_g_optimizer,
                           generator_f_optimizer=generator_f_optimizer,
                           discriminator_x_optimizer=discriminator_x_optimizer,
                           discriminator_y_optimizer=discriminator_y_optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# if a checkpoint exists, restore the latest checkpoint.
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print ('Latest checkpoint restored!!')

# Training

In [None]:
EPOCHS = 150

In [None]:
def generate_images(model, test_input):
    prediction = model(test_input)

    plt.figure(figsize=(12, 12))

    display_list = [test_input[0], prediction[0]]
    title = ['Input Image', 'Predicted Image']

    for i in range(2):
        plt.subplot(1, 2, i+1)
        plt.title(title[i])
        # getting the pixel values between [0, 1] to plot it.
        plt.imshow(display_list[i] * 0.5 + 0.5)
        plt.axis('off')
    plt.show()

In [None]:
@tf.function
def train_step(real_x, real_y):
  # persistent is set to True because the tape is used more than
  # once to calculate the gradients.
    with tf.GradientTape(persistent=True) as tape:
        # Generator G translates X -> Y
        # Generator F translates Y -> X.

        fake_y = generator_g(real_x, training=True)
        cycled_x = generator_f(fake_y, training=True)

        fake_x = generator_f(real_y, training=True)
        cycled_y = generator_g(fake_x, training=True)

        # same_x and same_y are used for identity loss.
        same_x = generator_f(real_x, training=True)
        same_y = generator_g(real_y, training=True)

        disc_real_x = discriminator_x(real_x, training=True)
        disc_real_y = discriminator_y(real_y, training=True)

        disc_fake_x = discriminator_x(fake_x, training=True)
        disc_fake_y = discriminator_y(fake_y, training=True)

        # calculate the loss
        gen_g_loss = generator_loss(disc_fake_y)
        gen_f_loss = generator_loss(disc_fake_x)

        total_cycle_loss = calc_cycle_loss(real_x, cycled_x) + calc_cycle_loss(real_y, cycled_y)

        # Total generator loss = adversarial loss + cycle loss
        total_gen_g_loss = gen_g_loss + total_cycle_loss + identity_loss(real_y, same_y)
        total_gen_f_loss = gen_f_loss + total_cycle_loss + identity_loss(real_x, same_x)

        disc_x_loss = discriminator_loss(disc_real_x, disc_fake_x)
        disc_y_loss = discriminator_loss(disc_real_y, disc_fake_y)
  
    # Calculate the gradients for generator and discriminator
    generator_g_gradients = tape.gradient(total_gen_g_loss, 
                                        generator_g.trainable_variables)
    generator_f_gradients = tape.gradient(total_gen_f_loss, 
                                        generator_f.trainable_variables)

    discriminator_x_gradients = tape.gradient(disc_x_loss, 
                                            discriminator_x.trainable_variables)
    discriminator_y_gradients = tape.gradient(disc_y_loss, 
                                            discriminator_y.trainable_variables)
  
    # Apply the gradients to the optimizer
    generator_g_optimizer.apply_gradients(zip(generator_g_gradients, 
                                            generator_g.trainable_variables))

    generator_f_optimizer.apply_gradients(zip(generator_f_gradients, 
                                            generator_f.trainable_variables))

    discriminator_x_optimizer.apply_gradients(zip(discriminator_x_gradients,
                                                discriminator_x.trainable_variables))

    discriminator_y_optimizer.apply_gradients(zip(discriminator_y_gradients,
                                                discriminator_y.trainable_variables))

In [None]:
for epoch in range(EPOCHS):
    start = time.time()

    n = 0
    for image_x, image_y in tf.data.Dataset.zip((train_datasetA, train_datasetB)):
        train_step(image_x, image_y)
        if n % 10 == 0:
            print ('.', end='')
        n += 1

    clear_output(wait=True)
    # Using a consistent image (sample_horse) so that the progress of the model
    # is clearly visible.
    generate_images(generator_g, sample_dtsA)

    if (epoch + 1) % 5 == 0:
        ckpt_save_path = ckpt_manager.save()
        print ('Saving checkpoint for epoch {} at {}'.format(epoch+1,
                                                             ckpt_save_path))

    print ('Time taken for epoch {} is {} sec\n'.format(epoch + 1,
                                                      time.time()-start))

# Generate using test dataset

In [None]:
# Run the trained model on the test dataset
for inp in train_datasetA.take(25):
    generate_images(generator_g, inp)

In [None]:
for inp in train_datasetB.take(25):
    generate_images(generator_f, inp)

In [None]:
# Run the trained model on the test dataset
for inp in test_datasetA.take(25):
    generate_images(generator_g, inp)

In [None]:
# Run the trained model on the test dataset
for inp in test_datasetB.take(25):
    generate_images(generator_f, inp)

# CONCLUSIÓN

anto Pix2Pix como CycleGAN son algoritmos muy populares y efectivos para la tarea de traducción de imágenes. En la segmentación de calles, ambos pueden ser útiles y dar buenos resultados dependiendo de la configuración y entrenamiento del modelo.

Pix2Pix se enfoca en aprender una función de mapeo directo entre dos dominios de imagen, lo que significa que necesita un conjunto de datos emparejados de imágenes de entrada y salida para el entrenamiento. Por otro lado, CycleGAN es capaz de aprender la relación entre dos dominios de imagen sin la necesidad de imágenes emparejadas, lo que lo hace más flexible en términos de entrenamiento.

En nuestro caso, el modelo px2pix para Cityscapes parece que da mejores resultados con menos epochs de entrenamieto que el modelo cyclegan (DCGSN). Pero como hemos comentado, ambis modelos son optimos para la segmentación de calles y detección de objetos en la circulación de coches autnomos.