## D2RL data process

In [None]:
import matplotlib.pyplot as plt
import os
import json
import xml.etree.ElementTree as ET
import numpy as np
from shapely.geometry import Polygon, Point, LineString
import pandas as pd
import math
import multiprocessing as mp



def _get_region_coordinates(anchor_center_pt, direction_pt1, direction_pt2, distance=100):
    direction1 = np.arctan2(direction_pt1[1] - anchor_center_pt[1], direction_pt1[0] - anchor_center_pt[0])
    new_pt1 = [anchor_center_pt[0] + distance * np.cos(direction1), anchor_center_pt[1] + distance * np.sin(direction1)]

    direction2 = np.arctan2(direction_pt2[1] - anchor_center_pt[1], direction_pt2[0] - anchor_center_pt[0])
    new_pt2 = [anchor_center_pt[0] + distance * np.cos(direction2), anchor_center_pt[1] + distance * np.sin(direction2)]

    return anchor_center_pt, new_pt1, new_pt2


def relative_position_two_vehicles(anchor_v, determine_v):
    anchor_center_pt = [anchor_v.location.x, anchor_v.location.y]
    pt1, pt2, pt3, pt4 = anchor_v.realworld_4_vertices  # upper left clockwise

    determine_v_center = Point(determine_v.location.x, determine_v.location.y)

    relative_position = None
    # front
    front_area = Polygon(_get_region_coordinates(anchor_center_pt, pt1, pt2, distance=100))
    if front_area.contains(determine_v_center):
        relative_position = 'front'
        return relative_position

    # rear
    rear_area = Polygon(_get_region_coordinates(anchor_center_pt, pt3, pt4, distance=100))
    if rear_area.contains(determine_v_center):
        relative_position = 'rear'
        return relative_position

    # left
    left_area = Polygon(_get_region_coordinates(anchor_center_pt, pt1, pt4, distance=100))
    if left_area.contains(determine_v_center):
        relative_position = 'left'
        return relative_position

    # right
    right_area = Polygon(_get_region_coordinates(anchor_center_pt, pt2, pt3, distance=100))
    if right_area.contains(determine_v_center):
        relative_position = 'right'
        return relative_position


def _collision_type(relative_position, relative_heading):
    deg1 = 10.
    collision_type = 'other'
    if relative_position == 'rear' and relative_heading <= deg1:
        collision_type = 'rear_end'
    if relative_position == 'rear' and deg1 < relative_heading <= 90.:
        collision_type = 'angle'
    if relative_position == 'rear' and 90. < relative_heading:
        # collision_type = 'backing'
        # currently classify backing crash as other
        collision_type = 'other'

    if relative_position == 'front' and relative_heading <= deg1:
        collision_type = 'rear_end'
    if relative_position == 'front' and deg1 < relative_heading <= 90.:
        collision_type = 'angle'
    if relative_position == 'front' and 90. < relative_heading:
        # collision_type = 'head_on'
        # currently classify head_on crash as other
        collision_type = 'other'

    deg2 = 40.
    if (relative_position == 'left' or relative_position == 'right') and ((relative_heading <= deg2) or (relative_heading >= 180. - deg2)):
        collision_type = 'sideswipe'
    if (relative_position == 'left' or relative_position == 'right') and (deg2 < relative_heading < 180. - deg2):
        collision_type = 'angle'

    return collision_type


def relative_heading_two_vehicles(anchor_v, determine_v):
    """
    Return the relative angle between two vehicles heading.
    Return in degrees in [0, 180].
    """
    angle = abs(anchor_v.speed_heading - determine_v.speed_heading) % 360
    relative_heading = angle if (0 <= angle <= 180) else 360. - angle
    assert (0. <= relative_heading <= 180.)

    return relative_heading


def collision_type_classification(collision_vehicle_list):
    """
    Do collision type classification based on relative position and heading of colliding vehicles.
    Currently, we only consider two vehicle collision case.
    """
    collision_type = 'other'
    location_region = "not_in_ROI"
    relative_position, relative_heading = None, None
    if len(collision_vehicle_list) == 2:
        v1, v2 = collision_vehicle_list
        relative_position = relative_position_two_vehicles(anchor_v=v1, determine_v=v2)
        relative_heading = relative_heading_two_vehicles(anchor_v=v1, determine_v=v2)
        collision_type = _collision_type(relative_position, relative_heading)
    if len(collision_vehicle_list) > 2:
        collision_type = 'more_than_two_vehicles'
        raise ValueError('Crash with more than two vehicles!')
    return collision_type, location_region, relative_position, relative_heading

def read_SUMO_NADE_dataset(path="./non_crash_result/", id=0, read_only_json=False):
    with open(os.path.join(path, str(id) + ".json"), "r") as json_file:
        json_obj = json.load(json_file)

    if not read_only_json:
        xml_path = os.path.join(path, str(id) + ".fcd.xml")
        fcd_tree = ET.parse(xml_path)
    else:
        fcd_tree = None
    return json_obj, fcd_tree


def construct_v_object(vid, x, y, lane_id, heading, speed, v_width=1.8, v_length=4.8, v_height=1.5, mass=1):
    """
    The heading here is [deg] east:0, north:90, south:270
    """
    v = Vehicle()
    v.location.x, v.location.y = x, y
    v.id = vid
    v.speed_heading = heading
    v.speed = speed
    v.size = Size3d(width=v_width, length=v_length, height=v_height)
    v.update_poly_box_and_realworld_4_vertices()
    v.mass = mass
    v.lane_id = lane_id

    return v


class Location(object):
    def __init__(self, x=None, y=None, z=None):
        self.x = x
        self.y = y
        self.z = z


class Size3d(object):
    def __init__(self, width=None, length=None, height=None):
        self.width = width
        self.length = length
        self.height = height


def _rotate_pt(x, y, a):
    return np.cos(a) * x - np.sin(a) * y, np.sin(a) * x + np.cos(a) * y


def get_box_pts_from_center_heading(length, width, xc, yc, heading):
    l, w = length / 2.0, width / 2.0

    ## box
    x1, y1 = l, w
    x2, y2 = l, -w
    x3, y3 = -l, -w
    x4, y4 = -l, w

    ## rotation
    a = heading / 180. * np.pi
    x1_, y1_ = _rotate_pt(x1, y1, a)
    x2_, y2_ = _rotate_pt(x2, y2, a)
    x3_, y3_ = _rotate_pt(x3, y3, a)
    x4_, y4_ = _rotate_pt(x4, y4, a)

    ## translation
    pt1 = [x1_ + xc, y1_ + yc]
    pt2 = [x2_ + xc, y2_ + yc]
    pt3 = [x3_ + xc, y3_ + yc]
    pt4 = [x4_ + xc, y4_ + yc]

    return [pt1, pt2, pt3, pt4]


class Vehicle(object):
    def __init__(self):
        "Configuration for vehicle states"
        self.location = Location()
        self.size = Size3d()
        self.realworld_4_vertices = None
        self.id = '-1'
        self.speed = None
        self.speed_heading = None
        self.poly_box = None  # The rectangle of the vehicle using shapely.Polygon
        self.gt_realworld_4_vertices = None  # using gt_size
        self.gt_poly_box = None  # using gt_size
        self.mass = None  # mass of the vehicle. To calculate the crash severity
        self.lane_id = None  # the lane id the vehicle is at

    def update_poly_box_and_realworld_4_vertices(self):
        """
        Update the poly box and realworld 4 vertices based on current location (x,y) and speed heading
        Returns
        -------

        """
        length, width = self.size.length, self.size.width
        realworld_4_vertices = get_box_pts_from_center_heading(length=length, width=width, xc=self.location.x, yc=self.location.y, heading=self.speed_heading)
        new_poly_box = Polygon(realworld_4_vertices)
        self.poly_box = new_poly_box
        self.realworld_4_vertices = np.array(realworld_4_vertices)


def get_center_from_front_center(x_front_center, y_front_center, angle, VEH_LENGTH):
    def _rotate(x1, y1, x2, y2, angle=0.):
        x = (x1 - x2) * np.cos(angle) - (y1 - y2) * np.sin(angle) + x2
        y = (x1 - x2) * np.sin(angle) + (y1 - y2) * np.cos(angle) + y2
        return [x, y]

    x_center, y_center = _rotate(x_front_center - 0.5 * VEH_LENGTH, y_front_center, x_front_center, y_front_center, angle=-math.radians(angle - 90.))
    return x_center, y_center


def vec_speed_diff(v1_obj, v2_obj):
    v1_velocity, v1_heading = v1_obj.speed, v1_obj.speed_heading
    v2_velocity, v2_heading = v2_obj.speed, v2_obj.speed_heading
    v1_velocity_vec = np.array([v1_velocity * np.cos(math.radians(v1_heading)), v1_velocity * np.sin(math.radians(v1_heading))])
    v2_velocity_vec = np.array([v2_velocity * np.cos(math.radians(v2_heading)), v2_velocity * np.sin(math.radians(v2_heading))])
    speed_diff_norm = np.linalg.norm(v1_velocity_vec - v2_velocity_vec)
    return speed_diff_norm

import glob
import random

import glob
def co_sim_crash_type_severity_analysis(crash_json_list, path="NDE"):
    generate_crash_video = False
    result_path = "./"+path+"/crash_type_severity/"
    os.makedirs(result_path, exist_ok=True)
    json_path_list = crash_json_list

    worker_input_list = []
    from tqdm import tqdm
    for json_path in tqdm(json_path_list):
        if 1:
            try:
                fcd_path = json_path
                sim_id = int(os.path.splitext(json_path)[0].split("/")[-1])
                with open(json_path, "r") as json_file:
                    json_obj = json.load(json_file)
                assert (json_obj["collision_result"] == 1)
                sim_weight = json_obj['weight_episode']
                save_file_name = json_path
                worker_input_list.append([json_obj, sim_id, generate_crash_video, result_path, save_file_name])
            except Exception as e:
                print(e)
                continue
    print(len(worker_input_list))
    main_func_crash_type_severity_analysis(worker_input_list, result_path)


def main_func_crash_type_severity_analysis(worker_input_list, result_path):

    total_crash_num = 0
    crash_type_res = {"angle": {"num": 0, "weighted_num": 0, "sim_id_list": []},
                      "sideswipe": {"num": 0, "weighted_num": 0, "sim_id_list": []},
                      "rear_end": {"num": 0, "weighted_num": 0, "sim_id_list": []},
                      "head_on": {"num": 0, "weighted_num": 0, "sim_id_list": []},
                      "other": {"num": 0, "weighted_num": 0, "sim_id_list": []}}
    crash_type_location_res = {"highway": {"rear_end": 0, "sideswipe": 0, "angle": 0, "head_on": 0, "other": 0, "sum": 0},
                               "roundabout1": {"rear_end": 0, "sideswipe": 0, "angle": 0, "head_on": 0, "other": 0, "sum": 0},
                               "roundabout2": {"rear_end": 0, "sideswipe": 0, "angle": 0, "head_on": 0, "other": 0, "sum": 0},
                               "roundabout3": {"rear_end": 0, "sideswipe": 0, "angle": 0, "head_on": 0, "other": 0, "sum": 0},
                               "intersection1": {"rear_end": 0, "sideswipe": 0, "angle": 0, "head_on": 0, "other": 0, "sum": 0},
                               "intersection2": {"rear_end": 0, "sideswipe": 0, "angle": 0, "head_on": 0, "other": 0, "sum": 0},
                               "intersection3": {"rear_end": 0, "sideswipe": 0, "angle": 0, "head_on": 0, "other": 0, "sum": 0},
                               "not_in_ROI": {"rear_end": 0, "sideswipe": 0, "angle": 0, "head_on": 0, "other": 0, "sum": 0}}
    deltaV_res = {"frontal_impact": {"deltaV": [], "weight": []}, "side_impact": {"deltaV": [], "weight": []}}
    speed_diff_norm_res = {"speed_diff_list": [], "weight": []}
    
    for worker_input in worker_input_list:
        res = worker_crash_type_and_severity(worker_input)
        collision_type, location_region, sim_weight, save_file_name, deltaV, speed_diff_norm, sim_id = res

        total_crash_num += 1
        crash_type_res[collision_type]['num'] += 1
        crash_type_res[collision_type]['weighted_num'] += sim_weight
        crash_type_res[collision_type]['sim_id_list'].append(save_file_name)
        
        speed_diff_norm_res["speed_diff_list"].append(speed_diff_norm)
        speed_diff_norm_res["weight"].append(sim_weight)
#         print(sim_id, speed_diff_norm)

        if collision_type == "rear_end" or collision_type == "head_on":
            deltaV_res["frontal_impact"]["deltaV"].append(deltaV)
            deltaV_res["frontal_impact"]["weight"].append(sim_weight)
        else:
            deltaV_res["side_impact"]["deltaV"].append(deltaV)
            deltaV_res["side_impact"]["weight"].append(sim_weight)

    # Crash Type
    crash_type_res_path = os.path.join(result_path, "crash_type")
    os.makedirs(crash_type_res_path, exist_ok=True)
    crash_type_file_name = "crash_type_res.json"
    with open(os.path.join(crash_type_res_path, crash_type_file_name), "w") as outfile:
        json.dump(crash_type_res, outfile)

    # Crash severity: speed diff norm
    speed_diff_norm_res_path = os.path.join(result_path, "speed_diff_norm")
    os.makedirs(speed_diff_norm_res_path, exist_ok=True)
    speed_diff_norm_file_name = "speed_diff_norm.json"
    with open(os.path.join(speed_diff_norm_res_path, speed_diff_norm_file_name), "w") as outfile:
        json.dump(speed_diff_norm_res, outfile)

        
def worker_crash_type_and_severity(input_data):
    VEH_LENGTH, VEH_WIDTH, VEH_HEIGHT = 4.8, 1.8, 1.5
    json_obj, sim_id, generate_crash_video, result_path, save_file_name = input_data
    crash_bv_id = json_obj['collision_id'][0] if json_obj['collision_id'][0]!= "CAV" else json_obj['collision_id'][1]
    sim_weight = json_obj['weight_episode']
    
    av_obs = json_obj["av_obs"]
    last_timestep = list(av_obs.keys())[-1]
    for key in av_obs[last_timestep]:
        vehicle = av_obs[last_timestep][key]
        if vehicle is None:
            continue
        vid, x_front_center, y_front_center, heading, speed, lane_id = vehicle["veh_id"], vehicle["position"][0], vehicle["position"][1], - vehicle["heading"] + 90., vehicle["velocity"], vehicle["lane_index"]
        x_center, y_center = get_center_from_front_center(x_front_center, y_front_center, angle=float(vehicle["heading"]), VEH_LENGTH=VEH_LENGTH)
        x, y = x_center, y_center
        
        if vehicle["veh_id"] == 'CAV':
            CAV_obj = construct_v_object(vid=vid, x=x, y=y, lane_id=lane_id, heading=heading, speed=speed, v_width=VEH_WIDTH, v_length=VEH_LENGTH, v_height=VEH_HEIGHT, mass=1)
        if vehicle["veh_id"] == crash_bv_id:
            BV_obj = construct_v_object(vid=vid, x=x, y=y, lane_id=lane_id, heading=heading, speed=speed, v_width=VEH_WIDTH, v_length=VEH_LENGTH, v_height=VEH_HEIGHT, mass=1)

        
    collision_vehicle_list = [CAV_obj, BV_obj]
    collision_type, _, _, _ = collision_type_classification(collision_vehicle_list=collision_vehicle_list)

    speed_diff_norm = vec_speed_diff(v1_obj=CAV_obj, v2_obj=BV_obj)
    speed_diff_norm = speed_diff_norm * 2.23694  # m/s to mph

    if generate_crash_video:
        replay_traj_path(traj_path=os.path.join(sub_crash_data_folder, str(sim_id) + ".fcd.xml"), file_name=save_file_name,
                         collision_type=collision_type, crash_location=location_region, save_path=result_path)
    return collision_type, 0, sim_weight, save_file_name, 0, speed_diff_norm, sim_id
    

In [None]:
worker_num = 8  # number of workers to use for multiprocessing.
root_folder = "../raw_data/D2RL/"
path_list = ["Experiment-2lane_400m_D2RL_2022-09-10"]
crash_json_list = []
for path in path_list:
    crash_path = root_folder + path + "/crash/"
    crash_json_list.extend(glob.glob(crash_path + "*.json"))
co_sim_crash_type_severity_analysis(crash_json_list, path="D2RL")


 35%|███▌      | 3565/10086 [01:28<02:24, 45.25it/s]

Expecting value: line 1 column 1 (char 0)


100%|██████████| 10086/10086 [04:03<00:00, 41.43it/s]


10085


## NDE Data Process

In [None]:
import glob
root_folder = "../raw_data/NDE/"
path_list = ["Experiment-2lane_400m_NDE_v1_2022-09-09", "Experiment-2lane_400m_NDE_v2_2022-09-09", "Experiment-2lane_400m_NDE_v3_2022-09-09"]
crash_json_list = []
for path in path_list:
    crash_path = root_folder + path + "/crash/"
    crash_json_list.extend(glob.glob(crash_path + "*.json"))
print(len(crash_json_list))

37


In [None]:
# print(crash_json_list)
co_sim_crash_type_severity_analysis(crash_json_list, path="NDE")

100%|██████████| 37/37 [00:01<00:00, 31.92it/s]


37
