## Estimating Free Drivable Road Space

#### Environment setup
Import dependencies, define constant values

In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

import os
import cv2
import time
import numpy as np

WORKING_DIR = '.'
MODELS_DIR = 'model'
IMAGES_DIR = 'img'
VIDEO_DIR = 'video'
TEMP_DIR = 'tmp'

DATASETS_DIR = 'dataset'
BDD100K_DIR = 'bdd100k'

# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"   # see issue #152
# os.environ["CUDA_VISIBLE_DEVICES"] = "1"

Set TensorFlow as Keras back-end

In [None]:
import tensorflow as tf
#tf.keras.backend.clear_session()

def get_session():
    #config = tf.ConfigProto()
    config = tf.compat.v1.ConfigProto(log_device_placement=True)
    config.gpu_options.allow_growth = True
    return tf.compat.v1.Session(config = config)
    #return tf.Session(config = config)


def setTensorFlowBackend():
    session = get_session()
    session.run()
    #tf.keras.backend.set_session(session)
    #tf.keras.backend.tensorflow_backend.set_session(session)

# tf.test.gpu_device_name()
#get_session()
#tf.test.is_gpu_available()
# tf.test.is_built_with_cuda()
# setTensorFlowBackend()

#### Hardware Setup
Detect physical GPU

In [None]:
from tensorflow.python.client import device_lib

def get_available_gpus():
    local_device_protos = device_lib.list_local_devices()
    return [x.physical_device_desc for x in local_device_protos if x.device_type == 'GPU']

gpu = get_available_gpus()
print(gpu)

#### Debug Helper Methods

In [None]:
import matplotlib.pyplot as plt

def show_image(image, size = (15, 15), show_axis = 'off'):
    plt.figure(figsize = size)
    plt.axis(show_axis)
    plt.imshow(image)
    plt.show()

### Object Detection
#### Pre-trained RetinaNet Model Loading
Load pre-trained model and prepare it if needed

In [None]:
from keras_retinanet import models

def load_retinanet_model(model_name,
                         backbone_name,
                         should_convert_to_inference_model = False,
                         should_print_summary = False):
    
    model_path = os.path.join(WORKING_DIR, MODELS_DIR, model_name)
    model = models.load_model(model_path, backbone_name = backbone_name)

    if (should_convert_to_inference_model):
        model = models.convert_model(model)
        
    if (should_print_summary):
        print(model.summary())
    
    return model

def get_name_for_label(label):
    return labels_to_names[label]

labels_to_names = {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone', 68: 'microwave', 69: 'oven', 70: 'toaster', 71: 'sink', 72: 'refrigerator', 73: 'book', 74: 'clock', 75: 'vase', 76: 'scissors', 77: 'teddy bear', 78: 'hair drier', 79: 'toothbrush'}

#### Object Detection Pipeline Methods

In [None]:
import cv2
import time
import numpy as np

from keras_retinanet.utils.colors import label_color
from keras_retinanet.utils.visualization import draw_box, draw_caption
from keras_retinanet.utils.image import preprocess_image, resize_image

def apply_region_of_interest_to_image(image):
    # TODO: Implement
    return image

def prepare_frame_for_detection(image):
    image = preprocess_image(image)
    image = apply_region_of_interest_to_image(image)
    image, scale = resize_image(image)
    
    return image, scale

def detect_objects_from_prepared_image(image, image_scale, model, verbose):
    start_time = time.time()
    boxes, scores, labels = model.predict_on_batch(np.expand_dims(image, axis = 0))
    boxes /= image_scale
    if (verbose):
        print("image processing time: ", time.time() - start_time)
    
    return boxes, scores, labels
    
def compute_distance_to_box_with_homography(box, homography):
    ego_lane_midpoint_warped = homography['width_warped'] // 2, homography['height_warped']
    
    return compute_distance_to_box(box,
                                   homography['H'],
                                   ego_lane_midpoint_warped,
                                   homography['pixels_per_meter'])
    
def compute_distance_to_box(box, H, ego_lane_midpoint_warped, pixels_per_meter):
    ground_mid_x = box[0] + ((box[2] - box[0]) // 2)
    ground_mid_y = box[1] + (box[3] - box[1])
    midpoint = np.concatenate([(ground_mid_x, ground_mid_y), np.ones(1)])
    
    midpoint_warped = np.matmul(H, midpoint)
    midpoint_warped /= midpoint_warped[-1]
    midpoint_warped = midpoint_warped[:-1]
    midpoint_warped_x, midpoint_warped_y = tuple(int(a) for a in midpoint_warped)
    
    delta = [midpoint_warped_x, midpoint_warped_y] - np.asarray(ego_lane_midpoint_warped)
    distance_in_pixels = np.sqrt(np.sum(delta ** 2))
    distance_in_meters = distance_in_pixels / pixels_per_meter
    
    return ground_mid_x, ground_mid_y, distance_in_meters

def visualize_object_detections(boxes, scores, labels, image, verbose, score_threshold = 0.5):
    annotated_image = image.copy()
    annotated_image = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)
    
    homography = np.load("./homography.npz") # move elsewhere

    for box, score, label in zip(boxes[0], scores[0], labels[0]):
        if (score < score_threshold):
            continue
            
        x, y, distance = compute_distance_to_box_with_homography(box, homography)
        x, y = int(x), int(y)
        
        if verbose:
            print("Distance to {} is {} meters".format(get_name_for_label(label), distance))
        
        cv2.line(annotated_image, (x, y), (annotated_image.shape[1] // 2, annotated_image.shape[0]),
                 color = (200, 200, 200), thickness = 2, lineType = 4)
        
        box_type = box.astype(int)
        draw_box(annotated_image, box_type, color = label_color(label))
        caption = "{} {:.3f}".format(get_name_for_label(label), score)
        draw_caption(annotated_image, box_type, caption)
                
        cv2.circle(annotated_image, center = (x, y), radius = 5, color = (255, 255, 255), thickness = cv2.FILLED)
        cv2.putText(annotated_image, "{0:.2f} m".format(distance), (x, y + 10),
                    cv2.FONT_HERSHEY_PLAIN, fontScale = 1.5, color = (255, 255, 255), thickness = 3)
        
    return annotated_image

def detect_objects_in_frame(frame, model, verbose):
    image, image_scale = prepare_frame_for_detection(frame)
    boxes, scores, labels = detect_objects_from_prepared_image(image, image_scale, model, verbose)
    annotated_frame = visualize_object_detections(boxes, scores, labels, frame, verbose)
    
    return annotated_frame

#### Object Detection on Video Capture

In [None]:
def run_objects_detection_from_capture(capture, model, verbose = False):
    frames_read = 0
    
    while (True):
        retval, frame = capture.read()
        if not retval:
            return
        
        annotated_image = detect_objects_in_frame(frame, model, verbose)
        annotated_image_path = os.path.join(WORKING_DIR, TEMP_DIR, 'img%08d.jpg' % frames_read)
        cv2.imwrite(annotated_image_path, annotated_image)
        
        if (verbose):
            show_image(annotated_image)
        
        frames_read += 1
        
    capture.release()
    cv2.destroyAllWindows()

#### Object Detection on Images

In [None]:
from keras_retinanet.utils.image import read_image_bgr

def run_object_detection_for_image_named(image_name, model, verbose = False):
    image_path = os.path.join(WORKING_DIR, IMAGES_DIR, image_name)
    image = read_image_bgr(image_path)
    annotated_image = detect_objects_in_frame(image, model, verbose)
    if (verbose):
        show_image(annotated_image)

### Drivable Area Segmentation

#### Image Data Generators

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

def combine_generator(gen1, gen2):
    while True:
        yield(gen1.next(), gen2.next())   
        
def make_data_generator(data_generator_args, images_directory, masks_directory, seed, target_size, batch_size):
    image_data_generator = ImageDataGenerator(**data_generator_args)
    image_generator = image_data_generator.flow_from_directory(images_directory,
                                                               class_mode = None,
                                                               target_size = target_size,
                                                               seed = seed,
                                                               batch_size = batch_size)
    
    mask_data_generator = ImageDataGenerator(**data_generator_args)
    mask_generator = mask_data_generator.flow_from_directory(masks_directory,
                                                             class_mode = None,
                                                             target_size = target_size,
                                                             seed = seed,
                                                             batch_size = batch_size)
    
    data_generator = combine_generator(image_generator, mask_generator)
    
    return data_generator
    
def make_train_data_generator(images_directory, masks_directory, seed = 1, target_size = (512, 512), batch_size = 4):
    data_generator_args = dict(rescale = 1. / 255,
                               shear_range = 0.2,
                               zoom_range = 0.2,
                               horizontal_flip = True)
    
    data_generator = make_data_generator(data_generator_args, 
                                         images_directory,
                                         masks_directory,
                                         seed,
                                         target_size,
                                         batch_size)
    
    return data_generator

def make_val_data_generator(images_directory, masks_directory, seed = 1, target_size = (512, 512), batch_size = 4):
    data_generator_args = dict(rescale = 1. / 255)
    
    data_generator = make_data_generator(data_generator_args, 
                                         images_directory,
                                         masks_directory,
                                         seed,
                                         target_size,
                                         batch_size)
    
    return data_generator

#### DeepLabV3+ Model Training

In [None]:
from deeplab_v3_model import Deeplabv3

from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping

def train_deeplabv3_model(train_data_generator,
                          train_images_count,
                          val_data_generator,
                          val_images_count,
                          epochs,
                          monitor,
                          mode,
                          classes,
                          batch_size):

    backbone = 'mobilenetv2'
    model = Deeplabv3(classes = classes, backbone = backbone, weights = 'cityscapes')
    
    optimizer = tf.keras.optimizers.SGD(lr = (0.01 / 16 * batch_size), momentum = 0.9, decay = 0.0005)
    model.compile(loss = tf.keras.losses.categorical_crossentropy,
                  optimizer = optimizer,
                  metrics = ['accuracy'])
    
    weights_path = os.path.join(WORKING_DIR, MODELS_DIR)
    checkpoint = ModelCheckpoint(weights_path, 
                                 monitor = monitor, 
                                 verbose = 1,
                                 save_best_only = True,
                                 save_weights_only = True,
                                 mode = mode)
    
    early_stopping = EarlyStopping(monitor = monitor,
                                   verbose = 1,
                                   min_delta = 0.01,
                                   patience = 3,
                                   mode = mode)

    callbacks = [checkpoint, early_stopping]
    
    model.fit_generator(generator = train_data_generator,
                        epochs = epochs,
                        steps_per_epoch = (train_images_count // batch_size),
                        validation_data = val_data_generator,
                        validation_steps = (val_images_count // batch_size),
                        callbacks = callbacks,
                        use_multiprocessing = True,
                        workers = 2)
    
    model_name = "DeepLabV3-{}-{}".format(backbone, int(time.time()))
    model_path = os.path.join(WORKING_DIR, MODELS_DIR, model_name)
    
    weights_path = "{}.h5".format(model_path)
    
    model.save_weights(weights_path)
    model.save(model_path)
    tf.keras.experimental.export_saved_model(model, "{}-SavedModel".format(model_path))
    
    return model

#### Road Space Segmentation Pipeline Methods

In [None]:
def prepare_frame_for_segmentation(image):
    image = preprocess_image(image)
    image = apply_region_of_interest_to_image(image)
    image, scale = resize_image(image)
    
    return image, scale

def segment_road_space_in_prepared_image(image, image_scale, model, verbose):
    start_time = time.time()
    
    prediction = model.predict(np.expand_dims(image, axis = 0))
    segmentations = np.argmax(prediction.squeeze(), -1)
    
    boxes /= image_scale
    if (verbose):
        print("image processing time: ", time.time() - start_time)
    
    return zip(boxes[0], scores[0], labels[0])

def visualize_road_space_segmentation(segmentations, frame, score_threshold = 0.5):
    annotated_image = image.copy()
    annotated_image = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)
    
    for box, score, label in detections:
        if (score < score_threshold):
            continue
            
        box_type = box.astype(int)
        box_color = label_color(label)
        
        draw_box(annotated_image, box_type, color = box_color)
        caption = "{} {:.3f}".format(get_name_for_label(label), score)
        draw_caption(annotated_image, box_type, caption)
        
    return annotated_image

def segment_road_space_in_frame(frame, model, verbose):
    image, image_scale = prepare_frame_for_segmentation(frame)
    detections = segment_road_space_in_prepared_image(image, image_scale, model, verbose)
    segmented_frame = visualize_road_space_segmentation(detections, frame)
    
    return segmented_frame
        
def run_road_space_segmentation_for_image_named(image_name, model, verbose = False):
    image_path = os.path.join(WORKING_DIR, IMAGES_DIR, image_name)
    image = read_image_bgr(image_path)
    segmented_image = segment_road_space_in_frame(image, model, verbose)
    if (verbose):
        show_image(segmented_image)

In [None]:
from PIL import Image

def _segment_road_space_in_image(image, model, verbose):
    trained_image_width = 512 
    mean_subtraction_value = 127.5

    w, h, _ = image.shape
    ratio = float(trained_image_width) / np.max([w, h])
    resized_image = np.array(Image.fromarray(image.astype('uint8')).resize((int(ratio * h), int(ratio * w))))    
    resized_image = (resized_image / mean_subtraction_value) - 1.

    pad_x = int(trained_image_width - resized_image.shape[0])
    pad_y = int(trained_image_width - resized_image.shape[1])
    resized_image = np.pad(resized_image, ((0, pad_x), (0, pad_y), (0, 0)), mode = 'constant')

    res = model.predict(np.expand_dims(resized_image, 0))
    labels = np.argmax(res.squeeze(), -1)
    
    if pad_x > 0:
        labels = labels[:-pad_x]
    if pad_y > 0:
        labels = labels[:, :-pad_y]
    labels = np.array(Image.fromarray(labels.astype('uint8')).resize((h, w)))
    
    return labels

def _run_road_space_segmentation_for_image_named(image_name, model, verbose = False):
    image_path = os.path.join(WORKING_DIR, IMAGES_DIR, image_name)
    image = np.array(Image.open(image_path))
    segmented_image = _segment_road_space_in_image(image, model, verbose)
    if (verbose):
        show_image(segmented_image)

#### DeepLabV3+ Model Training using BDD100K Dataset

In [None]:
BDD100K_IMG_DIR = 'images/100k'
BDD100K_MASK_DIR = 'drivable_maps/labels'
BDD100K_TRAIN_DIR = 'train'
BDD100K_VAL_DIR = 'val'

BDD100K_TRAIN_IMG_PATH = os.path.join(WORKING_DIR, DATASETS_DIR, BDD100K_DIR, BDD100K_IMG_DIR, BDD100K_TRAIN_DIR)
BDD100K_VAL_IMG_PATH = os.path.join(WORKING_DIR, DATASETS_DIR, BDD100K_DIR, BDD100K_IMG_DIR, BDD100K_VAL_DIR)

BDD100K_TRAIN_MASK_PATH = os.path.join(WORKING_DIR, DATASETS_DIR, BDD100K_DIR, BDD100K_MASK_DIR, BDD100K_TRAIN_DIR)
BDD100K_VAL_MASK_PATH = os.path.join(WORKING_DIR, DATASETS_DIR, BDD100K_DIR, BDD100K_MASK_DIR, BDD100K_VAL_DIR)

def train_deeplabv3_bdd100k_model(epochs = 30, monitor = 'val_loss', mode = 'max', batch_size = 16):
    train_data_generator = make_train_data_generator(BDD100K_TRAIN_IMG_PATH, BDD100K_TRAIN_MASK_PATH, batch_size = batch_size)
    train_images_count = len(os.listdir(os.path.join(BDD100K_TRAIN_IMG_PATH, '0')))
    
    val_data_generator = make_val_data_generator(BDD100K_VAL_IMG_PATH, BDD100K_VAL_MASK_PATH, batch_size = batch_size)
    val_images_count = len(os.listdir(os.path.join(BDD100K_VAL_IMG_PATH, '0')))
    
    model = train_deeplabv3_model(train_data_generator,
                                  train_images_count, 
                                  val_data_generator,
                                  val_images_count,
                                  epochs,
                                  monitor,
                                  mode,
                                  classes = 3,
                                  batch_size = batch_size)
    return model

### Distance Estimation

#### Detected Objects Distance Estimation

### Deployment

#### Model instantiation

In [None]:
object_detection_model = load_retinanet_model(
    model_name = 'resnet50_coco_best_v2.1.0.h5',
    backbone_name = 'resnet50',
    should_print_summary = True
)

In [None]:
drivable_area_segmentation_model = train_deeplabv3_bdd100k_model(batch_size = 8)

#### Object Detection

In [None]:
video_path = os.path.join(WORKING_DIR, VIDEO_DIR, '')
video_capture = cv2.VideoCapture(video_path)

run_objects_detection_from_capture(video_capture, object_detection_model)

In [None]:
image_names = [
    ''
]

for image_name in image_names:
    run_object_detection_for_image_named(image_name, object_detection_model, verbose = True)

#### Drivable Area Segmentation

In [None]:
image_names = [
    ''
]

for image_name in image_names:
    _run_road_space_segmentation_for_image_named(image_name, drivable_area_segmentation_model, verbose = True)