In [None]:
from google.colab import drive

drive.mount('/content/drive')



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import tensorflow as tf
tf.test.gpu_device_name()


''

In [None]:

from os import listdir
from numpy import asarray
from numpy import vstack
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
from numpy import savez_compressed

In [None]:

def load_images(path, size=(256,512)):
	src_list, tar_list = list(), list()
	# enumerate filenames in directory, assume all are images
	for filename in listdir(path):
		# load and resize the image
		pixels = load_img(path + filename, target_size=size)
		# convert to numpy array
		pixels = img_to_array(pixels)
		# split into satellite and map
		sat_img, map_img = pixels[:, :256], pixels[:, 256:]
		src_list.append(sat_img)
		tar_list.append(map_img)
	return [asarray(src_list), asarray(tar_list)]

In [None]:
 path = 'drive/My Drive/maps/train/'
 [src_images, tar_images] = load_images(path)
 print('Loaded: ', src_images.shape, tar_images.shape)
 filename = 'maps_256.npz'
 savez_compressed(filename, src_images, tar_images)
 print('Saved dataset: ', filename)


Loaded:  (1096, 256, 256, 3) (1096, 256, 256, 3)
Saved dataset:  maps_256.npz


In [None]:
from numpy import load
from matplotlib import pyplot

In [None]:
data = load('maps_256.npz')
src_images, tar_images = data['arr_0'], data['arr_1']
print('Loaded: ', src_images.shape, tar_images.shape)

Loaded:  (1096, 256, 256, 3) (1096, 256, 256, 3)


In [None]:
from keras.initializers import RandomNormal
from keras.models import Model
from keras.models import Input
from keras.layers import Concatenate
from keras.layers import Conv2D
from keras.layers import LeakyReLU
from keras.layers import BatchNormalization
from keras.layers import Activation
from keras.layers import Conv2DTranspose
from keras.layers import Dropout
from tensorflow.keras.optimizers import Adam

In [None]:
def define_discriminator(image_shape):
	
	init = RandomNormal(stddev=0.02)
	in_src_image = Input(shape=image_shape)

	in_target_image = Input(shape=image_shape)

	merged = Concatenate()([in_src_image, in_target_image])
	# C64
	d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(merged)
	d = LeakyReLU(alpha=0.2)(d)
	# C128
	d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C256
	d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C512
	d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# second last output layer
	d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# patch output
	d = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
	patch_out = Activation('sigmoid')(d)

	model = Model([in_src_image, in_target_image], patch_out)
 
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights=[0.5])
	return model

In [None]:
def define_encoder_block(layer_in, n_filters, batchnorm=True):

  init = RandomNormal(stddev=0.02)
  g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
  if batchnorm:
    g = BatchNormalization()(g, training=True)
  g = LeakyReLU(alpha=0.2)(g)
  return g

def decoder_block(layer_in, skip_in, n_filters, dropout=True):
  init = RandomNormal(stddev=0.02)
  g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
  g = BatchNormalization()(g, training=True)
  if dropout:
    g = Dropout(0.5)(g, training=True)
  g = Concatenate()([g, skip_in])
  g = Activation('relu')(g)
  return g

def define_generator(image_shape=(256,256,3)):
  init = RandomNormal(stddev=0.02)
  in_image = Input(shape=image_shape)
  e1 = define_encoder_block(in_image, 64, batchnorm=False)
  e2 = define_encoder_block(e1, 128)
  e3 = define_encoder_block(e2, 256)
  e4 = define_encoder_block(e3, 512)
  e5 = define_encoder_block(e4, 512)
  e6 = define_encoder_block(e5, 512)
  e7 = define_encoder_block(e6, 512)
  b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(e7)
  b = Activation('relu')(b)
  d1 = decoder_block(b, e7, 512)
  d2 = decoder_block(d1, e6, 512)
  d3 = decoder_block(d2, e5, 512)
  d4 = decoder_block(d3, e4, 512, dropout=False)
  d5 = decoder_block(d4, e3, 256, dropout=False)
  d6 = decoder_block(d5, e2, 128, dropout=False)
  d7 = decoder_block(d6, e1, 64, dropout=False)
  g = Conv2DTranspose(3, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d7)
  out_image = Activation('tanh')(g)
  model = Model(in_image, out_image)

  return model

In [None]:
def define_gan(g_model, d_model, image_shape):

	d_model.trainable = False

	in_src = Input(shape=image_shape)

	gen_out = g_model(in_src)
	
	dis_out = d_model([in_src, gen_out])

	model = Model(in_src, [dis_out, gen_out])

	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss=['binary_crossentropy', 'mae'], optimizer=opt, loss_weights=[1,100])
	return model

In [None]:
def load_real_samples(filename):

	data = load(filename)

	X1, X2 = data['arr_0'], data['arr_1']

	X1 = (X1 - 127.5) / 127.5
	X2 = (X2 - 127.5) / 127.5
	return [X1, X2]

In [None]:
from numpy.random import randint
from numpy import ones
def generate_real_samples(dataset, n_samples, patch_shape):

	trainA, trainB = dataset

	ix = randint(0, trainA.shape[0], n_samples)

	X1, X2 = trainA[ix], trainB[ix]

	y = ones((n_samples, patch_shape, patch_shape, 1))
	return [X1, X2], y

In [None]:
from numpy import zeros
def generate_fake_samples(g_model, samples, patch_shape):

	X = g_model.predict(samples)

	y = zeros((len(X), patch_shape, patch_shape, 1))
	return X, y

In [None]:
def summarize_performance(step, g_model, dataset, n_samples=3):

  [X_realA, X_realB], _ = generate_real_samples(dataset, n_samples, 1)

  X_fakeB, _ = generate_fake_samples(g_model, X_realA, 1)

  X_realA = (X_realA + 1) / 2.0
  X_realB = (X_realB + 1) / 2.0
  X_fakeB = (X_fakeB + 1) / 2.0

  for i in range(n_samples):
    pyplot.subplot(3, n_samples, 1 + i)
    pyplot.axis('off')
    pyplot.imshow(X_realA[i])

  for i in range(n_samples):
    pyplot.subplot(3, n_samples, 1 + n_samples + i)
    pyplot.axis('off')
    pyplot.imshow(X_fakeB[i])

  for i in range(n_samples):
    pyplot.subplot(3, n_samples, 1 + n_samples*2 + i)
    pyplot.axis('off')
    pyplot.imshow(X_realB[i])
	# save plot to file
  filename1 = 'drive/My Drive/maps/plot/plot2_%06d.png' % (step+1)
  pyplot.savefig(filename1)
  pyplot.close()
  # save the generator model
  filename2 = 'drive/My Drive/maps/models/model2_%06d.h5' % (step+1)
  g_model.save(filename2)
  print('>Saved: %s and %s' % (filename1, filename2))

In [None]:
# create a line plot of loss for the gan and save to file
def plot_history(d1_hist, d2_hist, g_hist, a1_hist, a2_hist):
	# plot loss
	pyplot.subplot(2, 1, 1)
	pyplot.plot(d1_hist, label='d-real')
	pyplot.plot(d2_hist, label='d-fake')
	pyplot.plot(g_hist, label='gen')
	pyplot.legend()
	# plot discriminator accuracy
	pyplot.subplot(2, 1, 2)
	pyplot.plot(a1_hist, label='acc-real')
	pyplot.plot(a2_hist, label='acc-fake')
	pyplot.legend()
	# save plot to file
	pyplot.savefig('drive/My Drive/maps/plot_line_plot_loss.png')
	pyplot.close()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
arr=[]

def train(d_model, g_model, gan_model, dataset, n_epochs=100, n_batch=1):

  n_patch = d_model.output_shape[1]
  trainA, trainB = dataset
  bat_per_epo = int(len(trainA) / n_batch)
  n_steps = bat_per_epo * n_epochs
  d1_hist, d2_hist, g_hist, a1_hist, a2_hist = list(), list(), list(), list(), list()

  for i in range(n_steps):
    
    [X_realA, X_realB], y_real = generate_real_samples(dataset, n_batch, n_patch)
    X_fakeB, y_fake = generate_fake_samples(g_model, X_realA, n_patch)
    d_loss1 = d_model.train_on_batch([X_realA, X_realB], y_real)
    d_loss2 = d_model.train_on_batch([X_realA, X_fakeB], y_fake)
    _,_,g_loss = gan_model.train_on_batch(X_realA, [y_real, X_realB])
    d1_hist.append(d_loss1)
    d2_hist.append(d_loss2)
    g_hist.append(g_loss)
   
		
    if i%100 == 0:
      
      print('>%d, d1[%.3f] d2[%.3f] g[%.3f]' % (i+1, d_loss1, d_loss2, g_loss))
      
		 

In [None]:
from numpy import load
dataset = load_real_samples('maps_256.npz')
print('Loaded', dataset[0].shape, dataset[1].shape)
image_shape = dataset[0].shape[1:]
d_model = define_discriminator(image_shape)
g_model = define_generator(image_shape)
gan_model = define_gan(g_model, d_model, image_shape)
train(d_model, g_model, gan_model, dataset)

Loaded (1096, 256, 256, 3) (1096, 256, 256, 3)


  super(Adam, self).__init__(name, **kwargs)


>1, d1[0.650] d2[0.543] g[0.755]
>101, d1[0.031] d2[0.073] g[0.116]
>201, d1[0.016] d2[0.007] g[0.090]
>301, d1[0.123] d2[0.022] g[0.071]
>401, d1[0.006] d2[0.022] g[0.079]
>501, d1[0.004] d2[0.009] g[0.195]
>601, d1[0.007] d2[0.005] g[0.086]
>701, d1[0.031] d2[0.003] g[0.096]
>801, d1[0.447] d2[0.038] g[0.079]
>901, d1[0.081] d2[0.004] g[0.104]
>1001, d1[0.007] d2[0.003] g[0.113]
>1101, d1[0.145] d2[0.792] g[0.083]
