In [9]:
from commonroad.common.file_reader import CommonRoadFileReader
import os
import matplotlib.pyplot as plt
from IPython import display
from commonroad.common.file_reader import CommonRoadFileReader
from commonroad.visualization.draw_dispatch_cr import draw_object
import numpy as np
import math
from commonroad.geometry.shape import Circle
from commonroad.scenario.obstacle import StaticObstacle, ObstacleType
from commonroad.scenario.trajectory import State
from commonroad_dc.collision.collision_detection.pycrcc_collision_dispatch import create_collision_checker, \
    create_collision_object
from commonroad_dc.boundary import boundary
import commonroad_dc.pycrcc as pycrcc
from commonroad_dc.collision.trajectory_queries import trajectory_queries
import cvxpy as cp
import pandas as pd
import triangle



## Longitudinal Distance

The functions below determine if the vehicle is keeping a safe longitudinal distance from other vehicles. The safe distance parameter can be changed depending on what is deemed a safe distance. One option is the 3 second rule, that any vehicle in front of the ego should be at least 3 seconds away. Although RSS interprets this rule using breaking and acceleration rates as well as the velocity of vehicles in front, the legal interpretation states that at a given velocity, the ego should treat all other objects as stationary at a given instant and assume constant velocity until that object is reached. Thus, no acceleration rates were considered and each dynamic obstacle's position was fixed for a given point in time.

In [3]:
from commonroad.scenario.obstacle import DynamicObstacle
from commonroad.scenario.trajectory import Trajectory
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad.geometry.shape import Rectangle
from math import sqrt
from commonroad.geometry.shape import Rectangle
from commonroad.scenario.obstacle import DynamicObstacle, ObstacleType
from commonroad.scenario.trajectory import State


def distance(x_0, x_1, y_0, y_1):
    return sqrt((x_1-x_0)**2 + (y_1-y_0)**2)

def create_ego_tvo_from_i(scenario, i):
    safe_dist_list = []
    
    ego = scenario.obstacle_by_id(-1)
    car_length = ego.obstacle_shape.length
    car_width = ego.obstacle_shape.width
    state_list = ego.prediction.trajectory.state_list
    tvo=pycrcc.TimeVariantCollisionObject(i)

    for j in range(i, len(state_list)-1):
        
        current_pos = state_list[j].position
        next_pos = state_list[j+1].position
        orientation = state_list[j].orientation
        
        x = current_pos[0]
        y = current_pos[1]
        x_1 = next_pos[0]
        y_1 = next_pos[1]
        
        v = distance(x, x_1, y, y_1)
        
        safe_dist = 3*v
        if i == 0:
            safe_dist_list.append(safe_dist)
        
        if (v > 0):
            length = (car_length/2) + safe_dist/2
            offset_magnitude = ((length*2) - car_length)/2
            offset_x = ((x_1-x)/v)*(offset_magnitude-1)
            offset_y = ((y_1-y)/v)*(offset_magnitude-1)


            #note that the collision obstacle created needs a half length input
            tvo.append_obstacle(pycrcc.RectOBB(length,car_width/2,orientation,x+offset_x,y+offset_y))
        else: 
            tvo.append_obstacle(pycrcc.RectOBB(car_length/2,car_width/2,orientation,x,y))
    return tvo, safe_dist_list
def create_triangle_mesh(vertices):
    # triangulate the polygon
    number_of_vertices = len(vertices)
    segments = list(zip(range(0, number_of_vertices-1), range(1, number_of_vertices)))
    segments.append((0, number_of_vertices-1))
    triangles = triangle.triangulate({'vertices': vertices, 'segments': segments}, opts='pqS2.4')
    # convert all triangles to pycrcc.Triangle
    mesh = list()
    for t in triangles['triangles']:
        v0 = triangles['vertices'][t[0]]
        v1 = triangles['vertices'][t[1]]
        v2 = triangles['vertices'][t[2]]
        mesh.append(pycrcc.Triangle(v0[0], v0[1],
                                    v1[0], v1[1],
                                    v2[0], v2[1]))
    return mesh


def create_tvo(obs, i, obs_obj_dict):
    length = obs.obstacle_shape.length
    width = obs.obstacle_shape.width
    state_list = obs.prediction.occupancy_set
    tvo = pycrcc.TimeVariantCollisionObject(i)
    
    for j in range(i, len(state_list)):
        pos = state_list[j].shape.center
        orientation = state_list[j].shape.orientation
        
        x = pos[0]
        y = pos[1]
        
        vertices = state_list[j].shape.vertices
        mesh = create_triangle_mesh(vertices)
        tvo.append_obstacle(pycrcc.Polygon(vertices,list(),mesh))
    obs_obj_dict[tvo] = obs
    return tvo


def get_shortest_distance(vert_1, vert_2):
    
    X = vert_1
    Y = vert_2

    # maximum thickness separating slab
    a, b = cp.Variable(2), cp.Variable()
    prob = cp.Problem(cp.Minimize(cp.norm2(a)), [a.T@X - b >= 1, a.T@Y - b <= -1])
    prob.solve()
    if a.value is None:
        return 0
    width_max = 2 / np.linalg.norm(a.value)
    return width_max


def long_dist(scenario):
    
    violations = []
    
    
    ego = scenario.obstacle_by_id(-1)
    
    car_length = ego.obstacle_shape.length
    car_width = ego.obstacle_shape.width
    
    state_list = ego.prediction.trajectory.state_list
    for i in range(0, len(state_list)-1):
        violations.append(0)
        current_pos = state_list[i].position
        next_pos = state_list[i+1].position
        orientation = state_list[i].orientation
        
        x = current_pos[0]
        y = current_pos[1]
        x_1 = next_pos[0]
        y_1 = next_pos[1]
        
        v = distance(x, x_1, y, y_1)
        safe_dist = 3*v
        center = np.array([x, y])
        length = car_length

        if (v > 0):
            length = car_length + safe_dist
            offset_magnitude = length - (car_length)
            offset_x = ((x_1-x)/v)*(offset_magnitude)
            offset_y = ((y_1-y)/v)*(offset_magnitude)
            center = np.array([x+offset_x, y+offset_y])
        
        ego_rect = Rectangle(length,car_width,center,orientation)
        
        ego_vertices = np.transpose(ego_rect.vertices[:-1])
        for obs in scenario.dynamic_obstacles:
            if obs.obstacle_id != -1 and obs.prediction.occupancy_at_time_step(i) is not None:
                obs_rect = obs.prediction.occupancy_at_time_step(i).shape
                obs_vertices = np.transpose(obs_rect.vertices[:-1])
                
                if(get_shortest_distance(ego_vertices, obs_vertices) == 0):
                    ego_vertices = np.transpose(ego.prediction.occupancy_at_time_step(i).shape.vertices[:-1])
                    actual_dist = get_shortest_distance(ego_vertices, obs_vertices)
                    
                    
                    violation = safe_dist - actual_dist
                    if violation < 0:
                        violation = 0
                    violations[i] = violation
        
    return violations

## Lane Tracking
Lane tracking is measured by generating a list of possible lanelets that the vehicle is occupied in and trying to maintain its position in. Then, for each time step, the distance traveled down the lanelet is calculated and the interpolated position down that lanelet is generated as a desired distance. The minimum distance from the actual position of the vehicle and the interpolated position down the exact centerline of the lanelet is then computed to be the error of that time step.

In [4]:
def lane_tracking(scenario):
    
    ego = scenario.obstacle_by_id(-1)
    state_list = ego.prediction.trajectory.state_list
    
    desired_state_list = []
    error_list = []
    for i in range(0, len(state_list)-1):
        desired_state_list.append(state_list[i].position)
        error_list.append(1000000.0)
    for i in range(0, len(state_list)-1):
        candidate_lanelets = []
        for lanelet in scenario.lanelet_network.lanelets:
            pos = state_list[i].position
            if (lanelet.convert_to_polygon().contains_point(pos)):
                candidate_lanelets.append(lanelet)
        for lanelet in candidate_lanelets:
            
            lanelet_start = lanelet.interpolate_position(0)[0]
            curr_pos = state_list[i].position
            init_dist = distance(lanelet_start[0], curr_pos[0], lanelet_start[1], curr_pos[1])
            
            next_pos = state_list[i+1].position
            next_dist = distance(next_pos[0], curr_pos[0], next_pos[1], curr_pos[1])
            
            if(init_dist + next_dist <= lanelet.distance[-1]):
                desired_pos = lanelet.interpolate_position(init_dist + next_dist)[0]
                error = distance(desired_pos[0], next_pos[0], desired_pos[1], next_pos[1])
            
                if (error < error_list[i]):
                    desired_state_list[i] = desired_pos
                    
                    error_list[i] = error
            else: 
                #edge case where we have reached the end of a lanelet and cannot calculate interpolated position
                desired_pos = lanelet.interpolate_position(lanelet.distance[-1])[0]
                error = distance(desired_pos[0], next_pos[0], desired_pos[1], next_pos[1])
            
                if (error < error_list[i]):
                    desired_state_list[i] = desired_pos
                    error_list[i] = error
    for i in range(0, len(desired_state_list)):
        desired_pos = desired_state_list[i]
        static_obstacle_id = scenario.generate_object_id()
        static_obstacle_type = ObstacleType.PARKED_VEHICLE
        static_obstacle_shape = Circle(radius = 0.25)
        static_obstacle_initial_state = State(position = desired_pos, orientation = 0, time_step = 0)

        # feed in the required components to construct a static obstacle
        static_obstacle = StaticObstacle(static_obstacle_id, static_obstacle_type, static_obstacle_shape, static_obstacle_initial_state)

        # add the static obstacle to the scenario
        scenario.add_objects(static_obstacle)
    return error_list

## Relative Speed
Here, the differential between the ego's current velocity and the average velocity of all other vehicles is compared. The function returns an array of the differential at each time step

In [5]:
def relative_speed(scenario):    
    avg_v = []
    differential_v = []
    
    ego = scenario.obstacle_by_id(-1)
    state_list = ego.prediction.trajectory.state_list
    count = len(scenario.dynamic_obstacles)
    for i in range(0, len(state_list)-1):
        total_v = 0
        for obs in scenario.dynamic_obstacles:
            if obs.obstacle_id != -1:
                if obs.prediction.occupancy_at_time_step(i+1) is not None and obs.prediction.occupancy_at_time_step(i) is not None:
                    pos_1 = obs.prediction.occupancy_at_time_step(i).shape.center
                    pos_2 = obs.prediction.occupancy_at_time_step(i+1).shape.center
                    obs_v = distance(pos_1[0], pos_2[0], pos_1[1], pos_2[1])
                    total_v = total_v + obs_v
        avg_v.append(total_v/count)
        ego_pos_1 = ego.prediction.occupancy_at_time_step(i).shape.center
        ego_pos_2 = ego.prediction.occupancy_at_time_step(i+1).shape.center
        ego_v = distance(ego_pos_1[0], ego_pos_2[0], ego_pos_1[1], ego_pos_2[1])
        diff_v = ego_v - (total_v/count)
        differential_v.append(diff_v)
    return differential_v

# Kinetic Energy Transfer
As our last metric, we calculate the kinetic energy transfer of the ego vehicle's trajectory upon collision. The velocity just prior to collision is taken for each vehicle involved in the collision.

In [68]:
from commonroad_dc.collision.trajectory_queries.trajectory_queries import trajectories_collision_dynamic_obstacles
from commonroad_dc import pycrcc

def get_shortest_distance_ke(obs_1, obs_2, i):
    
    X = np.transpose(obs_1.occupancy_at_time(i).shape.vertices[:-1])
    Y = np.transpose(obs_2.occupancy_at_time(i).shape.vertices[:-1])

    # maximum thickness separating slab
    a, b = cp.Variable(2), cp.Variable()
    prob = cp.Problem(cp.Minimize(cp.norm2(a)), [a.T@X - b >= 1, a.T@Y - b <= -1])
    prob.solve()
    if a.value is None:
        return 0
    width_max = 2 / np.linalg.norm(a.value)
    return width_max

def create_tvobstacle(traj_list,car_half_length,car_half_width, i):
    tvo=pycrcc.TimeVariantCollisionObject(i)
    for traj in traj_list:
        tvo.append_obstacle(pycrcc.RectOBB(car_half_length, car_half_width, traj.orientation,traj.position[0],traj.position[1]))
    return tvo


def ke_transfer(scenario):
    ego = scenario.obstacle_by_id(-1)
    traj = ego.prediction.trajectory.state_list
    length = 2.15
    width = 0.9
    co_dict = {}
    for obs in scenario.dynamic_obstacles:
        state_list = obs.prediction.trajectory.state_list
        obs_tvo = create_tvobstacle(state_list, length, width, 0)
        co_dict[obs_tvo] = obs
    co_list = list(co_dict.keys())
    ke_transfer_vals = []
    for i in range(0, len(traj)):
        ke_transfer_vals.append(0)
    tvo = create_tvobstacle(traj, length, width, 0)
    collision_found = False
    while (not collision_found):
        for tvo_obs in co_list:
            t = trajectories_collision_dynamic_obstacles([tvo], [tvo_obs])[0]
            if t>0:
                obs = co_dict[tvo_obs]
                if get_shortest_distance_ke(ego, obs, t) == 0:
                    collision_found = True
                    ke = 0.5*1500*(traj[t].velocity**2)
                    ke_transfer_vals[t] = ke
        tvo = create_tvobstacle(traj[t+1:], length, width, t+1)
    return ke_transfer_vals



# Final Step
Now, we run these evaluations to determine our metrics for each solution scenario.

In [10]:
import csv

def generate_safety_csv(scenarios_path, output_path):
    
    count = 0
    with open(output_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        for filename in os.listdir(scenarios_path):
            if not filename.startswith('.') and os.path.isfile(os.path.join(scenarios_path, filename)):
                scenario, planning_problem_set = CommonRoadFileReader(scenarios_path + filename).open()
                print("running file " + filename)
                scenario, planning_problem_set = CommonRoadFileReader(scenarios_path + filename).open()
                
                long_dist_list = []
                try:
                    long_dist_list = long_dist(scenario)
                except AttributeError as error:
                    print('attribute error on long_dist scenario ' + filename)   
                print('done long dist')
                lane_tracking_list = []
                try:
                    lane_tracking_list = lane_tracking(scenario)
                except AttributeError as error:
                    print('attribute error on lane_track scenario ' + filename)   
                print('done lane track')
                relative_speed_list = []
                try:
                    relative_speed_list = relative_speed(scenario)
                except AttributeError as error:
                    print('attribute error on relative_speed scenario ' + filename)   
                print('done speed')
                ke_transfer_list = []
                try:
                    ke_transfer_list = ke_transfer(scenario)
                except AttributeError as error:
                    print('attribute error on ke_transfer scenario ' + filename)
                print('done ke transfer')
                
                long_dist_row = [str(filename)] + ['long_dist'] + long_dist_list
                lane_tracking_row = [str(filename)] + ['lane_tracking'] + lane_tracking_list
                relative_speed_row = [str(filename)] + ['relative_speed'] + relative_speed_list
                ke_transfer_row = [str(filename)] + ['ke_transfer'] + ke_transfer_list

                writer.writerow(long_dist_row)
                writer.writerow(lane_tracking_row)
                writer.writerow(relative_speed_row)
                writer.writerow(ke_transfer_row)

                count = count+1
                print("done with file " + str(count))

In [8]:
#uncomment to run


#scenarios_path = ''
#output_path = ''
#generate_safety_csv(scenarios_path, output_path)
