<a href="https://colab.research.google.com/github/kalebsampaco/Ejercicios-en-google-colab/blob/master/Redes_adversas_generativas_pixtwopix.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:

from __future__ import print_function, division
import scipy
from glob import glob
import numpy as np
import matplotlib.pyplot as plt
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, Concatenate
from keras.layers import BatchNormalization, Activation, ZeroPadding2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam
import datetime
from google.colab import drive
drive.mount('/content/drive')
import sys
sys.path.append('/content/drive/My Drive/pix2pix/')
import os
import cv2 as cv

#CLase preparada para carga de datos
class Cargador_Datos():
    def __init__(self, nombre_dataset, resolucion_imagen=(128, 128)):
        self.nombre_dataset = nombre_dataset
        self.resolucion_imagen = resolucion_imagen
    #Funcion de carga de datos
    def carga_datos(self, tamano_batch=1, is_testing=False):
        tipo_datos = "train" if not is_testing else "test"
        ruta = glob('/content/drive/My Drive/pix2pix/datasets/%s/%s/*' % (self.nombre_dataset, tipo_datos))

        batch_de_imagenes = np.random.choice(ruta, size=tamano_batch)

        imagenes_A = []
        imagenes_B = []
        for ruta_imagen in batch_de_imagenes:
            imagen = self.imread(ruta_imagen)

            alto, ancho, _ = imagen.shape
            _ancho = int(ancho/2)
            imagen_A, imagen_B = imagen[:, :_ancho, :], imagen[:, _ancho:, :]

            imagen_A = cv.resize(imagen_A, self.resolucion_imagen)
            imagen_B = cv.resize(imagen_B, self.resolucion_imagen)

            # Aumento de datos solo en entrenamiento
            if not is_testing and np.random.random() < 0.5:
                imagen_A = np.fliplr(imagen_A)
                imagen_B = np.fliplr(imagen_B)

            imagenes_A.append(imagen_A)
            imagenes_B.append(imagen_B)

        imagenes_A = np.array(imagenes_A)/127.5 - 1.
        imagenes_B = np.array(imagenes_B)/127.5 - 1.

        return imagenes_A, imagenes_B
      
    #Funcion de ccarga de batches
    def carga_batch(self, tamano_batch=1, is_testing=False):
        tipo_datos = "train" if not is_testing else "val"
        ruta = glob('/content/drive/My Drive/pix2pix/datasets/%s/%s/*' % (self.nombre_dataset, tipo_datos))

        self.numero_batches = int(len(ruta) / tamano_batch)

        for i in range(self.numero_batches-1):
            batch = ruta[i*tamano_batch:(i+1)*tamano_batch]
            imagenes_A, imagenes_B = [], []
            for imagen in batch:
                imagen = self.imread(imagen)
                alto, ancho, _ = imagen.shape
                _ancho = int(ancho/2)
                imagen_A = imagen[:, :_ancho, :]
                imagen_B = imagen[:, _ancho:, :]
              
                imagen_A = cv.resize(imagen_A, self.resolucion_imagen)
                imagen_B = cv.resize(imagen_B, self.resolucion_imagen)

                if not is_testing and np.random.random() > 0.5:
                        imagen_A = np.fliplr(imagen_A)
                        imagen_B = np.fliplr(imagen_B)

                imagenes_A.append(imagen_A)
                imagenes_B.append(imagen_B)

            imagenes_A = np.array(imagenes_A)/127.5 - 1.
            imagenes_B = np.array(imagenes_B)/127.5 - 1.

            yield imagenes_A, imagenes_B


    def imread(self, ruta):
        return cv.imread(ruta)

#Definicion de Clase GAN Pix2Pix
class Pix2Pix():
    def __init__(self):
        # Tamaño de entrada de imagenes
        self.filas_imagen = 256
        self.columnas_imagen = 256
        self.canales = 3
        self.tamano_imagen = (self.filas_imagen, self.columnas_imagen, self.canales)

        # Configuracion de base de datos y cargador de datos
        self.nombre_dataset = 'facades'
        self.cargador_datos = Cargador_Datos(nombre_dataset=self.nombre_dataset,resolucion_imagen=(self.filas_imagen, self.columnas_imagen))


        # Calculo de salidas del discriminador
        parche = int(self.filas_imagen / 2**4)
        self.parches_discriminador = (parche, parche, 1)

        # Numero de filtros iniciales de discriminador y generador
        self.gf = 64
        self.df = 64

        optimizador = Adam(0.0002, 0.5)

        # Construccion y compilación del discriminador
        self.discriminador = self.crea_discriminador()
        self.discriminador.compile(loss='mse',optimizer=optimizador,metrics=['accuracy'])

        # Construccion de generador
        self.generador = self.crea_generador()

        # Entradas y condiciones de entrada
        imagen_A = Input(shape=self.tamano_imagen)
        imagen_B = Input(shape=self.tamano_imagen)

        # Condicionado mediante B para generar una falsa A
        falsa_A = self.generador(imagen_B)

        # Congelado del discriminador en el modelo conjunto
        self.discriminador.trainable = False

        # Validez de imagenes calculadas a través del discriminador
        valido = self.discriminador([falsa_A, imagen_B])

        self.combinado = Model(inputs=[imagen_A, imagen_B], outputs=[valido, falsa_A])
        self.combinado.compile(loss=['mse', 'mae'],loss_weights=[1, 100],optimizer=optimizador)

    #Funcion de creacion del generador
    def crea_generador(self):   
        
        #definicion de convolucion+Leaky Relu
        def conv2d(layer_input, filters, f_size=4, bn=True):
            d = Conv2D(filters, kernel_size=f_size, strides=2, padding='same')(layer_input)
            d = LeakyReLU(alpha=0.2)(d)
            if bn:
                d = BatchNormalization(momentum=0.8)(d)
            return d
        
        #definicion de aproximacion a deconvolucion
        def deconv2d(layer_input, skip_input, filters, f_size=4, dropout_rate=0):
            u = UpSampling2D(size=2)(layer_input)
            u = Conv2D(filters, kernel_size=f_size, strides=1, padding='same', activation='relu')(u)
            if dropout_rate:
                u = Dropout(dropout_rate)(u)
            u = BatchNormalization(momentum=0.8)(u)
            u = Concatenate()([u, skip_input])
            return u

        # Entrada
        d0 = Input(shape=self.tamano_imagen)

        # Encoder
        d1 = conv2d(d0, self.gf, bn=False)
        d2 = conv2d(d1, self.gf*2)
        d3 = conv2d(d2, self.gf*4)
        d4 = conv2d(d3, self.gf*8)
        d5 = conv2d(d4, self.gf*8)
        d6 = conv2d(d5, self.gf*8)
        d7 = conv2d(d6, self.gf*8)

        # Decoder
        u1 = deconv2d(d7, d6, self.gf*8)
        u2 = deconv2d(u1, d5, self.gf*8)
        u3 = deconv2d(u2, d4, self.gf*8)
        u4 = deconv2d(u3, d3, self.gf*4)
        u5 = deconv2d(u4, d2, self.gf*2)
        u6 = deconv2d(u5, d1, self.gf)
        u7 = UpSampling2D(size=2)(u6)
        imagen_salida = Conv2D(self.canales, kernel_size=4, strides=1, padding='same', activation='tanh')(u7)

        return Model(d0, imagen_salida)

     #Funcion de creacion del discriminador
    def crea_discriminador(self):

        #definicion de capa de discriminacion
        def d_layer(layer_input, filters, f_size=4, bn=True):
            d = Conv2D(filters, kernel_size=f_size, strides=2, padding='same')(layer_input)
            d = LeakyReLU(alpha=0.2)(d)
            if bn:
                d = BatchNormalization(momentum=0.8)(d)
            return d
         
        #definicion de entradas
        imagen_A = Input(shape=self.tamano_imagen)
        imagen_B = Input(shape=self.tamano_imagen)

        # Concatenacion de imagen y condicion en canales
        combined_imgs = Concatenate(axis=-1)([imagen_A, imagen_B])

        d1 = d_layer(combined_imgs, self.df, bn=False)
        d2 = d_layer(d1, self.df*2)
        d3 = d_layer(d2, self.df*4)
        d4 = d_layer(d3, self.df*8)
        
        #obtencion de validez a traves del discriminador
        validez = Conv2D(1, kernel_size=4, strides=1, padding='same')(d4)

        return Model([imagen_A, imagen_B], validez)

    # Funcion de entrenamiento  
    def train(self, epocas, tamano_batch=1, intervalo_muestra=50):

        inicio = datetime.datetime.now()

        # Definicion de etiquetas validas y falsas para entrenar
        valido = np.ones((tamano_batch,) + self.parches_discriminador)
        falsa = np.zeros((tamano_batch,) + self.parches_discriminador)
        for epoca in range(epocas):
            for batch_i, (imagenes_A, imagenes_B) in enumerate(self.cargador_datos.carga_batch(tamano_batch)):


                # Generacion de imagen a traves de condicion B
                falsa_A = self.generador.predict(imagenes_B)

                # Entrenamiento del discriminador
                perdidas_discriminador_real = self.discriminador.train_on_batch([imagenes_A, imagenes_B], valido)
                perdidas_discriminador_falsa = self.discriminador.train_on_batch([falsa_A, imagenes_B], falsa)
                perdidas_discriminador = 0.5 * np.add(perdidas_discriminador_real, perdidas_discriminador_falsa)

                #Entrenamiento del generador
                perdidas_generador = self.combinado.train_on_batch([imagenes_A, imagenes_B], [valido, imagenes_A])

                tiempo_final = datetime.datetime.now() - inicio


                # Guardado y ploteo cada intervalo de muestra
                if batch_i % intervalo_muestra == 0:
                    print ("[epoca %d/%d] [Batch %d/%d] [perdidas discriminador: %f, precision: %3d%%] [perdidas generador: %f] tiempo: %s" % (epoca, epocas,
                                                                batch_i, self.cargador_datos.numero_batches,perdidas_discriminador[0], 100*perdidas_discriminador[1],
                                                                perdidas_generador[0],tiempo_final))
                    self.combinado.save_weights('/content/drive/My Drive/srgan/saved_model/combined.h5')
                    self.discriminador.save_weights('/content/drive/My Drive/srgan/saved_model/discriminator.h5')
                    self.generador.save_weights('/content/drive/My Drive/srgan/saved_model/generator.h5')
                    self.imagenes_muestra(epoca, batch_i)

    #Funcion de ploteo de imagenes de muestra
    def imagenes_muestra(self, epoca, batch_i):
        r, c = 3, 3

        imagenes_A, imagenes_B = self.cargador_datos.carga_datos(tamano_batch=3, is_testing=True)
        falsa_A = self.generador.predict(imagenes_B)

        imagenes_generadas = np.concatenate([imagenes_B, falsa_A, imagenes_A])

        imagenes_generadas = 0.5 * imagenes_generadas + 0.5

        titulos = ['Condicion', 'Generada', 'Original']
        fiigura, ejes = plt.subplots(r, c)
        contador = 0
        for i in range(r):
            for j in range(c):
                ejes[i,j].imshow(imagenes_generadas[contador])
                ejes[i, j].set_title(titulos[i])
                ejes[i,j].axis('off')
                contador += 1
        plt.plot()


gan = Pix2Pix()

gan.train(epocas=200, tamano_batch=5, intervalo_muestra=100)