## Multiclass semantic segmentation using DeepLabV3+ on XView2

**Author:** [Kayla Akyüz](https://github.com/kaylaa0)<br>
**Date created:** 2023/06/14<br>
**Last modified:** 2023/06/17<br>
**Description:** Implement DeepLabV3+ architecture for Multi-class Semantic Segmentation on XView2 dataset with test and evaluation methods. The notebook is derived from [Soumik Rakshit's](http://github.com/soumik12345) [Notebook](https://colab.research.google.com/github/keras-team/keras-io/blob/master/examples/vision/ipynb/deeplabv3_plus.ipynb) and [Keras Article](https://keras.io/examples/vision/deeplabv3_plus/)

## Introduction

Semantic segmentation, with the goal to assign semantic labels to every pixel in an image,
is an essential computer vision task. In this example, we implement
the **DeepLabV3+** model for multi-class semantic segmentation, a fully-convolutional
architecture that performs well on semantic segmentation benchmarks.

### References:

- [Encoder-Decoder with Atrous Separable Convolution for Semantic Image Segmentation](https://arxiv.org/pdf/1802.02611.pdf)
- [Rethinking Atrous Convolution for Semantic Image Segmentation](https://arxiv.org/abs/1706.05587)
- [DeepLab: Semantic Image Segmentation with Deep Convolutional Nets, Atrous Convolution, and Fully Connected CRFs](https://arxiv.org/abs/1606.00915)

## Downloading the data

We will use the [xView2 xBD dataset](https://xview2.org/download-links)
for training our model.

In [None]:
#@title Import necessary libraries. { display-mode: "form" }
import os
import cv2
import numpy as np
from glob import glob
from scipy.io import loadmat
import matplotlib.pyplot as plt
import imageio
import shutil
import json
from PIL import Image, ImageDraw
from datetime import datetime
import pandas as pd
import random
from random import randrange

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.callbacks import CSVLogger

In [None]:
#@title Define data directory. { display-mode: "form" }
colab_drive = "./"

In [None]:
#@title Select to use only CPU, avoids OOM but slower. { display-mode: "form" }
CPU_ONLY = False #@param ["False", "True"] {type:"raw"}
if CPU_ONLY:
    cpus = tf.config.experimental.list_physical_devices('CPU')
    tf.config.set_visible_devices([], 'GPU')  # hide the GPU
    tf.config.set_visible_devices(cpus[0], 'CPU')

In [None]:
#@title Mount `Google Drive`. { display-mode: "form" }
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

colab_drive = "/content/gdrive/My Drive/"

In [None]:
#@title Insert the download link from `XView2.org` to save a temp directory. { display-mode: "form" }
link = 'Insert download link' #@param {type:"string"}
!wget {link} -P "/content/Xview2/root" -O "train_images_labels_targets.tar.gz"

In [None]:
#@title Unzip the contents. { display-mode: "form" }
!tar -xzvf "/content/Xview2/root/train_images_labels_targets.tar.gz" -C "/content/train/"

In [None]:
#@title Process train images and save to `Google Drive`. { display-mode: "form" }
convertTif = False #@param {type:"boolean"}
input_dir = "/content/train/train/images/"
output_dir = "Xview2/imagesdeeplab/" #@param {type:"string"}
output_dir = colab_drive+output_dir
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

def tiff_to_png(input_path, output_dir):
    # Read the TIFF image
    tiff_image = imageio.imread(input_path)

    # Convert to PNG format
    tiff_image = tiff_image.astype(np.uint8)
    # Create the output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Get the filename without extension
    filename = os.path.splitext(os.path.basename(input_path))[0]

    # Save the PNG image to the output directory
    output_path = os.path.join(output_dir, f"{filename}.png")
    imageio.imwrite(output_path, tiff_image)

# Example usage
if convertTif:
    for filename in os.listdir(input_dir):
        if filename.endswith('.tif'):
            tiff_to_png(os.path.join(input_dir, filename), output_dir)
else:
    shutil.copy(input_dir, colab_drive+"Xview2/images/")


In [None]:
#@title Process train labels and save to `Google Drive`. { display-mode: "form" }
# Set input and output directories
input_dir = '/content/train/train/labels/'
output_dir = "Xview2/labelsdeeplab"  #@param {type:"string"}
output_dir = colab_drive+output_dir

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Define color mappings for feature type and subtype

def get_class_number(feature_type, subtype=None):
    # Define the mapping of feature types and subtypes to class numbers
    class_mapping = {
        'building': 1,
    }

    # Define the mapping of subtypes to class numbers (if applicable)
    subtype_mapping = {
        'no-damage': 2,
        'minor-damage': 3,
        'destroyed': 4,
        'major-damage': 5,
        'un-classified': 6
    }

    # Check if the feature has a subtype
    if subtype is not None:
        # Return the class number based on feature type and subtype
        return subtype_mapping.get(subtype, 0)  # Assign a default value of 0 if subtype is not found

    # Return the class number based on feature type
    return class_mapping.get(feature_type, 0)  # Assign a default value of 0 if feature type is not found

# Process each JSON label file in the input directory
for filename in os.listdir(input_dir):
    if filename.endswith('.json'):
        # Load JSON data from file
        with open(os.path.join(input_dir, filename), 'r') as file:
            data = json.load(file)

        # Set image size
        image_width = data['metadata']['width']
        image_height = data['metadata']['height']

        # Create a new image with white background
        image = Image.new('L', (image_width, image_height), 0)
        draw = ImageDraw.Draw(image)

        # Iterate over features
        for feature in data['features']['xy']:
            # Extract polygon coordinates
            coordinates = feature['wkt'].replace('POLYGON ((', '').replace('))', '').split(', ')
            points = []
            for coordinate in coordinates:
                x, y = coordinate.split(' ')
                points.append((float(x), float(y)))
            class_number = get_class_number(feature['properties']['feature_type'], feature['properties'].get('subtype'))
            draw.polygon(points, fill=class_number)



        # Save the image with the same name as the label file
        output_filename = os.path.splitext(filename)[0] + '.png'
        output_path = os.path.join(output_dir, output_filename)
        image.save(output_path)

In [None]:
#@title Flush `Google Drive` to save processed dataset. { display-mode: "form" }
drive.flush_and_unmount()

## Creating a TensorFlow Dataset

Training on the entire xView2 dataset might take a lot of time, hence we can define a subset.

In [None]:
#@title Create a subset of the dataset. { display-mode: "form" }
IMAGE_SIZE = 1024
BATCH_SIZE = 2 # Max 2, 3 gives OOM with 15GB VRAM
NUM_CLASSES = 7
DATA_DIR = "Xview2/" #@param {type:"string"}
DATA_DIR = colab_drive+DATA_DIR
NUM_TRAIN_IMAGES = 3998 #@param {type:"number"}
NUM_VAL_IMAGES = 600 #@param {type:"number"}
NUM_TEST_IMAGES = 1000 #@param {type:"number"}

images_directories = sorted(glob(os.path.join(DATA_DIR, "images/*")))
label_directories = sorted(glob(os.path.join(DATA_DIR, "labelsdeeplab/*")))

discard_before_images = True #@param {type:"boolean"}
shuffle_dataset = True #@param {type:"boolean"}

if discard_before_images:
    # Discard every other image and label before the first one
    images_directories = images_directories[1::2]
    label_directories = label_directories[1::2]
    NUM_TRAIN_IMAGES //= 2
    NUM_VAL_IMAGES //= 2
    NUM_TEST_IMAGES //= 2

if shuffle_dataset:
    # Shuffle the image and label directories in sync
    combined = list(zip(images_directories, label_directories))
    random.shuffle(combined)
    images_directories, label_directories = zip(*combined)

train_images = images_directories[:NUM_TRAIN_IMAGES]
train_masks = label_directories[:NUM_TRAIN_IMAGES]
val_images = images_directories[
    NUM_TRAIN_IMAGES : NUM_VAL_IMAGES + NUM_TRAIN_IMAGES
]
val_masks = label_directories[
    NUM_TRAIN_IMAGES : NUM_VAL_IMAGES + NUM_TRAIN_IMAGES
]
test_images = images_directories[
    NUM_VAL_IMAGES + NUM_TRAIN_IMAGES: NUM_VAL_IMAGES + NUM_TRAIN_IMAGES + NUM_TEST_IMAGES
]
test_masks = label_directories[
    NUM_VAL_IMAGES + NUM_TRAIN_IMAGES : NUM_VAL_IMAGES + NUM_TRAIN_IMAGES + NUM_TEST_IMAGES
]


def read_image(image_path, mask=False):
    image = tf.io.read_file(image_path)
    if mask:
        image = tf.image.decode_png(image, channels=1)
        image.set_shape([None, None, 1])
        image = tf.image.resize(images=image, size=[IMAGE_SIZE, IMAGE_SIZE])
    else:
        image = tf.image.decode_png(image, channels=3)
        image.set_shape([None, None, 3])
        image = tf.image.resize(images=image, size=[IMAGE_SIZE, IMAGE_SIZE])
        image = tf.keras.applications.resnet50.preprocess_input(image)
    return image


def load_data(image_list, mask_list):
    image = read_image(image_list)
    mask = read_image(mask_list, mask=True)
    return image, mask


def data_generator(image_list, mask_list):
    dataset = tf.data.Dataset.from_tensor_slices((image_list, mask_list))
    dataset = dataset.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
    return dataset


train_dataset = data_generator(sorted(train_images), sorted(train_masks))
val_dataset = data_generator(sorted(val_images), sorted(val_masks))
test_dataset = data_generator(sorted(test_images), sorted(test_masks))

print("Train Dataset:", train_dataset)
print("Val Dataset:", val_dataset)
print("Test Dataset:", test_dataset)

## Building the DeepLabV3+ model

DeepLabv3+ extends DeepLabv3 by adding an encoder-decoder structure. The encoder module
processes multiscale contextual information by applying dilated convolution at multiple
scales, while the decoder module refines the segmentation results along object boundaries.

![](https://github.com/lattice-ai/DeepLabV3-Plus/raw/master/assets/deeplabv3_plus_diagram.png)

**Dilated convolution:** With dilated convolution, as we go deeper in the network, we can keep the
stride constant but with larger field-of-view without increasing the number of parameters
or the amount of computation. Besides, it enables larger output feature maps, which is
useful for semantic segmentation.

The reason for using **Dilated Spatial Pyramid Pooling** is that it was shown that as the
sampling rate becomes larger, the number of valid filter weights (i.e., weights that
are applied to the valid feature region, instead of padded zeros) becomes smaller.

In [None]:
#@title Defining the Dilated Convolution
def convolution_block(
    block_input,
    num_filters=256,
    kernel_size=3,
    dilation_rate=1,
    padding="same",
    use_bias=False,
):
    x = layers.Conv2D(
        num_filters,
        kernel_size=kernel_size,
        dilation_rate=dilation_rate,
        padding="same",
        use_bias=use_bias,
        kernel_initializer=keras.initializers.HeNormal(),
    )(block_input)
    x = layers.BatchNormalization()(x)
    return tf.nn.relu(x)


def DilatedSpatialPyramidPooling(dspp_input):
    dims = dspp_input.shape
    x = layers.AveragePooling2D(pool_size=(dims[-3], dims[-2]))(dspp_input)
    x = convolution_block(x, kernel_size=1, use_bias=True)
    out_pool = layers.UpSampling2D(
        size=(dims[-3] // x.shape[1], dims[-2] // x.shape[2]), interpolation="bilinear",
    )(x)

    out_1 = convolution_block(dspp_input, kernel_size=1, dilation_rate=1)
    out_6 = convolution_block(dspp_input, kernel_size=3, dilation_rate=6)
    out_12 = convolution_block(dspp_input, kernel_size=3, dilation_rate=12)
    out_18 = convolution_block(dspp_input, kernel_size=3, dilation_rate=18)

    x = layers.Concatenate(axis=-1)([out_pool, out_1, out_6, out_12, out_18])
    output = convolution_block(x, kernel_size=1)
    return output


The encoder features are first bilinearly upsampled by a factor 4, and then
concatenated with the corresponding low-level features from the network backbone that
have the same spatial resolution. For this example, we
use a ResNet50 pretrained on ImageNet as the backbone model, and we use
the low-level features from the `conv4_block6_2_relu` block of the backbone.

In [None]:
#@title Creating the model
def DeeplabV3Plus(image_size, num_classes):
    model_input = keras.Input(shape=(image_size, image_size, 3))
    resnet50 = keras.applications.ResNet50(
        weights="imagenet", include_top=False, input_tensor=model_input
    )
    x = resnet50.get_layer("conv4_block6_2_relu").output
    x = DilatedSpatialPyramidPooling(x)

    input_a = layers.UpSampling2D(
        size=(image_size // 4 // x.shape[1], image_size // 4 // x.shape[2]),
        interpolation="bilinear",
    )(x)
    input_b = resnet50.get_layer("conv2_block3_2_relu").output
    input_b = convolution_block(input_b, num_filters=48, kernel_size=1)

    x = layers.Concatenate(axis=-1)([input_a, input_b])
    x = convolution_block(x)
    x = convolution_block(x)
    x = layers.UpSampling2D(
        size=(image_size // x.shape[1], image_size // x.shape[2]),
        interpolation="bilinear",
    )(x)
    model_output = layers.Conv2D(num_classes, kernel_size=(1, 1), padding="same")(x)
    return keras.Model(inputs=model_input, outputs=model_output)


model = DeeplabV3Plus(image_size=IMAGE_SIZE, num_classes=NUM_CLASSES)
model.summary()

## Training

We train the model using sparse categorical crossentropy as the loss function, and
Adam as the optimizer.

In [None]:
#@title Define the history plotting
def plot_history(history):
    plt.plot(history["loss"])
    plt.title("Training Loss")
    plt.ylabel("loss")
    plt.xlabel("epoch")
    plt.show()

    plt.plot(history["accuracy"])
    plt.title("Training Accuracy")
    plt.ylabel("accuracy")
    plt.xlabel("epoch")
    plt.show()

    plt.plot(history["val_loss"])
    plt.title("Validation Loss")
    plt.ylabel("val_loss")
    plt.xlabel("epoch")
    plt.show()

    plt.plot(history["val_accuracy"])
    plt.title("Validation Accuracy")
    plt.ylabel("val_accuracy")
    plt.xlabel("epoch")
    plt.show()

In [None]:
#@title Train the model
epoch = 20 #@param {type:"number"}

loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    loss=loss,
    metrics=["accuracy"],
)

now = datetime.now()
dt_string = now.strftime("%y-%m-%d-%H-%M-%S")

continue_training_from_folder = False #@param {type:"boolean"}
if continue_training_from_folder:
    continue_folder = "folder name" #@param {type:"string"}
    dt_string = continue_folder

initial_epoch = 0 #@param {type:"integer"}

os.makedirs(colab_drive+'checkpoints/'+dt_string+'/', exist_ok=True)

csv_logger = CSVLogger(colab_drive+'checkpoints/'+dt_string+'/history.csv', append=True)

mc = keras.callbacks.ModelCheckpoint(colab_drive+'checkpoints/'+dt_string+'/model{epoch:08d}.h5',
                                     save_weights_only=False, save_freq=1)

history = model.fit(train_dataset, validation_data=val_dataset, initial_epoch=initial_epoch, epochs=epoch, callbacks=[mc, csv_logger])

# convert the history.history dict to a pandas DataFrame:
hist_df = pd.DataFrame(history.history)

# save to json:
hist_json_file = colab_drive+'checkpoints/'+dt_string+'complete_history.json'
with open(hist_json_file, mode='w') as f:
    hist_df.to_json(f)

# or save to csv:
hist_csv_file = colab_drive+'checkpoints/'+dt_string+'complete_history.csv'
with open(hist_csv_file, mode='w') as f:
    hist_df.to_csv(f)

plot_history(history.history)

In [None]:
#@title Manually save the model
model.save("my_model.h5")
shutil.copy('/content/my_model.h5', colab_drive+'checkpoints/'+dt_string+'last_model.h5')

In [None]:
#@title Disconnect colab after training. { display-mode: "form" }
from google.colab import runtime
runtime.unassign()

In [None]:
#@title Load the train history and visualize
history_path = "checkpoints/history.csv" #@param {type:"string"}
loaded_history = pd.read_csv(colab_drive+history_path)

plot_history(loaded_history)

In [None]:
#@title Load the model
checkpoint_path = "checkpoints/use_final.h5" #@param {type:"string"}

loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

model = tf.keras.models.load_model(colab_drive+checkpoint_path, compile=False)
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    loss=loss,
    metrics=["accuracy"],)
# Check its architecture
model.summary()


## Inference using Colormap Overlay

The raw predictions from the model represent a one-hot encoded tensor.
In order to visualize the results, we plot them as RGB segmentation masks where each pixel is represented by a unique color corresponding to the particular label predicted. We would also plot an overlay of the RGB segmentation mask on the input image as this further helps us to identify the different categories present in the image more intuitively.

In [None]:
#@title Define inference functions

# Loading the Colormap
colormap = np.array([
 [255, 255,  255], # White
 [128, 128, 128], # Gray
 [0, 128, 0],     # Green
 [255, 165, 0],   # Orange
 [0, 255, 255],   # Cyan
 [255, 0, 0],     # Red
 [255, 192, 203]  # Pink
 ])
colormap = colormap.astype(np.uint8)


def infer(model, image_tensor):
    predictions = model.predict(np.expand_dims((image_tensor), axis=0))
    predictions = np.squeeze(predictions)
    predictions = np.argmax(predictions, axis=2)
    return predictions


def decode_segmentation_masks(mask, colormap, n_classes):
    r = np.zeros_like(mask).astype(np.uint8)
    g = np.zeros_like(mask).astype(np.uint8)
    b = np.zeros_like(mask).astype(np.uint8)
    for l in range(0, n_classes):
        idx = mask == l
        r[idx] = colormap[l, 0]
        g[idx] = colormap[l, 1]
        b[idx] = colormap[l, 2]
    rgb = np.stack([r, g, b], axis=2)
    return rgb


def get_overlay(image, colored_mask):
    image = tf.keras.utils.array_to_img(image)
    image = np.array(image).astype(np.uint8)
    overlay = cv2.addWeighted(image, 0.35, colored_mask, 0.65, 0)
    return overlay


def plot_samples_matplotlib(display_list, figsize=(5, 3)):
    _, axes = plt.subplots(nrows=1, ncols=len(display_list), figsize=figsize)
    for i in range(len(display_list)):
        if display_list[i].shape[-1] == 3:
            axes[i].imshow(tf.keras.utils.array_to_img(display_list[i]))
        else:
            axes[i].imshow(display_list[i])
    plt.show()


def plot_predictions(images_list, colormap, model):
    for image_file in images_list:
        image_tensor = read_image(image_file)
        prediction_mask = infer(image_tensor=image_tensor, model=model)
        prediction_colormap = decode_segmentation_masks(prediction_mask, colormap, NUM_CLASSES)
        overlay = get_overlay(image_tensor, prediction_colormap)
        plot_samples_matplotlib(
            [image_tensor, overlay, prediction_colormap], figsize=(18, 14)
        )


In [None]:
#@title Inference on Train Images

ind = randrange(len(train_images)-8)
plot_predictions(train_images[ind:ind+4], colormap, model=model)
plot_predictions(train_images[ind+4:ind+8], colormap, model=model)

In [None]:
#@title Inference on Validation Images
val_ind = randrange(len(val_images)-8)
plot_predictions(val_images[val_ind:val_ind+4], colormap, model=model)
plot_predictions(val_images[val_ind+4:val_ind+8], colormap, model=model)

In [None]:
#@title Inference on Test Images
test_ind = randrange(len(test_images)-8)
plot_predictions(test_images[test_ind:test_ind+4], colormap, model=model)
plot_predictions(test_images[test_ind+4:test_ind+8], colormap, model=model)