# Environment setup



In [None]:
# Cloud authentication
from google.colab import auth
auth.authenticate_user()

In [None]:
# Connect to Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Import modules
import tensorflow as tf
from tensorflow.keras.activations import softmax
import numpy as np
import os
from os import listdir
import datetime
import random

%pip install git+https://github.com/artemmavrin/focal-loss.git --quiet
from focal_loss import SparseCategoricalFocalLoss

%pip install rasterio --quiet
import rasterio
from sklearn.metrics import accuracy_score, precision_score, recall_score, cohen_kappa_score

# Tensorflow version check
print(tf.__version__)

In [None]:
# Set seed
def set_seed(seed: int = 42) -> None:
  random.seed(seed)
  np.random.seed(seed)
  tf.random.set_seed(seed)
  tf.experimental.numpy.random.seed(seed)
  os.environ['PYTHONHASHSEED'] = str(seed)
set_seed()

In [None]:
# Base folder
base = '/content/drive/My Drive/Thesis2'

In [None]:
# Define variables depending on the type of classes

## Binary
# Nclasses = 3
# train_base = base + '/binary_train'
# val_base = base + '/binary_val'
# Weight=[0.0, 1.01, 0.99]
# model_dir = base + '/UNetBin'
# lbl = '/Data/TestBinLbl.tif'
# labels = ['Forest', 'Non forest']

## Multi
Nclasses = 5
train_base = base + '/multi_train'
val_base = base + '/multi_val'
Weight=[0.00, 1.83, 2.29, 0.5, 0.99]
model_dir = base + '/UNetMulti'
lbl = '/Data/TestMultiLbl.tif'
labels = ['Water', 'Bare soil', 'Forest', 'Plantation']

# # Sub
# Nclasses = 7
# train_base = base + '/sub_train'
# val_base = base + '/sub_val'
# Weight=[0.00, 1.22, 1.53, 1.65, 0.41, 0.79, 4.09]
# model_dir = base + '/UNetSub'
# lbl = '/Data/TestSubLbl.tif'
# labels = ['Water', 'Bare soil', 'Regrowth forest', 'Dense forest', 'Mature plantation', 'Young plantation']

# Create dataset

In [None]:
# Specify the size and shape of patches expected by the model
kernel_size = 128
features = ['Capella', 'Label', 'Mask']

columns = [tf.io.FixedLenFeature(shape=[kernel_size, kernel_size], dtype=tf.float32) for k in features]
features_dict = dict(zip(features, columns))

# Sizes of the training and evaluation datasets.
train_size= int(100*120*12 *0.33) #47520
val_size = int(100*90*2 *0.33)    #5940

# Specify model training parameters
batch_size = 64
buffer_size = train_size

In [None]:
# Functions used to create dataset

class Augment(tf.keras.layers.Layer):
  def __init__(self, seed=42):
    super().__init__()
    ang_list = [0, 0.25, 0.5, 0.75]
    rdm_ang = ang_list[random.randrange(len(ang_list))]
    self.augment = tf.keras.Sequential([
                      tf.keras.layers.RandomFlip("horizontal_and_vertical"),
                      # tf.keras.layers.RandomRotation(factor=(0,1), fill_mode="reflect"),
                      tf.keras.layers.RandomRotation(factor=(rdm_ang,rdm_ang), fill_mode="reflect"),
                      ])
  def call(self, inputs, labels, masks):
    comb_tensor = tf.concat([inputs, labels, masks], axis=-1)
    augmented = self.augment(comb_tensor)
    inputs, labels, masks = tf.split(augmented, 3, axis=-1)
    return inputs, labels, masks

def parse_tfrecord(example_proto):
    return tf.io.parse_single_example(example_proto, features_dict)

# Disstack the data into image, label and mask
def to_tuple(inputs):
    inputsList = [inputs.get(key) for key in features]
    stacked = tf.stack(inputsList, axis=0)
    stacked = tf.transpose(stacked, [1, 2, 0])

    data = stacked[:,:,:1]
    label = stacked[:,:,1:2]
    # Convert nan to 0 in the label data
    label = tf.where(condition=tf.math.is_nan(label), x=tf.zeros_like(label), y=label)
    mask = stacked[:,:,2:3]

    return data, label, mask

def training_dataset():
    glob = train_base + '*'
    glob = tf.io.gfile.glob(glob)
    ds = tf.data.TFRecordDataset(glob, compression_type='GZIP').map(parse_tfrecord, num_parallel_calls=5)
    ds = ds.map(to_tuple, num_parallel_calls=5).map(Augment(), num_parallel_calls=5)
    ds = ds.shuffle(buffer_size).repeat().batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

def eval_dataset():
    glob = val_base + '*'
    glob = tf.io.gfile.glob(glob)
    ds = tf.data.TFRecordDataset(glob, compression_type='GZIP').map(parse_tfrecord, num_parallel_calls=5)
    ds = ds.map(to_tuple, num_parallel_calls=5).repeat().batch(1).prefetch(tf.data.AUTOTUNE)
    return ds

training = training_dataset()
evaluation = eval_dataset()

In [None]:
# Print the 777th iteration training data and time to print it
start = datetime.datetime.now()
print(iter(training.take(777)).next())
finish = datetime.datetime.now()
print(finish-start)

In [None]:
# Print the 99th iteration validation data and time to print it
start = datetime.datetime.now()
print(iter(evaluation.take(99)).next())
finish = datetime.datetime.now()
print(finish-start)

# Build a model

In [None]:
# Modified functions from https://github.com/Asad-Ismail/lane_detection/blob/0e2ca0c6796634a4a1d1945828445580dc6507e0/scripts/losses.py

def onehot(y_true, n_classes=Nclasses):
    #Squueeze if the tensor is already 4 dimneional
    if len(y_true.shape) == 4:
        y_true = tf.squeeze(y_true, -1)
    y_true = tf.one_hot(tf.cast(y_true, tf.int32), n_classes)
    y_true = tf.cast(y_true, tf.float32)
    return y_true

def sparse_weighted_crossentropy(weights):
    weights=tf.constant(weights,dtype=tf.float32)
    def loss(y_true, y_pred):
        y_true = onehot(y_true)
        y_pred = softmax(y_pred)
        y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1-tf.keras.backend.epsilon())
        loss = weights * ( y_true * tf.math.log(y_pred) + (tf.ones_like(y_true)-y_true) * tf.math.log(tf.ones_like(y_true)-y_pred) )
        loss = -1 * tf.keras.backend.sum(loss, axis=-1)
        loss = tf.keras.backend.mean(loss)
        return loss
    return loss

In [None]:
# Define parameters and loss function for U-Net
LearningRate = 0.001
Epochs =50

# Loss = tf.keras.losses.SparseCategoricalCrossentropy()
Loss = sparse_weighted_crossentropy(weights=Weight)
# Loss = SparseCategoricalFocalLoss(gamma=2, class_weight=Weight)

In [None]:
# Define a model

def conv_block(input_tensor, num_filters):
	encoder = tf.keras.layers.Conv2D(num_filters, (3, 3), padding='same')(input_tensor)
	encoder = tf.keras.layers.BatchNormalization()(encoder)
	encoder = tf.keras.layers.Activation('relu')(encoder)
	encoder = tf.keras.layers.Conv2D(num_filters, (3, 3), padding='same')(encoder)
	encoder = tf.keras.layers.BatchNormalization()(encoder)
	encoder = tf.keras.layers.Activation('relu')(encoder)
	return encoder

def encoder_block(input_tensor, num_filters):
	encoder = conv_block(input_tensor, num_filters)
	encoder_pool = tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2))(encoder)
	encoder_pool = tf.keras.layers.Dropout(0.2)(encoder_pool)
	return encoder_pool, encoder

def decoder_block(input_tensor, concat_tensor, num_filters):
	decoder = tf.keras.layers.Conv2DTranspose(num_filters, (2, 2), strides=(2, 2), padding='same')(input_tensor)
	decoder = tf.keras.layers.concatenate([concat_tensor, decoder], axis=-1)
	decoder = tf.keras.layers.BatchNormalization()(decoder)
	decoder = tf.keras.layers.Activation('relu')(decoder)
	decoder = tf.keras.layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
	decoder = tf.keras.layers.BatchNormalization()(decoder)
	decoder = tf.keras.layers.Activation('relu')(decoder)
	decoder = tf.keras.layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
	decoder = tf.keras.layers.BatchNormalization()(decoder)
	decoder = tf.keras.layers.Activation('relu')(decoder)
	return decoder


def get_unet():
	filter = 32
	inputs = tf.keras.layers.Input(shape=(None,None,1)) # None
	encoder0_pool, encoder0 = encoder_block(inputs, filter) # None/2
	encoder1_pool, encoder1 = encoder_block(encoder0_pool, filter*2) # None/4
	encoder2_pool, encoder2 = encoder_block(encoder1_pool, filter*4) # None/8
	encoder3_pool, encoder3 = encoder_block(encoder2_pool, filter*8) # None/16
	encoder4_pool, encoder4 = encoder_block(encoder3_pool, filter*16) # None/32
	center = conv_block(encoder4_pool, filter*32) # center
	decoder4 = decoder_block(center, encoder4, filter*16) # None/16
	decoder3 = decoder_block(decoder4, encoder3, filter*8) # None/8
	decoder2 = decoder_block(decoder3, encoder2, filter*4) # None/4
	decoder1 = decoder_block(decoder2, encoder1, filter*2) # None/2
	decoder0 = decoder_block(decoder1, encoder0, filter) # None
 	
	outputs = tf.keras.layers.Conv2D(Nclasses, (1, 1), padding='same')(decoder0)

	model = tf.keras.models.Model(inputs=[inputs], outputs=[outputs])

	model.compile(
		optimizer=tf.keras.optimizers.Adam(LearningRate), 
		loss=Loss,
		metrics=['accuracy'],
		run_eagerly=True
		)

	return model

In [None]:
model = get_unet()
model.summary()

# Training and validation

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir='unet_v2')

model.fit(
    x=training, 
    epochs=Epochs, 
    steps_per_epoch=train_size // batch_size, 
    validation_data=evaluation,
    validation_steps=val_size // 1 ,
		callbacks=[tensorboard_callback])

In [None]:
# Plot the accuracy and loss throughout the epochs
%load_ext tensorboard
%tensorboard --logdir 'unet_v2'

In [None]:
# Define a directry to save the trained model
# model_dir = '/UNetBin'
# model_dir = '/UNetMulti'
# model_dir = '/UNetSub'
# model_dir = '/weightedUNetBin'
# model_dir = '/weightedUNetMulti'
model_dir = '/weightedUNetSub'

# Save the trained model
model.save(base + model_dir, save_format='tf')

In [None]:
# Load the model
# model = tf.keras.models.load_model(base + model_dir)
model = tf.keras.models.load_model(base + model_dir, custom_objects={'loss': sparse_weighted_crossentropy(Weight)}) 

In [None]:
# Model evaluation
model.evaluate(evaluation,steps=val_size // 1,verbose=1)

# Optional: visualize dataset for multi-class

In [None]:
from matplotlib.colors import ListedColormap, Normalize
import matplotlib.pyplot as plt
%pip install earthpy --quiet
import earthpy.plot as ep

# Color map
class_colors_dict = {
    'Background':      [0.5, 0.5, 0.5, 1],  
    'Water':     [0,   0,   1,   1],   
    'Bare soil':     [1,   0,   1,   1],
    'Forest':     [0,   1,   0,   1],
    'Plantation':     [0.7,   1,   0.7,   1], 
}
class_cm = np.array([v for k,v in class_colors_dict.items()])
class_map = class_cm[np.unique([0., 1., 2., 3., 4.]).astype(int)]
cmap = ListedColormap(class_map)

In [None]:
# Plot the image, mask, true label and predicted label
def display(display_list):
  title = ['Input Image', 'Mask', 'Label']
  fig, (ax_img, ax_msk, ax_tru) = plt.subplots(1, 3, figsize=(10, 10))

  for i in range(len(display_list)):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    display_list[i] = np.squeeze(display_list[i])
    if i == 0:
      ax_img = plt.imshow(display_list[i], cmap='gray', vmin=0, vmax=0.1)
    if i == 1:
      ax_msk = plt.imshow(display_list[i], cmap=ListedColormap(['darkslategray', 'white']))
      # ep.draw_legend(ax_msk, titles=["background", "labels"])
    if i == 2:
      ax_tru = plt.imshow(display_list[i], cmap=cmap)
      # ep.draw_legend(ax_tru, titles=list(class_colors_dict.keys()))
  plt.show()
  return

for img, lbl, msk in training:
  display([np.squeeze(img), np.squeeze(msk), np.squeeze(lbl)])

In [None]:
Nclasses = 5
lbl = '/Data/TrainMultiLbl.tif'
Weight=[0.00, 1.83, 2.29, 0.5, 0.99]
model_dir = '/UNetMulti'
class_colors_dict = {
    'Background':      [0.5, 0.5, 0.5, 1],  
    'Water':     [0,   0,   1,   1],   
    'Bare soil':     [1,   0,   1,   1],
    'Forest':     [0,   1,   0,   1],
    'Plantation':     [0.7,   1,   0.7,   1], 
}
results = '/Results/PredMultiUNet.tif'

# Load image and make it readable as a numpy matrix
def open_raster(raster_path):
    with rasterio.open(raster_path, 'r') as ds:
        return ds.read() 

# Test data
image = open_raster(base + '/Data/TrainImg_spk.tif')
image[np.isnan(image)] = 0

label = open_raster(base + lbl)
label[np.isnan(label)] = 0
label[label<0] =0

mask = open_raster(base + '/Data/TrainMsk.tif')
mask[np.isnan(mask)] = 0
mask[mask<0] =0

# 3D to 4D
image = np.expand_dims(image, -1)
image = np.squeeze(image)
label = np.expand_dims(label, -1)
label = np.squeeze(label)
mask = np.expand_dims(mask, -1)
mask = np.squeeze(mask)

In [None]:
# Plot the image, mask, true label and predicted label
def display(display_list):
  title = ['Input Image', 'Mask', 'True Label']
  fig, (ax_img, ax_msk, ax_tru) = plt.subplots(1, 3, figsize=(15, 15))

  for i in range(len(display_list)):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    display_list[i] = np.squeeze(display_list[i])
    if i == 0:
      ax_img = plt.imshow(display_list[i], cmap='gray', vmin=0, vmax=0.6)
    if i == 1:
      ax_msk = plt.imshow(display_list[i], cmap=ListedColormap(['darkslategray', 'white']))
      # ep.draw_legend(ax_msk, titles=["background", "labels"])
    if i == 2:
      ax_tru = plt.imshow(display_list[i], cmap=cmap)
      ep.draw_legend(ax_tru, titles=list(class_colors_dict.keys()))
  plt.show()
  return

def create_mask(pred_mask):
  pred_mask = pred_mask[..., tf.newaxis]
  return pred_mask[0]

def predictions(img, msk, lbl):
    return display([img, msk, lbl])

predictions(image, mask, label)

In [None]:
for i in range(10):
  image = image[256:(256+128), 128*i:128*(i+1)]
  mask = mask[256:(256+128), 128*i:128*(i+1)]
  label = label[256:(256+128), 128*i:128*(i+1)]
  predictions(image, mask, label)