#Introduction

**Notebook setup**
Set the following variables to True or False to run certain cells in the notebook.

In [None]:
#Explore data
print_orig_images = False
compute_stats = True

#Data Augmentation
augment = False
use_augmented_data = True
load_augmented_data = True
a = ('a' if use_augmented_data else '')

#Model to use
unet = False
deeplab = True
dl = ('dl' if deeplab else '')

#Freeze encoder layers or not
freeze_layers = False
f = ('f' if freeze_layers else '')

#Train model
train = False

#Create a new dataframe to store models metrics (only set to True once)
create_new_df = False

#Load trained model from drive (Use if train = False)
load_model = True

#Print predicted masks
plot_predictions = True

**Install and import libraries**

In [None]:
pip install --quiet -U albumentations

In [None]:
pip install --quiet torchmetrics

In [None]:
pip install --quiet segmentation-models-pytorch

In [None]:
!pip install --quiet tqdm

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import matplotlib as mpl
from matplotlib.colors import ListedColormap

from google.colab import drive
import os
import glob

import albumentations as A
from PIL import Image, ImageDraw
import cv2

import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
from torch.utils.data import DataLoader
import torchvision.transforms.functional as tf
from torchvision.transforms import ToTensor
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from torchmetrics.classification import MulticlassJaccardIndex, MulticlassF1Score, MulticlassAccuracy
from torchmetrics import Accuracy

from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay

from scipy.signal import convolve2d

from albumentations.pytorch import ToTensorV2
from albumentations.augmentations.transforms import Normalize

import segmentation_models_pytorch as smp

from tqdm.notebook import tqdm_notebook
import pickle as pkl
import shutil

**Check GPU and setup device**

In [None]:
# This function checks whether GPU is available. If yes, sets uo device = 'cuda:0' (GPU), otherwise device = 'cpu'
train_on_gpu = torch.cuda.is_available()
if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')
device = torch.device("cuda:0" if train_on_gpu else "cpu")
print(device)

**Mount drive**

In [None]:
drive.mount('/content/drive')

%ls

#Define a directory to export files
drive_dir = ''

#Directory to save models' data
plots_folder = '/Plots/'

**Configure environment for Kaggle dataset**

In [None]:
os.environ['KAGGLE_USERNAME'] = ''
os.environ['KAGGLE_KEY'] = ''

**Download dataset**

In [None]:
!kaggle datasets download -d bulentsiyah/semantic-drone-dataset

In [None]:
#Unzip file
!unzip -q '/content/semantic-drone-dataset.zip'

## Define useful functions

**Functions for data inspection** \\
This function plots the colors used for plotting of each class in the RGB masks.

In [None]:
def print_label_colors():
  n = 1
  for idx in range(len(labels)):
      plt.subplot(6,4,n)
      (r,g,b)=labels.iloc[idx].values[1:]
      lab = np.array([[[r,g,b],[r,g,b],[r,g,b],[r,g,b]]])
      plt.title(f'{idx}: {labels["name"].iloc[idx]}', fontsize = 10)
      plt.imshow(lab)
      plt.axis('off')

      n += 1

The following functions can be used to inspect the data: \\
**0.** Computes the minimum and maximum class in all the masks of a dataset.\\

**1.** Counts the occurrence of all classes in the dataset and the percentage of a each class present in each image.\\

**2.** Creates a histogram with the distribution of pixels per class in the whole dataset.\\

**3.** Makes an histogram with the distribution of class percentage in the dataset. \\




In [None]:
#-------------------------------------------------------------------------------
# 0. This functions counts occurrences of classes in the dataset
#Use paths dataframe to only open masks

def min_max(paths_df):
  #One array to store cumulative frequency of each class
  tot_count = np.zeros(23)
  #List of arrays to store individual frequencies
  count_per_image = np.zeros((len(paths_df), 23))
  max = []
  min = []
  for i in range(len(paths_df)):
    mask = np.array(Image.open(paths_df['Mask'].iloc[i]))
    max += [np.max(mask)]
    min += [np.min(mask)]

  min = np.min(min)
  max = np.max(max)

  return min, max

#-------------------------------------------------------------------------------
# 1. This functions counts occurrences of classes in the dataset
#Use paths dataframe to only open masks

def count_classes(paths_df):
  #One array to store cumulative frequency of each class
  tot_count = np.zeros(23)
  #List of arrays to store individual frequencies
  count_per_image = np.zeros((len(paths_df), 23))
  max = []
  min = []
  for i in range(len(paths_df)):
    mask = np.array(Image.open(paths_df['Mask'].iloc[i]))
    max += [np.max(mask)]
    min += [np.min(mask)]

    # Count occurrences of each class in the mask
    mask_count = np.bincount(mask.flatten(), minlength = 23)   #i = index, 1 = mask
    tot_count += mask_count
    count_per_class[i] = mask_count

  return tot_count, count_per_class

#-------------------------------------------------------------------------------
# 2. This function makes a histogram for class frequencies

def histogram(count,len, ds_type = ''):
  plt.figure(figsize = (10,5))
  r = range(23)
  width = 0.5

  plt.bar(r, count*100/(4000*6000*len), color = 'b',
          width = width, edgecolor = 'black',
          )
  #plt.ylim(0,101)
  plt.xlabel("Class", fontsize = 12)
  plt.ylabel("% of pixels")
  plt.title(f"% of pixels per class in {ds_type} set")

  plt.xticks(r, labels.name[:-1], rotation = 90)
  plt.grid(alpha = 0.5)

  plt.show()

#-------------------------------------------------------------------------------
# 3. Make three histograms for different classes
def class_hist(count,len):
  plt.subplots(8,3,figsize = (20,30))

  n = 1
  #Create one histogram per class
  for i in range(23):
    plt.subplot(8,3,n)
    plt.hist(count[:,i]*100/(4000*6000), bins = 100, range = (-.5,100.5))
    plt.xlabel('% pixels')
    plt.ylabel('N. images')
    plt.ylim(0,len)
    plt.xlim(-.5,100.5)
    plt.grid(alpha = 0.5)
    plt.title(f'Class: {labels.name.iloc[i]}')

    n += 1

  plt.tight_layout()
  plt.show()

**Functions for the models** \\
This function takes as input the output of the model and applies a softmax2d activation function to convert the output to probabilities. For each pixel, the segmentation mask contains the index of the layer with highest probability (which also corresponds to class 0, 1, or 2). The function returns the mask, moved the cpu, as a numpy array and as a tensor.

In [None]:
# This function transforms into one channel the output of the model by taking the maximum value per pixel among all channels
def output_to_mask(output):
  output = output.squeeze(0)
  softmax = nn.Softmax2d()
  output = softmax(output)
  segm = np.argmax(output.cpu().detach().numpy(), axis = 0, keepdims = True)
  segm = segm.squeeze(0)
  segm_tensor = torch.tensor(segm).to(device)
  return segm, segm_tensor

This function can be used to define a model. \
Takes as input the encoder name, the architecture to use (UNet or DeepLab), whether to freeze the encoder layers and whether augmented data is being used.

In [None]:
def define_model(encoder, unet = True, freeze = False, use_augmented = use_augmented_data):
  if unet:
    model = smp.Unet(
            encoder_name = encoder,           # choose encoder between mobilenet_v2 or efficientnet-b2
            encoder_weights = "imagenet",     # use `imagenet` pre-trained weights for encoder initialization
            encoder_depth = 5,
            in_channels = 3,                  # model input channels (1 for gray-scale images, 3 for RGB)
            classes = 23,                     # model output channels (number of classes in your dataset)
            activation = 'sigmoid'
            )

    #Variable for DeepLab model
    dl = ''

  else:
    model = smp.DeepLabV3(
          encoder_name = encoder,           # choose encoder
          encoder_weights = "imagenet",     # use `imagenet` pre-trained weights for encoder initialization
          encoder_depth = 3,
          in_channels = 3,                  # model input channels (1 for gray-scale images, 3 for RGB)
          classes = 23,                     # model output channels (number of classes in your dataset)
          activation = 'sigmoid'
          )
    #Variable for DeepLab model
    dl = 'dl'

  f = ''
  if freeze:
    f = 'f'
    ## Iteration to freeze first layers and only train the last ones
    for key, value in dict(model.named_children()).items():
      if "encoder" in key:
        for param in value.parameters():
            param.requires_grad = False
      else:
        for param in value.parameters():
            param.requires_grad = True

  a = ''
  if use_augmented:
    a = 'a'

  model_name = encoder + f + dl + a

  return model, model_name

This function is used to load a trained model.

In [None]:
def load_model(model, model_name):
  checkpoint = torch.load(drive_dir + '/Models/'+ 'AerialSegmentation_' + model_name + '_best.pth', map_location = device)
  model.load_state_dict(checkpoint['model'])
  model.to(device)

  print('Loaded model:', model_name)
  return model

# Prepare dataset and dataloader

In [None]:
# Define data directories
images_dir = '/content/dataset/semantic_drone_dataset/original_images'
labels_dir = '/content/dataset/semantic_drone_dataset/label_images_semantic'
rgb_labels_dir = '/content/RGB_color_image_masks/RGB_color_image_masks'


#Create a dataset containing paths for images, masks and rgb masks
paths = {'Image': [], 'Mask': [], 'RGB Mask': []}
paths['Image'] += sorted(glob.glob(images_dir + '/*'))
paths['Mask'] += sorted(glob.glob(labels_dir + '/*'))
paths['RGB Mask'] += sorted(glob.glob(rgb_labels_dir + '/*'))

paths = pd.DataFrame(paths)

**Split data into train, test, validation**

In [None]:
#First split create test set
train_dir, test_dir = train_test_split(paths, test_size = 0.10, random_state = 42, shuffle = True)

#Second create train and validation sets
train_dir, val_dir = train_test_split(train_dir, test_size = 0.15, random_state = 42, shuffle = True)

print('Size of train set: %i' % len(train_dir))
print('Size of validation set: %i' % len(val_dir))
print('Size of test set: %i' % len(test_dir))

## Dataset

**Dataset class** \\
* This class function receives as root directory of images and labels, possible transformations and whether to open the corresponding rgb mask.
* It contains a function which returns the length of the dataset and a function which returns the image, mask at a certain index.

In [None]:
#--Define dataset class--
class Dataset(torch.utils.data.Dataset):

  # Initialize dataset class, default transforms = None, default val = False
  def __init__(self, root_dir, transforms = None, rgb = False):
        #Save all image paths and transforms
        self.root_dir = root_dir
        self.transform = transforms
        self.rgb = rgb

  def __len__(self):
      #Return the number of samples in the dataset
      return len(self.root_dir)

  def __getitem__(self, idx):
      #Open image at index idx following folder paths
      img = np.array(Image.open(self.root_dir['Image'].iloc[idx]))
      #Open corresponding mask
      mask = np.array(Image.open(self.root_dir['Mask'].iloc[idx]))

      #If any transformation is passed to the Dataset, apply transformations on image and mask
      if self.transform != None:
        #First normalize the image (not the mask)
        img = normalize(image = img)['image']
        #Apply transformations
        transf = self.transform(image = img, mask = mask)
        img = transf['image']
        mask = transf['mask']

      if self.rgb:
        #Open RGB mask
        rgb_mask = np.array(Image.open(self.root_dir['RGB Mask'].iloc[idx]))
        return img, mask, rgb_mask

      return img, mask

# Explore data

**Visualize sample images**

In [None]:
#Import .csv file with labels and corresponding RGB colors
labels = pd.read_csv('class_dict_seg.csv')
labels.columns = ['name','r','g','b']

In [None]:
print_label_colors()

In [None]:
#Visualize original images and corresponding RGB masks
if print_orig_images:
  i = 0
  for img, mask, rgb in Dataset(train_dir, rgb = True):

    fig = plt.subplots(1,2, figsize = (10,5))
    plt.subplot(1,2,1)
    plt.imshow(img)

    plt.subplot(1,2,2)
    plt.imshow(rgb)

    plt.tight_layout()

    i += 1

    if i == 5:
      break

**Compute dataset statistics** \\


In [None]:
if compute_stats:
  #Find minimum and maximum class in the dataset
  min_lab, max_lab = min_max(paths)
  print('Lowest class: %i' % min_lab)
  print('Highest class: %i' % max_lab)

  #Count pixel occurrences and percentages in the datasets
  tot_train, img_count_train = count_classes(train_dir)
  tot_test, img_count_test = count_classes(test_dir)
  tot_val, img_count_val = count_classes(val_dir)

In [None]:
if compute_stats:
  #Visualize distribution of pixels per class in each dataset
  histogram(tot_train, len(train_dir), ds_type = 'train')

  histogram(tot_test, len(test_dir), ds_type = 'test')

  histogram(tot_val, len(val_dir), ds_type = 'validation')

In [None]:
if compute_stats:
  #Visualize the distribution of percentage of each class in the dataset
  class_hist(img_count_train, len(test_dir))

# Data augmentation

💀 **Dangerous function: do not set to 'True'** 💀 \\

* When ext_augment = True, this function will create a new training set with
augmented images.
* Seven augmentations are applied to enlargen the dataset. The first transformations are geometric and are applied to both image and mask; the second set of transformations are color-related so they should be applied only to images. \\
* In this function, augmented images are saved in the original images folder. \\

🕓 Estimated running time is 1 hour and 45 minutes.

In [None]:
if augment:
  n = 1

  #List of classes to augment excluding paved areas and grass
  classes_to_augment = np.delete(labels_above_10, [1,3])

  #7 transformations to apply to images
  #List of geometric transformations to apply to images and masks
  transforms_all = A.Compose([A.Compose([A.RandomCrop(width= 5400, height= 3600, p = 1),
                               A.HorizontalFlip(p=0.5),
                               A.VerticalFlip(p=0.5)], additional_targets = {'mask1': 'mask', 'mask2': 'image'}),
                    A.Compose([A.HorizontalFlip(p=1),
                               A.VerticalFlip(p=1)], additional_targets = {'mask1': 'mask', 'mask2': 'image'}),
                    A.Compose(A.VerticalFlip(p=1), additional_targets = {'mask1': 'mask', 'mask2': 'image'})])

  #List of color transformations to apply to images
  transforms_img = A.Compose([A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.5, p =1),
                              A.HueSaturationValue(hue_shift_limit=0, sat_shift_limit=20, val_shift_limit = 10, p =1),
                              A.GaussianBlur(blur_limit = (9,19), p =1),
                              A.Compose([A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.5, p =1),
                                         A.HueSaturationValue(hue_shift_limit=0, sat_shift_limit=20, val_shift_limit = 10, p =1),
                                         A.GaussianBlur(blur_limit = (9,19), p =1)])])

  #Source directory where training images to be augmented are found
  to_augment = Dataset(train_dir, rgb = True)

  #Create local directory to store new data
  os.mkdir(os.path.join('/content/augmented_data/'))
  os.mkdir('/content/augmented_data/images')
  os.mkdir('/content/augmented_data/masks')
  os.mkdir('/content/augmented_data/rgb_masks')

  #progress bar
  tqdm_images = tqdm_notebook(total = to_augment.__len__(), desc ='Augmented')

  for idx in range(len(to_augment)):

    #Open image and masks
    image, mask, rgb_mask = to_augment[idx]
    #Which classes are present in the image
    classes_mask = np.unique(mask)

    #Apply augmentations if any of the classes to augment is present in the image
    if any(item in classes_to_augment for item in classes_mask):
      for t in transforms_all:
        #Apply geometric transformations
        t = transforms_all(image = image, mask1 = mask, mask2 = rgb_mask)
        img = tf.to_pil_image(t['image'])
        msk = tf.to_pil_image(t['mask1'])
        rgb_msk = tf.to_pil_image(t['mask2'])
        img.save(f'/content/augmented_data/images/{n}.jpg')
        msk.save(f'/content/augmented_data/masks/{n}.png')
        rgb_msk.save(f'/content/augmented_data/rgb_masks/{n}.png')

        n += 1

      mask = Image.fromarray(mask)
      rgb_mask = Image.fromarray(rgb_mask)
      for t in transforms_img:
        #Apply color transformations
        t = transforms_img(image = image)
        img = tf.to_pil_image(t['image'])
        img.save(f'/content/augmented_data/images/{n}.jpg')
        mask.save(f'/content/augmented_data/masks/{n}.png')
        rgb_mask.save(f'/content/augmented_data/rgb_masks/{n}.png')

        n += 1

    tqdm_images.update(1)

  tqdm_images.close()

  #Save externally zip file with augmented data
  shutil.make_archive(f'{drive_dir}/augmented_data', 'zip', '/content/augmented_data')

**Download and add augmented data to train set**

In [None]:
if use_augmented_data:
  if load_augmented_data:
    !gdown '1jZyemsqNhfZVH-FgD28-1qhi3H5AhQOH'
    !unzip -q '/content/augmented_data.zip' -d '/content/augmented_data/'

  augm_images_dir = '/content/augmented_data/images'
  augm_labels_dir = '/content/augmented_data/masks'
  augm_rgb_labels_dir = '/content/augmented_data/rgb_masks'

  #Extract paths of augmented images and masks and put everything in a dataframe
  augm_paths = {'Image': [], 'Mask': [], 'RGB Mask': []}
  augm_paths['Image'] += sorted(glob.glob(augm_images_dir + '/*'))
  augm_paths['Mask'] += sorted(glob.glob(augm_labels_dir + '/*'))
  augm_paths['RGB Mask'] += sorted(glob.glob(augm_rgb_labels_dir + '/*'))
  augm_paths = pd.DataFrame(augm_paths)

  #Add augmented data paths to the train paths
  train_dir = pd.concat([train_dir, augm_paths])
  train_dir = train_dir.sample(frac = 1, random_state = 42, axis = 0, ignore_index = True)
  print('New size of training set: %i' % len(train_dir))

In [None]:
#Display class distribution in augmented train set
if use_augmented_data:
  if compute_stats:
    tot_augm, img_count_augm = count_classes(train_dir)

    histogram(tot_augm, len(train_dir), ds_type = 'augmented train')

# UNet model

**Define segmentation model** \\
This function defines the segmentation model by using the smp library where:
* encoder is pretrained MobileNet V2 or EfficientNet B2
* encoder used pretrained weights on ImageNet
* output classes of the net is 23

In [None]:
# Define pre-trained classification models to use as encoders
encoder1 = "mobilenet_v2"
encoder2 = "efficientnet-b2"

encoder = encoder1

In [None]:
#Define the model to use
model, model_name = define_model(encoder, unet = True, freeze = False)

# Net is moved to device (can be cpu or gpu/cuda)
model.to(device)

# Dataloaders

**Define transformations** \\
Normalization is defined separately because it is only applied to the image and not to the mask.

In [None]:
#--Transformations--
#This function returns a transformation based on the encoder specified
def transform_resize(encoder):
  if encoder == encoder1:
    # Resized to 224x224 pixels and converted to tensors for MobileNet
    transform = A.Compose([
                            A.Resize(224, 224, interpolation = cv2.INTER_NEAREST),
                            ToTensorV2()
                            ],is_check_shapes=False)

  else:
    # Resized to 288x288 pixels and converted to tensors for EfficientNet
    transform = A.Compose([
                            A.Resize(288, 288, interpolation = cv2.INTER_NEAREST),
                            ToTensorV2()
                            ],is_check_shapes=False)

  return transform

In [None]:
#Assign transformation
transform = transform_resize(encoder)

#--Normalize--
# Data is normalized for the pretrained model
normalize = A.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])

**Datasets**

In [None]:
#Import training, testing and validation datasets to be used for training the model (apply transformations)
trainset = Dataset(train_dir, transforms = transform, rgb = False)
valset = Dataset(val_dir, transforms = transform, rgb = False)
testset = Dataset(test_dir, transforms = transform, rgb = False)

**Create dataloaders**

In [None]:
#--Create dataloaders--
batch_size = 32

trainloader = DataLoader(trainset,
                         batch_size = batch_size,
                         shuffle = True,
                         drop_last = True,
                         num_workers = 0)
testloader = DataLoader(testset,
                        batch_size = 1,
                        shuffle = False,
                        drop_last = False,
                        num_workers = 0)
valloader = DataLoader(valset,
                        batch_size = 1,
                        shuffle = False,
                        drop_last = False,
                        num_workers = 0)

# Train and evaluate model

**Define function to validate the model** \\
This function is used to validate the model by computing micro and macro IoU score and Dice score for segmentation image.

In [None]:
# Create validation routine

def validate(net, valloader, device, per_class = False):

    # Get final scores for micro and macro Dice and IoU scores
    iou_score_micro = MulticlassJaccardIndex(num_classes= 23, average = 'micro')
    dice_score_micro = MulticlassF1Score(num_classes= 23, average = 'micro')

    iou_score_macro = MulticlassJaccardIndex(num_classes= 23, average = 'macro')
    dice_score_macro = MulticlassF1Score(num_classes= 23, average = 'macro')

    # Move metrics to device
    iou_score_micro = iou_score_micro.to(device)
    dice_score_micro = dice_score_micro.to(device)

    iou_score_macro = iou_score_macro.to(device)
    dice_score_macro = dice_score_macro.to(device)

    #Compute Micro and Macro Dice and IoU scores per class
    if per_class:
      iou_score_c = MulticlassJaccardIndex(num_classes= 23, average = None)
      dice_score_c = MulticlassF1Score(num_classes= 23, average = None)

      iou_score_c = iou_score_c.to(device)
      dice_score_c = dice_score_c.to(device)

    # Set network in eval mode
    net.eval()

    n=0

    tqdm_val = tqdm_notebook(total= len(valloader), desc='Validation progress', unit='Image')

    # At the end of epoch, validate model
    for inp, mask in valloader:

        # Move batch to gpu
        inp = inp.to(device)
        mask = mask.to(device)

        # Get output mask
        with torch.no_grad():
            outmask = net(inp)

        # Create a segmentation mask as numpy array and one as tensor
        segm_mask, segm_mask_tensor = output_to_mask(outmask)


        # Update metrics for each item
        iou_score_micro.update(segm_mask_tensor, mask.squeeze(0))
        dice_score_micro.update(segm_mask_tensor, mask.squeeze(0))
        iou_score_macro.update(segm_mask_tensor, mask.squeeze(0))
        dice_score_macro.update(segm_mask_tensor, mask.squeeze(0))

        if per_class:
          iou_score_c.update(segm_mask_tensor, mask.squeeze(0))
          dice_score_c.update(segm_mask_tensor, mask.squeeze(0))

        tqdm_val.update(1)
        n += 1

    # Compute metrics for segmentation tensor vs. original mask
    iou_score_micro = iou_score_micro.compute().cpu().numpy()
    dice_score_micro = dice_score_micro.compute().cpu().numpy()
    iou_score_macro = iou_score_macro.compute().cpu().numpy()
    dice_score_macro = dice_score_macro.compute().cpu().numpy()

    if per_class:
      iou_score_c = iou_score_c.compute().cpu().numpy()
      dice_score_c = dice_score_c.compute().cpu().numpy()
      return iou_score_micro, dice_score_micro, iou_score_macro, dice_score_macro, iou_score_c, dice_score_c

    # set network in training mode
    net.train()

    return iou_score_micro, dice_score_micro, iou_score_macro, dice_score_macro

In [None]:
#This function saves externally the validation scores, loss and current epoch at the end of each training-validation epoch
#Takes as input
#par_dir = parent directory
#exp_name = name of the experiment

def save_data(par_dir, exp_name):
  model_dict = {'dice_micro': micro_dice_list,
                'dice_macro': macro_dice_list,
                'iou_micro': micro_iou_list,
                'iou_macro': macro_iou_list,
                'loss': loss_list,
                'epochs': cur_epoch-1}

  for key in model_dict.keys():
    if key != 'epochs':
      elem = model_dict[key]
      for i in range(len(elem)):
        if torch.is_tensor(elem[i]):
          elem[i] = elem[i].cpu().numpy()

  with open(drive_dir + par_dir + exp_name +'_data.pkl', 'wb') as f:
    pkl.dump(model_dict, f)

## Training

###**Setup tensorboard**
* Define name of the experiment to store model training.
* Launch tensorboard

In [None]:
experiment_name = 'AerialSegmentation_' + model_name
print('Experiment name: %s' % experiment_name)

In [None]:
import shutil

if train:
  #%load_ext tensorboard
  %reload_ext tensorboard
  %tensorboard --logdir={experiment_name}

###**Launch training**
* When train = True, this cells trains the network. \\
* The loss function used for training the Cross Entropy Loss. The model is trained and optimized only based on the segmentation prediction. \\
* Validation is run at the end of each epoch and the model with best IoU and Dice score is saved as best model.


In [None]:
if train:

  cross_entropy = nn.CrossEntropyLoss()
  model = model.to(device)
  learning_rate = 0.001

  # define Adam optimizer
  optimizer = torch.optim.Adam(params=model.parameters(), lr= learning_rate)

  #Initialize list to store loss values
  loss_list = []

  # define summary writer
  writer = SummaryWriter(experiment_name)

  # initialize iteration number
  n_iter = 0

  # define best validation value
  best_val_dice = 0
  best_val_iou = 0

  #Lists to store dice and iou scores
  micro_dice_list = []
  micro_iou_list = []
  macro_dice_list = []
  macro_iou_list = []

  # number of epoch
  n_epoch = 30
  total_batches = len(trainset)//batch_size

  #Progress bar for epochs
  tqdm_epochs = tqdm_notebook(total=n_epoch, desc='Epochs')

  for cur_epoch in range(n_epoch):
      # plot current epoch
      writer.add_scalar("epoch", cur_epoch, n_iter)

      # Progress bar for batches
      tqdm_batches = tqdm_notebook(total= total_batches, desc=f'Epoch {cur_epoch}')

      for inp, mask in trainloader:
          # move batch to gpu
          inp = inp.to(device)
          mask = mask.to(device)

          # reset gradients
          optimizer.zero_grad()
          # get output
          outmask = model(inp)

          # compute loss
          loss = nn.CrossEntropyLoss().to(device)
          loss = loss(outmask, mask.long())
          loss.backward()

          # update weights
          optimizer.step()

          #Plot
          writer.add_scalar("Loss",loss.item(), n_iter)

          #Update progress bar
          tqdm_batches.update(1)
          n_iter += 1

      loss_list += [loss.item()]

      # At the end, validate model
      # Validate the model with IoU, Dice, micro and macro accuracy
      cur_iou_micro, cur_dice_micro, cur_iou_macro, cur_dice_macro = validate(model, valloader, device)

      #Add current scores to the lists
      micro_dice_list += [cur_dice_micro]
      micro_iou_list += [cur_iou_micro]
      macro_dice_list += [cur_dice_macro]
      macro_iou_list += [cur_iou_macro]

      # plot validation scores
      writer.add_scalar("Dice", cur_dice_micro, cur_epoch)
      writer.add_scalar("IoU", cur_iou_micro, cur_epoch)

      #Save current data to external file
      save_data(plots_folder, experiment_name)

      # Check if it is the best model so far
      if (best_val_dice is None or cur_dice_micro >= best_val_dice) and (best_val_iou is None or cur_iou_micro >= best_val_iou):
          # define new best val
          best_val_dice = cur_dice_micro
          best_val_iou = cur_iou_micro

          # save current model as best
          torch.save({
              'model': model.state_dict(),
              'opt': optimizer.state_dict(),
              'epoch': cur_epoch
          }, f'{drive_dir}/Models/' + experiment_name + '_best.pth')

      # save last model
      torch.save({
          'model': model.state_dict(),
          'opt': optimizer.state_dict(),
          'epoch': cur_epoch
      },f'{drive_dir}/Models/' + experiment_name + '_last.pth')

      tqdm_batches.close()
      tqdm_epochs.update(1)

  tqdm_epochs.close()


###**Compare training history**

In [None]:
#Create a dictionary to store training history of each dataset
models_dict = {encoder1 : dict(), encoder2: dict(), encoder2 +'f': dict(), encoder2 + 'a': dict(), encoder1 + 'dl': dict(), encoder2 + 'fdl': dict()}

In [None]:
#Import traning history of all datasets
for key in models_dict.keys():
    pkl_file = 'AerialSegmentation_' + key + '_data.pkl'
    with open(drive_dir + plots_folder + pkl_file, 'rb') as file:
      models_dict[key] = pkl.load(file)

In [None]:
#Visualize keys of a dictionary
models_dict[encoder1].keys()

In [None]:
#Labels for legend
model_labels = ['UNet-MobileNet',
          'UNet-EfficientNet',
          'UNet-EfficientNet-f',
          'UNet-EfficientNet-a',
          'DL-MobileNet',
          'DL-EfficientNet-f']

In [None]:
#Loss function for six models
loss_fig = plt.figure(figsize = (10,5))
for key in models_dict.keys():
  data = models_dict[key]['loss']
  plt.plot(data, label = key)
  print('Minimum loss of ' + key + ': %.3f' % (np.min(data)))

plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.xticks(range(30), range(1,31))
plt.title('Cross-Entropy Loss')
plt.grid(alpha = 0.5)
plt.legend(model_labels)
plt.show()

In [None]:
#Micro dice score for six models
fig = plt.figure(figsize = (10,5))
for key in models_dict.keys():
  data = models_dict[key]['dice_micro']
  plt.plot(data, label = key)
  print('Max Micro Dice Score of ' + key + ': %.3f' % np.max(data) + ' at epoch: %i' % (np.argmax(data)+1))

plt.xlabel('Epoch')
plt.ylabel('Dice score')
plt.xticks(range(30), range(1,31))
plt.title('Micro Dice score')
plt.grid(alpha = 0.5)
plt.legend(model_labels,loc = 'lower right')
plt.show()

In [None]:
#Macro Dice score for six models
fig2 = plt.figure(figsize = (10,5))
for key in models_dict.keys():
  data = models_dict[key]['dice_macro']
  plt.plot(data, label = key)
  print('Max Macro Dice Score of ' + key + ': %.3f' % np.max(data) + ' at epoch: %i' % (np.argmax(data)+1))

plt.xlabel('Epoch')
plt.ylabel('Dice score')
plt.xticks(range(30), range(1,31))
plt.title('Macro Dice score')
plt.grid(alpha = 0.5)
plt.legend(model_labels, loc = 'lower right')
plt.show()

In [None]:
#Micro IoU score for six models
fig3 = plt.figure(figsize = (10,5))
for key in models_dict.keys():
  data = models_dict[key]['iou_micro']
  plt.plot(data, label = key)
  print('Max Micro Iou Score of ' + key + ': %.3f' % np.max(data) + ' at epoch: %i' % (np.argmax(data)+1))

plt.xlabel('Epoch')
plt.ylabel('IoU score')
plt.xticks(range(30), range(1,31))
plt.title('Micro IoU score')
plt.grid(alpha = 0.5)
plt.legend(model_labels,loc = 'lower right')
plt.show()

In [None]:
#Macro IoU score for six models
fig4 = plt.figure(figsize = (10,5))
for key in models_dict.keys():
  data = models_dict[key]['iou_macro']
  plt.plot(data, label = key)
  print('Max Macro Iou Score of ' + key + ': %.3f' % np.max(data) + ' at epoch: %i' % (np.argmax(data)+1))

plt.xlabel('Epoch')
plt.ylabel('IoU score')
plt.xticks(range(30), range(1,31))
plt.title('Macro IoU score')
plt.grid(alpha = 0.5)
plt.legend(model_labels, loc = 'lower right')
plt.show()

# Test model on test set

In this section the accuracy scores are computed for the test set and store in a common dataframe.

In [None]:
if create_new_df:
  #Create a dataframe to store models values
  test_metrics = pd.DataFrame(columns = ['model','iou_micro', 'dice_micro', 'iou_macro', 'dice_macro','iou_class', 'dice_class'])

else:
  #Load metrics dataset
  test_metrics = pd.read_csv(drive_dir + '/test_metrics.csv')

In [None]:
# Import a model to evaluate
import_model = True
encoder = encoder1

if import_model:
  model_structure, model_name = define_model(encoder, unet = True)
  model = load_model(model_structure, model_name)

**Compute metrics for test set** \\
The validation function is also used to validate the model with the test set. return_labels = True to compute confusion matrix.

In [None]:
#Evaluate the model and add it to the dataframe if it is not present yet
if model_name not in test_metrics['model'].tolist():
  row_val = pd.DataFrame([model_name] + list(validate(model, testloader, device, per_class = True))).T
  row_val.columns = test_metrics.columns
  test_metrics = pd.concat([test_metrics, row_val], ignore_index = True)

  #Export the dataframe
  test_metrics.to_csv(drive_dir + '/test_metrics.csv', index = False)

In [None]:
#Accuracy scores for all models
for idx in test_metrics.index:
  row = test_metrics.iloc[idx]
  print('Results for experiment: %s' % row['model'])
  print('Micro dice score on test set: %.3f' % row['dice_micro'])
  print('Macro dice score on test set: %.3f' % row['dice_macro'])
  print('Micro iou score on test set: %.3f' % row['iou_micro'])
  print('Macro iou score on test set: %.3f \n' % row['iou_macro'])

In [None]:
#Labels for barplots
model_labels2 = ['DL-MobileNet',
                  'UNet-MobileNet',
                  'UNet-EfficientNet',
                  'UNet-EfficientNet-f',
                  'UNet-EfficientNet-a',
                  'DL-EfficientNet-f']

#Function to make a barplot for a per-class variable
def barplot(data, column, variable_name):
  #Number of bins (one per class)
  r = np.arange(23)
  width = 0.12

  #Colors to plot each model
  colors = ['purple', 'blue', 'orange', 'green', 'red',  'yellow']

  plt.figure(figsize = (15,6))
  for i in range(len(data['model'])):
    data_col = data[column][i]
    plt.bar(r + i*width, data_col,
            width = width, color = colors[i],
            edgecolor = 'black',
            label=data['model'][i])

  plt.xlabel("Class", fontsize = 12)
  plt.ylabel(variable_name)
  plt.title(variable_name + " per class")

  plt.ylim(0,1.01)
  plt.yticks(np.arange(0,1.1,0.1))
  plt.xticks(r + ((len(data)*width)-width)/2, labels.name[:-1], rotation = 90)
  plt.legend(model_labels2)
  plt.grid(alpha = 0.5)

  plt.show()

In [None]:
#--IoU per class--
barplot(test_metrics, 'iou_class', 'IoU')

In [None]:
#--Dice score per class--
barplot(test_metrics, 'dice_class', 'Dice score')

**Visualize test results** \\

In [None]:
#This function is used to convert the 2d mask into rgb mask using the labels-colors .csv file
def convert_mask_to_rgb(mask, colors):

  mask_classes = np.unique(mask)
  rgb_mask_or = np.array([mask, mask, mask])
  rgb_mask = np.array([mask, mask, mask])
  rgb_mask_or = np.transpose(rgb_mask_or, (1,2,0))
  rgb_mask = np.transpose(rgb_mask, (1,2,0))

  for cl in mask_classes:
    rgb_colors = [colors['r'].iloc[cl], colors['g'].iloc[cl], colors['b'].iloc[cl]]
    rgb_mask = np.where(rgb_mask_or == cl, rgb_colors, rgb_mask)

  return rgb_mask

In [None]:
if plot_predictions:
  i = 0

  for img, msk in testloader:
    #Set model to evaluation mode
    model.eval()
    #Predict output mask and convert output to np.array
    outmask = model(img.to(device))
    segm_mask, _ = output_to_mask(outmask)

    #Convert 2D mask to RGB mask, increase size and restore original aspect of the image
    colored_mask = convert_mask_to_rgb(segm_mask, labels)
    colored_mask = np.array(Image.fromarray(colored_mask.astype(np.uint8)).resize((768, 512), resample=Image.NEAREST))

    #Open original image (not normalized) and rgb mask (not normalized or resized)
    or_img = np.array(Image.open(test_dir['Image'].iloc[i]).resize((768, 512), resample=Image.NEAREST))
    or_mask = np.array(Image.open(test_dir['RGB Mask'].iloc[i]).resize((768, 512), resample=Image.NEAREST))

    #Plot original image, rgb mask and predicted rgb mask
    plt.subplots(1, 3, figsize = (20,5))

    plt.subplot(1,3,1)
    plt.imshow(or_img)
    plt.title('Original Image')

    plt.subplot(1,3,2)
    plt.imshow(or_mask)
    plt.title('Original Mask')

    plt.subplot(1,3,3)
    plt.imshow(colored_mask)
    plt.title('Predicted Mask')


    i += 1

    if i == 3:
      break

**Function to count predictions made for each class**

In [None]:
#This function works per image
def predicted_classes(mask, pred_mask):
  #Create a 2D array to store predictions for each class
  predictions = np.zeros((23,23))
  #Store predictions made for each label
  for label in range(23):
    #Find true location of the label inside the ground truth image
    idx = np.argwhere(mask == label)
    idx_mask = []
    for elem in idx:
      row, col = elem
      pixel_value = pred_mask[row,col]
      idx_mask.append(pixel_value)
    #Count predicted classes for that label and store in the 2D array
    predictions[label] = np.bincount(idx_mask, minlength = 23)

  return predictions

In [None]:
#Array to store predicted classes for each label in the whole dataset
total_predictions = np.zeros((23,23))

for img, msk in testloader:
  #Set model to evaluation mode
  model.eval()
  #Predict output mask and convert output to np.array
  outmask = model(img.to(device))
  segm_mask, _ = output_to_mask(outmask)

  #Add predictions for each image to the total predictions
  msk = np.array(msk.squeeze(0))
  total_predictions += predicted_classes(msk, segm_mask)

In [None]:
#Divide the predictions made for each class by the total number of class pixels/predictions
for i in range(total_predictions.shape[0]):
  total_predictions[i] /= np.sum(total_predictions[i])

In [None]:
#Visualize predictions matrix
plt.figure(figsize = (6,6))
im = plt.imshow(total_predictions)

plt.xticks(range(23), labels['name'][:-1], rotation = 90)
plt.yticks(range(23), labels['name'][:-1])

#Highlight 'paved_area' class
plt.axvline(1, c = 'r')

#Highlight 'paved_area' class
plt.axvline(3, c = 'pink')


plt.colorbar(im, label = 'Fraction', fraction = 0.046, pad = 0.04)
plt.clim(0,1)

plt.xlabel('Predicted')
plt.ylabel('True')

plt.title('Fraction of true vs. predicted classes (DL-EfficientNet-f)')
plt.grid(alpha = 0.5)

plt.show()

In [None]:
#Classes that are predicted as paved area or grass in more than 10% of cases
labels_above_10 = np.argwhere(((total_predictions[:,1] >= 0.1)|(total_predictions[:,3] >= 0.1)))[:,0]
print(f'Classes predicted as paved areas or grass more than 10% of the time are: {list(labels_above_10)}')

#Land drone

This section is used for findind available landing area and a target landing spot.

In [None]:
# Import model to use (set to DL-MobileNet)
import_model = True
encoder = encoder1

if import_model:
  model_structure, model_name = define_model(encoder, unet = False)
  model = load_model(model_structure, model_name)

In [None]:
#--Define transformations for the encoder--
transform = transform_resize(encoder)

#--Normalize--
# Data is normalized for the pretrained model
normalize = A.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])

In [None]:
#Function to find possible landing areas on certain classes (+50% margin)
#Arguments:
#mask = Segmented mask to inspect
#drone_size = Size of the drone to land
#label_list = Classes where drone can land
def convolution(mask, drone_size = (40,40), label_list = [1,3]):
  h = int(drone_size[0] + 0.5*drone_size[0])
  w = int(drone_size[1] + 0.5*drone_size[1])
  kernel = np.ones((h,w))
  result = np.zeros(mask.shape)
  for lab in label_list:
    binary_mask = np.where(mask == lab, 1, 0)
    conv_res = convolve2d(binary_mask, kernel, mode = 'same')
    result = np.where(conv_res == h*w, 1, result)

  return result

In [None]:
#Randomly choose a pair of coordinates from the available landing area
def random_point_selection(array):
  target_idx = np.argwhere(array == 1)
  rand_idx = np.random.randint(0, len(target_idx))
  target_spot = target_idx[rand_idx]
  return target_spot

In [None]:
#Choose the middle pair of coordinates from the available landing area
def middle_point_selection(array):
    target_idx = np.argwhere(array == 1)
    middle_idx = len(target_idx)//2
    target_spot = target_idx[middle_idx]
    return target_spot

In [None]:
#--Function to compute the landing area and landing spot from segmentation result, and to draw a rectangle around the landing spot on the original image and mask
#Pass image and masks as PIL image
def landing_spot(img, msk, rgb_msk, net, target_classes = [1,3], method ='rp', drone_size = (40, 40)):
  #Create a resized version of original image, mask and RGB mask
  img_r = Image.fromarray(img).resize((768, 512), resample=Image.NEAREST)
  rgb_msk_r = Image.fromarray(rgb_msk).resize((768, 512), resample=Image.NEAREST)
  msk_r = Image.fromarray(msk).resize((768, 512), resample=Image.NEAREST)

  #Apply transformations and normalization for model
  img_n = normalize(image = img)['image']
  img_t = transform(image = img_n)['image']

  #Set model to evaluation mode and predict output
  net.eval()
  output = net(img_t.unsqueeze(0))
  #Transform output to mask
  segm_mask, _ = output_to_mask(output)

  #Upsample mask
  segm_mask = np.array(Image.fromarray(segm_mask.astype(np.uint8)).resize((768, 512), resample=Image.NEAREST))

  #Find possible landing areas in the mask
  sample_res = convolution(segm_mask, drone_size, target_classes)

  if method == 'rp':
    target_y, target_x = random_point_selection(sample_res)

  elif method == 'mp':
    target_y, target_x = middle_point_selection(sample_res)

  #Split drone size in width and height
  w, h = drone_size
  #Shape of the rectangle = (x0,y0), (x1,y1)
  shape = [(target_x - w/2, target_y + h/2), (target_x + w/2, target_y - h/2)]
  #Draw rectangle around target spot on original image
  img_target = ImageDraw.Draw(img_r)
  img_target.rectangle(shape, fill = None, outline ='red', width = 3)

  #Draw rectangle on the original RGB mask
  segm_target = ImageDraw.Draw(rgb_msk_r)
  segm_target.rectangle(shape, fill = None, outline ="red", width = 3)

  return sample_res, img_r, msk_r, rgb_msk_r, [target_y, target_x]

In [None]:
#Visualize segmented output in rgb
print_label_colors()

In [None]:
#--Drone landing with Random Point Selection--
#List to store the ground truth class of the target spot found by Random Point Selection
random_target_class = []

#Define the test dataset
testset = Dataset(test_dir, rgb = True)

#Open one image at a time from the test set
for i in range(len(testset)):
  sample_img, sample_mask, sample_rgb_mask = testset[i]
  #Find landing spot
  landing_area, target_sample, resized_mask, target_mask, target_spot = landing_spot(sample_img, sample_mask, sample_rgb_mask, model)

  #Add ground truth class of the target spot
  random_target_class += [np.array(resized_mask)[target_spot[0], target_spot[1]]]

  #plot first ten images
  if i < 10:
    plt.subplots(1,3, figsize = (15,5))
    plt.subplot(1,3,1)
    custom_cmap = ListedColormap(['#FF0000','#7CFC00'])
    plt.imshow(target_sample)
    im = plt.imshow(landing_area, cmap = custom_cmap, alpha = 0.5)
    plt.colorbar(im, ticks = [0,1], fraction = 0.03, pad = 0.04)
    plt.title('Possible landing area')

    plt.subplot(1,3,2)
    plt.imshow(target_sample)
    plt.title(f'Target coordinates: x,y = {target_spot[1],target_spot[0]}')

    plt.subplot(1,3,3)
    plt.imshow(target_sample)
    plt.imshow(target_mask, alpha = 0.7)
    plt.title(f'Target coordinates: x,y = {target_spot[1],target_spot[0]}')


In [None]:
middle_target_class = []

for i in range(len(testset)):
  sample_img, sample_mask, sample_rgb_mask = testset[i]
  #Find target spot
  landing_area, target_sample, resized_mask, target_mask, target_spot = landing_spot(sample_img, sample_mask, sample_rgb_mask, model, method = 'mp')

  #Add true class of the target spot
  middle_target_class += [np.array(resized_mask)[target_spot[0], target_spot[1]]]

  #Plot first ten images
  if i < 10:
    plt.subplots(1,3, figsize = (15,5))
    plt.subplot(1,3,1)
    plt.imshow(target_sample)
    custom_cmap = ListedColormap(['#FF0000','#7CFC00'])
    im = plt.imshow(landing_area, cmap = custom_cmap, alpha = 0.5)
    plt.colorbar(im, ticks = [0,1], fraction = 0.03, pad = 0.04)
    plt.title('Possible landing area')

    plt.subplot(1,3,2)
    plt.imshow(target_sample)
    plt.title(f'Target coordinates: x,y = {target_spot[1],target_spot[0]}')

    plt.subplot(1,3,3)
    plt.imshow(target_sample)
    plt.imshow(target_mask, alpha = 0.7)
    plt.title(f'Target coordinates: x,y = {target_spot[1],target_spot[0]}')

In [None]:
#--Landing on wrong object for random point--
wrong_rand = np.sum(np.where((random_target_class != 1)&(random_target_class != 3)))
frac_rand = wrong_rand/len(random_target_class)
print('Fraction of wrong landing out of 40 with Random Point Selection: %.3f' %frac_rand)

#--Landing on wrong object for middle point--
wrong_mid = np.sum(np.where((middle_target_class != 1)&(middle_target_class != 3)))
frac_mid= wrong_rand/len(middle_target_class)
print('Fraction of wrong landing out of 40 with Middle Point Selection: %.3f' %frac_mid)