In [None]:
import os
os.environ["L5KIT_DATA_FOLDER"] = "/home/ubuntu/level5_data"

import matplotlib.pyplot as plt

import numpy as np

from l5kit.data import ChunkedDataset, LocalDataManager
from l5kit.dataset import EgoDataset, AgentDataset

from l5kit.rasterization import build_rasterizer
from l5kit.configs import load_config_data
from l5kit.visualization import draw_trajectory, TARGET_POINTS_COLOR
from l5kit.geometry import transform_points
from tqdm import tqdm
from collections import Counter
from l5kit.data import PERCEPTION_LABELS
from prettytable import PrettyTable

from l5kit.visualization.visualizer.zarr_utils import zarr_to_visualizer_scene
from l5kit.visualization.visualizer.visualizer import visualize
from bokeh.io import output_notebook, show
from l5kit.data import MapAPI

# get config
cfg = load_config_data("./gym_config.yaml")

# load dataset
dm = LocalDataManager()
dataset_path = dm.require(cfg["val_data_loader"]["key"])
zarr_dataset = ChunkedDataset(dataset_path)
zarr_dataset.open()
print(zarr_dataset)

## Turning Categorization: Left, Right and Straight

In [None]:
from l5kit.geometry import rotation33_as_yaw
turn_thresh = 1.0   ## ~~ 60 degrees
time_duration = 50   ## 5 secs
num_scenes = 100
sc = zarr_dataset.scenes[:num_scenes]

## Loop Over Scenes
for sc_id, scene_data in enumerate(sc):
    frame_ids = scene_data["frame_index_interval"]
    frames = zarr_dataset.frames[frame_ids[0]:frame_ids[1]]
    yaws = np.zeros(len(frames),)
    
    ## Within scenes
    for idx_coord, frame in enumerate(frames):
        yaws[idx_coord] = rotation33_as_yaw(frame["ego_rotation"])
    
    ## Determine Turn
    turn = False
    yaw_diff = yaws[time_duration:] - yaws[:-time_duration]
    if np.sum(yaw_diff >= turn_thresh):
        turn = True
        print("Scene ID ", sc_id, ": Turning Left")
    if np.sum(yaw_diff <= -turn_thresh):
        turn = True
        print("Scene ID ", sc_id, ": Turning Right")
    
#     if not turn:
#         print("Scene ID ", sc_id, ": Going Straight")
        

In [None]:
scene_indices = [11, 16, 20]
output_notebook()
mapAPI = MapAPI.from_cfg(dm, cfg)
for scene_idx in scene_indices:
    out = zarr_to_visualizer_scene(zarr_dataset.get_scene_dataset(scene_idx), mapAPI)
    out_vis = visualize(scene_idx, out)
    show(out_vis)

## Speed Categorization: Constant, Acceleration, Deceleration

In [None]:
accn_thresh = 2      ## 1.25 ms2
time_duration = 16   ## 1.6 secs
num_scenes = 1000
sc = zarr_dataset.scenes[:num_scenes]
# sc = zarr_dataset.scenes[361:362]

## Loop Over Scenes
for sc_id, scene_data in enumerate(sc):
    frame_ids = scene_data["frame_index_interval"]
    frames = zarr_dataset.frames[frame_ids[0]:frame_ids[1]]
    speed = np.zeros(len(frames)-1,)
    
    ## Within scenes
    for idx_coord, (curr_frame, prev_frame) in enumerate(zip(frames[1:], frames[:-1])):
        curr_velocity = curr_frame["ego_translation"][:2] - prev_frame["ego_translation"][:2]
        speed[idx_coord] = np.linalg.norm(curr_velocity) * 10
    
    ## Determine Turn
    accn = False
    speed_diff = speed[time_duration:] - speed[:-time_duration]
    if np.sum(speed_diff >= accn_thresh):
        accn = True
        print("Scene ID ", sc_id, ": Accelerating")
    if np.sum(speed_diff <= -accn_thresh):
        accn = True
        print("Scene ID ", sc_id, ": Decelerating")
    
#     if not accn:
#         print("Scene ID ", sc_id, ": Constant Speed")

In [None]:
scene_indices = [9, 18, 25, 36]
output_notebook()
mapAPI = MapAPI.from_cfg(dm, cfg)
for scene_idx in scene_indices:
    out = zarr_to_visualizer_scene(zarr_dataset.get_scene_dataset(scene_idx), mapAPI)
    out_vis = visualize(scene_idx, out)
    show(out_vis)

## Lane Identification


In [None]:
from typing import List, Tuple

from l5kit.data.filter import (filter_agents_by_frames, filter_agents_by_labels, filter_tl_faces_by_frames,
                               filter_tl_faces_by_status)
from l5kit.visualization.visualizer.common import (AgentVisualization, CWVisualization, EgoVisualization,
                                                   FrameVisualization, LaneVisualization, TrajectoryVisualization)
from l5kit.visualization.visualizer.zarr_utils import _get_frame_trajectories, _get_frame_data
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon

def get_nearest_lane_to_ego(frame_vis: FrameVisualization) -> int:
    """Given a FrameVisualization, get the nearest lane id to the ego.

    :param frame_vis: a FrameVisualization
    :return: the index of the nearest lane
    """

    ego = frame_vis.ego
    ego_center = np.array([[ego.center_x, ego.center_y]])

    lanes = frame_vis.lanes
    concat_lanes = [np.array(list(zip(lane.xs, lane.ys))) for lane in lanes]
    min_dist = 100000 * np.ones((len(concat_lanes),))
    alpha = 1.0
    for idx, lane in enumerate(concat_lanes):
        polygon = Polygon(lane)
        point = Point(ego.center_x, ego.center_y)
        # Ego should be present within lane box
        if polygon.contains(point):
            # Ego closest to center of lane box + Give preference to polygon area
            lane_centroid = np.mean(lane, axis=0)
            min_dist[idx] = np.linalg.norm(lane_centroid - ego_center) - alpha * polygon.area
            # min_dist[idx] = np.min(np.linalg.norm(lane - ego_center, axis=1))

    nearest_lane_index = np.argmin(min_dist)
    return nearest_lane_index


In [None]:
def zarr_to_visualizer_ego_lane(scene_dataset: ChunkedDataset, mapAPI: MapAPI,
                                with_trajectories: bool = True) -> List[FrameVisualization]:
    """Convert a zarr scene into a list of FrameVisualization which can be used by the visualiser

    :param scene_dataset: a scene dataset. This must contain a single scene
    :param mapAPI: mapAPI object
    :param with_trajectories: if to enable trajectories or not
    :return: a list of FrameVisualization objects
    """
    if len(scene_dataset.scenes) != 1:
        raise ValueError(f"we can convert only a single scene, found {len(scene_dataset.scenes)}")

    frames = scene_dataset.frames
    agents_frames = filter_agents_by_frames(frames, scene_dataset.agents)
    tls_frames = filter_tl_faces_by_frames(frames, scene_dataset.tl_faces)

    frames_vis: List[FrameVisualization] = []
    for frame_idx in range(len(frames)):
        frame = frames[frame_idx]
        tls_frame = tls_frames[frame_idx]

        agents_frame = agents_frames[frame_idx]
        agents_frame = filter_agents_by_labels(agents_frame, 0.1)

        frame_vis = _get_frame_data(mapAPI, frame, agents_frame, tls_frame)

        if with_trajectories:
            traj_vis = _get_frame_trajectories(frames, agents_frames, agents_frame["track_id"], frame_idx)
            frame_vis = FrameVisualization(ego=frame_vis.ego, agents=frame_vis.agents,
                                           lanes=frame_vis.lanes, crosswalks=frame_vis.crosswalks,
                                           trajectories=traj_vis)
        
        # Get and Plot Nearest Lane in Blue
        nearest_lane_idx = get_nearest_lane_to_ego(frame_vis)
        frame_vis.lanes.append(LaneVisualization(frame_vis.lanes[nearest_lane_idx].xs, frame_vis.lanes[nearest_lane_idx].ys, 'blue'))
        frames_vis.append(frame_vis)

    return frames_vis

In [None]:
scene_indices = [4, 16]
output_notebook()
mapAPI = MapAPI.from_cfg(dm, cfg)
for scene_idx in scene_indices:
    out = zarr_to_visualizer_ego_lane(zarr_dataset.get_scene_dataset(scene_idx), mapAPI)
    out_vis = visualize(scene_idx, out)
    show(out_vis)

## BackUp: Extra

In [None]:
# frames = zarr_dataset.frames[::100]
# coords = np.zeros((len(frames), 2))
# for idx_coord, idx_data in enumerate(tqdm(range(len(frames)), desc="getting centroid to plot trajectory")):
#     frame = zarr_dataset.frames[idx_data]
#     coords[idx_coord] = frame["ego_translation"][:2]


# plt.scatter(coords[:, 0], coords[:, 1], marker='.')
# axes = plt.gca()
# axes.set_xlim([-2500, 1600])
# axes.set_ylim([-2500, 1600])

In [None]:
## Numpy convolve
def moving_average(x, w=15):
    return np.convolve(x, np.ones(w), 'valid')