# global variables

In [1]:
# check environment
from os import environ
IS_KAGGLE_ENVIRONMENT = 'KAGGLE_KERNEL_RUN_TYPE' in environ

# models path
MODELS_PATH = '/kaggle/working/models/' if IS_KAGGLE_ENVIRONMENT else 'data/models/'
MODEL_NAME = 'mobilenetv2-deeplabv3plus-ssdlite'

# data options
INPUT_IMAGE_SHAPE = (480, 640, 3)
LABELS_CODES = [0, 1, 2, 3]
LABEL_CODE_BACKGROUND = 0
NUMBER_OF_CLASSES = len(LABELS_CODES)

# object detection options
STANDARD_DEVIATIONS_CENTROIDS_OFFSETS = (0.1, 0.1, 0.2, 0.2)

# labels conversions
LABEL_CODE_TO_DESC = {
    1: 'monorail',
    2: 'person',
    3: 'forklift'
}
LABEL_CODE_TO_COLOR = {
    1: 'red',
    2: 'green',
    3: 'blue'
}

# tensorflow options
BATCH_SIZE = 16
SEED = 1993

# kaggle setup

## clone repository and setup

In [2]:
if IS_KAGGLE_ENVIRONMENT:
    # check if repo folder exists, eventually delete it
    %cd '/kaggle/working/'
    import os
    if os.path.exists('ssd-segmentation'):
        !rm -r 'ssd-segmentation'

    # clone github repository
    !git clone 'https://github.com/matteo-stat/ssd-segmentation.git'

    # change working directory to cloned repository folder
    %cd '/kaggle/working/ssd-segmentation'

    # change branch
    # !git checkout 'main'

    # show working directory content
    !ls

# dependecies

In [3]:
import tensorflow as tf
tf.keras.saving.get_custom_objects().clear()
tf.random.set_seed(SEED)

import random
random.seed(SEED)

import json
import csv
import numpy as np
from matplotlib import pyplot as plt, patches
from PIL import Image
import ssdseglib

# default bounding boxes

In [4]:
# create default bounding boxes
boxes_default = ssdseglib.boxes.DefaultBoundingBoxes(
    feature_maps_shapes=((30, 40), (15, 20), (13, 18), (7, 9), (4, 5)),
    feature_maps_aspect_ratios=(1, 2, 3, 4, 5, 6, 1/2, 1/3, 1/4),
    centers_padding_from_borders_percentage=(0.025, 0.05, 0.1, 0.125, 0.2),    
    boxes_scales=(0.18, 0.95),
    additional_square_box=True,
)

# rescale default bounding boxes to input image shape
boxes_default.rescale_boxes_coordinates(image_shape=INPUT_IMAGE_SHAPE[:2])

# data encoder

In [5]:
# create a data reader encoder
data_reader_encoder = ssdseglib.datacoder.DataEncoderDecoder(
    num_classes=NUMBER_OF_CLASSES,
    image_shape=INPUT_IMAGE_SHAPE[:2],
    xmin_boxes_default=boxes_default.get_boxes_coordinates_xmin(coordinates_style='ssd'),
    ymin_boxes_default=boxes_default.get_boxes_coordinates_ymin(coordinates_style='ssd'),
    xmax_boxes_default=boxes_default.get_boxes_coordinates_xmax(coordinates_style='ssd'),
    ymax_boxes_default=boxes_default.get_boxes_coordinates_ymax(coordinates_style='ssd'),
    iou_threshold=0.5,
    standard_deviations_centroids_offsets=STANDARD_DEVIATIONS_CENTROIDS_OFFSETS,
    augmentation_horizontal_flip=True
)

# input data

## load metadata

In [6]:
# training
data = []

# train
with open('data/train.json', 'r') as f:
    data.extend(json.load(f))

# train additional - persons
with open('data/train-additional-persons.json', 'r') as f:
    data.extend(json.load(f))

# train additional - forklifts
with open('data/train-additional-forklifts.json', 'r') as f:
    data.extend(json.load(f))

# the training set it's small and the validation set even smaller..
# it's so small that probably any metrics on it won't be particularly reliable 
# at this point maybe it's just better to use the validation set as additional training data
with open('data/eval-persons-forklifts.json', 'r') as f:
    data.extend(json.load(f))

# unpack train metadata into separate lists
path_files_images_train, path_files_masks_train, path_files_labels_boxes_train = map(list, zip(*data))

# test
with open('data/test.json', 'r') as f:
    path_files_images_test, path_files_masks_test, path_files_labels_boxes_test = map(list, zip(*json.load(f)))

# replace local data directory with kaggle input directory
if IS_KAGGLE_ENVIRONMENT:
    path_data_kaggle = '/kaggle/input/ssd-segmentation-dataset/'
    path_files_images_train = [path.replace('data/', f'{path_data_kaggle}data/') for path in path_files_images_train]
    path_files_masks_train = [path.replace('data/',  f'{path_data_kaggle}data/') for path in path_files_masks_train]
    path_files_labels_boxes_train = [path.replace('data/',  f'{path_data_kaggle}data/') for path in path_files_labels_boxes_train]

    path_files_images_test = [path.replace('data/',  f'{path_data_kaggle}data/') for path in path_files_images_test]
    path_files_masks_test = [path.replace('data/',  f'{path_data_kaggle}data/') for path in path_files_masks_test]
    path_files_labels_boxes_test = [path.replace('data/',  f'{path_data_kaggle}data/') for path in path_files_labels_boxes_test]

## tensorflow datasets

In [7]:
# training
ds_train = (
    tf.data.Dataset.from_tensor_slices((path_files_images_train, path_files_masks_train, path_files_labels_boxes_train))
    .shuffle(buffer_size=len(path_files_images_train))
    .map(data_reader_encoder.read_and_encode, num_parallel_calls=tf.data.AUTOTUNE)
    .batch(batch_size=BATCH_SIZE)
    .map(ssdseglib.datacoder.augmentation_rgb_channels, num_parallel_calls=tf.data.AUTOTUNE)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

# test
ds_test = (
    tf.data.Dataset.from_tensor_slices(path_files_images_test)
    .map(ssdseglib.datacoder.read_image, num_parallel_calls=tf.data.AUTOTUNE)
    .batch(batch_size=BATCH_SIZE)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

# weighted losses for model training

In [8]:
# weighted loss for semantic segmentation
dice_loss = ssdseglib.losses.dice(classes_weights=(1.0, 1.0, 1.0, 1.0))

# weighted metrics for model training

In [9]:
# weighted metrics for semantic segmentation
jaccard_iou_segmentation_masks_metric = ssdseglib.metrics.jaccard_iou_segmentation_masks(classes_weights=(0., 1/3, 1/3, 1/3))

# weighted metrics for boxes classification
categorical_accuracy_metric = ssdseglib.metrics.categorical_accuracy(classes_weights=(0., 1/3, 1/3, 1/3))

# metrics for boxes regression
jaccard_iou_bounding_boxes_metric = ssdseglib.metrics.jaccard_iou_bounding_boxes(
    center_x_boxes_default=data_reader_encoder.center_x_boxes_default,
    center_y_boxes_default=data_reader_encoder.center_y_boxes_default,
    width_boxes_default=data_reader_encoder.width_boxes_default,
    height_boxes_default=data_reader_encoder.height_boxes_default,
    standard_deviations_centroids_offsets=STANDARD_DEVIATIONS_CENTROIDS_OFFSETS
)

# model

## architecture

In [10]:
# model builder
model_builder = ssdseglib.models.MobileNetV2SsdSegBuilder(
    input_image_shape=INPUT_IMAGE_SHAPE,
    number_of_boxes_per_point=[
        len(aspect_ratios) + (1 if boxes_default.additional_square_box else 0)
        for aspect_ratios in boxes_default.feature_maps_aspect_ratios
    ],
    number_of_classes=NUMBER_OF_CLASSES,
    center_x_boxes_default=boxes_default.get_boxes_coordinates_center_x(coordinates_style='ssd'),
    center_y_boxes_default=boxes_default.get_boxes_coordinates_center_y(coordinates_style='ssd'),
    width_boxes_default=boxes_default.get_boxes_coordinates_width(coordinates_style='ssd'),
    height_boxes_default=boxes_default.get_boxes_coordinates_height(coordinates_style='ssd'),
    standard_deviations_centroids_offsets=STANDARD_DEVIATIONS_CENTROIDS_OFFSETS
)

In [11]:
# model for training
model = model_builder.get_model_for_training(
    segmentation_architecture='deeplabv3plus',
    object_detection_architecture='ssdlite',
    segmentation_dilation_rates=(2, 4, 8)
)

# or maybe load a trained model and continue the training
# model = tf.keras.models.load_model('/kaggle/working/mobilenetv2-ssdseg.keras', compile=False)

# print model summary
# model.summary()

## optimizer

In [16]:
# learning rate scheduler
learning_rate_scheduler = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
    boundaries=[40, 70, 100, 120],
    values=[0.001, 0.0005, 0.0001, 0.00005, 0.00001]
)

# optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate_scheduler)

## compile

In [17]:
# each ouput has its own loss and metrics
model.compile(
    optimizer=optimizer,
    loss={
        'output-mask': dice_loss,
        'output-labels': ssdseglib.losses.confidence_loss,
        'output-boxes': ssdseglib.losses.localization_loss
    },
    loss_weights={
        'output-mask': 1.0,
        'output-labels': 1.0,
        'output-boxes': 1.0
    },
    metrics={
        'output-mask': jaccard_iou_segmentation_masks_metric,
        'output-labels': categorical_accuracy_metric,
        'output-boxes': jaccard_iou_bounding_boxes_metric,
    }
)

## early stopping

In [18]:
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='loss',
    min_delta=0.1,
    patience=25,
    verbose=1,
    restore_best_weights=True,
    start_from_epoch=0,
)

## training model

In [19]:
# fit the model
history = model.fit(
    ds_train,
    epochs=140,
    callbacks=[early_stopping]
)

Epoch 1/140


NotFoundError: Graph execution error:

/kaggle/input/ssd-segmentation-data/data/train/2370_mask.png; No such file or directory
	 [[{{node ReadFile_1}}]]
	 [[IteratorGetNext]] [Op:__inference_train_function_33666]

### training history

In [None]:
# plot training loss and validation loss
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

## save weights

In [None]:
# save model
model.save(f'{MODELS_PATH}{MODEL_NAME}.keras')

## inference model

In [None]:
# load trained model
model_trained = tf.keras.models.load_model(f'{MODELS_PATH}{MODEL_NAME}.keras', compile=False)

# transfer weights
model_inference = model_builder.get_model_for_inference(
    model_trained=model_trained,
    max_number_of_boxes_per_class=15,
    max_number_of_boxes_per_sample=25,
    boxes_iou_threshold=0.5,
    labels_probability_threshold=0.6,
    suppress_background_boxes=False
)

# print model summary
# model_inference.summary()

## evaluation

### evaluation dataset

In [None]:
# get predictions for the whole test set
segmentation_pred_batch, detection_pred_batch = model_inference.predict(ds_test)

# split and format predictions as required by the evaluators
segmentation_pred_batch = segmentation_pred_batch.astype(np.float32)
labels_pred_batch = detection_pred_batch[:, :, 0].astype(np.int32)
confidences_pred_batch = detection_pred_batch[:, :, 1].astype(np.float32)
boxes_pred_batch = detection_pred_batch[:, :, 2:].astype(np.float32)

### jaccard iou

In [None]:
# evaluate iou for each class
iou_per_class = ssdseglib.evaluators.jaccard_iou_semantic_segmentation(
    masks_pred_batch=segmentation_pred_batch,
    path_files_masks=path_files_masks_test,
    labels_codes=LABELS_CODES,
    label_code_background=LABEL_CODE_BACKGROUND    
)

# calculate the maximum label length
length_longest_label = max(len(label) for label in LABEL_CODE_TO_DESC.values())

# print
print('\n****************')
print(f'***   IoU    ***')
print('****************')
for label, iou in iou_per_class.items():
    print(f'> {LABEL_CODE_TO_DESC[label]:>{length_longest_label}}: {iou:2.2f}')
print('----------------')
print(f'> {"mIoU@":>{length_longest_label}}: {sum(iou_per_class.values()) / len(iou_per_class):.2f}') 

### average precision

In [None]:
# set the iou thresholds to use for evaluate average precision in object detection
iou_thresholds_object_detection = [0.5, 0.75]

# calculate the maximum label length
length_longest_label = max(len(label) for label in LABEL_CODE_TO_DESC.values())

# for each iou threshold calculate AP and mAP
for iou_threshold in iou_thresholds_object_detection:
    average_precision_per_class = ssdseglib.evaluators.average_precision_object_detection(
        labels_pred_batch=labels_pred_batch,
        confidences_pred_batch=confidences_pred_batch,
        boxes_pred_batch=boxes_pred_batch,
        iou_threshold=iou_threshold,
        path_files_labels_boxes=path_files_labels_boxes_test,
        labels_codes=LABELS_CODES,
        label_code_background=LABEL_CODE_BACKGROUND
    )

    # iou threshold formatted for printing
    iou_threshold = format(iou_threshold, '.2f').lstrip('0')

    # print
    print('\n****************')
    print(f'***  AP@{iou_threshold}  ***')
    print('****************')
    for label, average_precision in average_precision_per_class.items():
        print(f'> {LABEL_CODE_TO_DESC[label]:>{length_longest_label}}: {average_precision:2.2f}')
    print('----------------')
    print(f'> {f"mAP@{iou_threshold}":>{length_longest_label}}: {sum(average_precision_per_class.values()) / len(average_precision_per_class):.2f}')    

# predict

## plot some predictions

In [None]:
number_of_samples = 8
fig_size_width = 12
sample_indices = list(range(len(path_files_images_test)))

for i in random.sample(sample_indices, number_of_samples):
    
    # extract the sample
    path_file_image = path_files_images_test[i]
    path_file_mask = path_files_masks_test[i]
    path_file_labels_boxes = path_files_labels_boxes_test[i]

    # create the needed subplots and set figure size
    fig, ((ax1, ax3), (ax2, ax4)) = plt.subplots(nrows=2, ncols=2)
    fig.set_size_inches(fig_size_width, int(fig_size_width / (INPUT_IMAGE_SHAPE[1] / INPUT_IMAGE_SHAPE[0])))    

    # --------------------------------------------------------------------------------
    # read - image sample
    # --------------------------------------------------------------------------------
    # read image
    image = Image.open(path_file_image)

    # add batch dimension to image
    image_batch = np.array(image).astype(np.float32)
    image_batch = np.expand_dims(image, axis=0)

    # convert to array of integers
    image = np.array(image)
    image = image.astype(np.int32)

    # --------------------------------------------------------------------------------
    # read - segmentation mask sample
    # --------------------------------------------------------------------------------
    # read mask
    mask = Image.open(path_file_mask)

    # keep the 3 classes on rgb channels
    mask = tf.slice(tf.one_hot(mask, depth=4, dtype=tf.float32), begin=[0, 0, 1], size=[-1, -1, 3])

    # --------------------------------------------------------------------------------
    # read - labels boxes sample
    # --------------------------------------------------------------------------------    
    # read ground truth labels boxes from csv file
    with open(path_file_labels_boxes, 'r') as f:
        labels_boxes = list(csv.reader(f))
    
    # --------------------------------------------------------------------------------
    # plot - ground truth
    # --------------------------------------------------------------------------------
    # plot the image
    ax1.imshow(image, vmin=0, vmax=1)
    ax1.set_axis_off()
    ax1.set_title(f'ground truth - object detection')
    
    # plot ground truth boxes
    for label, xmin, ymin, xmax, ymax in labels_boxes:
        label = int(label)
        xmin = float(xmin)
        ymin = float(ymin)
        xmax = float(xmax)
        ymax = float(ymax)        
        rect = patches.Rectangle((xmin, ymin), xmax - xmin + 1, ymax - ymin + 1, linewidth=1, edgecolor=LABEL_CODE_TO_COLOR[label], facecolor='none')
        ax1.add_patch(rect)
        ax1.text(xmin, ymin, LABEL_CODE_TO_DESC[label], fontsize=8, color=LABEL_CODE_TO_COLOR[label], verticalalignment='top')        

    # plot ground truth mask
    ax2.imshow(mask, vmin=0, vmax=1)
    ax2.set_axis_off()
    ax2.set_title('ground truth - segmentation mask')

    # --------------------------------------------------------------------------------
    # plot - model predictions
    # --------------------------------------------------------------------------------
    # get predictions from the model
    output_mask, output_object_detection = model_inference(image_batch, training=False)
    if output_object_detection.ndim > 2:
        output_object_detection = tf.squeeze(output_object_detection, axis=0)

    # keep the 3 classes on rgb channels
    output_mask = tf.math.argmax(tf.squeeze(output_mask, axis=0), axis=-1)
    output_mask = tf.one_hot(output_mask, depth=4, axis=2)
    output_mask = tf.slice(output_mask, begin=[0, 0, 1], size=[-1, -1, 3])

    # plot the image
    ax3.imshow(image, vmin=0, vmax=255)
    ax3.set_axis_off()
    ax3.set_title(f'model - object detection')

    # plot predicted boxes
    for label, probability, xmin, ymin, xmax, ymax in output_object_detection:
        if label == LABEL_CODE_BACKGROUND:
            continue
        label = int(label)
        probability = int(probability * 100)
        xmin = float(xmin)
        ymin = float(ymin)
        xmax = float(xmax)
        ymax = float(ymax)        
        rect = patches.Rectangle((xmin, ymin), xmax - xmin + 1, ymax - ymin + 1, linewidth=1, edgecolor=LABEL_CODE_TO_COLOR[label], facecolor='none')
        ax3.add_patch(rect)
        ax3.text(xmin, ymin, f'{LABEL_CODE_TO_DESC[label]} {probability}%', fontsize=8, color=LABEL_CODE_TO_COLOR[label], verticalalignment='top')        

    # plot predicted mask
    ax4.imshow(output_mask, vmin=0, vmax=1)
    ax4.set_axis_off()
    ax4.set_title('model - segmentation mask')

    # show the plot
    plt.show()