In [None]:
path_for_joined_geotiff_df = '' 
path_for_folder_to_save_models = ''
#Path for tensorflow 128x128 dataset
path_for_tf_dataset = ''
#epochs to train gan for
epochs = 1000

In [None]:
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
import numpy as np
#from google.colab import drive
import pandas as pd
#drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#Parameters from generator_18 from the random search
generator_18_params = {'alpha': 0.8,
 'conv2d_layers': 3,
 'dense': True,
 'filter_growth': 'decreasing',
 'filter_size': 4,
 'kernel_growth': 'increasing',
 'kernel_size': 4,
 'l2': 	0.000001,
 'learning_rate': 0.01,
 'momentum': 0.3,
 'steps_per_lr_decay': 35}

In [None]:
df = pd.read_csv(path_for_joined_geotiff_df)

In [None]:
#These Rows (from yoesmetie) had random points with elevation in the billions, so I just exluded them
outliers = [964, 970, 975, 976, 977, 982, 983, 984, 985, 992, 993, 994, 995]
df = df[~df.index.isin(outliers)]

In [None]:
image_size = 128
batch_size = 32

In [None]:
dataset = tf.data.experimental.load(path_for_tf_dataset)

In [None]:
steps_per_epoch = int(np.ceil(len(dataset)/batch_size))
num_classes = len(np.unique(df.BIOME_NAME))
latent_dim = ((16*16)*num_classes)-num_classes
generator_input = latent_dim + num_classes
discriminator_in_channels = 1 + num_classes

In [None]:
def build_discriminator():
  discriminator = keras.Sequential(
      [
          keras.layers.InputLayer((image_size, image_size, discriminator_in_channels)),
          layers.BatchNormalization(momentum=0.5),
          layers.Conv2D(64, (4, 4), strides=(2, 2), padding="same" 
                        , activity_regularizer=keras.regularizers.L2(.00001)),
          layers.BatchNormalization(momentum=0.5),
          layers.LeakyReLU(alpha=0.2),
          layers.Conv2D(128, (4, 4), strides=(2, 2), padding="same" 
                        , activity_regularizer=keras.regularizers.L2(.00001)),
          layers.BatchNormalization(momentum=0.5),
          layers.LeakyReLU(alpha=0.2),
          layers.Conv2D(256, (4, 4), strides=(2, 2), padding="same" 
                        , activity_regularizer=keras.regularizers.L2(.00001)),
          layers.BatchNormalization(momentum=0.5),
          layers.LeakyReLU(alpha=0.2),
          layers.Conv2D(512, (4, 4), strides=(2, 2), padding="same" ,
                        activity_regularizer=keras.regularizers.L2(.00001)),
          layers.BatchNormalization(momentum=0.5),
          layers.LeakyReLU(alpha=0.2),
          layers.Conv2D(1024, (3, 3), strides=(2, 2), padding="same", 
                         activity_regularizer=keras.regularizers.L2(.00001)),
          layers.LeakyReLU(alpha=0.2),
          layers.GlobalMaxPooling2D(),
          layers.Dense(1, activation='sigmoid')
      ],
      name="discriminator",
  )
  return discriminator

In [None]:
class ConditionalGAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(ConditionalGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.gen_loss_tracker = keras.metrics.Mean(name="generator_loss")
        self.disc_loss_tracker = keras.metrics.Mean(name="discriminator_loss")

    @property
    def metrics(self):
        return [self.gen_loss_tracker, self.disc_loss_tracker]

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(ConditionalGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, data):
        #load data
        real_images, one_hot_labels = data

        # Add dimension for the labels
        image_one_hot_labels = one_hot_labels[:, :, None, None]
        image_one_hot_labels = tf.repeat(
            image_one_hot_labels, repeats=[image_size * image_size]
        )
        image_one_hot_labels = tf.reshape(
            image_one_hot_labels, (-1, image_size, image_size, num_classes)
        )

        # Generate random points in the latent space.
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        #Add the labels
        random_vector_labels = tf.concat(
            [random_latent_vectors, one_hot_labels], axis=1
        )
        #Generate fake terrain with generator
        generated_images = self.generator(random_vector_labels)

        #Combine terrain with labels
        fake_image_and_labels = tf.concat([generated_images, image_one_hot_labels], -1)
        real_image_and_labels = tf.concat([real_images, image_one_hot_labels], -1)
        #Real and Fake Terrain
        combined_images = tf.concat(
            [fake_image_and_labels, real_image_and_labels], axis=0
        )

        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )

        # Train the discriminator.
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )
        
        #Sample random points in the latent space.
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        random_vector_labels = tf.concat(
            [random_latent_vectors, one_hot_labels], axis=1
        )

        # Assemble labels that say "all real images".
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator
        with tf.GradientTape() as tape:
            fake_images = self.generator(random_vector_labels)
            fake_image_and_labels = tf.concat([fake_images, image_one_hot_labels], -1)
            predictions = self.discriminator(fake_image_and_labels)
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        #Track loss
        self.gen_loss_tracker.update_state(g_loss)
        self.disc_loss_tracker.update_state(d_loss)

        return{
            "g_loss": self.gen_loss_tracker.result(),
            "d_loss": self.disc_loss_tracker.result(),
        }

In [None]:
#Returns 2 lists of kernel and filters sizes for each conv2d_layer
def get_kernels_and_filters(first_filter_size, filter_growth, first_kernel_size, kernel_growth, conv2d_layers):
  #Used to constrain the size of the model when increasing and decreasing growth for kernel and filter size
  max_filter = 2048
  min_filter = 4
  max_kernel = 32
  min_kernel = 4
  #Creates list of filter sizes for each conv2D layer depending on if the filters are increasing, constant, or decreasing in size
  if filter_growth == 'constant':
    filters = [first_filter_size for i in range(conv2d_layers)]
  elif filter_growth == 'increasing':
    filters = [first_filter_size*2**i for i in range(conv2d_layers)]
  else:
    filters = [first_filter_size*2**-i for i in range(conv2d_layers)]
  #Creates list of kernel sizes for each conv2D layer depending on if the kernels are increasing, constant, or decreasing in size
  if kernel_growth == 'constant':
    kernels = [first_kernel_size for i in range(conv2d_layers)]
  elif kernel_growth == 'increasing':
    kernels = [first_kernel_size*2**i for i in range(conv2d_layers)]
  else:
    kernels = [first_kernel_size*2**-i for i in range(conv2d_layers)]
  kernels =  np.asarray(kernels)
  filters = np.asarray(filters)
  #if filter or kernel size is greater/less than the max/min filter or kernel size then it is set to the max/min
  kernels[kernels < min_kernel] = min_kernel
  kernels[kernels > max_kernel] = max_kernel
  filters[filters > max_filter] = max_filter
  filters[filters < min_filter] = min_filter
  return kernels, filters

In [None]:
#returns list of strides for each conv2d layer
def get_strides(conv2d_layers):
  #Every models last two conv2d layers will have (2,2) strides
  strides = [(2,2), (2,2)]
  for i in range(conv2d_layers-3):
    strides.insert(0,(1,1))
  return strides

In [None]:
class Custom_Callback(keras.callbacks.Callback):
   def on_epoch_end(self, epoch, logs=None):
    #Saves generator and discriminator every 20 epochs
    if (epoch%20 == 0) and (epoch != 0):
       cond_gan.generator.save(path_for_folder_to_save_models + '/generator_18_' + str(epoch))
       cond_gan.discriminator.save(path_for_folder_to_save_models + '/discriminator_' + str(epoch))
    #Stops training if discriminator loss or generator loss gets close to zero
    if (logs.get('d_loss') < .0001) or (logs.get('g_loss') < .0001):
         self.model.stop_training = True

In [None]:
#Builds generator with the random params
def build_generator(choosen_params):
  strides = get_strides(choosen_params['conv2d_layers'])
  #Gets lists of kernel sizes, and filter sizes
  kernels, filters = get_kernels_and_filters(first_filter_size=choosen_params['filter_size'], filter_growth=choosen_params['filter_growth'], 
                          first_kernel_size=choosen_params['kernel_size'], kernel_growth=choosen_params['kernel_growth'], conv2d_layers=choosen_params['conv2d_layers'])
  generator = keras.Sequential()
  generator.add(keras.layers.InputLayer(generator_input,))
  #Maps latent space to 2d space using dense layers
  if choosen_params['dense']:
    generator.add(layers.Dense(16*16*num_classes, activity_regularizer=keras.regularizers.L2(choosen_params['l2'])))
    generator.add(layers.LeakyReLU(alpha=choosen_params['alpha']))
    generator.add(layers.BatchNormalization(momentum=choosen_params['momentum']))
    generator.add(layers.Reshape((16, 16, num_classes)))
  else:
    #If dense layer isn't used then latent space is reshaped to 2d
    generator.add(layers.Reshape((16, 16, num_classes)))
  for i in range(choosen_params['conv2d_layers']-1):
    generator.add(layers.Conv2DTranspose(filters[i], (int(kernels[i]), int(kernels[i])),
                                           padding='same', activity_regularizer=keras.regularizers.L2(choosen_params['l2']), strides=strides[i]))
    generator.add(layers.LeakyReLU(alpha=choosen_params['alpha']))
    #If last layer batch normalization is not used
    if i != choosen_params['conv2d_layers'] - 1:
      generator.add(layers.BatchNormalization(momentum=choosen_params['momentum']))
  generator.add(layers.Conv2DTranspose(1, (int(kernels[-1]), int(kernels[-1])),
                                           padding='same', activity_regularizer=keras.regularizers.L2(choosen_params['l2']), strides=(2,2)))
  generator.add(layers.LeakyReLU(alpha=choosen_params['alpha']))
  return generator

In [None]:
#Builds generator with random parameters
generator_18 = build_generator(generator_18_params)
    
  #builds discriminator with same params every time
discriminator = build_discriminator()

In [None]:
  #generator learning rate is random, discriminator learning rate is constant
g_initial_learning_rate = generator_18_params['learning_rate']
d_initial_learning_rate = 0.001


#Learning rate decays at rate initial_learning_rate * .9986^(decay_steps/steps_per_epoch)
#generator learning rate decay
g_lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
g_initial_learning_rate,
decay_steps=generator_18_params['steps_per_lr_decay'],
decay_rate=.9986
)
  
d_lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
d_initial_learning_rate,
decay_steps=steps_per_epoch,
decay_rate=.9986
)

In [None]:
#creates condtional gan
cond_gan = ConditionalGAN(
  discriminator=discriminator, generator=generator_18, latent_dim=latent_dim)
#Compiles GAN    
cond_gan.compile(
  d_optimizer=keras.optimizers.Adam(learning_rate=d_lr_schedule),
  g_optimizer=keras.optimizers.Adam(learning_rate=g_lr_schedule),
  loss_fn=keras.losses.BinaryCrossentropy(),
)
#Creates the callback
custom_checkpoint = Custom_Callback()

In [1]:
history = cond_gan.fit(dataset, epochs=epochs, callbacks=[custom_checkpoint], verbose=2)