# Pix2Pix for Satellite-to-Map Image Translation - Model Training
Notebook for Pix2Pix Satellite-to-Map Image Translation model training. [Keras](https://keras.io/) is used for model development and training. The implementation follows this [paper](https://arxiv.org/abs/1611.07004) to work with color images of size 256x256 pixels.

Import all the necessary Python dependencies for model development and training.

In [None]:
from numpy import load
from numpy import zeros
from numpy import ones
from numpy.random import randint
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.models import Model
from tensorflow.keras import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import LeakyReLU
from matplotlib import pyplot

### Model Development

Define the Discriminator. It is a CNN that does image classification. It takes as input both source (satellite image) and target (Google Maps image) and predicts the likehood of whether the target is real or a fake translation of the source.

In [None]:
# define the discriminator model
def define_discriminator(image_shape):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# source image input
	in_src_image = Input(shape=image_shape)
	# target image input
	in_target_image = Input(shape=image_shape)
	# concatenate images channel-wise
	merged = Concatenate()([in_src_image, in_target_image])
	# C64
	d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(merged)
	d = LeakyReLU(alpha=0.2)(d)
	# C128
	d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C256
	d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C512
	d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# second last output layer
	d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# patch output
	d = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
	patch_out = Activation('sigmoid')(d)
	# define model
	model = Model([in_src_image, in_target_image], patch_out)
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights=[0.5])
	return model

The Generator is an encoder-decoder model based on a [U-Net](https://en.wikipedia.org/wiki/U-Net) architecture. It takes a satellite photo as source and generates a Google Maps image as target. 

Define the encoder block.

In [None]:
def define_encoder_block(layer_in, n_filters, batchnorm=True):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# add downsampling layer
	g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
	# conditionally add batch normalization
	if batchnorm:
		g = BatchNormalization()(g, training=True)
	# leaky relu activation
	g = LeakyReLU(alpha=0.2)(g)
	return g

Define the decoder block.

In [None]:
def decoder_block(layer_in, skip_in, n_filters, dropout=True):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# add upsampling layer
	g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
	# add batch normalization
	g = BatchNormalization()(g, training=True)
	# conditionally add dropout
	if dropout:
		g = Dropout(0.5)(g, training=True)
	# merge with skip connection
	g = Concatenate()([g, skip_in])
	# relu activation
	g = Activation('relu')(g)
	return g

Define the generator.

In [None]:
def define_generator(image_shape=(256,256,3)):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# image input
	in_image = Input(shape=image_shape)
	# encoder model
	e1 = define_encoder_block(in_image, 64, batchnorm=False)
	e2 = define_encoder_block(e1, 128)
	e3 = define_encoder_block(e2, 256)
	e4 = define_encoder_block(e3, 512)
	e5 = define_encoder_block(e4, 512)
	e6 = define_encoder_block(e5, 512)
	e7 = define_encoder_block(e6, 512)
	# bottleneck, no batch norm and relu
	b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(e7)
	b = Activation('relu')(b)
	# decoder model
	d1 = decoder_block(b, e7, 512)
	d2 = decoder_block(d1, e6, 512)
	d3 = decoder_block(d2, e5, 512)
	d4 = decoder_block(d3, e4, 512, dropout=False)
	d5 = decoder_block(d4, e3, 256, dropout=False)
	d6 = decoder_block(d5, e2, 128, dropout=False)
	d7 = decoder_block(d6, e1, 64, dropout=False)
	# output
	g = Conv2DTranspose(3, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d7)
	out_image = Activation('tanh')(g)
	# define model
	model = Model(in_image, out_image)
	return model

Define the combined model (Generator + Discriminator).

In [None]:
def define_gan(g_model, d_model, image_shape):
	# make weights in the discriminator not trainable
	for layer in d_model.layers:
		if not isinstance(layer, BatchNormalization):
			layer.trainable = False
	# define the source image
	in_src = Input(shape=image_shape)
	# connect the source image to the generator input
	gen_out = g_model(in_src)
	# connect the source input and generator output to the discriminator input
	dis_out = d_model([in_src, gen_out])
	# src image as input, generated image and classification output
	model = Model(in_src, [dis_out, gen_out])
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss=['binary_crossentropy', 'mae'], optimizer=opt, loss_weights=[1,100])
	return model

Define a function to load the dataset preliminary saved in compressed Numpy array format. It will retun a list of two Numpy arrays, one for the source images and the second for the corresponding target images.

In [None]:
def load_real_samples(filename):
	# load compressed arrays
	data = load(filename)
	# unpack arrays
	X1, X2 = data['arr_0'], data['arr_1']
	# scale from [0,255] to [-1,1]
	X1 = (X1 - 127.5) / 127.5
	X2 = (X2 - 127.5) / 127.5
	return [X1, X2]

Define a function to prepare a batch of random pairs of images from the training dataset and the corresponding discriminator label of class = 1 to indicate that they are real images.

In [None]:
def generate_real_samples(dataset, n_samples, patch_shape):
	# unpack dataset
	trainA, trainB = dataset
	# choose random instances
	ix = randint(0, trainA.shape[0], n_samples)
	# retrieve selected images
	X1, X2 = trainA[ix], trainB[ix]
	# generate 'real' class labels (1)
	y = ones((n_samples, patch_shape, patch_shape, 1))
	return [X1, X2], y

Define a function to prepare a batch of real images from the training dataset to generate an equivalent batch of target images and the corresponding discriminator label of class = 0 to indicate that they are fake images.

In [None]:
def generate_fake_samples(g_model, samples, patch_shape):
	# generate fake instance
	X = g_model.predict(samples)
	# create 'fake' class labels (0)
	y = zeros((len(X), patch_shape, patch_shape, 1))
	return X, y

GAN models do not converge, but an equilibrium is found between the Generator and Discriminator models. In order then to understand when training should stop, we can save the model and use it to generate sample image-to-image translations periodically during training, such as every 10 training epochs. We can then review the generated images at the end of training and use the image quality to choose a final model.  
Define an helper function for this.

In [None]:
def summarize_performance(step, g_model, dataset, n_samples=3):
	# select a sample of input images
	[X_realA, X_realB], _ = generate_real_samples(dataset, n_samples, 1)
	# generate a batch of fake samples
	X_fakeB, _ = generate_fake_samples(g_model, X_realA, 1)
	# scale all pixels from [-1,1] to [0,1]
	X_realA = (X_realA + 1) / 2.0
	X_realB = (X_realB + 1) / 2.0
	X_fakeB = (X_fakeB + 1) / 2.0
	# plot real source images
	for i in range(n_samples):
		pyplot.subplot(3, n_samples, 1 + i)
		pyplot.axis('off')
		pyplot.imshow(X_realA[i])
	# plot generated target image
	for i in range(n_samples):
		pyplot.subplot(3, n_samples, 1 + n_samples + i)
		pyplot.axis('off')
		pyplot.imshow(X_fakeB[i])
	# plot real target image
	for i in range(n_samples):
		pyplot.subplot(3, n_samples, 1 + n_samples*2 + i)
		pyplot.axis('off')
		pyplot.imshow(X_realB[i])
	# save plot to file
	filename1 = 'plot_%06d.png' % (step+1)
	pyplot.savefig(filename1)
	pyplot.close()
	# save the generator model
	filename2 = 'model_%06d.h5' % (step+1)
	g_model.save(filename2)
	print('>Saved: %s and %s' % (filename1, filename2))

Define a function to train the pix2pix model.

In [None]:
def train(d_model, g_model, gan_model, dataset, n_epochs=100, n_batch=1):
	# determine the output square shape of the discriminator
	n_patch = d_model.output_shape[1]
	# unpack dataset
	trainA, trainB = dataset
	# calculate the number of batches per training epoch
	bat_per_epo = int(len(trainA) / n_batch)
	# calculate the number of training iterations
	n_steps = bat_per_epo * n_epochs
	# manually enumerate epochs
	for i in range(n_steps):
		# select a batch of real samples
		[X_realA, X_realB], y_real = generate_real_samples(dataset, n_batch, n_patch)
		# generate a batch of fake samples
		X_fakeB, y_fake = generate_fake_samples(g_model, X_realA, n_patch)
		# update discriminator for real samples
		d_loss1 = d_model.train_on_batch([X_realA, X_realB], y_real)
		# update discriminator for generated samples
		d_loss2 = d_model.train_on_batch([X_realA, X_fakeB], y_fake)
		# update the generator
		g_loss, _, _ = gan_model.train_on_batch(X_realA, [y_real, X_realB])
		# summarize performance
		print('>%d, d1[%.3f] d2[%.3f] g[%.3f]' % (i+1, d_loss1, d_loss2, g_loss))
		# summarize model performance
		if (i+1) % (bat_per_epo * 10) == 0:
			summarize_performance(i, g_model, dataset)

Mount Google Drive.

In [None]:
from google.colab import drive

drive.mount('/content/gdrive')

Mounted at /content/gdrive


Load the prepared data from Google Drive.

In [None]:
dataset = load_real_samples('/content/gdrive/MyDrive/maps_256.npz')
print('Loaded', dataset[0].shape, dataset[1].shape)

Loaded (1096, 256, 256, 3) (1096, 256, 256, 3)


Define the input shape based on the loaded dataset.

In [None]:
image_shape = dataset[0].shape[1:]

Build the model.

In [None]:
d_model = define_discriminator(image_shape)
g_model = define_generator(image_shape)
gan_model = define_gan(g_model, d_model, image_shape)

  "The `lr` argument is deprecated, use `learning_rate` instead.")


Start the training.

In [None]:
epochs = 10
train(d_model, g_model, gan_model, dataset, epochs)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
>5963, d1[0.190] d2[0.032] g[8.390]
>5964, d1[0.009] d2[0.079] g[16.522]
>5965, d1[0.091] d2[0.041] g[9.427]
>5966, d1[0.012] d2[0.687] g[10.002]
>5967, d1[0.547] d2[0.049] g[9.596]
>5968, d1[0.151] d2[0.060] g[10.166]
>5969, d1[0.062] d2[0.116] g[11.987]
>5970, d1[0.023] d2[0.083] g[11.934]
>5971, d1[0.254] d2[0.260] g[10.167]
>5972, d1[0.009] d2[0.076] g[14.536]
>5973, d1[0.022] d2[0.023] g[8.768]
>5974, d1[0.031] d2[0.126] g[12.421]
>5975, d1[0.017] d2[0.025] g[8.705]
>5976, d1[0.369] d2[0.095] g[7.683]
>5977, d1[0.011] d2[0.445] g[9.895]
>5978, d1[0.229] d2[0.027] g[9.967]
>5979, d1[0.820] d2[0.150] g[6.999]
>5980, d1[0.010] d2[0.329] g[9.433]
>5981, d1[0.009] d2[0.089] g[11.293]
>5982, d1[0.045] d2[0.060] g[11.629]
>5983, d1[0.008] d2[0.033] g[27.446]
>5984, d1[0.024] d2[0.023] g[7.327]
>5985, d1[0.121] d2[0.145] g[8.602]
>5986, d1[0.040] d2[0.076] g[9.426]
>5987, d1[0.149] d2[0.077] g[11.323]
>5988, d1[0.085] d2[0.0

Save the trained model to Google Drive.

In [None]:
import shutil

source = '/content/model_010960.h5'
destination = '/content/gdrive/MyDrive/model_010960.h5'
try:
    shutil.copy(source, destination)
    print("File copied successfully.")
except:
    print("Error occurred while copying file.")

File copied successfully.


Unmount Google Drive before stopping the runtime.

In [None]:
drive.flush_and_unmount()