In [None]:
import tensorflow as tf
import keras
import numpy as np
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import os
import random
import time
import datetime

from keras import layers, losses, utils, optimizers, callbacks, metrics
from keras.applications import VGG16
from keras.preprocessing import image
from tqdm import tqdm
from base64 import b64encode

### Get KITTI dataset

In [None]:
# training data
TRAIN_DATA_DIR = "/input/kittiroadsegmentation/training/image_2/"

# ground trouth segmentation
TRAIN_GT_DIR = "/input/kittiroadsegmentation/training/gt_image_2/"

# test data
TEST_DATA_DIR = "/input/kittiroadsegmentation/testing/"

# get size of training samples
TRAINSET_SIZE = int(len(os.listdir(TRAIN_DATA_DIR)) * 0.8)
VALIDATION_SIZE = int(len(os.listdir(TRAIN_DATA_DIR)) * 0.1)
TESTSET_SIZE = int(len(os.listdir(TRAIN_DATA_DIR)) - TRAINSET_SIZE - VALIDATION_SIZE)

### Constants & Hyperparams

In [None]:
IMG_SIZE = 128
N_CHANNELS = 3
N_CLASSES = 1
SEED = 123

BATCH_SIZE = 32
BUFFER_SIZE = 1000
AUTOTUNE = tf.data.AUTOTUNE

### Dataset preprocessing

#### Parse data into masks and images as a dict, generation of dataset splits

In [None]:
def ParseImages(image_path: str) -> dict:
    # read standard road images and decoding
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.convert_image_dtype(image, tf.uint8)

    # three types of image paths: (um_road, umm_road, uu_road)
    mask_path = tf.strings.regex_replace(image_path, "image_2", "gt_image_2")
    mask_path = tf.strings.regex_replace(mask_path, "um_", "um_road_")
    mask_path = tf.strings.regex_replace(mask_path, "umm_", "umm_road_")
    mask_path = tf.strings.regex_replace(mask_path, "uu_", "uu_road_")

    # read and decode masks
    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=3)

    # labeling
    non_road_label = np.array([255, 0, 0])
    road_label = np.array([255, 0, 255])
    other_road_label = np.array([0, 0, 0])

    # convert mask to binary
    mask = tf.experimental.numpy.all(mask == road_label, axis=2)
    mask = tf.cast(mask, tf.uint8)
    mask = tf.expand_dims(mask, axis=-1)

    return { "image": image, "segmentation_mask": mask}

# generate dataset splits: test, train, val
dataset = tf.data.Dataset.list_files(TRAIN_DATA_DIR + "*.png", seed=SEED)
dataset = dataset.map(ParseImages)

# splitting
train_ds = dataset.take(TRAINSET_SIZE + VALIDATION_SIZE)
validation_ds = train_ds.skip(TRAINSET_SIZE)
train_ds = train_ds.take(TRAINSET_SIZE)
test_ds = dataset.skip(TRAINSET_SIZE - VALIDATION_SIZE)

### Normalizing and Data Augmentation

In [None]:
@tf.function
def Normalize(input_image, input_mask) -> tuple:
    input_image = tf.cast(input_image, tf.float32) / 255.0
    return input_image, input_mask

@tf.function
def LoadTrainImages(datapoint) -> tuple:
    # resize images and masks
    input_image = tf.image.resize(datapoint["image"], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.image.resize(datapoint["segmentation_mask"], (IMG_SIZE, IMG_SIZE))

    # data augmentation by randomly flipping the image and generate new training data
    if tf.random.uniform() > 0.5:
        input_image = tf.image.flip_left_right(input_image)
        input_mask = tf.image.flip_left_right(input_mask)

    # normalize images
    input_image, input_mask = Normalize(input_image, input_mask)

    return input_image, input_mask

@tf.function
def LoadTestImages(datapoint: dict) -> tuple:
    # resizing
    input_image = tf.image.resize(datapoint['image'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (IMG_SIZE, IMG_SIZE))

    # normalizing
    input_image, input_mask = Normalize(input_image, input_mask)

    return input_image, input_mask



#### Process training and testing data for performance

In [None]:
# full ds dict
dataset = { "train": train_ds, "test": test_ds, "val": validation_ds }

# processing training data
dataset["train"] = dataset["train"].map(LoadTrainImages, num_parallel_calls=AUTOTUNE)
dataset["train"] = dataset["train"].shuffle(buffer_size=BUFFER_SIZE, seed=SEED).repeat().batch(BATCH_SIZE)
dataset["train"] = dataset["train"].prefetch(buffer_size=AUTOTUNE)

# processing test data
dataset["test"] = dataset["test"].map(LoadTestImages).batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)

# processing validation data
dataset["val"] = dataset["val"].map(LoadTestImages)
dataset["val"] = dataset["val"].repeat().batch(BATCH_SIZE).prefetch(buffer_size=AUTOTUNE)

print(dataset["train"])
print(dataset["test"])
print(dataset["val"])

#### Display some dataset samples

In [None]:
def DisplaySamples(display_list):
    plt.figure(figsize=(18, 18))

    # display info
    titles = ["Input Image", "True Segmentation", "Model Prediction"]

    # plot images from list
    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i + 1)
        plt.title(titles[i])
        plt.imshow(utils.array_to_img(display_list[i]))
        
    plt.show()



In [None]:
for image, mask in dataset["train"].take(1):
    sample_image, sample_mask = image, mask

DisplaySamples([sample_image[0], sample_mask[0]])

### Defining the FCN Road Segmentation Model

In [None]:
vgg16_model = VGG16()
vgg16_model.summary()

Input Shape for model

In [None]:
INPUT_SHAPE = (IMG_SIZE, IMG_SIZE, N_CHANNELS)

#### Model architecture

In [None]:
def RoadSegmentationModel():
    # input layer
    inputs = layers.Input(INPUT_SHAPE)

    # get VGG16 model
    vgg16_model = VGG16(include_top=False, weights="imagenet", input_tensor=inputs)

    # encoder 
    c1 = vgg16_model.get_layer("block3_pool").output
    c2 = vgg16_model.get_layer("block4_pool").output
    c3 = vgg16_model.get_layer("block4_pool").output

    # decoder
    u1 = layers.UpSampling2D((2, 2), interpolation="bilinear")(c3)
    ct1 = layers.Concatenate()([u1, c2])
    u2 = layers.UpSampling2D((2, 2), interpolation="bilinear")(ct1)
    ct2 = layers.Concatenate()([u2, ct1])

    # final upsampling
    u3 = layers.UpSampling2D((8, 8), interpolation="bilinear")(ct2)

    # get outputs
    outputs = layers.Conv2D(N_CLASSES, 1, activation="sigmoid")(u3)

    # build model
    return keras.Model(inputs, outputs, name="RDS_FCN")



### Compling & Training

In [None]:
mean_iou = metrics.MeanIoU(2)
segmentation_model = RoadSegmentationModel()

segmentation_model.compile(optimizer=optimizers.Adam(), loss=losses.BinaryCrossentropy(), metrics=[mean_iou])
utils.plot_model(segmentation_model, show_shapes=True)