# Final Project: Segmentation of satellite images

## U-Net

In [None]:
# necessary imports for U-Net:
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, MaxPool2D, Cropping2D, BatchNormalization, Activation, Dropout
from tensorflow.keras import Model, Input
import numpy as np
from tensorflow.keras.preprocessing.image import load_img
import random
import os
from PIL import Image

In [None]:
def EncoderBlock(prev_layer, filters, dropout_rate=0.0, max_pool=True):
    prev_layer = Conv2D(filters, 3, padding="same", kernel_initializer="he_normal", bias_initializer="zeros")(prev_layer)
    prev_layer = BatchNormalization()(prev_layer)
    prev_layer = Activation("relu")(prev_layer)
    prev_layer = Conv2D(filters, 3, padding="same", kernel_initializer="he_normal", bias_initializer="zeros")(prev_layer)
    prev_layer = BatchNormalization()(prev_layer)
    prev_layer = Activation("relu")(prev_layer)
    if dropout_rate > 0.0:
        prev_layer = Dropout(dropout_rate)(prev_layer)
    skip_layer = prev_layer
    if max_pool:
        prev_layer = MaxPool2D()(prev_layer)
    return prev_layer, skip_layer
    
def DecoderBlock(prev_layer, skip_layer, filters, dropout_rate = 0.0):
    prev_layer = Conv2DTranspose(filters, 2, strides=2, padding="same", kernel_initializer="he_normal", bias_initializer="zeros")(prev_layer)
    prev_layer = BatchNormalization()(prev_layer)
    prev_layer = Activation("relu")(prev_layer)
    prev_layer = tf.concat([skip_layer, prev_layer], axis=-1)
    prev_layer = Conv2D(filters, 3, padding="same", kernel_initializer="he_normal", bias_initializer="zeros")(prev_layer)
    prev_layer = BatchNormalization()(prev_layer)
    prev_layer = Activation("relu")(prev_layer)
    prev_layer = Conv2D(filters, 3, padding="same", kernel_initializer="he_normal", bias_initializer="zeros")(prev_layer)
    prev_layer = BatchNormalization()(prev_layer)
    prev_layer = Activation("relu")(prev_layer)
    if dropout_rate > 0.0:
        prev_layer = Dropout(dropout_rate)(prev_layer)
    return prev_layer

In [None]:
def UNet(input_size=(128, 128, 3), n_classes=4, dropout_rate=0.0):
    inputs = Input(shape = input_size)
    enc1 = EncoderBlock(inputs, 64, dropout_rate=dropout_rate, max_pool=True)
    enc2 = EncoderBlock(enc1[0], 128, dropout_rate=dropout_rate, max_pool=True)
    enc3 = EncoderBlock(enc2[0], 256, dropout_rate=dropout_rate, max_pool=True)
    enc4 = EncoderBlock(enc3[0], 512, dropout_rate=dropout_rate, max_pool=True)
    enc5 = EncoderBlock(enc4[0], 1024, dropout_rate=dropout_rate, max_pool=False)
    dec1 = DecoderBlock(enc5[0], enc4[1], 512, dropout_rate=dropout_rate)
    dec2 = DecoderBlock(dec1, enc3[1], 256, dropout_rate=dropout_rate)
    dec3 = DecoderBlock(dec2, enc2[1], 128, dropout_rate=dropout_rate)
    dec4 = DecoderBlock(dec3, enc1[1], 64, dropout_rate=dropout_rate)
    conv = Conv2D(n_classes, 1, activation="softmax", padding="same", kernel_initializer="glorot_normal", bias_initializer="zeros")(dec4)
    model = Model(inputs=inputs, outputs=conv)
    return model

## Dataloader

In [None]:
# define a dataloader for the training pipe:
class SatelliteData(tf.keras.utils.Sequence):

    FOREST = (4, 135, 29)
    FIELDS = (231, 231, 25)
    URBAN = (229, 109, 109)
    WATER = (14, 10, 214)

    def __init__(self, batch_size, img_size, input_img_paths, target_img_paths, shuffle = False):
        self.batch_size = batch_size
        self.img_size = img_size
        if shuffle:
            indices = [*range(len(input_img_paths))]
            random.shuffle(indices)
            self.input_img_paths = np.array(input_img_paths)[indices]
            self.target_img_paths = np.array(target_img_paths)[indices]
        else:
            self.input_img_paths = np.array(input_img_paths)
            self.target_img_paths = np.array(target_img_paths)

    def __len__(self):
        return len(self.target_img_paths) // self.batch_size

    def __getitem__(self, idx):
        i = idx * self.batch_size
        batch_input_img_paths = self.input_img_paths[i : i + self.batch_size]
        batch_target_img_paths = self.target_img_paths[i : i + self.batch_size]
        x = np.zeros((self.batch_size,) + self.img_size + (3,), dtype="float32")
        for j, path in enumerate(batch_input_img_paths):
            img = load_img(path, target_size=self.img_size)
            x[j] = img
        y = np.zeros((self.batch_size,) + self.img_size + (4,), dtype="float32")
        for j, path in enumerate(batch_target_img_paths):
            img = np.array(load_img(path, target_size=self.img_size))
            y[j] = np.all(img[:, :, None] == (self.FOREST, self.FIELDS, self.URBAN, self.WATER), axis=-1)
        return x, y

## Validation

In [None]:
model_weights = ""  # set path to model
model = UNet()
model.load_weights(model_weights)

In [None]:
def predict(model, img, patch_size, padding):
  shift = patch_size - 2 * padding
  x_max = (img.shape[1] - 2 * padding) // shift
  y_max = (img.shape[0] - 2 * padding) // shift
  test_img = []
  test_pred = []
  for x in range(1, x_max-1):
      test_img_row = []
      test_pred_row = []
      for y in range(1, y_max-1):
          img_patch = img[y * shift:y * shift + patch_size, x * shift:x * shift + patch_size, :]
          test_img_row.append(img_patch[padding:patch_size - padding, padding:patch_size - padding])
          pred = model.predict(img_patch[None, :, :, :])
          pred = np.argmax(pred, axis=-1)
          pred_patch = np.zeros(pred.shape + (4,))
          X, Y, Z = np.ogrid[:pred.shape[0], :pred.shape[1], :pred.shape[2]]
          pred_patch[X, Y, Z, pred] = 1
          pred_patch = transform_to_rgb(pred_patch[0].reshape((patch_size, patch_size, 4)))
          test_pred_row.append(pred_patch[padding:patch_size - padding, padding:patch_size - padding])
      test_img.append(np.concatenate(test_img_row, axis=0))
      test_pred.append(np.concatenate(test_pred_row, axis=0))
  test_img = np.concatenate(test_img, axis=1)
  test_pred = np.concatenate(test_pred, axis=1)
  return test_img, test_pred

def transform_to_rgb(ann):
  FOREST = (4, 135, 29)
  FIELDS = (231, 231, 25)
  URBAN = (229, 109, 109)
  WATER = (14, 10, 214)
  colors = (FOREST, FIELDS, URBAN, WATER)
  rgb_ann = np.zeros(ann.shape[:-1] + (3,), dtype="uint8")
  for i in range(4):
    rgb_ann[ann[:, :, i].astype(bool)] = colors[i]
  return rgb_ann

In [None]:
test_img = ""  # set path to test image
img = load_img(os.path.join(test_img))
# img = img.resize((img.size[0], img.size[1]))  resize test image if needed
img = np.array(img)
patch_size = 128
padding = 32
test_img, test_pred = predict(model, img, patch_size, padding)
test_img = Image.fromarray(np.uint8(test_img), mode="RGB")
test_pred = Image.fromarray(np.uint8(test_pred), mode="RGB")

In [None]:
img = test_img.convert("RGBA")
pred = test_pred.convert("RGBA")

final = Image.blend(img, pred, 0.3)
final = final.resize((2000, 2000), Image.ANTIALIAS)
display(final)
final.save("")  # set output path for image

## Calculate test set errors

In [None]:
import tensorflow.keras.backend as K

class JaccardCoefficient:

  def __init__(self, name = "jaccard_coefficient", smooth = 1e-5):
    self.__name__ = name
    self.smooth = smooth

  def __call__(self, y_true, y_pred):
    intersection = K.sum(y_true * y_pred, axis=(0, 1, 2))
    y_true = K.sum(y_true, axis=(0, 1, 2))
    y_pred = K.sum(y_pred, axis=(0, 1, 2))
    jaccard = (intersection + self.smooth) / (y_true + y_pred - intersection + self.smooth) 
    jaccard = K.mean(jaccard)
    return jaccard

In [None]:
test_data_path = ""  # set path to test set
test_input_path = os.path.join(test_data_path, "images")
test_target_path = os.path.join(test_data_path, "annotations")

test_input_img_paths = [os.path.join(test_input_path, img) for img in os.listdir(test_input_path)]
test_target_img_paths = [os.path.join(test_target_path, img) for img in os.listdir(test_target_path)]

test_data = SatelliteData(64, (128, 128), test_input_img_paths, test_target_img_paths, shuffle = True)

In [None]:
cat = tf.keras.metrics.CategoricalAccuracy()
jac = JaccardCoefficient()

total_cat = 0.0
total_jac = 0.0

for batch in test_data:
    pred = model.predict(batch[0])
    total_cat += cat(batch[1], pred)
    total_jac += jac(batch[1], pred)

total_cat /= len(test_data)
total_jac /= len(test_data)

print(f"Categorical accuracy: {total_cat.item():.4f}")
print(f"Jaccard-coefficient: {total_jac.item():.4f}")

In [None]:
%load_ext tensorboard
%tensorboard --logdir "drive/MyDrive/final_project/model01/logs"