In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns 
%matplotlib inline
import tensorflow  as tf
from tensorflow import keras
#import cupy as cp

from sklearn.preprocessing import normalize

In [None]:
from __future__ import print_function, division
import tensorflow as tf

from keras.datasets import mnist
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, multiply, Concatenate
from keras.layers import BatchNormalization, Activation, Embedding, ZeroPadding2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam, SGD
from keras.preprocessing.image import img_to_array

import matplotlib.pyplot as plt

import numpy as np

import sys
import os
import cv2
from google.colab.patches import cv2_imshow
from PIL import Image
from sklearn.preprocessing import LabelBinarizer
from skimage import filters

import imutils
from sklearn.utils import shuffle

# load, split and scale the maps dataset ready for training
from os import listdir
from numpy import asarray
from numpy import vstack
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
from numpy import savez_compressed

from numpy import load
from numpy import zeros
from numpy import ones
from numpy.random import randint
from keras.optimizers import Adam, SGD
from keras.initializers import RandomNormal
from keras.models import Model
from keras.models import Input
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Concatenate
from keras.layers import Dropout
from keras.layers import BatchNormalization
from keras.layers import LeakyReLU
from matplotlib import pyplot
from keras.losses import BinaryCrossentropy, binary_crossentropy

In [None]:
# define the discriminator model
def define_discriminator(image_shape_d=(256,256,1)):
	# weight initialization
	init = RandomNormal(stddev=0.02)
  #z_src = Input(shape=image_shape)
	# source image input
	in_src_image = Input(shape=image_shape)
  #in_merged_src_z = Concatenate()(in_src_image, z_src)
	# target image input
	in_target_image = Input(shape=image_shape_d)
	# concatenate images channel-wise
	merged = Concatenate()([in_src_image, in_target_image])
	# C64
	d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(merged)
	d = LeakyReLU(alpha=0.2)(d)
	# C128
	d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization(momentum=0.8)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C256
	d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization(momentum=0.8)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C512
	d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization(momentum=0.8)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# second last output layer
	#d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
	#d = BatchNormalization(momentum=0.8)(d)
	#d = LeakyReLU(alpha=0.2)(d)
	# patch output
	d = Conv2D(1, (4,4), strides=1, padding='same', kernel_initializer=init)(d)
	#patch_out = Activation('sigmoid')(d)
	#print("Patch out - shape", d.shape)
	# define model
	model = Model([in_src_image, in_target_image], d)
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss='mse', optimizer=opt, loss_weights=[0.5])
	#print("Discriminator model - shape", model)
	return model

In [None]:
# define an encoder block
def define_encoder_block(layer_in, n_filters, batchnorm=True): 
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# add downsampling layer
	g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
	# conditionally add batch normalization
	if batchnorm:
		g = BatchNormalization(momentum=0.8)(g, training=True)
	# leaky relu activation
	g = LeakyReLU(alpha=0.2)(g)
	return g

# define a decoder block
def decoder_block(layer_in, skip_in, n_filters, dropout=True):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# add upsampling layer
	g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in) 
	# add batch normalization
	g = BatchNormalization(momentum=0.8)(g, training=True)
	# conditionally add dropout
	if dropout:
		g = Dropout(0.5)(g, training=True)
	# merge with skip connection
	g = Concatenate()([g, skip_in])
	# relu activation
	g = Activation('relu')(g) #LeakyReLU(alpha=0.2)(g)
	return g

In [None]:
# define the standalone generator model
def define_generator(image_shape=(256,256,1)):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# image input
	in_image = Input(shape=image_shape)
	#print('Generator encoder in_image: ', in_image.shape)
	# encoder model
	e1 = define_encoder_block(in_image, 64, batchnorm=False)
	e2 = define_encoder_block(e1, 128)
	e3 = define_encoder_block(e2, 256)
	e4 = define_encoder_block(e3, 512)
	e5 = define_encoder_block(e4, 512)
	e6 = define_encoder_block(e5, 512)
	e7 = define_encoder_block(e6, 512)
	# bottleneck, no batch norm and relu
	b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(e7)
	b = Activation('relu')(b) #LeakyReLU(alpha=0.2)(b)

	# decoder model
	d1 = decoder_block(b, e7, 512) #
	d2 = decoder_block(d1, e6, 512) #512
	d3 = decoder_block(d2, e5, 512) #512
	d4 = decoder_block(d3, e4, 512, dropout=False) #512
	d5 = decoder_block(d4, e3, 256, dropout=False) 
	d6 = decoder_block(d5, e2, 128, dropout=False) #128
	d7 = decoder_block(d6, e1, 64, dropout=False) #64
	# output
	g = Conv2DTranspose(1, (4,4), padding='same', strides=(2,2), kernel_initializer=init)(d7) 
	out_image = Activation('tanh')(g)
	#print("Generator out_image - shape", out_image.shape)
	# define model
	model = Model(in_image, out_image)
	return model

In [None]:
# define the combined generator and discriminator model, for updating the generator
def define_gan(g_model, d_model, image_shape):
	# make weights in the discriminator not trainable
	d_model.trainable = False
	# define the source image
	in_src = Input(shape=image_shape)
	#print("define_gan source image - shape", in_src.shape)
	# connect the source image to the generator input
	gen_out = g_model(in_src)
	#print("define_gan gen_out - shape", gen_out.shape)
	# connect the source input and generator output to the discriminator input
	dis_out = d_model([in_src, gen_out])
	# src image as input, generated image and classification output
	model = Model(in_src, [dis_out, gen_out])
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5) #, beta_2=0.999, epsilon=1e-08)
	model.compile(loss=['mse', 'mae'], optimizer=opt, loss_weights=[1,100]) #loss=['binary_crossentropy', 'mae'] #mse BinaryCrossentropy(from_logits=True)
	#model.summary()
	#d_model.trainable = True
	return model

In [None]:
# load and prepare training images
def load_real_samples(filename):
	# load compressed arrays
	data = load(filename)
	# unpack arrays
	X1, X2 = data['arr_0'], data['arr_1']
	# scale from [0,255] to [-1,1]
	X1 = (X1 / 127.5) - 1.
	X2 = (X2 / 127.5) - 1.
	#print(np.unique(X1))
	#print(np.unique(X2))
	return [X1, X2]

In [None]:
# select a batch of random samples, returns images and target
def generate_real_samples(dataset, n_samples, patch_shape):
	# unpack dataset
	trainA, trainB = dataset
	#print("trainA - shape", trainA.shape)
	#print("trainB - shape", trainB.shape)
	# choose random instances
	ix = randint(0, trainA.shape[0], n_samples)
	# retrieve selected images
	X1, X2 = trainA[ix], trainB[ix]
	# generate 'real' class labels (1)
	y = ones((n_samples, patch_shape, patch_shape, 1))
	#print('real class labels: ', y)
	return [X1, X2], y

In [None]:
# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, samples, patch_shape):
	# generate fake instance
	X = g_model.predict(samples)
	# create 'fake' class labels (0)
	y = zeros((len(X), patch_shape, patch_shape, 1))
	#print('fake class labels: ', y)
	return X, y

In [None]:
# generate samples and save as a plot and save the model
def summarize_performance(step, g_model, dataset, n_samples=3):
	# select a sample of input images
	[X_realA, X_realB], _ = generate_real_samples(dataset, n_samples, 1)
	# generate a batch of fake samples
	X_fakeB, _ = generate_fake_samples(g_model, X_realA, 1)
	# scale all pixels from [-1,1] to [0,1]
	X_realA = (X_realA + 1) / 2.0
	X_realB = (X_realB + 1) / 2.0
	X_fakeB = (X_fakeB + 1) / 2.0
	titles = ['Source', 'Generated', 'Expected']
	# plot real source images
	fig = plt.figure(figsize=(15,15))
	for i in range(n_samples):
		pyplot.subplot(3, n_samples, 1 + i)
		pyplot.axis('off')
		pyplot.imshow(X_realA[i,:,:,0], cmap='gray')
		# show title
		pyplot.title(titles[0])
	# plot generated target image
	for i in range(n_samples):
		pyplot.subplot(3, n_samples, 1 + n_samples + i)
		pyplot.axis('off')
		pyplot.imshow(X_fakeB[i,:,:,0], cmap='gray')
		# show title
		pyplot.title(titles[1])
	# plot real target image
	for i in range(n_samples):
		pyplot.subplot(3, n_samples, 1 + n_samples*2 + i)
		pyplot.axis('off')
		pyplot.imshow(X_realB[i,:,:,0], cmap='gray')
		# show title
		pyplot.title(titles[2])
	# save plot to file
	filename1 = '/content/drive/My Drive/lidar/output/d_bce_sigmoid_2_layers_U-Net_mydata_plot_%06d.png' % (step+1)
	pyplot.savefig(filename1)
	pyplot.close()
	# save the generator model
	filename2 = '/content/drive/My Drive/lidar/model/d_bce_sigmoid_2_layers_U-Net_model_mydata_%06d.h5' % (step+1)
	g_model.save(filename2)
	print('>Saved: %s and %s' % (filename1, filename2))

In [None]:
# create a line plot of loss for the gan and save to file
def plot_history_real_fake(d1_hist, d2_hist, g_hist, a1_hist, a2_hist):
  fig = plt.figure(figsize=(15,15))
  ax = fig.add_subplot(2,1,1)
  ax.plot(d1_hist, label='Discriminator Loss - Real')
  ax.plot(d2_hist, label='Discriminator Loss - Fake')
  ax.plot(g_hist, label='Generator Loss')
  # ax.yticks(np.arange(0,101,step=5))
  ax.legend()
  #ax.set_xscale('log')
  #ax.set_yscale('log')

  ax.set_xlabel('Epochs')
  ax.set_ylabel('Loss')

  ax2 = fig.add_subplot(2,1,2)
  ax2.plot(a1_hist, label='Accuracy of Discriminator Loss - Real')
  ax2.plot(a2_hist, label='Accuracy of Discriminator Loss - Fake')
  ax2.set_xlabel('Accuracy')
  ax2.set_ylabel('Loss')
  ax2.legend()
  fig.savefig('/content/drive/My Drive/lidar/plot/d_bce_sigmoid_2_layers_U-Net_plot_line_plot_loss_real_fake.png')


In [None]:
# create a line plot of loss for the gan and save to file
def plot_history_real_fake_log(d1_hist, d2_hist, g_hist, a1_hist, a2_hist):
  fig = plt.figure(figsize=(15,15))
  ax = fig.add_subplot(2,1,1)
  ax.plot(d1_hist, label='Discriminator Loss - Real')
  ax.plot(d2_hist, label='Discriminator Loss - Fake')
  ax.plot(g_hist, label='Generator Loss')
  # ax.yticks(np.arange(0,101,step=5))
  ax.legend()
  ax.set_xscale('log')
  ax.set_yscale('log')

  ax.set_xlabel('Epochs')
  ax.set_ylabel('Loss')

  ax2 = fig.add_subplot(2,1,2)
  ax2.plot(a1_hist, label='Accuracy of Discriminator Loss - Real')
  ax2.plot(a2_hist, label='Accuracy of Discriminator Loss - Fake')
  ax2.set_xlabel('Accuracy')
  ax2.set_ylabel('Loss')
  ax2.legend()
  fig.savefig('/content/drive/My Drive/lidar/plot/d_bce_sigmoid_2_layers_U-Net_plot_line_plot_loss_real_fake_log.png')


In [None]:
# create a line plot of loss for the gan and save to file
def plot_history(d_hist, g_hist, a1_hist, a2_hist): #d2_hist
  fig = plt.figure(figsize=(15,15))
  ax = fig.add_subplot(2,1,1)
  ax.plot(d_hist, label='Discriminator Loss') #d-real
  #x.plot(d2_hist, label='d-fake')
  ax.plot(g_hist, label='Generator Loss')
  # ax.yticks(np.arange(0,101,step=5))
  ax.legend()
  ax.set_xscale('log')
  ax.set_yscale('log')
  ax.set_xlabel('Epochs')
  ax.set_ylabel('Loss')
  ax2 = fig.add_subplot(2,1,2)
  ax2.plot(a1_hist, label='Accuracy of Discriminator Loss - Real')
  ax2.plot(a2_hist, label='Accuracy of Discriminator Loss - Fake')
  ax2.set_xlabel('Accuracy')
  ax2.set_ylabel('Loss')
  ax2.legend()
  fig.savefig('/content/drive/My Drive/lidar/plot/d_bce_sigmoid_2_layers_U-Net_plot_line_plot_loss.png')

 	# plot loss
	# pyplot.subplot(2, 1, 1) #
	# pyplot.plot(d1_hist, label='d-real')
	# pyplot.plot(d2_hist, label='d-fake')
	# pyplot.plot(g_hist, label='gen')
	# pyplot.yticks(np.arange(0,101,step=5))
	# pyplot.legend()
	# plot discriminator accuracy
	# pyplot.subplot(2, 1, 2)
	# pyplot.plot(a1_hist, label='acc-real')
	# pyplot.plot(a2_hist, label='acc-fake')
	#pyplot.yticks(np.arange(0,401,step=25))
	# pyplot.legend()
	# save plot to file
	# pyplot.savefig('/content/drive/My Drive/1.experiment/plot_line_plot_loss.png')
	# pyplot.close()

In [None]:
from mpl_toolkits.axes_grid1 import host_subplot
import mpl_toolkits.axisartist as AA
import matplotlib.pyplot as plt

def plot_history_exp(d1_hist, d2_hist, g_hist, a1_hist, a2_hist): 
  fig = plt.figure(figsize=(10,10))
  host = fig.add_subplot(111)

  par1 = host.twinx()
  par2 = host.twinx()

  # host.set_xlim(0, 2)
  # host.set_ylim(0, 2)
  # par1.set_ylim(0, 4)
  # par2.set_ylim(1, 65)

  host.set_xlabel("Epochs")
  host.set_ylabel("Generator Loss")
  par1.set_ylabel("Discriminator Loss - Real")
  par2.set_ylabel("Discriminator Loss - Fake")

  color1 = plt.cm.viridis(0)
  color2 = plt.cm.viridis(0.5)
  color3 = plt.cm.viridis(.9)

  p1, = host.plot(g_hist, color=color1,label="Generator Loss")
  p2, = par1.plot(d1_hist, color=color2, label="Discriminator Loss - Real")
  p3, = par2.plot(d2_hist, color=color3, label="Discriminator Loss - Fake")

  lns = [p1, p2, p3] 
  host.legend(handles=lns, loc='best')

  # right, left, top, bottom
  par2.spines['right'].set_position(('outward', 60))      
  # no x-ticks                 
  par2.xaxis.set_ticks([])
  # Sometimes handy, same for xaxis
  par2.yaxis.set_ticks_position('right')

  host.yaxis.label.set_color(p1.get_color())
  par1.yaxis.label.set_color(p2.get_color())
  par2.yaxis.label.set_color(p3.get_color())
  fig.savefig('/content/drive/My Drive/lidar/plot/d_bce_sigmoid_2_layers_U-Net_plot2_line_plot_loss.png')
  plt.show()

In [None]:
# train pix2pix models
idx ,d_loss1_array, d_loss2_array, d_loss_array, g_loss_array, a1_array, a2_array = [], [], [], [], [], [], []
def train(d_model, g_model, gan_model, dataset, n_epochs=100, n_batch=1):
  d_hist, d1_hist, d2_hist, g_hist, a1_hist, a2_hist = list(), list(), list(), list(), list(), list()
  # determine the output square shape of the discriminator
  n_patch = d_model.output_shape[1]
  #print("n_patch", n_patch)
  # unpack dataset
  trainA, trainB = dataset
  # calculate the number of batches per training epoch
  bat_per_epo = int(len(trainA) / n_batch)
  print("the number of batches per training epoch: ", bat_per_epo)
  # calculate the number of training iterations
  n_steps = bat_per_epo * n_epochs
  print("the number of training iterations: ", n_steps)

  #oldStdout = sys.stdout
  #file = open('/content/drive/My Drive/1.experiment/log/log_mydata_U-Net.txt', 'w')
  #sys.stdout = file
  # manually enumerate epochs
  for i in range(n_steps):
  #	# select a batch of real samples
    [X_realA, X_realB], y_real = generate_real_samples(dataset, n_batch, n_patch)
    #print("X_realA shape: ", X_realA.shape)
    #print("X_realB shape: ", X_realB.shape)
    #print("y_real shape: ", y_real.shape)
    # generate a batch of fake samples
    X_fakeB, y_fake = generate_fake_samples(g_model, X_realA, n_patch)
    #print("X_fakeB shape: ", X_fakeB.shape)
    #print("y_fake shape: ", y_fake.shape)
    # update discriminator for real samples
    d_loss1 = d_model.train_on_batch([X_realA, X_realB], y_real)
    # update discriminator for generated samples
    d_loss2 = d_model.train_on_batch([X_realA, X_fakeB], y_fake)
    d_loss = 0.5 * np.add(d_loss1, d_loss2)
    # update the generator
    g_loss, _, _ = gan_model.train_on_batch(X_realA, [y_real, X_realB])
    # summarize performance
    idx.append(i+1)
    d_loss1_array.append(d_loss1)
    d_loss2_array.append(d_loss2)
    d_loss_array.append(d_loss)
    g_loss_array.append(g_loss)
    a1_array.append(int(100*d_loss1))
    a2_array.append(int(100*d_loss2))
    print('>%d, d_real[%.3f] d_fake[%.3f] g[%.3f], a1=%d, a2=%d' % (i+1, d_loss1, d_loss2, g_loss, int(100*d_loss1), int(100*d_loss2)))
    #sys.stdout = oldStdout	
    # record history
    d_hist.append(d_loss)
    d1_hist.append(d_loss1)
    d2_hist.append(d_loss2)
    g_hist.append(g_loss)
    a1_hist.append(int(100*d_loss1))
    a2_hist.append(int(100*d_loss2))
    # summarize model performance
    ##if (i) % (500) == 0:
    if (i+1) % (bat_per_epo*10) == 0: #
      summarize_performance(i, g_model, dataset)	
	#---save 
  plot_history(d_hist, g_hist, a1_hist, a2_hist) #d1_hist, d2_hist
  plot_history_real_fake(d1_hist, d2_hist, g_hist, a1_hist, a2_hist)
  plot_history_real_fake_log(d1_hist, d2_hist, g_hist, a1_hist, a2_hist)
  plot_history_exp(d1_hist, d2_hist, g_hist, a1_hist, a2_hist)
	#sys.stdout = oldStdout	
	#file.close()
	#sys.stdout = open('/content/drive/My Drive/1.experiment/training2/log/log2_1st_experiment.txt', "w")

In [None]:
# example of pix2pix gan for satellite to map image-to-image translation
# load image data
dataset = load_real_samples('/content/drive/My Drive/lidar/training_lidar_256.npz')
# print('Loaded', road_dataset[0].shape, condition_dataset[0].shape)
# define input shape based on the loaded dataset
# image shape for generator (256,256,3):
image_shape = dataset[0].shape[1:]
print("Image shape of generator: ", image_shape)
# image shape for discriminator:
image_shape_d = (256,256,1)
#print("Image shape of discriminator: ", image_shape_d)
# define the models
d_model = define_discriminator(image_shape_d)
# summarize the discriminator model
#print("Summary of the discriminator model: ")
d_model.summary()
g_model = define_generator(image_shape)
# define the composite model
gan_model = define_gan(g_model, d_model, image_shape)
#print("gan model", gan_model)
# train model
train(d_model, g_model, gan_model, dataset)


Image shape of generator:  (256, 256, 1)
Model: "functional_55"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_37 (InputLayer)           [(None, 256, 256, 1) 0                                            
__________________________________________________________________________________________________
input_38 (InputLayer)           [(None, 256, 256, 1) 0                                            
__________________________________________________________________________________________________
concatenate_72 (Concatenate)    (None, 256, 256, 2)  0           input_37[0][0]                   
                                                                 input_38[0][0]                   
__________________________________________________________________________________________________
conv2d_106 (Conv2D)             (None, 128, 1

In [None]:
training_data_file = "/content/drive/My Drive/lidar/log/d_bce_sigmoid_2_layers_training_data.txt"
data = np.array([idx, d_loss1_array, d_loss2_array, d_loss_array, g_loss_array, a1_array, a2_array])
data = data.T
with open(training_data_file, 'w+') as file:
  np.savetxt(file, data, fmt=['%d', '%f', '%f', '%f', '%f', '%d', '%d'])

In [None]:
# load the prepared dataset
from numpy import load
from matplotlib import pyplot
# load the dataset
skeletons_satellite_data = load('/content/drive/My Drive/1.experiment/training2_maps_256.npz')
satellite_images, roadbed_images = skeletons_satellite_data['arr_0'], skeletons_satellite_data['arr_1']
print('Loaded: ', satellite_images.shape, roadbed_images.shape)
# plot source images
n_samples = 3
fig = plt.figure(figsize=(15,15))
for i in range(n_samples):
	pyplot.subplot(2, n_samples, 1 + i)
	pyplot.axis('off')
	pyplot.imshow(satellite_images[i].astype('uint8'))
#pyplot.show()
# plot target image
for i in range(n_samples):
	pyplot.subplot(2, n_samples, 1 + n_samples + i)
	pyplot.axis('off')
	pyplot.imshow(roadbed_images[i, :, :, 0].astype('uint8'), cmap='gray')
pyplot.show()

KeyboardInterrupt: ignored