# Hazard Detection for self-driving vehicles using a monocular camera



## Installation

Tensorflow Dataset

* `tfds-nightly`: Released every day, contains the last versions of the datasets.


Note: TFDS requires `tensorflow` (or `tensorflow-gpu`) to be already installed. TFDS support TF >=1.15.

This notebook uses `tfds-nightly` and TF 2.


In [None]:
!pip install -U tfds-nightly --user

Install prerequisites: 

In [None]:
!pip install tf_slim pycocotools

### Imports

In [None]:
import numpy as np
import os
import six
import six.moves.urllib as urllib
import sys
import tarfile
import tensorflow as tf
import zipfile

from collections import defaultdict
from io import StringIO
from matplotlib import pyplot as plt
from PIL import Image
from IPython.display import display



This fetches the tensorflow/models directory from github and compliles photobuts and the object_detection package

In [None]:
import os
import pathlib


if "models" in pathlib.Path.cwd().parts:
  while "models" in pathlib.Path.cwd().parts:
    os.chdir('..')
elif not pathlib.Path('models').exists():
  !git clone --depth 1 https://github.com/tensorflow/models

In [None]:
%%bash
cd models/research/
protoc object_detection/protos/*.proto --python_out=.

In [None]:
%%bash 
cd models/research
pip install .

Importing the tensorflow object detection API

In [None]:
from object_detection.utils import ops as utils_ops
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util

In [None]:
import tensorflow_datasets as tfds


In [None]:
!gcloud config set account application-default

I'm hosting a clone of the KITTI dataset in Google Cloud Storage. 

It can be used seamlessly without having to download the ~11.36GB dataset (recommended)

If you would like to have a local copy of the dataset. Change the `data_dir` parameter to a local folder and chance the download argument to `True`

In [None]:
(ds_train, ds_test), ds_info = tfds.load(
    'kitti',
    split=['train', 'test'],
    with_info=True,
    download=False,
#     data_dir='./tensorflow_datasets'
    data_dir="gs://kitti-dataset-1"
)


## Model preparation

By default we use an "Faster R-CNN ResNet50 Low proposals" model here. See the [detection model zoo](https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/detection_model_zoo.md) for a list of other models that can be run out-of-the-box with varying speeds and accuracies.

### Loader

In [None]:
def load_model(model_name):
  base_url = 'http://download.tensorflow.org/models/object_detection/'
  model_file = model_name + '.tar.gz'
  model_dir = tf.keras.utils.get_file(
    fname=model_name, 
    origin=base_url + model_file,
    untar=True)

  model_dir = pathlib.Path(model_dir)/"saved_model"

  model = tf.saved_model.load(str(model_dir))
  model = model.signatures['serving_default']

  return model

In [None]:

model_name = 'faster_rcnn_resnet50_lowproposals_coco_2018_01_28'
# model_name = 'faster_rcnn_resnet101_lowproposals_coco_2018_01_28'

detection_model = load_model(model_name)


### Loading Label map

Label maps map indices to category names, so that when our convolution network predicts 1, we know that this corresponds to car. Here we use internal utility functions, but anything that returns a dictionary mapping integers to appropriate string labels would be fine

In [None]:
# List of the strings that is used to add correct label for each box.

# PATH_TO_LABELS = 'models/research/object_detection/data/kitti_label_map.pbtxt'
PATH_TO_LABELS = 'models/research/object_detection/data/mscoco_label_map.pbtxt'
category_index = label_map_util.create_category_index_from_labelmap(PATH_TO_LABELS, use_display_name=True)


In [None]:
ds_train = ds_train.cache()
ds_train = ds_train.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:

ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
print(detection_model.inputs)


In [None]:
detection_model.output_dtypes

In [None]:
detection_model.output_shapes

## Output format


- Output classes are always integers in the range 0, num_classes). Any mapping of these integers to semantic labels is to be handled outside of this class. We never explicitly emit a “background class” --- thus 0 is the first non-background class and any logic of predicting and removing implicit background classes must be handled internally by the implementation.


- Detected boxes are to be interpreted as being in (y_min, x_min, y_max, x_max) format and normalized relative to the image window.

## Utilities



Helper method that runs an inference for a single image for the selected model. Adds a wrapper function to call the model, and cleanup the outputs:



In [None]:
def run_inference_for_single_image(model, image):
  image = np.asarray(image)
  # The input needs to be a tensor, convert it using `tf.convert_to_tensor`.
  input_tensor = tf.convert_to_tensor(image)
  # The model expects a batch of images, so add an axis with `tf.newaxis`.
  input_tensor = input_tensor[tf.newaxis,...]

  # Run inference
  output_dict = model(input_tensor)

  # All outputs are batches tensors.
  # Convert to numpy arrays, and take index [0] to remove the batch dimension.
  # We're only interested in the first num_detections.
  num_detections = int(output_dict.pop('num_detections'))
  output_dict = {key:value[0, :num_detections].numpy() 
                 for key,value in output_dict.items()}
  output_dict['num_detections'] = num_detections

  # detection_classes should be ints.
  output_dict['detection_classes'] = output_dict['detection_classes'].astype(np.int64)
   
  # Handle models with masks:
  if 'detection_masks' in output_dict:
    # Reframe the the bbox mask to the image size.
    detection_masks_reframed = utils_ops.reframe_box_masks_to_image_masks(
              output_dict['detection_masks'], output_dict['detection_boxes'],
               image.shape[0], image.shape[1])      
    detection_masks_reframed = tf.cast(detection_masks_reframed > 0.5,
                                       tf.uint8)
    output_dict['detection_masks_reframed'] = detection_masks_reframed.numpy()
    
  return output_dict

Normalizes a dataset of bounding boxes according to the KITTI dataset

In [None]:
def norm(data):
    height= 375
    width = 1242
    
    data.loc[:,'xmin'] = data['xmin'] / width 
    data.loc[:,'xmax'] = data['xmax'] / width
    data.loc[:,'ymax'],data.loc[:,'ymin']  = (height - data['ymin']) / height , (height - data['ymax']) / height
    
    return data

Maps COCO class IDs to KITTI class IDs

In [None]:
def transform_to_kitti(classes):
    
    hash = { 3:1 , 8:3, 1:4, 7:7 }
    return [ hash.get(classes[i], 8) for i in range(len(classes)) ]

Removes detections that are below a minimum threshold and also classifications that are outside the domain of the KITTI dataset since COCO has many classes we don't care about

In [None]:
def filter_detections( output_dict):
    
    output_dict['detection_classes'] = transform_to_kitti(output_dict['detection_classes'])
    
#     print(output_dict  )
    misc_ids = (8,9)
    
    scores = output_dict['detection_scores']
    classes = output_dict['detection_classes']
    
    size = len(classes)
    min_threshhold = 0.5
    output_dict['detection_boxes'] = np.array([ output_dict['detection_boxes'][i] for i in range(size) if scores[i] >= min_threshhold and classes[i] not in misc_ids  ])
    output_dict['detection_classes'] = np.array([ output_dict['detection_classes'][i] for i in range(size) if scores[i] >= min_threshhold and classes[i] not in misc_ids  ])
    output_dict['detection_scores'] = np.array([ output_dict['detection_scores'][i] for i in range(size) if scores[i] >= min_threshhold and classes[i] not in misc_ids  ])


    return output_dict

The next two helper methods are for formatting bounding boxes for the SORT algorithm which takes  a numpy array of detections in the format `[[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]`

In [None]:
def reorder_for_sort(array):

     return np.array( [array[1] ,array[0], array[3], array[2]])

In [None]:
def format_detections_for_mot(outputdict):
    
    detections = [ np.append( reorder_for_sort(outputdict['detection_boxes'][i]) , outputdict['detection_scores'][i])  for i in range( len(outputdict['detection_classes'])) ] 
    
    return np.asarray(detections)  if len(detections) else np.empty((0, 5))


KITTI measures it's coordinates from the top-left as opposed to bottom-left hence we need to re-format this to the prevailing format in this application

In [None]:
def format_boxes( boxes):
    
    result = boxes.numpy()
    for box in result:
        box[0]=1-box[0]
        box[2]=1-box[2]
        box[0],box[2] = box[2], box[0]
    return tf.convert_to_tensor(result)

In [None]:
def get_tracked_color(label):
    
    return vis_util.STANDARD_COLORS[label %len(vis_util.STANDARD_COLORS)]

Calculates the length of a line

In [None]:
def length(x1,y1,x2,y2):
    return sqrt( (x2-x1)**2 + (y2-y1)**2)
    

## 3D Location estimator

Training for this model is performed in LocNet notebook and saved to disk in generated_files folder. Here we load the model we've trained to be used for inference.

In [None]:

from tensorflow.keras.models import model_from_json

MODEL = "model@1597671595"
WEIGHTS = "model@1597671595"


# load json and create model
json_file = open('generated_files/{}.json'.format(MODEL), 'r')
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json( loaded_model_json )

# load weights into new model
loaded_model.load_weights("generated_files/{}.h5".format(WEIGHTS))
print("Loaded model from disk")

loaded_model.compile(loss='mean_squared_error', optimizer='adam')


In [None]:

categories = [{
        'id': 1,
        'name': 'car'
    }, {
        'id': 2,
        'name': 'van'
    }, {
        'id': 3,
        'name': 'truck'
    }, {
        'id': 4,
        'name': 'pedestrian'
    }]

cat_index  = {i+1: val for i,val in enumerate(categories) }

In [None]:

def run_locnet( bboxes):
  
    if len(bboxes)==0 or len(bboxes)==1 and not any(bboxes[0]):
        return []
    y_pred = loaded_model.predict(bboxes)
  
    return np.hstack((bboxes,y_pred))


In [None]:
!pip install filterpy

Initializae SORT Object

Refer to [SORT documentation](https://github.com/abewley/sort) for internals

In [None]:

from sort import *
mot_tracker = Sort( max_age=8, iou_threshold=0.3)


## Model Inference and Visualizations

This is the main block that performs operations for 
1. Object Detection
2. Multiple Object Tracking and Trajectory prediction
3. 3D location estimation

The mode run is determined by the 'mode' argument to the function, defaults to object detection. Takes in an image as input, runs the underlying model and writes visualizations to the image and returns it.

In [None]:
import copy
import cv2
import PIL.Image as Image
import PIL.ImageColor as ImageColor
import PIL.ImageDraw as ImageDraw
import PIL.ImageFont as ImageFont
from math import sqrt

def process_image(image, objects=None, groundtruth=None, mode=None):
  

    image_np = np.copy(image)
    
    output_dict = run_inference_for_single_image(detection_model, image_np)
    
    output_dict =filter_detections( output_dict)

    max_age=25
    global frame
    global warning_ids
    thickness=1
    if mode =='tracking':
        
        detections = format_detections_for_mot(output_dict)

        tracked_objects = mot_tracker.update(detections) #Run SORT algorithm for all detections made per frame
        
        #Accessing each KalmanTrackerBox internally to run the trajectory prediction steps outside of the Multiple Object Tracker. We filter to get only active trackers
        live_trackers = (i for i in mot_tracker.trackers if i.id+1 in (int(j[4]) for j in tracked_objects) )
        
        for kt in live_trackers:
            
            k = copy.deepcopy(kt) 
            center = convert_bbox_to_z(k.get_state()[0]).T[0] #Gets the center coordinates of the tracker

            for x in range(60): # Run prediction for 60 steps (equivalent to 60 frames in the future sequentially)
                predicted = k.predict()

            xmin,ymin,xmax,ymax = predicted[0] 

            predicted_center = convert_bbox_to_z(predicted[0]).T[0] #Center of the predicted point of the object
            
            if center[2] < 0.4: #Filters out detections that are too large, usally the dashboard of the car or window frames are caught incorrectly.

                image = Image.fromarray(image_np)
                draw = ImageDraw.Draw(image)
                im_width, im_height = image.size


                (x1,y1) = center[:2]
                (x2,y2) = predicted_center[:2]

                (x1_absolute, x2_absolute, y1_absolute, y2_absolute) = (x1 * im_width, x2 * im_width,
                                      y1 * im_height, y2 * im_height)

                #Draws the trajectory line from the prediction made with the Kalman Filter
                draw.line([(x1_absolute, y1_absolute), (x2_absolute, y2_absolute)],
                          width=3,
                          fill='red')
                
                np.copyto(image_np, np.array(image))
            
                #Hazard detection step. Checks if object falls within set thresholds
                if 0.2 < predicted_center[0] < 0.8 and 0.75 < predicted_center[1] and length(x1,y1,x2,y2) > sqrt(center[2])*0.6:
                    warning_ids.append( (k.id+1,frame+max_age))
            
            
        warning_ids = [i for i in warning_ids if i[1]>frame ] #Warnings should persit for max_age frames, filter expired warnings here
        
        for object in tracked_objects:
            xmin,ymin,xmax,ymax = object[:4]
            track_id = int(object[4])
            n= track_id%40 +1
#             n= track_id
            label = f'object{n:03}'
            
            color = get_tracked_color(track_id)
            
            #Visualize Boxes and WARNING for an object that has been detected as a hazard
            if convert_bbox_to_z(object).T[0][2] < 0.4:
                if track_id in (i[0] for i in warning_ids ):
                    color = 'red'
                    label = 'WARNING!'
                    thickness=4
                vis_util.draw_bounding_box_on_image_array(image_np, ymin, xmin, ymax, xmax, thickness=thickness, display_str_list=[label],color=color)


    elif mode=='locations':
        #Runs the 3D location estimator and prints visualizations
        locations = run_locnet( output_dict['detection_boxes'])

        i=0
        while i < len(objects['location']) and i < len(locations):
            ymin,xmin,ymax,xmax = locations[i][:4]
            coords = locations[i][4:7:2]
            groundtruth_coords = objects['location'].numpy()[i][0:3:2]
            label = "predicted:(%.1f,%.1f), actual:(%.1f,%.1f)" % tuple(np.append(coords  , groundtruth_coords).tolist())
            vis_util.draw_bounding_box_on_image_array(image_np, ymin, xmin, ymax, xmax, thickness=1, display_str_list=[label],color='green')
            i+=1

            
    else:
        # Visualization of the results of object detection detection.
        # If the grountruth flag is set visualize groundtruth boxes in addition
        vis_util.visualize_boxes_and_labels_on_image_array(
          image_np,
          output_dict['detection_boxes'],
          output_dict['detection_classes'],
          output_dict['detection_scores'],
          cat_index,
          use_normalized_coordinates=True,
          line_thickness=2)


        if groundtruth:
            groundtruth_boxes = format_boxes(objects['bbox']).numpy()
            groundtruth_classes = objects['type'].numpy()+1
            vis_util.visualize_boxes_and_labels_on_image_array(
          image_np,
          groundtruth_boxes,
          groundtruth_classes,
            None,
          cat_index,
          use_normalized_coordinates=True,
          line_thickness=1,
            groundtruth_box_visualization_color='blue')
        
    frame +=1
    return image_np

In [None]:

# Load the TensorBoard notebook extension
%load_ext tensorboard

%tensorboard --logdir logs


Run the model an image and show the results:

In [None]:
def show_inference(model, tensor, objects, groundtruth=None, mode=None):
  
    image_np = np.array(tensor)
    image =process_image(image_np, objects, groundtruth, mode)
    display(Image.fromarray(image))
    return

## Experiments

### 3D Location prediction experiments

_(GPU required)_

In [None]:
tracked_ids=None
warning_ids = []
frame = 0
for example in ds_test.take(5):  # example is `{'image': tf.Tensor, 'label': tf.Tensor}`
    image = example["image"]
    objects = example["objects"]
  
    print('bbox:' ,objects['bbox'])
    print('location:', objects['location'])
    print('type:', objects['type'])
    show_inference(detection_model, image, objects, mode='locations')
    

In [None]:
print(len(ds_test))

### Inference Time experiments

_Takes a while.. _

In [None]:
import time

total = 0
for example in ds_test:  # example is `{'image': tf.Tensor, 'label': tf.Tensor}`
    image = example["image"]
    objects = example["objects"]
  
    start = time.time()
    image_np = np.array(image)
    run_inference_for_single_image(detection_model, image_np)
    end = time.time()
    total += (end - start)
    
print( total/len(ds_test))



In [None]:
cat_index

### Qualitative Evaluation Steps

Shows side-by-side comparisons for Object Detections vs groundtruth

In [None]:

from object_detection import eval_util
from object_detection.core import standard_fields as fields
from object_detection.metrics import coco_evaluation
from object_detection.protos import eval_pb2
from object_detection.utils import test_case
from object_detection.utils import tf_version

input_data_fields = fields.InputDataFields
detection_fields = fields.DetectionResultFields


for example in ds_test.take(5):  # example is `{'image': tf.Tensor, 'label': tf.Tensor}`
    image = example["image"]
    objects = example["objects"]
#     show_inference(detection_model, image)
    image_np = np.array(image)
    # Actual detection.
    output_dict = run_inference_for_single_image(detection_model, image_np)


    output_dict = filter_detections( output_dict)

    batch_size = 1
    key=tf.constant('image1')
    
    groundtruth_boxes = format_boxes(objects['bbox'])
    groundtruth_classes = objects['type']+1
    groundtruth = {
        input_data_fields.groundtruth_boxes: groundtruth_boxes,
        input_data_fields.groundtruth_classes: groundtruth_classes,      
    }
    
    num_detections = tf.constant([len(output_dict['detection_classes'])])
    
    detections = {
        detection_fields.detection_boxes: tf.constant([output_dict['detection_boxes']]) ,
        detection_fields.detection_scores: tf.constant([output_dict['detection_scores']]),
        detection_fields.detection_classes: tf.constant([output_dict['detection_classes']-1]),
        detection_fields.num_detections: num_detections
       
    }
    

    image = tf.constant([image_np])
    
    
    result_dict = eval_util.result_dict_for_single_example(image, key,detections, groundtruth)
    
    side_by_side_img =  vis_util.draw_side_by_side_evaluation_image(result_dict,cat_index)[0][0].numpy()
    display(Image.fromarray(side_by_side_img))


### Quantitative Evaluation Steps (PASCAL VOC metric)

In [None]:
from object_detection.core import standard_fields
from object_detection.utils import object_detection_evaluation


categories = [{
        'id': 1,
        'name': 'car'
    }, {
        'id': 4,
        'name': 'pedestrian'
    }]

pascal_evaluator = object_detection_evaluation.PascalDetectionEvaluator(
        categories,matching_iou_threshold=0.5)

for index, example in enumerate(ds_test):  
    image = example["image"]
    objects = example["objects"]
  
    image_np = np.array(image)
    # Actual detection.
    output_dict = run_inference_for_single_image(detection_model, image_np)
    output_dict = filter_detections( output_dict)
    
    image_key = 'img'+str(index)
    
    groundtruth_boxes = format_boxes(objects['bbox']).numpy()
    groundtruth_classes = objects['type'].numpy()+1


    pascal_evaluator.add_single_ground_truth_image_info(
        image_key,
        {standard_fields.InputDataFields.groundtruth_boxes: groundtruth_boxes,
         standard_fields.InputDataFields.groundtruth_classes:
         groundtruth_classes,
        })

    if len(output_dict['detection_classes']):
        pascal_evaluator.add_single_detected_image_info(
                image_key,
                {standard_fields.DetectionResultFields.detection_boxes: output_dict['detection_boxes'],
                 standard_fields.DetectionResultFields.detection_scores:
                 output_dict['detection_scores'],
                 standard_fields.DetectionResultFields.detection_classes:
                 output_dict['detection_classes']
                })
    

    metrics = pascal_evaluator.evaluate()
    

In [None]:
metrics

In [None]:
!pip install moviepy proglog

Uncomment the line below of the accident_compilation file doesn't exist in the project folder (Esp if you downloaded this from my github)

In [None]:
# !gsutil cp gs://hazard-detection-test-videos/accident_compilation.mp4 accident_compilation.mp4

## Runs model on actual video

In [None]:
from moviepy.editor import VideoFileClip
import proglog
proglog.notebook()

mot_tracker = Sort( max_age=8, iou_threshold=0.5)

tracked_ids=None
warning_ids = []
frame = 0
write_output = 'output_' + model_name + str(time.time())+ '.mp4'
## To speed up the testing process you may want to try your pipeline on a shorter subclip of the video
## To do so add .subclip(start_second,end_second) to the end of the line below
## Where start_second and end_second are integer values representing the start and end of the subclip
## You may also uncomment the following line for a subclip of the first n seconds
# clip1 = VideoFileClip("test.mp4").subclip(0,1)

clip1 = VideoFileClip("accident_compilation.mp4")


white_clip = clip1.fl_image(process_image) 

white_clip.write_videofile(write_output, audio=False, verbose=False)

clip1.close()
