In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
from numpy import zeros,ones
from numpy.random import randint
from keras.optimizers import Adam
from keras.initializers import RandomNormal
from keras.models import Model
# from keras.models import Input
from tensorflow.keras.layers import Input
from keras.layers import Conv2D,Conv2DTranspose,LeakyReLU,Activation,Concatenate,Dropout,BatchNormalization
from matplotlib import pyplot as plt
from tensorflow.keras.utils import plot_model

In [None]:
def define_discriminator(image_shape):
    init = RandomNormal(stddev = 0.2)
    in_src_image = Input(shape = image_shape)
    in_target_image = Input(shape = image_shape)
    merged = Concatenate()([in_src_image,in_target_image])
    
    d = Conv2D(64,(4,4),strides = (2,2),padding = 'same',kernel_initializer = init)(d)
    d = BatchNormalization()(d)
    d = LeakyReLu(alpha = 0.2)(d)
    
    d = Conv2D(128,(4,4),strides = (2,2),padding = 'same',kernel_initializer = init)(d)
    d = BatchNormalization()(d)
    d = LeakyReLu(alpha = 0.2)(d)
    
    d = Conv2D(256,(4,4),strides = (2,2),padding = 'same',kernel_initializer = init)(d)
    d = BatchNormalization()(d)
    d = LeakyReLu(alpha = 0.2)(d)
    
    d = Conv2D(512,(4,4),strides = (2,2),padding = 'same',kernel_initializer = init)(d)
    d = BatchNormalization()(d)
    d = LeakyReLu(alpha = 0.2)(d)
    
    d = Conv2D(512,(4,4),padding = 'same',kernel_initializer = init)(d)
    d = BatchNormalization()(d)
    d = LeakyReLu(alpha = 0.2)(d)
    
    d = Conv2D(1,(4,4),padding = 'same',kernel_initializer = init)(d)
    patch_out = Activation('sigmoid')(d)
    
    #defining the model

    model = Model([in_src_image,in_target_image],patch_out)
    # compile model
    opt = Adam(lr = 0.0002,beta_1 = 0.5)
    model.compile(loss = 'binary_crossentropy',optimizer = opt,loss_weights = [0.5])
    return model

In [None]:
def define_encoder_block(layer_in,n_filters,batchnorm = True):
    init = RandomNormal(stddev = 0.02)
    g = Conv2D(n_filters,(4,4),strides = (2,2),padding = 'same',kernel_initializer = init)(g)
    if batchnorm:
        g = BatchNormalization()(g,training = True)
    #leakyrelu acivation
    g = LeakyReLu(alpha = 0.2)(g)
    return g

In [None]:
def decoder_block(layer_in,skip_in,n_filters,dropout = True):
    init = RandomNormal(stddev = 0.02)
    g = Conv2DTranspose(n_filters,(4,4),strides = (2,2),padding = 'same',kernel_initializer = init)(g)
    g = BatchNormalization()(g,training = True)
    if dropout:
        g = Dropout(0.5)(g,training = True)
        
    #leakyrelu acivation
    g = Concatenate()([g,skip_in])
    g = Activation('relu')(g)
    return g

In [None]:
def define_generator(image_shape = (256,256,3)):
    init = RandomNormal(stddev = 0.02)
    in_image = Input(shape = image_shape)
    e1 = define_encoder_block(in_image,64,batchnorm = False)
    e2 = define_encoder_block(e1,128)
    e3 = define_encoder_block(e2,256)
    e4 = define_encoder_block(e3,512)
    e5 = define_encoder_block(e4,512)
    e6 = define_encoder_block(e5,512)
    e7 = define_encoder_block(e6,512)
    
    b = Conv2D(512,(4,4),strides = (2,2),padding = 'same',kernel_intializer =init)(b)
    b = Activation('relu')(b)
    
    d1 = decoder_block(b,e7,512)
    d2 = decoder_block(d1,e6,512)
    d3 = decoder_block(d2,e5,512)
    d4 = decoder_block(d3,e4,512,dropout = False)
    d5 = decoder_block(d4,e3,256,dropout = False)
    d6 = decoder_block(d5,e2,128,dropout = False)
    d7 = decoder_block(d6,e1,64,dropout = False)
    
    g = Conv2DTranspose(image_shape[2],(4,4),strides = (2,2),padding = 'same',kernel_initializer = init)(g)
    out_image = Activation('tanh')(g)
    
    model = Model(in_image,out_image)
    return model

In [None]:
def define_gan(g_model,d_model,image_shape):
    for layer in d_model.layers:
        if not isinstance(layer,BatchNormalization):
            layer.trainable = False
    in_src = Input(shape = image_shape)
    gen_out = g_model(in_src)
    dis_out = d_model([in_src,gen_out])
    model = Model(in_src , [dis_out,gen_out])
    opt = Adam(lr = 0.002,beta_1 = 0.5)
    model.compile(loss = ['binary_crossentropy','mae'],optimizer = opt,loss_weights = [1,100])
    return model

In [None]:
def generate_real_samples(dataset,n_samples,patch_shape):
    trainA,trainB = dataset
    ix = randint(0,trainA.shape[0],n_samples)
    X1,X2 = trainA[ix],trainB[ix]
    y = ones((n_samples,patch_shape,patch_shape,1))
    return [X1,X2],y

In [None]:
def generate_fake_samples(g_model,samples,patch_shape):
    X = g_model.predict(samples)
    y = zeros((len(X),patch_shape,patch_shape,1))
    return X,y

In [None]:
def summarize_performance(step,g_model,dataset,n_samples = 3):
    [X_realA,X_realB],_ = generate_real_samples(dataset,n_samples,1)
    X_fakeB, _ = generate_fake_samples(g_model,X_realA,1)
    X_realA = (X_realA+1)/2.0
    X_realB = (X_realB+1)/2.0
    X_fakeB = (X_fakeB + 1)/2.0
    
    #lets now plot real source images
    for i in range(n_samples):
        plt.subplot(3,n_samples,1+i)
        plt.axis('off')
        plt.imshow(X_realA[i])
    
    #plot generated images
    for i in range(n_samples):
        plt.subplot(3,n_samples,1+n_samples + i)
        plt.axis('off')
        plt.imshow(X_fakeB[i])
    
    #plot real target images
    for i in range(n_samples):
        plt.subplot(3,n_samples,1+n_samples*2 + i)
        plt.axis('off')
        plt.imshow(X_realB[i])
    
    #save the plot to file
    filename1 = 'images'%(step+1)
    g_model.save(filename1)
    print('saved files')

Now train the pix2pix model

In [None]:
def train(d_model,g_model,gan_model,dataset,n_epochs = 100,n_batch = 1):
    n_patch = d_model.output_shape[1]
    trainA,trainB = dataset
    bat_per_epo = init(len(trainA)/n_batch)
    
    n_steps = bat_per_epo * n_epochs
    for i in range(n_steps):
        [X_realA,X_realB] , y_real = generate_real_samples(dataset,n_batch,n_patch)
        #generate a batch of fake samples
        X_fakeB,y_fake = generate_fake_samples(g_model,X_realA,n_patch)
        
        #update discriminator
        d_loss1 = d_model.train_on_batch([X_realA,X_realB],y_real)
        
        d_loss2 = d_model.train_on_batch([X_realA,X_fakeB],y_fake)
        
        g_loss, _ , _ = gan_model.train_on_batch(X_realA,[y_real,X_realB])
        if i%(bat_per_epo*10) == 0:
            summarize_performance(i,g_model,dataset)