In [10]:
import matplotlib.pyplot as plt

import numpy as np


from l5kit.data import ChunkedDataset, LocalDataManager
from l5kit.dataset import EgoDataset, AgentDataset
from l5kit.geometry.transform import *

from l5kit.rasterization import build_rasterizer
from l5kit.configs import load_config_data
from l5kit.geometry import transform_points, rotation33_as_yaw
from tqdm import tqdm
from collections import Counter
from l5kit.rasterization.rasterizer_builder import _load_metadata
from prettytable import PrettyTable

import os

# from l5kit.visualization.visualizer.zarr_utils import zarr_to_visualizer_scene
# from l5kit.visualization.visualizer.visualizer import visualize
from bokeh.io import output_notebook, show
from l5kit.data import MapAPI
from IPython.display import display, clear_output
import PIL

from scenario import get_lanes, lane_check, current_lane


import matlab
import matlab.engine


from shapely.geometry import Point
from shapely.geometry.polygon import Polygon

In [3]:
# set env variable for data
os.environ["L5KIT_DATA_FOLDER"] = "C:\\Users\\zheng\\Desktop\\UMich\\Independent Study\\prediction-dataset"
# get config
cfg = load_config_data("C:\\Users\\zheng\\Desktop\\UMich\\Independent Study\\Codes\\python codes\\visualisation_config.yaml")
print(cfg)

dm = LocalDataManager()
dataset_path = dm.require(cfg["val_data_loader"]["key"])
zarr_dataset = ChunkedDataset(dataset_path)
zarr_dataset.open()
print(zarr_dataset)

rast = build_rasterizer(cfg, dm)
ego_dataset = EgoDataset(cfg, zarr_dataset, rast)
# agent_dataset = AgentDataset(cfg, zarr_dataset, rast)
# Obatin the information from semantic map

semantic_map_filepath = dm.require(cfg["raster_params"]["semantic_map_key"])
dataset_meta = _load_metadata(cfg["raster_params"]["dataset_meta_key"], dm)
world_to_ecef = np.array(dataset_meta["world_to_ecef"], dtype=np.float64)
Map_Api = MapAPI(semantic_map_filepath, world_to_ecef)

{'format_version': 4, 'model_params': {'model_architecture': 'resnet50', 'history_num_frames': 0, 'future_num_frames': 50, 'step_time': 0.1}, 'raster_params': {'raster_size': [320, 320], 'pixel_size': [0.5, 0.5], 'ego_center': [0.6, 0.5], 'map_type': 'py_semantic', 'satellite_map_key': 'aerial_map/aerial_map.png', 'semantic_map_key': 'semantic_map/semantic_map.pb', 'dataset_meta_key': 'meta.json', 'filter_agents_threshold': 0.5, 'disable_traffic_light_faces': False, 'set_origin_to_bottom': True}, 'val_data_loader': {'key': 'scenes/train.zarr', 'batch_size': 12, 'shuffle': False, 'num_workers': 16}}
+------------+------------+------------+---------------+-----------------+----------------------+----------------------+----------------------+---------------------+
| Num Scenes | Num Frames | Num Agents | Num TR lights | Total Time (hr) | Avg Frames per Scene | Avg Agents per Frame | Avg Scene Time (sec) | Avg Frame frequency |
+------------+------------+------------+---------------+------

## Now check the lane-junction relation

In [338]:
class Scene(object):
    def __init__(self, dataset, Map_Api):
        self.dataset = dataset
        self.frames = self.dataset.frames
        self.scene_num = len(self.dataset.scenes)
        self.yaw_th = 1.25
        self.distance_th = 10
        self.eng = matlab.engine.start_matlab()
        self.eng.cd(r'D:\GitHub\Clone\planning\utils') 
        self.turning_scenes = []
        self.turning_yaw_diff = {}
        self.tunring_frames = []
        self.map_api = Map_Api
        self.all_junctions = None
        self.all_lanes = None
        self.Junction_Lane = {}
        self.Lane = None
        self.Junctions = None
        self.junction_scene = {}
        self.junction_turning_scene = {}
        self.error_filtered_scene = []
        
        
    
    def is_element(self, elem, element_name):
        return elem.element.HasField(element_name)

    def get_elements(self, element_name):
        return [elem for elem in self.map_api.elements if self.is_element(elem, element_name)]

    
    def lane_check(self, centroid, lane_id):
        point = Point(centroid[0],centroid[1])
        boundary = self.map_api.get_lane_coords(lane_id)
        left_boundary = boundary['xyz_left']
        right_boundary = boundary['xyz_right']
        left_boundary_point = [(line[0],line[1])for line in left_boundary]
        right_boundary_point = [(line[0],line[1])for line in reversed(right_boundary)]
        polygon = Polygon(left_boundary_point+right_boundary_point)

        return polygon.contains(point)


    def current_lane(self, centroid):

        for key, value in self.Lane.items():
            if self.lane_check(centroid, key):
                return key
    

    def generate_info_from_MAP(self):
        self.all_junctions = self.get_elements("junction")
        self.all_lanes = self.get_elements("lane")
        self.Lane = {self.map_api.id_as_str(lane.id):lane.element.lane for lane in self.all_lanes}
        self.Junction = {self.map_api.id_as_str(junction.id):junction for junction in self.all_junctions}
        
        for junction in self.all_junctions:
            self.Junction_Lane[self.map_api.id_as_str(junction.id)] = []

            for lane in junction.element.junction.lanes:
                self.Junction_Lane[self.map_api.id_as_str(junction.id)].append(self.map_api.id_as_str(lane))

            self.Junction_Lane[self.map_api.id_as_str(junction.id)] = set(self.Junction_Lane[self.map_api.id_as_str(junction.id)])
        
        self.junction_scene = dict.fromkeys(list(self.Junction.keys()), [])
        self.junction_turning_scene = dict.fromkeys(list(self.Junction.keys()), {})
        for key in self.Junction.keys():
            self.junction_turning_scene[key] = {'Turning Left': [], 'Turning Right': []}
        
        
    def trajectory_visualize(self):
        plt.figure(figsize=(18,18))
        plt.scatter(self.frames["ego_translation"][:,0], self.frames["ego_translation"][:,1], marker='.')
        plt.scatter(self.frames["ego_translation"][self.tunring_frames,0], self.frames["ego_translation"][self.tunring_frames,1], marker='.', color='r')
        plt.axis("equal")
        plt.grid(which='both')
        axes = plt.gca()
        
    
     def scene_visualize(self, scene_idx):
        
        indexes = ego_dataset.get_scene_indices(scene_idx)
        images = []

        for idx in indexes:

            data = ego_dataset[idx]
            im = data["image"].transpose(1, 2, 0)
            im = ego_dataset.rasterizer.to_rgb(im)
            target_positions_pixels = transform_points(data["target_positions"], data["raster_from_agent"])
            center_in_pixels = np.asarray(cfg["raster_params"]["ego_center"]) * cfg["raster_params"]["raster_size"]
        #     draw_trajectory(im, target_positions_pixels, TARGET_POINTS_COLOR, yaws=data["target_yaws"])
            clear_output(wait=True)
            display(PIL.Image.fromarray(im))
                     
            
    def junction_visualize(self, junction_id):
        
        plt.figure(figsize=(18,18))
        plt.scatter(SCENE.frames["ego_translation"][:,0], SCENE.frames["ego_translation"][:,1], marker='.')
        
        lane_list = self.Junction_Lane[junction_id]
        
        for lane in lane_list:
            plt.scatter(self.map_api.get_lane_coords(lane)['xyz_left'][:,0], self.map_api.get_lane_coords(lane)['xyz_left'][:,1], marker='.', color='k')
            plt.scatter(self.map_api.get_lane_coords(lane)['xyz_right'][:,0], self.map_api.get_lane_coords(lane)['xyz_right'][:,1], marker='.', color='k')

        plt.axis("equal")
        plt.grid(which='both')
        axes = plt.gca()
    
        
    def turning_scenes_finding(self):
        
        bar = tqdm(range(self.scene_num))
        bar.set_description('Filtering the turning scenes: ')
        for scene in bar:

            start_frame = self.frames[self.dataset.scenes[scene]["frame_index_interval"][0]]
            finish_frame = self.frames[self.dataset.scenes[scene]["frame_index_interval"][1]-1]

            start_yaw = rotation33_as_yaw(start_frame["ego_rotation"])
            finish_yaw = rotation33_as_yaw(finish_frame["ego_rotation"])


            if abs(start_yaw-finish_yaw) > self.yaw_th:
                self.turning_scenes.append(scene)
                self.tunring_frames += list(range(self.dataset.scenes[scene]["frame_index_interval"][0],self.dataset.scenes[scene]["frame_index_interval"][1]))
                
    
   
    def chgpt_index_find(self, chgpt_loc, chgpt_dist, chgpt_num):
        rel_d = np.diff(chgpt_loc)
        
        while True in (rel_d < self.distance_th):
            for idx, d in enumerate(rel_d):
                if d < self.distance_th:
                    index = chgpt_loc[idx] if chgpt_dist[chgpt_loc[idx]]< chgpt_dist[chgpt_loc[idx+1]] else chgpt_loc[idx+1]
                    chgpt_dist[index] = 0
            chgpt_loc = np.argpartition(chgpt_dist, -chgpt_num)[-chgpt_num:]
            chgpt_loc.sort()
            # find the adjacent distance
            rel_d = np.diff(chgpt_loc)
            
            
        return chgpt_loc

    
            
    def scene_turning_frames(self, scene_idx):
        
        frames = list(range(self.dataset.scenes[scene_idx]["frame_index_interval"][0],self.dataset.scenes[scene_idx]["frame_index_interval"][1]))
        yaw = [rotation33_as_yaw(rotation) for rotation in self.dataset.frames['ego_rotation'][frames]]
        
        # extract key info from MATLAB results
        output_yaw = self.eng.mat2py(matlab.double(yaw), nargout=2)

        chgpt_dist_yaw = np.array(output_yaw[0])[0]
        chgpt_num_dist_yaw = np.array(output_yaw[1]).flatten()

        chgpt_num = np.argmax(chgpt_num_dist_yaw)
        
        
        if chgpt_num != 2:
            self.error_filtered_scene.append(scene_idx)
            
            return []
            
            
        else:
            chgpt_index = np.argpartition(chgpt_dist_yaw, -chgpt_num)[-chgpt_num:]
            chgpt_index.sort()

            chgpt_loc = self.chgpt_index_find(chgpt_index, chgpt_dist_yaw, chgpt_num)

            self.turning_yaw_diff[scene_idx] = yaw[chgpt_loc[0]] - yaw[chgpt_loc[1]]


            return list(range(chgpt_loc[0]+frames[0],chgpt_loc[1]+frames[0],3))
    
    
    
    
    def lanes_at_turning(self, scene_turning_frames):
        
        lanes = []

        for frame in scene_turning_frames:
            lanes.append(self.current_lane(self.frames[frame]['ego_translation'][:2]))

        lanes = set(lanes)

        return lanes
    
    
    
    
    def junction_id_find(self, lanes):
        junction_id = []

        
        for key in self.Junction_Lane.keys():

            if len(self.Junction_Lane[key]&lanes) > 0:
                junction_id.append(key)
                return junction_id
                
                
        return junction_id
    
    
    
    
    def junction_scene_find(self, scene_idx, junction_id):
    
        self.junction_scene[junction_id] = self.junction_scene[junction_id] + [scene_idx]
        
        if self.turning_yaw_diff[scene_idx] > 0:
            self.junction_turning_scene[junction_id]['Turning Right'] += [scene_idx]
        else:
            self.junction_turning_scene[junction_id]['Turning Left'] += [scene_idx]
            


In [357]:
# declare the object
SCENE = Scene(zarr_dataset, Map_Api)

# generate all necessary information from semantic MAP, including dictionaries of lane, junction and the lanes connected to junction
SCENE.generate_info_from_MAP()

# Filter the scenes containing turning decision based on the difference of yaws at starting and ending frame of ego
SCENE.turning_scenes_finding()

# For each turning scene, we first find out the frame interval that ego actually made its turn, and then extracted the lanes id covered
# during this frame interval. Compare the lane set with the groudtruth set of each junction and its lanes, then we can know which junction 
# that ego was passing if it was passing a junction 

for turning_scene in tqdm(SCENE.turning_scenes):


    scene_turning_frames = SCENE.scene_turning_frames(turning_scene)

    if len(scene_turning_frames) > 0:

        lanes = SCENE.lanes_at_turning(scene_turning_frames)
        junction_id = SCENE.junction_id_find(lanes)

        if len(junction_id) > 0:
            SCENE.junction_scene_find(turning_scene, junction_id[0])
            
    else:
        continue
    

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1661/1661 [3:08:33<00:00,  6.81s/it]
