In [2]:
import numpy as np
import cv2
import keras
import tensorflow as tf
import h5py

from os import listdir

from numpy import load
from numpy import zeros
from numpy import ones
from numpy import expand_dims
from numpy.random import randint
from numpy import vstack
from numpy import asarray
from numpy import savez_compressed

from matplotlib import pyplot as plt

from keras import Sequential
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.initializers import RandomNormal
from keras.models import Model
from keras.models import Input
from keras.models import load_model
from keras.optimizers import Adam
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Concatenate
from keras.layers import Dropout
from keras.layers import BatchNormalization
from keras.layers import ReLU

# Data Preprocessing

In [3]:
def load_images(path, size=(256,512)):
    
    # path : Path for the training images directory
    # size : Size of each image. For our case one image contains two separate images of size 256*256.

    # load all images in a directory into memory and returns them as a combined array

    src_list, tar_list = list(), list()

    # enumerate all images in directory
    for filename in listdir(path):
        # load and resize the image
        pixels = load_img(path + filename, target_size=size)
        # convert to numpy array
        pixels = img_to_array(pixels)
        # split into source and target
        s_img, t_img = pixels[:, :256], pixels[:, 256:]
        src_list.append(s_img)
        tar_list.append(t_img)
        print(filename)
        
    print('Done')
    return [asarray(src_list), asarray(tar_list)]

def compress_images(src_images, tar_images, path_dest):
    
    # src_images : source image array
    # tar_images : target image array
    # path_dest : path to store the compressed .npz file
    
    # converts source and targets arrays into a single .npz file

    # [src_images, tar_images] = load_images(path_src)
    print("Loaded: ", src_images.shape, tar_images.shape)
    savez_compressed(path_dest, src_images, tar_images)
    print('Compressed and saved successfully!')

def plot_figures(src_images, tar_images, plot_dest, n_samples=3):

    # n_samples : number of samples to be plotted
    # plot_dest : path to save the plot

    # function to plot figures of dataset
    
    # [src_images, tar_images] = load_images(path_src)

    # Plots first n_samples number of images
    for i in range(n_samples):
        plt.subplot(2, n_samples, 1+i)
        plt.axis('off')
        plt.imshow(src_images[i].astype('uint8'))
    
    for i in range(n_samples):
        plt.subplot(2, n_samples, 1+n_samples+i)
        plt.axis('off')
        plt.imshow(tar_images[i].astype('uint8'))
        
    plt.savefig(plot_dest)

    print('Images saved successfully')

def convert_to_hdf5(src_images, tar_images, path_dest):
    
    # src_images : source image array
    # tar_images : target image array
    # path_dest : path to store the compressed .hdf5 file

    # function to compress arrays to hdf5 file format

    # [src_images, tar_images] = load_images(path_src)
    print('Loaded: ', src_images.shape, tar_images.shape)

    h5f = h5py.File(path_dest, 'w')
    h5f.create_dataset('dataset_1', data=[src_images, tar_images])
    h5f.close()


In [None]:
# USAGE

path_src = './assets/datasets/maps/train/'
path_dest = 'file.npz'
plot_dest = 'img.png'

[src_images, tar_images] = load_images(path_src)
compress_images(src_images, tar_images, path_dest)
plot_figures(src_images, tar_images, plot_dest)

# Generator

In [None]:
def encoder(ip_layer, n_filters, batchNorm=True):
    
    # batchNorm : boolean value that determines whether to apply batch normalization
    # ip_layer : input feature vector
    # n_filters : number of filters after concatenation

    # Encoder architecture : C64-C128-C256-C512-C512-C512-C512-C512
    # batchNorm is only applied to first layer of the encoder i.e. C64

    # Weights initialized from a Gaussian distribution with mean 0 and standard deviation 0.02
    initialized_weights = RandomNormal(stddev=0.02)

    # All layers have kernel size 4*4 and stride 2*2
    kernel_size = (4, 4)
    stride = (2, 2)

    layer = Conv2D(n_filters, kernel_size, strides=stride, padding='same', kernel_initializer=initialized_weights)(ip_layer)

    if batchNorm:
        layer = BatchNormalization()(layer, training=True)

    layer = LeakyReLU(alpha=0.2)(layer)

    return layer

def decoder(ip_layer, skip_connection, n_filters, dropout=True):

    # ip_layer : input feature vector
    # n_filters : number of filters after concatenation
    # dropout : boolean value that determines whether to apply dropout to the layer
    # skip_connection : the encoder layer from which skip connections will be applied

    # Decoder architecture : C512-C512-C512-C512-C256-C128-C64

    # Weights initialized from a Gaussian distribution with mean 0 and standard deviation 0.02
    initialized_weights = RandomNormal(stddev=0.02)

    # All layers have kernel size 4*4 and stride 2*2
    kernel_size = (4, 4)
    stride = (2, 2)

    layer = Conv2DTranspose(n_filters, kernel_size, strides=stride, padding='same', kernel_initializer=initialized_weights)(ip_layer)
    layer = BatchNormalization()(layer, training=True)
    if dropout:
        layer = Dropout(0.5)(layer, training=True)

    # Applying the skip connection
    layer = Concatenate()([layer, skip_connection])
    # ReLUs in the decoder are not leaky - Ref. 6.1.1.
    layer = Activation('relu')(layer)

    return layer

def generator(image_size = (256, 256, 3)):

    # image_size : shape of input image

    # Generator Architecture : CCD512-CD1024-CD1024-CD1024-CD1024-CD512-CD256-CD128
    # U-Net architecture with skip connections between each layer i in the encoder and the layer n-i in the decoder

    # Weights initialized from a Gaussian distribution with mean 0 and standard deviation 0.02
    kernel_init = RandomNormal(stddev = 0.02)

    ip_image = Input(shape = image_size)
    
    # All layers have kernel size 4*4 and stride 2*2
    kernel_size = (4,4)
    stride = (2,2)

    e1 = encoder(ip_image, 64, batchNorm=False) # No batchNorm for first encoder layer
    e2 = encoder(e1, 128)
    e3 = encoder(e2, 256)
    e4 = encoder(e3, 512)
    e5 = encoder(e4, 512)
    e6 = encoder(e5, 512)
    e7 = encoder(e6, 512)

    # Bottleneck layer, connecting encoder and decoder
    bottle_neck = Conv2D(512, kernel_size, strides=stride, padding='same', kernel_initializer=kernel_init, activation='relu')(e7)

    d1 = decoder(bottle_neck, e7, 512)
    d2 = decoder(d1, e6, 512)
    d3 = decoder(d2, e5, 512)
    d4 = decoder(d3, e4, 512, dropout=False)
    d5 = decoder(d4, e3, 256, dropout=False)
    d6 = decoder(d5, e2, 128, dropout=False)
    d7 = decoder(d6, e1, 64, dropout=False)

    # Convolution is applied to map to the number of output channels, followed by a tanh function - Ref. 6.1.1.
    op_image = Conv2DTranspose(3, kernel_size, strides=stride, padding='same', kernel_initializer=kernel_init, activation='tanh')(d7)

    # compile model
    model = Model(ip_image, op_image)

    return model

In [None]:
model = generator()
model.summary()

# Discriminator

In [None]:
def discriminator(image_shape=(256, 256, 3)):

    # image_shape : specifies the shape of the input image. By default- 256*256
    
    # Discriminator architecture details:
        # Ck denotes a Convulution-BatchNorm-LeakyReLU layer with k filters (Ref 6.1.)
        # Discriminator architecture: C64-C128-C256-C512 (Ref 6.1.2.)
        # Receptive field size for above architecture : 70*70

        # The depth of the discriminator architecture is responsible for the receptive field size

    # Weights initialized from a Gaussian distribution with mean 0 and standard deviation 0.02 (Ref 6.2.)
    curv_val = RandomNormal(stddev=0.02)

    # Discriminator uses two inputs, the source input image and the target input image
    # source image input
    # For maps and cityscapes dataset, input image shape is 256*256
    src_image_inp = Input(shape=image_shape)
    # target image input
    target_image_inp = Input(shape=image_shape)

    # concatenate images channel-wise
    merged_input = Concatenate()([src_image_inp, target_image_inp])

    kernel_size = (4,4)
    stride = (2,2)

    # First layer : C64
    layer = Conv2D( 64, kernel_size, strides = stride, padding ='same', kernel_initializer=curv_val)(merged_input)
    # No batch-normalization for the first layer
    layer = LeakyReLU(alpha=0.2)(layer)

    # Subsequent layers : C128-C256-C512
    filter_size = [128, 256, 512]
    for i in range(len(filter_size)):
        layer = Conv2D( filter_size[i], kernel_size, strides = stride, padding ='same', kernel_initializer=curv_val)(layer)
        layer = BatchNormalization()(layer)
        # Applying Leaky ReLU activation with slope 0.2
        layer = LeakyReLU(alpha=0.2)(layer)
        
    layer = Conv2D(512, (4,4), padding='same', kernel_initializer=curv_val)(layer)
    layer = BatchNormalization()(layer)
    layer= LeakyReLU(alpha=0.2)(layer)

    # After last layer a convolution is applied to map to a 1-dimensional output
    layer = Conv2D(1, (4,4), padding='same', kernel_initializer=curv_val)(layer)
    patch_out = Activation('sigmoid')(layer)

    # define model
    model = Model([src_image_inp, target_image_inp], patch_out)
    # compile model
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights=[0.5])
    return model

In [None]:
model = discriminator()
print(model.summary())

# Composite Model - GAN

In [None]:
def GAN(g_model, d_model, input_shape=(256, 256, 3)):

    # g_model : input generator model
    # d_model : input discriminator model
    # input_shape : input image shape
    
    # This is a composite model to connect the output of the generator model to the input of the discrimintor model

    ip_image = Input(shape=input_shape)

    # Discriminator trainable attribute set to false to train the generator.
    # Discriminator state remains the same while the generator trains. 
    d_model.trainable = False
    g_output = g_model(ip_image)
    d_output = d_model([ip_image, g_output])

    GAN_model = Model(ip_image, [d_output, g_output])
    # Defining loss function
    # loss = Adversarial_loss + lambda * L1_loss
    loss = ['binary_crossentropy', 'mae']
    # weighting the loss contributions of the different model outputs
    loss_weights = [1, 100]

    # optimizer parameters specified in  3.3.
    optimizer = Adam(lr=0.0002, beta_1=0.5, beta_2=0.999)
    GAN_model.compile(loss=loss, optimizer=optimizer, loss_weights=loss_weights)

    return GAN_model


In [None]:
g_model = generator()
d_model = discriminator()
GAN_model = GAN(g_model, d_model)
GAN_model.summary()

## Utility functions

In [None]:
def generate_random_training_samples(data, n_samples, n_patch):

    # data : input dataset in .npz format
    # n_samples : number of samples required
    # n_patch : output feature map size (16*16 in our case)

    # this function generates a batch of random samples and returns source images and target

    train_A, train_B = data
    n = randint(0, train_A.shape[0], n_samples)
    X1, X2 = train_A[n], train_B[n]

    # generate the target array of ones
    y = ones((n_samples, n_patch, n_patch, 1))

    return [X1, X2], y

def generate_fake_samples(generator_model, samples, n_patch):
   
    # generator_model : input the generator model
    # input sample for prediction
    # n_patch : output feature map size (16*16 in our case)
    
    #  generates a batch of fake images through the generator model and the associated target

    print(samples.shape)
    X = generator_model.predict(samples)

    # generate the target array of zeros
    y = zeros((len(X), n_patch, n_patch, 1))
    return X, y

def load_real_samples(filename):

    # filename : input .npz filename

    # function loads and preprocesses image array before training
    
    # load compressed numpy arrays (.npz)
    data = load(filename)
    # unpack arrays
    X1, X2 = data['arr_0'], data['arr_1']
    
    # scale from [0,255] to [-1,1]
    X1 = (X1 - 127.5) / 127.5
    X2 = (X2 - 127.5) / 127.5

    return [X1, X2]

def save_model(step, g_model, d_model, gan_model, model_dest):

    # step : step at which model is being saved
    # g_model, d_model, gan_model : models
    # model_dest : destination to save the models

    # function saves the models at the given step for further training later

    filename1 = model_dest + ('model_g_%06d.h5' % (step+1))
    g_model.save(filename1)
 
    filename2 = model_dest + ('model_d_%06d.h5' % (step+1))
    d_model.save(filename2)
 
    filename3 = model_dest + ('model_gan_%06d.h5' % (step+1))
    gan_model.save(filename3)
    print('Models successfully saved at step: %d' % (step))

# Training

In [None]:
def train(discriminator_model, generator_model, gan_model, data, model_dest, n_epochs=200, n_batch=1, n_patch=16, random_jitter=False, current_step=0):

    # discriminator_model : input discriminator model
    # generator_model : input generator model
    # gan_model : input composite gan model
    # data : input dataset as an array of images
    # model_dest : destination for saving model
    # n_epochs : number of epochs
    # n_batch : batch size
    # n_patch : output feature map size
    # random_jitter : boolean value that determines whether to apply random jitter to an image before training
    # current_step : in case of resuming training from a checkpoint, current_step indicates the point from where to restart the training

    train_A, train_B = data

    # calculating total number of steps required in training
    batches_per_epoch = int((len(train_A)) / n_batch)
    n_steps = batches_per_epoch*n_epochs

    print(n_steps, batches_per_epoch)

    # Looping over all the steps
    for i in range(current_step, n_steps):

        # Get a batch of real images
        [X_real_A, X_real_B], y_real = generate_random_training_samples(data, n_batch, n_patch)

        # Adding random jitter
        if random_jitter==True:

            # Upsample input images from 256*256 to 286*286
            input_image = tf.image.resize(X_real_A, [286, 286], method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
            real_image = tf.image.resize(X_real_B, [286, 286], method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

            stacked_image = tf.stack([input_image, real_image], axis=0)

            # Randomly crop the images back to 256*256
            cropped_image = tf.image.random_crop( stacked_image, size=[2, 1, 256, 256, 3])

            X_real_A, X_real_B = cropped_image[0], cropped_image[1]
            
            # convert from tensor to numpy
            X_real_A = keras.backend.eval(X_real_A)
            X_real_B = keras.backend.eval(X_real_B)

        # Generate a batch of fake images
        X_fake, y_fake = generate_fake_samples(generator_model, X_real_A, n_patch)

        # Calculate the discriminator losses
        discriminator_loss_real = discriminator_model.train_on_batch([X_real_A, X_real_B], y_real)
        discriminator_loss_generated = discriminator_model.train_on_batch([X_real_A, X_fake], y_fake)

        # Calculate the generator loss
        generator_loss, a, b = gan_model.train_on_batch(X_real_A, [y_real, X_real_B])

        print('%d, d1[%.3f] d2[%.3f] g[%.3f]' % (i+1, discriminator_loss_real, discriminator_loss_generated, generator_loss))

        # Save model state every 10 epochs
        if (i+1) % (batches_per_epoch * 10) == 0:
            save_model(i, generator_model, discriminator_model, gan_model, model_dest)

In [None]:
def start_training(dataset_url, model_dest):

    # dataset_url : path to compressed dataset
    # model_dest : destination path to save models

    # this function loads the dataset and starts the training
    
    train_generator = load_real_samples(dataset_url)
    print('Dataset Loaded', train_generator[0].shape, train_generator[1].shape)
    # define input shape based on the loaded dataset
    image_shape = train_generator[0].shape[1:]
    # define the models
    d_model = discriminator(image_shape)
    g_model = generator(image_shape)
    # define the composite GAN model
    gan_model = GAN(g_model, d_model, image_shape)
    # train model
    train(d_model, g_model, gan_model, train_generator, model_dest) 

def resume_training(step, dataset_url, d_model_src, g_model_src, gan_model_src, model_dest):

    # step : step from which training has to be resumed
    # d_model_src, g_model_src, gan_model_src : path where models have been saved

    # this function resumes the training from the mentioned step and the already saved models

    d_model = load_model(d_model_src)
    g_model = load_model(g_model_src)
    gan_model = load_model(gan_model_src)

    dataset = load_real_samples(dataset_url)
    train(d_model, g_model, gan_model, dataset, model_dest, current_step=step)

In [None]:
# USAGE

# Call to start training
dataset_url = './assets/datasets/maps/compressed/maps_256.npz'
model_dest = './models/maps/'
start_training(dataset_url, model_dest)

# Call to resume training from step 153400 ( 140 epochs on the maps dataset)
step = 153440
d_model_src = model_dest + 'model_d_' + str(step) + '.h5'
g_model_src = model_dest + 'model_g_' + str(step) + '.h5'
gan_model_src = model_dest + 'model_gan_' + str(step) + '.h5'
resume_training(step, dataset_url, d_model_src, g_model_src, gan_model_src, model_dest)

# Results

In [5]:
def plot_images(src_img, gen_img, tar_img, dest):

    # Plots all the input images in the destination specified by dest

    images = vstack((src_img, gen_img, tar_img))
    # scaling from [-1,1] to [0,1]
    images = (images + 1) / 2.0
    titles = ['Source', 'Generated', 'Expected']

    fig = plt.figure(figsize=(10, 20))

    for i in range(len(images)):
        plt.subplot(1, 3, 1 + i)
        plt.axis('off')
        plt.imshow(images[i])
        plt.title(titles[i])
    
    print('Figure saved successfully.')
    plt.savefig(dest)
 
def generate_prediction(models, dataset, dest):

    # models : input an array of models for prediction
    # dataset : path to compressed dataset (.npz format)
    # dest : destination path to store the plot

    [X1, X2] = dataset
    # selects a random image from the dataset
    ix = randint(0, len(X1), 1)

    dest = dest + 'train_' + str(ix) + '.jpg'

    # generate an image for every model
    for model in models:
        src_image, tar_image = X1[ix], X2[ix]
        gen_image = model.predict(src_image)
        plot_images(src_image, gen_image, tar_image, dest)

def prediction(model, img_url, size=(256, 512)):

    # model : accepts a model
    # img_url : input path to image
    # size : size of input image

    # load image and convert to numpy array
    pix = load_img(img_url, target_size=size)
    pix = img_to_array(pix)

    # split the image into source image and target image 
    s_img, t_img = pix[:, :256], pix[:, 256:]
    # scale from [0,255] to [-1,1]
    s = (s_img-127.5) / 127.5
    s = expand_dims(s, 0)

    # generate an image
    gen = model.predict(s)
    gen = (gen + 1) / 2
    
    # returns generated image, target image
    return gen[0], t_img

def load_and_plot(img_url, models, dest, size=(256, 512)):

    # img_url : path to image
    # models : an array of models for image generation
    # dest : destination path to save images
    # size : size of input images

    # Function used for comparison of images generated by models trained for different number of epochs

    # load and convert the image into numpy array
    pix = load_img(img_url, target_size=size)
    pix = img_to_array(pix)
    
    s_img, t_img = pix[:, :256], pix[:, 256:]
    s = (s_img-127.5) / 127.5
    s = expand_dims(sat, 0)

    fig = plt.figure(figsize=(10, 20))

    for i in range(len(models)):
        gen = models[i].predict(s)
        gen = (gen + 1) / 2

        plt.subplot(len(models), 3, i*3 + 1)
        plt.title('Source')
        plt.axis('off')
        plt.imshow(s_img.astype('uint8'))
        plt.subplot(len(models), 3, i*3 + 2)
        plt.title('Generated')
        plt.imshow(gen[0])
        plt.axis('off')
        plt.subplot(len(models), 3, i*3 + 3)
        plt.title('Expected')
        plt.axis('off')
        plt.imshow(t_img.astype('uint8'))

    print('Figure saved successfully at destination: %s' % dest)

    plt.savefig(dest)

In [None]:
# USAGE

# Test on maps dataset
model_1 = load_model('./models/maps/model_g_153440.h5')
model_2 = load_model('./models/maps/model_g_021920.h5')

for i in range(10):
    generate_random_map_image = randint(1, 1099)
    img_url = './assets/datasets/maps/val/' + str(generate_random_map_image) + '.jpg'
    dest_val = './assets/plots/maps/maps_' + str(generate_random_map_image) + '_val.jpg'

    load_and_plot(img_url, models=[model_2, model_1], dest=dest_val)

# To test on cityscapes dataset
dest_dir = './assets/plots/maps/'
dataset = load_real_samples('./assets/datasets/maps/compressed/maps_256.npz')
generate_prediction(models=[model], dataset=dataset, dest=dest_dir)

model_1 = load_model('./models/cityscapes/model_g_059500.h5')
model_2 = load_model('./models/cityscapes/model_g_297500.h5')

for i in range(10):
    generate_random_map_image = randint(1, 501)
    img_url = './assets/datasets/cityscapes/val/' + str(generate_random_map_image) + '.jpg'
    dest_val_1 = './assets/plots/cityscapes/cityscapes_' + str(generate_random_map_image) + '_val.jpg'

    load_and_plot(img_url, models=[model_1, model_2], dest=dest_val_1)

# To test on trained images
dest_dir = './assets/plots/cityscapes/'
dataset = load_real_samples('./assets/datasets/cityscapes/compressed/cityscapes_256.npz')
generate_prediction(models=[model], dataset=dataset, dest=dest_dir)
