# Car Crash Detection

In each image frame, this application detects cars that have stopped for more than 10 seconds and recognizes as an accident if the car has been damaged.

If this application is not checked for stopping status, the car running in state of damage may be recognized as accident car.

I used a pre-learned base model for car detection. on the other hand, I learned the model for car damage detection. However, the code about training is the same as car fire module, so the code is not added here.

In [None]:
import os
import pathlib
import numpy as np
import os
import six.moves.urllib as urllib
import sys
import tarfile
import tensorflow as tf
import zipfile
import time
from collections import defaultdict
from io import StringIO
from matplotlib import pyplot as plt
from PIL import Image
from IPython.display import display
from object_detection.utils import ops as utils_ops
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util
import cv2
import pandas as pd
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator, img_to_array

utils_ops.tf = tf.compat.v1
tf.gfile = tf.io.gfile

root = "."
image_path = '/images'
output_dir = './output_dir'

def load_model_local(model_name):
  model_dir = './model/' + model_name + '/saved_model'
  model = tf.compat.v2.saved_model.load(str(model_dir), None)
  model = model.signatures['serving_default']
  return model

# list of the strings that is used to add correct label for each box.
PATH_TO_LABELS = root + '_car_crash_detection/object_detection/data/kitti_label_map.pbtxt'
category_index = label_map_util.create_category_index_from_labelmap(PATH_TO_LABELS, use_display_name=True)

PATH_TO_TEST_IMAGES_DIR = pathlib.Path(image_path)
TEST_IMAGE_PATHS = sorted(list(PATH_TO_TEST_IMAGES_DIR.glob("*.jpg")))
TEST_IMAGE_PATHS

# model for object detection (car)
model_name = 'faster_rcnn_resnet101_kitti_2018_01_28'
selected_model = load_model_local(model_name)

# model for car crash check
damage_model = load_model('./model/dense201_20epochs_6202_2071_model.h5')

# this code block is referenced from tensorflow api
def run_inference_for_single_image(model, image):
  image = np.asarray(image)
  input_tensor = tf.convert_to_tensor(image)
  input_tensor = input_tensor[tf.newaxis,...]
  output_dict = model(input_tensor)

  # We're only interested in the first num_detections.
  num_detections = int(output_dict.pop('num_detections'))
  output_dict = {key:value[0, :num_detections].numpy() 
                 for key,value in output_dict.items()}
  output_dict['num_detections'] = num_detections

  # detection_classes should be ints.
  output_dict['detection_classes'] = output_dict['detection_classes'].astype(np.int64)
   
  # handle models with masks:
  if 'detection_masks' in output_dict:
    # reframe the the bbox mask to the image size.
    detection_masks_reframed = utils_ops.reframe_box_masks_to_image_masks(
              output_dict['detection_masks'], output_dict['detection_boxes'],
               image.shape[0], image.shape[1])      
    detection_masks_reframed = tf.cast(detection_masks_reframed > 0.5,
                                       tf.uint8)
    output_dict['detection_masks_reframed'] = detection_masks_reframed.numpy()
    
  return output_dict


# count input images
count = 0
for path in pathlib.Path(image_path).iterdir():
  if path.is_file():
    count += 1
print(count)

check = 0
temp = 0
accident_flag = 0

for i in range(count):
  #feed test image
  feed_image = np.array(Image.open(TEST_IMAGE_PATHS[i]))
  output = run_inference_for_single_image(selected_model, feed_image)
  
  curcoord = list()
  prevcoord = list()
  diff = list()
  last_idx = list()

  if i == 0:
    prevoutput = output
  else:
    for previ in range(prevoutput['num_detections']):  # for all of detected cars
      if prevoutput['detection_scores'][previ] > 0.5:  # if detection score is larger than 0.5, 
        prevcoord = prevoutput['detection_boxes'][previ]  # previous coordinates is saved

        for curi in range(output['num_detections']):
          if output['detection_scores'][curi] > 0.5:
            curcoord = output['detection_boxes'][curi]
            diff = abs(curcoord - prevcoord)   # and yield diffence value with current and previous coordinate

            # both values are smaller than 0.01 means this car object is not moved from previous frame
            if diff[0] < 0.01 and diff[1] < 0.01:  
              temp = 1
              last_idx.append(curi)

              
  if temp == 1:
    temp = 0
    check += 1

  if i%10 == 0:  # check every 10 steps
    if check > 9:  # if car is stopped for 9 steps
      print('stopped')
      last_img = Image.open(TEST_IMAGE_PATHS[i])
      width, height= last_img.size
      print (width, height)
      print(last_idx)
      
      for idx in last_idx:
            # save cropped images x, y, height, width
            crd_x = int(output['detection_boxes'][idx][1] * width)
            crd_y = int(output['detection_boxes'][idx][0] * height)
            crd_h = int(output['detection_boxes'][idx][2] * height)-int(output['detection_boxes'][idx][0] * height)
            crd_w = int(output['detection_boxes'][idx][3]*width)-int(output['detection_boxes'][idx][1] * width)
            area = (crd_x,crd_y,crd_x+crd_w,crd_y+crd_h)
            size = crd_h*crd_w
            if size > 8000:  # exclude objects that are too small because they are not recognized
              print(area)
              cropped_img = last_img.crop(area)
              cropped_img.save('./cropped_images/' + 'croppedimage' + str(i) + '_' +str(idx) +  '.jpg')
                     
              test_img = cv2.imread(root + '/_car_crash_detection/cropped_images/' + 'croppedimage' + str(i) + '_' +str(idx) +  '.jpg')
              test_img = cv2.resize(test_img,(160,160), interpolation =  cv2.INTER_CUBIC)
              test_img = np.array(test_img)
              test_img = np.expand_dims(test_img,axis=0)
              test_img = test_img/255
              
              # predict car crash with cropped image
              accident = damage_model.predict_classes(test_img)
           
              # if car crash is detected
              if accident == 0:
                print("accident is detected !!!")     

    check = 0        


  prevoutput = output 
  print(i, check)

  #load label (this code block is referenced from tensorflow api)
  class_info = {}
  f = open(root + '_car_crash_detection/class_info.txt', 'r')
  for line in f:
      info = line.split(', ')
      class_index = int(info[0])
      class_name = info[1]
      color = (int(info[2][1:]), int(info[3]), int(info[4].strip()[:-1]))    
      class_info[class_index] = [class_name, color]
  f.close()

  cv2.imwrite(output_dir + output_name + '.jpg', image)

