In [None]:
from numpy import load
from numpy import zeros
from numpy import ones
from numpy.random import randint
from keras.optimizers import Adam
from keras.initializers import RandomNormal
from keras.models import Model
from keras.models import Input
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Concatenate
from keras.layers import Dropout
from keras.layers import BatchNormalization
from keras.layers import LeakyReLU
from matplotlib import pyplot

## Discriminator model

In [None]:
# define the discriminator model
def define_discriminator(image_shape):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# source image input
	in_src_image = Input(shape=image_shape)
	# target image input
	in_target_image = Input(shape=image_shape)
	# concatenate images channel-wise
	merged = Concatenate()([in_src_image, in_target_image])
	# C64
	d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(merged)
	d = LeakyReLU(alpha=0.2)(d)
	# C128
	d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C256
	d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# # C512
	d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	 # second last output layer
	d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
	d = BatchNormalization()(d)
	d = LeakyReLU(alpha=0.2)(d)
	# patch output
	d = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
	patch_out = Activation('sigmoid')(d)
	# define model
	model = Model([in_src_image, in_target_image], patch_out)
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights=[0.5])
	return model

## Encoder-decoder



### Encoder

In [None]:
def define_encoder_block(layer_in, n_filters, batchnorm=True):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# add downsampling layer
	g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
	# conditionally add batch normalization
	if batchnorm:
		g = BatchNormalization()(g, training=True)
	# leaky relu activation
	g = LeakyReLU(alpha=0.2)(g)
	return g

### Decoder

In [None]:
# define a decoder block
def decoder_block(layer_in, skip_in, n_filters, dropout=True):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# add upsampling layer
	g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
	# add batch normalization
	g = BatchNormalization()(g, training=True)
	# conditionally add dropout
	if dropout:
		g = Dropout(0.5)(g, training=True)
	# merge with skip connection
	g = Concatenate()([g, skip_in])
	# relu activation
	g = Activation('relu')(g)
	return g

## Generator

In [None]:
# define the standalone generator model
def define_generator(image_shape=(96,96,3)):
	# weight initialization
  init = RandomNormal(stddev=0.02)
	# image input
  in_image = Input(shape=image_shape)
	# encoder model
  e1 = define_encoder_block(in_image, 64, batchnorm=False)
  e2 = define_encoder_block(e1, 128)
  e3 = define_encoder_block(e2, 256)
  #e4 = define_encoder_block(e3, 512)
  #e5 = define_encoder_block(e4, 512)
  #e6 = define_encoder_block(e5, 512)
  #e7 = define_encoder_block(e6, 512)
  # bottleneck, no batch norm and relu
  b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(e3)
  b = Activation('relu')(b)
  # decoder model
  #d1 = decoder_block(b, e7, 512)
  #d2 = decoder_block(d1, e6, 512)
  #d3 = decoder_block(b, e5, 512)
  #d4 = decoder_block(d3, e4, 512, dropout=False)
  d5 = decoder_block(b, e3, 256, dropout=False)
  d6 = decoder_block(d5, e2, 128, dropout=False)
  d7 = decoder_block(d6, e1, 64, dropout=False)
  # output
  g = Conv2DTranspose(3, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(d7)
  out_image = Activation('tanh')(g)
  # define model
  model = Model(in_image, out_image)
  return model

## Combined GAN

In [None]:
def define_gan(g_model, d_model, image_shape):
	# make weights in the discriminator not trainable
	for layer in d_model.layers:
		if not isinstance(layer, BatchNormalization):
			layer.trainable = False
	# define the source image
	in_src = Input(shape=image_shape)
	# connect the source image to the generator input
	gen_out = g_model(in_src)
	# connect the source input and generator output to the discriminator input
	dis_out = d_model([in_src, gen_out])
	# src image as input, generated image and classification output
	model = Model(in_src, [dis_out, gen_out])
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss=['binary_crossentropy', 'mae'], optimizer=opt, loss_weights=[1,100])
	return model

## Training images

### Load and prepare

In [None]:
def load_real_samples(filename_gw, filename_bw):
	# load compressed arrays
  data_gw = load(filename_gw) #good weather data
  data_bw = load(filename_bw) #bad weather data
	# unpack arrays
  X1, X2 = data_gw, data_bw
	# scale from [0,255] to [-1,1]
  X1 = (X1 - 127.5) / 127.5
  X2 = (X2 - 127.5) / 127.5
  return [X1, X2]

### Real samples


In [None]:
def generate_real_samples(dataset, n_samples, patch_shape):
	# unpack dataset
	trainA, trainB = dataset
	# choose random instances
	ix = randint(0, trainA.shape[0], n_samples)
	# retrieve selected images
	X1, X2 = trainA[ix], trainB[ix]
	# generate 'real' class labels (1)
	y = ones((n_samples, patch_shape, patch_shape, 1))
	return [X1, X2], y

### Fake samples


In [None]:
# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, samples, patch_shape):
	# generate fake instance
	X = g_model.predict(samples)
	# create 'fake' class labels (0)
	y = zeros((len(X), patch_shape, patch_shape, 1))
	return X, y
 

## Train

In [None]:
# train pix2pix models
def train(d_model, g_model, gan_model, dataset, n_epochs=100, n_batch=1):
	# determine the output square shape of the discriminator
	n_patch = d_model.output_shape[1]
	# unpack dataset
	trainA, trainB = dataset
	# calculate the number of batches per training epoch
	bat_per_epo = int(len(trainA) / n_batch)
	# calculate the number of training iterations
	n_steps = bat_per_epo * n_epochs
	# manually enumerate epochs
	for i in range(n_steps):
		# select a batch of real samples
		[X_realA, X_realB], y_real = generate_real_samples(dataset, n_batch, n_patch)
		# generate a batch of fake samples
		X_fakeB, y_fake = generate_fake_samples(g_model, X_realA, n_patch)
		# update discriminator for real samples
		d_loss1 = d_model.train_on_batch([X_realA, X_realB], y_real)
		# update discriminator for generated samples
		d_loss2 = d_model.train_on_batch([X_realA, X_fakeB], y_fake)
		# update the generator
		g_loss, _, _ = gan_model.train_on_batch(X_realA, [y_real, X_realB])
		# summarize performance
		print('>%d, d1[%.3f] d2[%.3f] g[%.3f]' % (i+1, d_loss1, d_loss2, g_loss))
		# summarize model performance
		if (i+1) % (bat_per_epo * 10) == 0:
			summarize_performance(i, g_model, dataset)

## Run Everything

In [None]:
# load image data
dataset = load_real_samples('/content/drive/MyDrive/Autonomous cars/images_rgb_snow.npy','/content/drive/MyDrive/Autonomous cars/images_rgb_no_snow.npy')
print('Loaded', dataset[0].shape, dataset[1].shape)
# define input shape based on the loaded dataset
image_shape = dataset[0].shape[1:]
# define the models
d_model = define_discriminator(image_shape)
g_model = define_generator(image_shape)
# define the composite model
gan_model = define_gan(g_model, d_model, image_shape)
# train model
train(d_model, g_model, gan_model, dataset)

Loaded (6000, 96, 96, 3) (6000, 96, 96, 3)


  "The `lr` argument is deprecated, use `learning_rate` instead.")


>1, d1[0.315] d2[1.064] g[39.911]
>2, d1[0.300] d2[0.559] g[37.723]
>3, d1[0.271] d2[0.310] g[36.839]
>4, d1[0.290] d2[0.230] g[35.106]
>5, d1[0.220] d2[0.239] g[33.791]
>6, d1[0.123] d2[0.147] g[32.013]
>7, d1[0.177] d2[0.101] g[31.828]
>8, d1[0.106] d2[0.119] g[32.083]
>9, d1[0.146] d2[0.176] g[28.258]
>10, d1[0.142] d2[0.228] g[28.330]
>11, d1[0.168] d2[0.071] g[27.091]
>12, d1[0.062] d2[0.061] g[26.242]
>13, d1[0.031] d2[0.047] g[24.896]
>14, d1[0.020] d2[0.036] g[24.957]
>15, d1[0.111] d2[0.196] g[22.761]
>16, d1[0.152] d2[0.035] g[22.608]
>17, d1[0.046] d2[0.064] g[22.057]
>18, d1[0.040] d2[0.050] g[22.725]
>19, d1[0.018] d2[0.035] g[23.072]
>20, d1[0.040] d2[0.058] g[22.862]
>21, d1[0.029] d2[0.028] g[20.768]
>22, d1[0.219] d2[0.657] g[22.103]
>23, d1[1.599] d2[0.095] g[18.189]
>24, d1[0.101] d2[0.376] g[18.831]
>25, d1[0.103] d2[0.116] g[17.164]
>26, d1[0.187] d2[0.328] g[17.473]
>27, d1[0.164] d2[0.043] g[19.160]
>28, d1[0.032] d2[0.030] g[15.645]
>29, d1[0.020] d2[0.039] g[14

KeyboardInterrupt: ignored

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
