In [1]:
import numpy as np
 
print("My numpy version is: ", np.__version__)

My numpy version is:  2.2.5


In [2]:
import av2

In [3]:
from av2.evaluation import NUM_RECALL_SAMPLES

In [4]:
from av2.evaluation.detection.utils import (
    compute_objects_in_roi_mask,
    load_mapped_avm_and_egoposes,
)

In [5]:
from av2.map.map_api import ArgoverseStaticMap

In [6]:
from enum import Enum, unique
from pathlib import Path
from random import choices
from typing import Final

import click
from joblib import Parallel, delayed
from rich.progress import track

from av2.datasets.motion_forecasting import scenario_serialization
from av2.datasets.motion_forecasting.viz.scenario_visualization import (
    visualize_scenario,
)
from av2.map.map_api import ArgoverseStaticMap

_DEFAULT_N_JOBS: Final[int] = -2

In [9]:
@unique
class SelectionCriteria(str, Enum):
    """Valid criteria used to select Argoverse scenarios for visualization."""

    FIRST = "first"
    RANDOM = "random"

In [10]:
def generate_scenario_visualizations(
    argoverse_scenario_dir: Path,
    viz_output_dir: Path,
    num_scenarios: int,
    selection_criteria: SelectionCriteria,
    *,
    debug: bool = False,
) -> None:
    """Generate and save dynamic visualizations for selected scenarios within `argoverse_scenario_dir`.

    Args:
        argoverse_scenario_dir: Path to local directory where Argoverse scenarios are stored.
        viz_output_dir: Path to local directory where generated visualizations should be saved.
        num_scenarios: Maximum number of scenarios for which to generate visualizations.
        selection_criteria: Controls how scenarios are selected for visualization.
        debug: Runs preprocessing in single-threaded mode when enabled.
    """
    Path(viz_output_dir).mkdir(parents=True, exist_ok=True)
    all_scenario_files = sorted(argoverse_scenario_dir.rglob("*.parquet"))
    scenario_file_list = (
        all_scenario_files[:num_scenarios]
        if selection_criteria == SelectionCriteria.FIRST
        else choices(all_scenario_files, k=num_scenarios)
    )  # Ignoring type here because type of "choice" is partially unknown.

In [12]:
# Build inner function to generate visualization for a single scenario.
def generate_scenario_visualization(scenario_path: Path) -> None:
    """Generate and save dynamic visualization for a single Argoverse scenario.

    NOTE: This function assumes that the static map is stored in the same directory as the scenario file.

    Args:
        scenario_path: Path to the parquet file corresponding to the Argoverse scenario to visualize.
    """
    scenario_id = scenario_path.stem.split("_")[-1]
    static_map_path = (
        scenario_path.parents[0] / f"log_map_archive_{scenario_id}.json"
    )
    viz_save_path = viz_output_dir / f"{scenario_id}.mp4"

    scenario = scenario_serialization.load_argoverse_scenario_parquet(scenario_path)
    static_map = ArgoverseStaticMap.from_json(static_map_path)
    visualize_scenario(scenario, static_map, viz_save_path)

In [13]:
# Generate visualization for each selected scenario in parallel (except if running in debug mode)
if debug:
    for scenario_path in track(scenario_file_list):
        generate_scenario_visualization(scenario_path)
else:
    Parallel(n_jobs=_DEFAULT_N_JOBS)(
        delayed(generate_scenario_visualization)(scenario_path)
        for scenario_path in track(scenario_file_list)
    )

NameError: name 'debug' is not defined

In [17]:
for scenario_path in track(scenario_file_list):
        generate_scenario_visualization(scenario_path)

NameError: name 'scenario_file_list' is not defined

In [20]:
Parallel(n_jobs=_DEFAULT_N_JOBS)(
        delayed(generate_scenario_visualization)(scenario_path)
        for scenario_path in track(scenario_file_list)
)

NameError: name 'scenario_file_list' is not defined

In [14]:
@click.command(help="Generate visualizations from a directory of Argoverse scenarios.")
@click.option(
    "--argoverse-scenario-dir",
    required=True,
    help="Path to local directory where Argoverse scenarios are stored.",
    type=click.Path(exists=True),
)
@click.option(
    "--viz-output-dir",
    required=True,
    help="Path to local directory where generated visualizations should be saved.",
    type=click.Path(),
)
@click.option(
    "-n",
    "--num-scenarios",
    default=100,
    help="Maximum number of scenarios for which to generate visualizations.",
    type=int,
)
@click.option(
    "-s",
    "--selection-criteria",
    default="first",
    help="Controls how scenarios are selected for visualization - either the first available or at random.",
    type=click.Choice(["first", "random"], case_sensitive=False),
)
@click.option(
    "--debug",
    is_flag=True,
    default=False,
    help="Runs preprocessing in single-threaded mode when enabled.",
)

_IncompleteInputError: incomplete input (3756169282.py, line 33)

In [15]:
def run_generate_scenario_visualizations(
    argoverse_scenario_dir: str,
    viz_output_dir: str,
    num_scenarios: int,
    selection_criteria: str,
    debug: bool,
) -> None:
    """Click entry point for generation of Argoverse scenario visualizations."""
    generate_scenario_visualizations(
        Path(argoverse_scenario_dir),
        Path(viz_output_dir),
        num_scenarios,
        SelectionCriteria(selection_criteria.lower()),
        debug=debug,
    )

In [16]:
run_generate_scenario_visualizations()

TypeError: run_generate_scenario_visualizations() missing 5 required positional arguments: 'argoverse_scenario_dir', 'viz_output_dir', 'num_scenarios', 'selection_criteria', and 'debug'

In [7]:
from typing import Any, Dict, List, Tuple
import os

import numpy as np
import pandas as pd
import matplotlib.transforms as transforms
from matplotlib.patches import Ellipse
import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib.patches import Polygon
from pathlib import Path

from argoverse.map_representation.map_api import ArgoverseMap
#
from av2.map.map_api import ArgoverseStaticMap
from av2.map.lane_segment import LaneType, LaneMarkType
from av2.datasets.motion_forecasting.data_schema import ArgoverseScenario, ObjectState, ObjectType, Track, TrackCategory


class ArgoMapVisualizer:
    def __init__(self):
        self.argo_map = ArgoverseMap()

    def show_lanes(self, ax, city_name, lane_ids, clr='g', alpha=0.2, show_lane_ids=False):
        seq_lane_props = self.argo_map.city_lane_centerlines_dict[city_name]

        for idx in lane_ids:
            lane_cl = seq_lane_props[idx].centerline
            ax.plot(lane_cl[:, 0], lane_cl[:, 1], color=clr, alpha=alpha, linewidth=5)

            if show_lane_ids:
                m_pt = lane_cl[int(lane_cl.shape[0] / 2)]
                ax.text(m_pt[0], m_pt[1], idx, color='b')

    def show_map_with_lanes(self,
                            ax,
                            city_name,
                            position,
                            lane_ids,
                            map_size=np.array([150.0, 150.0]),
                            show_freespace=True,
                            show_lane_ids=False):
        x_min = position[0] - map_size[0] / 2
        x_max = position[0] + map_size[0] / 2
        y_min = position[1] - map_size[1] / 2
        y_max = position[1] + map_size[1] / 2

        ax.set_xlim(x_min, x_max)
        ax.set_ylim(y_min, y_max)

        seq_lane_props = self.argo_map.city_lane_centerlines_dict[city_name]

        for idx in lane_ids:
            lane_cl = seq_lane_props[idx].centerline
            lane_polygon = self.argo_map.get_lane_segment_polygon(
                idx, city_name)
            ax.add_patch(
                Polygon(lane_polygon[:, 0:2],
                        color='gray',
                        alpha=0.1,
                        edgecolor=None))

            pt = lane_cl[0]
            vec = lane_cl[1] - lane_cl[0]
            ax.arrow(pt[0],
                     pt[1],
                     vec[0],
                     vec[1],
                     alpha=0.5,
                     color='grey',
                     width=0.1,
                     zorder=1)
            if show_lane_ids:
                m_pt = lane_cl[int(lane_cl.shape[0] / 2)]
                ax.text(m_pt[0], m_pt[1], idx, color='b')

        if show_freespace:
            drivable_area = self.argo_map.get_da_contours(city_name)
            surrounding_contours = []
            for contour in drivable_area:
                if (np.min(contour[:, 0]) < x_max
                        and np.min(contour[:, 1]) < y_max
                        and np.max(contour[:, 0]) > x_min
                        and np.max(contour[:, 1]) > y_min):
                    surrounding_contours.append(contour)

            for contour in surrounding_contours:
                ax.add_patch(
                    Polygon(contour[:, 0:2],
                            color='darkgray',
                            alpha=0.1,
                            edgecolor=None))

    def show_surrounding_elements(self,
                                  ax,
                                  city_name,
                                  position,
                                  map_size=np.array([150.0, 150.0]),
                                  show_freespace=True,
                                  show_lane_ids=False):
        x_min = position[0] - map_size[0] / 2
        x_max = position[0] + map_size[0] / 2
        y_min = position[1] - map_size[1] / 2
        y_max = position[1] + map_size[1] / 2

        ax.set_xlim(x_min, x_max)
        ax.set_ylim(y_min, y_max)

        seq_lane_props = self.argo_map.city_lane_centerlines_dict[city_name]
        surrounding_lanes = {}
        for lane_id, lane_props in seq_lane_props.items():
            lane_cl = lane_props.centerline
            if (np.min(lane_cl[:, 0]) < x_max and np.min(lane_cl[:, 1]) < y_max
                    and np.max(lane_cl[:, 0]) > x_min
                    and np.max(lane_cl[:, 1]) > y_min):
                surrounding_lanes[lane_id] = lane_cl

        for idx, lane_cl in surrounding_lanes.items():
            lane_polygon = self.argo_map.get_lane_segment_polygon(
                idx, city_name)
            ax.add_patch(
                Polygon(lane_polygon[:, 0:2],
                        color='gray',
                        alpha=0.1,
                        edgecolor=None))

            pt = lane_cl[0]
            vec = lane_cl[1] - lane_cl[0]
            vec = vec / np.linalg.norm(vec) * 1.0
            # ax.arrow(pt[0],
            #          pt[1],
            #          vec[0],
            #          vec[1],
            #          alpha=0.5,
            #          color='grey',
            #          width=0.1,
            #          zorder=1)
            if show_lane_ids:
                m_pt = lane_cl[int(lane_cl.shape[0] / 2)]
                ax.text(m_pt[0], m_pt[1], idx, color='b')

        if show_freespace:
            drivable_area = self.argo_map.get_da_contours(city_name)
            surrounding_contours = []
            for contour in drivable_area:
                if (np.min(contour[:, 0]) < x_max
                        and np.min(contour[:, 1]) < y_max
                        and np.max(contour[:, 0]) > x_min
                        and np.max(contour[:, 1]) > y_min):
                    surrounding_contours.append(contour)

            for contour in surrounding_contours:
                ax.add_patch(
                    Polygon(contour[:, 0:2],
                            color='darkgray',
                            alpha=0.1,
                            edgecolor=None))

    def show_all_map(self, ax, city_name, show_freespace=True):
        seq_lane_props = self.argo_map.city_lane_centerlines_dict[city_name]

        for lane_id, lane_props in seq_lane_props.items():
            lane_cl = lane_props.centerline

            pt = lane_cl[0]
            vec = lane_cl[1] - lane_cl[0]

            under_control = self.argo_map.lane_has_traffic_control_measure(
                lane_id, city_name)

            in_intersection = self.argo_map.lane_is_in_intersection(
                lane_id, city_name)

            turn_dir = self.argo_map.get_lane_turn_direction(
                lane_id, city_name)

            cl_clr = 'grey'

            if in_intersection:
                cl_clr = 'orange'

            if turn_dir == 'LEFT':
                cl_clr = 'blue'
            elif turn_dir == 'RIGHT':
                cl_clr = 'green'

            ax.arrow(pt[0],
                     pt[1],
                     vec[0],
                     vec[1],
                     alpha=0.5,
                     color=cl_clr,
                     width=0.1,
                     zorder=1)

            if under_control:
                p_vec = vec / np.linalg.norm(vec) * 1.5
                pt1 = pt + np.array([-p_vec[1], p_vec[0]])
                pt2 = pt + np.array([p_vec[1], -p_vec[0]])
                ax.plot([pt1[0], pt2[0]], [pt1[1], pt2[1]],
                        color='tomato',
                        alpha=0.5,
                        linewidth=2)

            lane_polygon = self.argo_map.get_lane_segment_polygon(
                lane_id, city_name)
            ax.add_patch(
                Polygon(lane_polygon[:, 0:2],
                        color=cl_clr,
                        alpha=0.1,
                        edgecolor=None))

        if show_freespace:
            drivable_area = self.argo_map.get_da_contours(city_name)
            surrounding_contours = []
            for contour in drivable_area:
                surrounding_contours.append(contour)

            for contour in surrounding_contours:
                ax.add_patch(
                    Polygon(contour[:, 0:2],
                            color='darkgray',
                            alpha=0.1,
                            edgecolor=None))

    def transform_2d_gaussian(self, mu_x, mu_y, sig_x, sig_y, rho, rot_mat, pos):
        cov_xy = sig_x * sig_y * rho
        cov_mat = np.array([[sig_x**2, cov_xy], [cov_xy, sig_y**2]])

        _cov_mat = rot_mat.dot(cov_mat).dot(rot_mat.T)
        _sig_x = np.sqrt(_cov_mat[0, 0])
        _sig_y = np.sqrt(_cov_mat[1, 1])
        _cov_xy = _cov_mat[0, 1]
        _rho = _cov_xy / (_sig_x * _sig_y)

        _mu_tmp = rot_mat.dot(np.array([[mu_x], [mu_y]])).flatten()

        _mu_x = _mu_tmp[0] + pos[0]
        _mu_y = _mu_tmp[1] + pos[1]

        return _mu_x, _mu_y, _sig_x, _sig_y, _rho

    # Visualize 2d gaussian distribution
    def get_confidence_ellipse(self,
                               mu_x,
                               mu_y,
                               sig_x,
                               sig_y,
                               rho,
                               trans,
                               n_std=3,
                               facecolor='none',
                               edgecolor='red',
                               alpha=0.3):
        ell_radius_x = np.sqrt(1 + rho)
        ell_radius_y = np.sqrt(1 - rho)
        ellipse = Ellipse((0, 0),
                          width=ell_radius_x * 2,
                          height=ell_radius_y * 2,
                          facecolor=facecolor,
                          edgecolor=edgecolor,
                          alpha=alpha)

        # multiply stdandard deviation with the
        # given number of standard deviations.
        scale_x = sig_x * n_std
        scale_y = sig_y * n_std

        transf = transforms.Affine2D() \
            .rotate_deg(45) \
            .scale(scale_x, scale_y) \
            .translate(mu_x, mu_y)

        ellipse.set_transform(transf + trans)

        return ellipse


class AV2MapVisualizer:
    def __init__(self):
        self.dataset_dir = os.path.expanduser('~') + '/data/dataset/argoverse2/'

    def show_map(self,
                 ax,
                 split: str,
                 seq_id: str,
                 show_freespace=True):

        # ax.set_facecolor("grey")

        static_map_path = Path(self.dataset_dir + f"/{split}/{seq_id}" + f"/log_map_archive_{seq_id}.json")
        static_map = ArgoverseStaticMap.from_json(static_map_path)

        # ~ drivable area
        # print('num drivable areas: ', len(static_map.vector_drivable_areas),
        #       [x for x in static_map.vector_drivable_areas.keys()])
        for drivable_area in static_map.vector_drivable_areas.values():
            # ax.plot(drivable_area.xyz[:, 0], drivable_area.xyz[:, 1], color='grey', alpha=0.5, linestyle='--')
            ax.fill(drivable_area.xyz[:, 0], drivable_area.xyz[:, 1], color='grey', alpha=0.2)

        # ~ lane segments
        # print('num lane segs: ', len(static_map.vector_lane_segments),
        #       [x for x in static_map.vector_lane_segments.keys()])
        print('Num lanes: ', len(static_map.vector_lane_segments))
        for lane_segment in static_map.vector_lane_segments.values():
            # print('left pts: ', lane_segment.left_lane_boundary.xyz.shape,
            #       'right pts: ', lane_segment.right_lane_boundary.xyz.shape)

            if lane_segment.lane_type == 'VEHICLE':
                lane_clr = 'blue'
            elif lane_segment.lane_type == 'BIKE':
                lane_clr = 'green'
            elif lane_segment.lane_type == 'BUS':
                lane_clr = 'orange'
            else:
                assert False, "Wrong lane type"

            # if lane_segment.is_intersection:
            #     lane_clr = 'yellow'

            polygon = lane_segment.polygon_boundary
            ax.fill(polygon[:, 0], polygon[:, 1], color=lane_clr, alpha=0.1)

            for boundary in [lane_segment.left_lane_boundary, lane_segment.right_lane_boundary]:
                ax.plot(boundary.xyz[:, 0],
                        boundary.xyz[:, 1],
                        linewidth=1,
                        color='grey',
                        alpha=0.3)

            # cl = static_map.get_lane_segment_centerline(lane_segment.id)
            # ax.plot(cl[:, 0], cl[:, 1], linestyle='--', color='magenta', alpha=0.1)

        # ~ ped xing
        for pedxing in static_map.vector_pedestrian_crossings.values():
            edge = np.concatenate([pedxing.edge1.xyz, np.flip(pedxing.edge2.xyz, axis=0)])
            # plt.plot(edge[:, 0], edge[:, 1], color='orange', alpha=0.75)
            ax.fill(edge[:, 0], edge[:, 1], color='orange', alpha=0.2)
            # for edge in [ped_xing.edge1, ped_xing.edge2]:
            #     ax.plot(edge.xyz[:, 0], edge.xyz[:, 1], color='orange', alpha=0.5, linestyle='dotted')

    def show_map_clean(self,
                       ax,
                       split: str,
                       seq_id: str,
                       show_freespace=True):

        # ax.set_facecolor("grey")

        static_map_path = Path(self.dataset_dir + f"/{split}/{seq_id}" + f"/log_map_archive_{seq_id}.json")
        static_map = ArgoverseStaticMap.from_json(static_map_path)

        # ~ drivable area
        for drivable_area in static_map.vector_drivable_areas.values():
            # ax.plot(drivable_area.xyz[:, 0], drivable_area.xyz[:, 1], color='grey', alpha=0.5, linestyle='--')
            ax.fill(drivable_area.xyz[:, 0], drivable_area.xyz[:, 1], color='grey', alpha=0.2)

        # ~ lane segments
        print('Num lanes: ', len(static_map.vector_lane_segments))
        for lane_id, lane_segment in static_map.vector_lane_segments.items():
            lane_clr = 'grey'
            polygon = lane_segment.polygon_boundary
            ax.fill(polygon[:, 0], polygon[:, 1], color='whitesmoke', alpha=1.0, edgecolor=None, zorder=0)

            # centerline
            centerline = static_map.get_lane_segment_centerline(lane_id)[:, 0:2]  # use xy
            ax.plot(centerline[:, 0], centerline[:, 1], alpha=0.1, color='grey', linestyle='dotted', zorder=1)

            # lane boundary
            for boundary, mark_type in [(lane_segment.left_lane_boundary.xyz, lane_segment.left_mark_type),
                                        (lane_segment.right_lane_boundary.xyz, lane_segment.right_mark_type)]:

                clr = None
                width = 1.0
                if mark_type in [LaneMarkType.DASH_SOLID_WHITE,
                                 LaneMarkType.DASHED_WHITE,
                                 LaneMarkType.DOUBLE_DASH_WHITE,
                                 LaneMarkType.DOUBLE_SOLID_WHITE,
                                 LaneMarkType.SOLID_WHITE,
                                 LaneMarkType.SOLID_DASH_WHITE]:
                    clr = 'white'
                    zorder = 3
                    width = width
                elif mark_type in [LaneMarkType.DASH_SOLID_YELLOW,
                                   LaneMarkType.DASHED_YELLOW,
                                   LaneMarkType.DOUBLE_DASH_YELLOW,
                                   LaneMarkType.DOUBLE_SOLID_YELLOW,
                                   LaneMarkType.SOLID_YELLOW,
                                   LaneMarkType.SOLID_DASH_YELLOW]:
                    clr = 'gold'
                    zorder = 4
                    width = width * 1.1

                style = 'solid'
                if mark_type in [LaneMarkType.DASHED_WHITE,
                                 LaneMarkType.DASHED_YELLOW,
                                 LaneMarkType.DOUBLE_DASH_YELLOW,
                                 LaneMarkType.DOUBLE_DASH_WHITE]:
                    style = (0, (5, 10))  # loosely dashed
                elif mark_type in [LaneMarkType.DASH_SOLID_YELLOW,
                                   LaneMarkType.DASH_SOLID_WHITE,
                                   LaneMarkType.DOUBLE_SOLID_YELLOW,
                                   LaneMarkType.DOUBLE_SOLID_WHITE,
                                   LaneMarkType.SOLID_YELLOW,
                                   LaneMarkType.SOLID_WHITE,
                                   LaneMarkType.SOLID_DASH_WHITE,
                                   LaneMarkType.SOLID_DASH_YELLOW]:
                    style = 'solid'

                if (clr is not None) and (style is not None):
                    ax.plot(boundary[:, 0],
                            boundary[:, 1],
                            color=clr,
                            alpha=1.0,
                            linewidth=width,
                            linestyle=style,
                            zorder=zorder)

        # ~ ped xing
        for pedxing in static_map.vector_pedestrian_crossings.values():
            edge = np.concatenate([pedxing.edge1.xyz, np.flip(pedxing.edge2.xyz, axis=0)])
            ax.fill(edge[:, 0], edge[:, 1], color='yellow', alpha=0.1, edgecolor=None)

In [8]:
from typing import Any, Dict, List, Tuple
import os

import numpy as np
import pandas as pd
import matplotlib.transforms as transforms
from matplotlib.patches import Ellipse
import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib.patches import Polygon
from pathlib import Path

from argoverse.map_representation.map_api import ArgoverseMap
#
from av2.map.map_api import ArgoverseStaticMap
from av2.map.lane_segment import LaneType, LaneMarkType
from av2.datasets.motion_forecasting.data_schema import ArgoverseScenario, ObjectState, ObjectType, Track, TrackCategory


class ArgoMapVisualizer:
    def __init__(self):
        self.argo_map = ArgoverseMap()

    def show_lanes(self, ax, city_name, lane_ids, clr='g', alpha=0.2, show_lane_ids=False):
        seq_lane_props = self.argo_map.city_lane_centerlines_dict[city_name]

        for idx in lane_ids:
            lane_cl = seq_lane_props[idx].centerline
            ax.plot(lane_cl[:, 0], lane_cl[:, 1], color=clr, alpha=alpha, linewidth=5)

            if show_lane_ids:
                m_pt = lane_cl[int(lane_cl.shape[0] / 2)]
                ax.text(m_pt[0], m_pt[1], idx, color='b')

    def show_map_with_lanes(self,
                            ax,
                            city_name,
                            position,
                            lane_ids,
                            map_size=np.array([150.0, 150.0]),
                            show_freespace=True,
                            show_lane_ids=False):
        x_min = position[0] - map_size[0] / 2
        x_max = position[0] + map_size[0] / 2
        y_min = position[1] - map_size[1] / 2
        y_max = position[1] + map_size[1] / 2

        ax.set_xlim(x_min, x_max)
        ax.set_ylim(y_min, y_max)

        seq_lane_props = self.argo_map.city_lane_centerlines_dict[city_name]

        for idx in lane_ids:
            lane_cl = seq_lane_props[idx].centerline
            lane_polygon = self.argo_map.get_lane_segment_polygon(
                idx, city_name)
            ax.add_patch(
                Polygon(lane_polygon[:, 0:2],
                        color='gray',
                        alpha=0.1,
                        edgecolor=None))

            pt = lane_cl[0]
            vec = lane_cl[1] - lane_cl[0]
            ax.arrow(pt[0],
                     pt[1],
                     vec[0],
                     vec[1],
                     alpha=0.5,
                     color='grey',
                     width=0.1,
                     zorder=1)
            if show_lane_ids:
                m_pt = lane_cl[int(lane_cl.shape[0] / 2)]
                ax.text(m_pt[0], m_pt[1], idx, color='b')

        if show_freespace:
            drivable_area = self.argo_map.get_da_contours(city_name)
            surrounding_contours = []
            for contour in drivable_area:
                if (np.min(contour[:, 0]) < x_max
                        and np.min(contour[:, 1]) < y_max
                        and np.max(contour[:, 0]) > x_min
                        and np.max(contour[:, 1]) > y_min):
                    surrounding_contours.append(contour)

            for contour in surrounding_contours:
                ax.add_patch(
                    Polygon(contour[:, 0:2],
                            color='darkgray',
                            alpha=0.1,
                            edgecolor=None))

    def show_surrounding_elements(self,
                                  ax,
                                  city_name,
                                  position,
                                  map_size=np.array([150.0, 150.0]),
                                  show_freespace=True,
                                  show_lane_ids=False):
        x_min = position[0] - map_size[0] / 2
        x_max = position[0] + map_size[0] / 2
        y_min = position[1] - map_size[1] / 2
        y_max = position[1] + map_size[1] / 2

        ax.set_xlim(x_min, x_max)
        ax.set_ylim(y_min, y_max)

        seq_lane_props = self.argo_map.city_lane_centerlines_dict[city_name]
        surrounding_lanes = {}
        for lane_id, lane_props in seq_lane_props.items():
            lane_cl = lane_props.centerline
            if (np.min(lane_cl[:, 0]) < x_max and np.min(lane_cl[:, 1]) < y_max
                    and np.max(lane_cl[:, 0]) > x_min
                    and np.max(lane_cl[:, 1]) > y_min):
                surrounding_lanes[lane_id] = lane_cl

        for idx, lane_cl in surrounding_lanes.items():
            lane_polygon = self.argo_map.get_lane_segment_polygon(
                idx, city_name)
            ax.add_patch(
                Polygon(lane_polygon[:, 0:2],
                        color='gray',
                        alpha=0.1,
                        edgecolor=None))

            pt = lane_cl[0]
            vec = lane_cl[1] - lane_cl[0]
            vec = vec / np.linalg.norm(vec) * 1.0
            # ax.arrow(pt[0],
            #          pt[1],
            #          vec[0],
            #          vec[1],
            #          alpha=0.5,
            #          color='grey',
            #          width=0.1,
            #          zorder=1)
            if show_lane_ids:
                m_pt = lane_cl[int(lane_cl.shape[0] / 2)]
                ax.text(m_pt[0], m_pt[1], idx, color='b')

        if show_freespace:
            drivable_area = self.argo_map.get_da_contours(city_name)
            surrounding_contours = []
            for contour in drivable_area:
                if (np.min(contour[:, 0]) < x_max
                        and np.min(contour[:, 1]) < y_max
                        and np.max(contour[:, 0]) > x_min
                        and np.max(contour[:, 1]) > y_min):
                    surrounding_contours.append(contour)

            for contour in surrounding_contours:
                ax.add_patch(
                    Polygon(contour[:, 0:2],
                            color='darkgray',
                            alpha=0.1,
                            edgecolor=None))

    def show_all_map(self, ax, city_name, show_freespace=True):
        seq_lane_props = self.argo_map.city_lane_centerlines_dict[city_name]

        for lane_id, lane_props in seq_lane_props.items():
            lane_cl = lane_props.centerline

            pt = lane_cl[0]
            vec = lane_cl[1] - lane_cl[0]

            under_control = self.argo_map.lane_has_traffic_control_measure(
                lane_id, city_name)

            in_intersection = self.argo_map.lane_is_in_intersection(
                lane_id, city_name)

            turn_dir = self.argo_map.get_lane_turn_direction(
                lane_id, city_name)

            cl_clr = 'grey'

            if in_intersection:
                cl_clr = 'orange'

            if turn_dir == 'LEFT':
                cl_clr = 'blue'
            elif turn_dir == 'RIGHT':
                cl_clr = 'green'

            ax.arrow(pt[0],
                     pt[1],
                     vec[0],
                     vec[1],
                     alpha=0.5,
                     color=cl_clr,
                     width=0.1,
                     zorder=1)

            if under_control:
                p_vec = vec / np.linalg.norm(vec) * 1.5
                pt1 = pt + np.array([-p_vec[1], p_vec[0]])
                pt2 = pt + np.array([p_vec[1], -p_vec[0]])
                ax.plot([pt1[0], pt2[0]], [pt1[1], pt2[1]],
                        color='tomato',
                        alpha=0.5,
                        linewidth=2)

            lane_polygon = self.argo_map.get_lane_segment_polygon(
                lane_id, city_name)
            ax.add_patch(
                Polygon(lane_polygon[:, 0:2],
                        color=cl_clr,
                        alpha=0.1,
                        edgecolor=None))

        if show_freespace:
            drivable_area = self.argo_map.get_da_contours(city_name)
            surrounding_contours = []
            for contour in drivable_area:
                surrounding_contours.append(contour)

            for contour in surrounding_contours:
                ax.add_patch(
                    Polygon(contour[:, 0:2],
                            color='darkgray',
                            alpha=0.1,
                            edgecolor=None))

    def transform_2d_gaussian(self, mu_x, mu_y, sig_x, sig_y, rho, rot_mat, pos):
        cov_xy = sig_x * sig_y * rho
        cov_mat = np.array([[sig_x**2, cov_xy], [cov_xy, sig_y**2]])

        _cov_mat = rot_mat.dot(cov_mat).dot(rot_mat.T)
        _sig_x = np.sqrt(_cov_mat[0, 0])
        _sig_y = np.sqrt(_cov_mat[1, 1])
        _cov_xy = _cov_mat[0, 1]
        _rho = _cov_xy / (_sig_x * _sig_y)

        _mu_tmp = rot_mat.dot(np.array([[mu_x], [mu_y]])).flatten()

        _mu_x = _mu_tmp[0] + pos[0]
        _mu_y = _mu_tmp[1] + pos[1]

        return _mu_x, _mu_y, _sig_x, _sig_y, _rho

    # Visualize 2d gaussian distribution
    def get_confidence_ellipse(self,
                               mu_x,
                               mu_y,
                               sig_x,
                               sig_y,
                               rho,
                               trans,
                               n_std=3,
                               facecolor='none',
                               edgecolor='red',
                               alpha=0.3):
        ell_radius_x = np.sqrt(1 + rho)
        ell_radius_y = np.sqrt(1 - rho)
        ellipse = Ellipse((0, 0),
                          width=ell_radius_x * 2,
                          height=ell_radius_y * 2,
                          facecolor=facecolor,
                          edgecolor=edgecolor,
                          alpha=alpha)

        # multiply stdandard deviation with the
        # given number of standard deviations.
        scale_x = sig_x * n_std
        scale_y = sig_y * n_std

        transf = transforms.Affine2D() \
            .rotate_deg(45) \
            .scale(scale_x, scale_y) \
            .translate(mu_x, mu_y)

        ellipse.set_transform(transf + trans)

        return ellipse


class AV2MapVisualizer:
    def __init__(self):
        self.dataset_dir = os.path.expanduser('~') + '/data/dataset/argoverse2/'

    def show_map(self,
                 ax,
                 split: str,
                 seq_id: str,
                 show_freespace=True):

        # ax.set_facecolor("grey")

        static_map_path = Path(self.dataset_dir + f"/{split}/{seq_id}" + f"/log_map_archive_{seq_id}.json")
        static_map = ArgoverseStaticMap.from_json(static_map_path)

        # ~ drivable area
        # print('num drivable areas: ', len(static_map.vector_drivable_areas),
        #       [x for x in static_map.vector_drivable_areas.keys()])
        for drivable_area in static_map.vector_drivable_areas.values():
            # ax.plot(drivable_area.xyz[:, 0], drivable_area.xyz[:, 1], color='grey', alpha=0.5, linestyle='--')
            ax.fill(drivable_area.xyz[:, 0], drivable_area.xyz[:, 1], color='grey', alpha=0.2)

        # ~ lane segments
        # print('num lane segs: ', len(static_map.vector_lane_segments),
        #       [x for x in static_map.vector_lane_segments.keys()])
        print('Num lanes: ', len(static_map.vector_lane_segments))
        for lane_segment in static_map.vector_lane_segments.values():
            # print('left pts: ', lane_segment.left_lane_boundary.xyz.shape,
            #       'right pts: ', lane_segment.right_lane_boundary.xyz.shape)

            if lane_segment.lane_type == 'VEHICLE':
                lane_clr = 'blue'
            elif lane_segment.lane_type == 'BIKE':
                lane_clr = 'green'
            elif lane_segment.lane_type == 'BUS':
                lane_clr = 'orange'
            else:
                assert False, "Wrong lane type"

            # if lane_segment.is_intersection:
            #     lane_clr = 'yellow'

            polygon = lane_segment.polygon_boundary
            ax.fill(polygon[:, 0], polygon[:, 1], color=lane_clr, alpha=0.1)

            for boundary in [lane_segment.left_lane_boundary, lane_segment.right_lane_boundary]:
                ax.plot(boundary.xyz[:, 0],
                        boundary.xyz[:, 1],
                        linewidth=1,
                        color='grey',
                        alpha=0.3)

            # cl = static_map.get_lane_segment_centerline(lane_segment.id)
            # ax.plot(cl[:, 0], cl[:, 1], linestyle='--', color='magenta', alpha=0.1)

        # ~ ped xing
        for pedxing in static_map.vector_pedestrian_crossings.values():
            edge = np.concatenate([pedxing.edge1.xyz, np.flip(pedxing.edge2.xyz, axis=0)])
            # plt.plot(edge[:, 0], edge[:, 1], color='orange', alpha=0.75)
            ax.fill(edge[:, 0], edge[:, 1], color='orange', alpha=0.2)
            # for edge in [ped_xing.edge1, ped_xing.edge2]:
            #     ax.plot(edge.xyz[:, 0], edge.xyz[:, 1], color='orange', alpha=0.5, linestyle='dotted')

    def show_map_clean(self,
                       ax,
                       split: str,
                       seq_id: str,
                       show_freespace=True):

        # ax.set_facecolor("grey")

        static_map_path = Path(self.dataset_dir + f"/{split}/{seq_id}" + f"/log_map_archive_{seq_id}.json")
        static_map = ArgoverseStaticMap.from_json(static_map_path)

        # ~ drivable area
        for drivable_area in static_map.vector_drivable_areas.values():
            # ax.plot(drivable_area.xyz[:, 0], drivable_area.xyz[:, 1], color='grey', alpha=0.5, linestyle='--')
            ax.fill(drivable_area.xyz[:, 0], drivable_area.xyz[:, 1], color='grey', alpha=0.2)

        # ~ lane segments
        print('Num lanes: ', len(static_map.vector_lane_segments))
        for lane_id, lane_segment in static_map.vector_lane_segments.items():
            lane_clr = 'grey'
            polygon = lane_segment.polygon_boundary
            ax.fill(polygon[:, 0], polygon[:, 1], color='whitesmoke', alpha=1.0, edgecolor=None, zorder=0)

            # centerline
            centerline = static_map.get_lane_segment_centerline(lane_id)[:, 0:2]  # use xy
            ax.plot(centerline[:, 0], centerline[:, 1], alpha=0.1, color='grey', linestyle='dotted', zorder=1)

            # lane boundary
            for boundary, mark_type in [(lane_segment.left_lane_boundary.xyz, lane_segment.left_mark_type),
                                        (lane_segment.right_lane_boundary.xyz, lane_segment.right_mark_type)]:

                clr = None
                width = 1.0
                if mark_type in [LaneMarkType.DASH_SOLID_WHITE,
                                 LaneMarkType.DASHED_WHITE,
                                 LaneMarkType.DOUBLE_DASH_WHITE,
                                 LaneMarkType.DOUBLE_SOLID_WHITE,
                                 LaneMarkType.SOLID_WHITE,
                                 LaneMarkType.SOLID_DASH_WHITE]:
                    clr = 'white'
                    zorder = 3
                    width = width
                elif mark_type in [LaneMarkType.DASH_SOLID_YELLOW,
                                   LaneMarkType.DASHED_YELLOW,
                                   LaneMarkType.DOUBLE_DASH_YELLOW,
                                   LaneMarkType.DOUBLE_SOLID_YELLOW,
                                   LaneMarkType.SOLID_YELLOW,
                                   LaneMarkType.SOLID_DASH_YELLOW]:
                    clr = 'gold'
                    zorder = 4
                    width = width * 1.1

                style = 'solid'
                if mark_type in [LaneMarkType.DASHED_WHITE,
                                 LaneMarkType.DASHED_YELLOW,
                                 LaneMarkType.DOUBLE_DASH_YELLOW,
                                 LaneMarkType.DOUBLE_DASH_WHITE]:
                    style = (0, (5, 10))  # loosely dashed
                elif mark_type in [LaneMarkType.DASH_SOLID_YELLOW,
                                   LaneMarkType.DASH_SOLID_WHITE,
                                   LaneMarkType.DOUBLE_SOLID_YELLOW,
                                   LaneMarkType.DOUBLE_SOLID_WHITE,
                                   LaneMarkType.SOLID_YELLOW,
                                   LaneMarkType.SOLID_WHITE,
                                   LaneMarkType.SOLID_DASH_WHITE,
                                   LaneMarkType.SOLID_DASH_YELLOW]:
                    style = 'solid'

                if (clr is not None) and (style is not None):
                    ax.plot(boundary[:, 0],
                            boundary[:, 1],
                            color=clr,
                            alpha=1.0,
                            linewidth=width,
                            linestyle=style,
                            zorder=zorder)

        # ~ ped xing
        for pedxing in static_map.vector_pedestrian_crossings.values():
            edge = np.concatenate([pedxing.edge1.xyz, np.flip(pedxing.edge2.xyz, axis=0)])
            ax.fill(edge[:, 0], edge[:, 1], color='yellow', alpha=0.1, edgecolor=None)

In [12]:
ArgoMapVisualizer()

<__main__.ArgoMapVisualizer at 0x1409a5940>

In [13]:
 ArgoverseMap()

<argoverse.map_representation.map_api.ArgoverseMap at 0x140979f90>

In [9]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt

In [10]:
_ESTIMATED_VEHICLE_LENGTH_M = 5.0
_ESTIMATED_VEHICLE_WIDTH_M = 2.0
_ESTIMATED_CYCLIST_LENGTH_M = 2.0
_ESTIMATED_CYCLIST_WIDTH_M = 0.7
_ESTIMATED_PEDESTRIAN_LENGTH_M = 0.3
_ESTIMATED_PEDESTRIAN_WIDTH_M = 0.5
_ESTIMATED_BUS_LENGTH_M = 7.0
_ESTIMATED_BUS_WIDTH_M = 2.1


class Visualizer():
    def __init__(self):
        self.map_vis = AV2MapVisualizer()

    def draw_once(self, post_out, data, eval_out, show_map=False, test_mode=False, split='val'):
        batch_size = len(data['SEQ_ID'])

        seq_id = data['SEQ_ID'][0]
        city_name = data['CITY_NAME'][0]
        orig = data['ORIG'][0]
        rot = data['ROT'][0]
        trajs_obs = data['TRAJS'][0]['TRAJS_POS_OBS']
        trajs_fut = data['TRAJS'][0]['TRAJS_POS_FUT']
        pads_obs = data['TRAJS'][0]['PAD_OBS']
        pads_fut = data['TRAJS'][0]['PAD_FUT']
        trajs_ctrs = data['TRAJS'][0]['TRAJS_CTRS']
        trajs_vecs = data['TRAJS'][0]['TRAJS_VECS']
        #
        trajs_type = data['TRAJS'][0]["TRAJS_TYPE"]
        trajs_tid = data['TRAJS'][0]["TRAJS_TID"]
        trajs_cat = data['TRAJS'][0]["TRAJS_CAT"]
        #
        lane_graph = data['LANE_GRAPH'][0]

        res_cls, res_reg, res_aux = post_out['out_raw']

        print("[Vis] seq_id: {}, city_name: {}".format(seq_id, city_name))
        _, ax = plt.subplots(figsize=(12, 12))
        ax.axis('equal')
        ax.set_title('{}-{}'.format(seq_id, city_name))

        if show_map:
            self.map_vis.show_map_clean(ax, split, seq_id)
            pass
        else:
            rot = torch.eye(2)
            orig = torch.zeros(2)

        # trajs hist
        for i, (traj_obs, pad_obs, ctr, vec) in enumerate(zip(trajs_obs, pads_obs, trajs_ctrs, trajs_vecs)):
            traj_cat = trajs_cat[i]
            zorder = 10
            if traj_cat == 'focal':
                clr = 'r'
                zorder = 20
            elif traj_cat == 'av':
                clr = 'cornflowerblue'
            elif traj_cat == 'score':
                clr = 'royalblue'
            else:
                clr = 'grey'

            theta = np.arctan2(vec[1], vec[0])
            act_rot = torch.Tensor([[np.cos(theta), -np.sin(theta)],
                                    [np.sin(theta), np.cos(theta)]])

            if not traj_cat in ['unscore', 'frag']:
                traj_obs = torch.matmul(traj_obs, act_rot.T) + ctr
                traj_obs = torch.matmul(traj_obs, rot.T) + orig
                ax.plot(traj_obs[:, 0], traj_obs[:, 1], marker='.', alpha=0.25, color=clr, zorder=zorder)

            # attrs
            traj_type = np.where(trajs_type[i][len(traj_obs) - 1])[0][0]
            if traj_type == 0:
                bbox_l = _ESTIMATED_VEHICLE_LENGTH_M
                bbox_w = _ESTIMATED_VEHICLE_WIDTH_M
            elif traj_type == 1:
                bbox_l = _ESTIMATED_PEDESTRIAN_LENGTH_M
                bbox_w = _ESTIMATED_PEDESTRIAN_WIDTH_M
            elif traj_type == 2 or traj_type == 3:
                bbox_l = _ESTIMATED_CYCLIST_LENGTH_M
                bbox_w = _ESTIMATED_CYCLIST_WIDTH_M
            elif traj_type == 4:
                bbox_l = _ESTIMATED_BUS_LENGTH_M
                bbox_w = _ESTIMATED_BUS_WIDTH_M
            elif traj_type == 5:
                bbox_l = 1.0  # unknown
                bbox_w = 1.0
            else:
                bbox_l = 0.5  # static
                bbox_w = 0.5
            bbox = torch.Tensor([[-bbox_l / 2, -bbox_w / 2],
                                 [-bbox_l / 2, bbox_w / 2],
                                 [bbox_l / 2, bbox_w / 2],
                                 [bbox_l / 2, -bbox_w / 2]])
            bbox = torch.matmul(bbox, act_rot.T) + ctr
            bbox = torch.matmul(bbox, rot.T) + orig
            ax.fill(bbox[:, 0], bbox[:, 1], color=clr, alpha=0.5, zorder=zorder)

            ctr_vis = torch.matmul(ctr, rot.T) + orig
            vec_vis = torch.matmul(vec, rot.T)
            ax.arrow(ctr_vis[0], ctr_vis[1], vec_vis[0], vec_vis[1], alpha=0.5, color=clr, width=0.05, zorder=zorder)

        # if not test mode, vis GT trajectories
        if not test_mode:
            for i, (traj_fut, pad_fut, ctr, vec) in enumerate(zip(trajs_fut, pads_fut, trajs_ctrs, trajs_vecs)):
                traj_cat = trajs_cat[i]
                zorder = 10
                if traj_cat == 'focal':
                    clr = 'deeppink'
                    zorder = 20
                elif traj_cat == 'av':
                    clr = 'deepskyblue'
                elif traj_cat == 'score':
                    clr = 'deepskyblue'
                else:
                    clr = 'grey'

                if traj_cat in ['unscore', 'frag']:
                    continue

                theta = np.arctan2(vec[1], vec[0])
                act_rot = torch.Tensor([[np.cos(theta), -np.sin(theta)],
                                        [np.sin(theta), np.cos(theta)]])

                traj_fut = torch.matmul(traj_fut, act_rot.T) + ctr
                traj_fut = torch.matmul(traj_fut, rot.T) + orig
                ax.plot(traj_fut[:, 0], traj_fut[:, 1], alpha=0.5, color=clr, zorder=zorder)

                # mk = '*' if torch.sum(pad_fut) == 60 else 's'
                mk = '*'
                ax.plot(traj_fut[-1, 0], traj_fut[-1, 1], marker=mk, alpha=0.5, color=clr, zorder=zorder, markersize=10)

        # traj pred all
        print('res_reg: ', [x.shape for x in res_reg])
        res_reg = res_reg[0].cpu().detach()
        res_cls = res_cls[0].cpu().detach()
        for i, (trajs, probs, ctr, vec) in enumerate(zip(res_reg, res_cls, trajs_ctrs, trajs_vecs)):
            traj_cat = trajs_cat[i]
            zorder = 10
            if traj_cat == 'focal':
                clr = 'r'
                zorder = 20
            elif traj_cat == 'av':
                clr = 'cornflowerblue'
            elif traj_cat == 'score':
                clr = 'royalblue'
            else:
                clr = 'grey'

            if traj_cat in ['unscore', 'frag']:
                continue

            theta = np.arctan2(vec[1], vec[0])
            act_rot = torch.Tensor([[np.cos(theta), -np.sin(theta)],
                                    [np.sin(theta), np.cos(theta)]])

            for traj, prob in zip(trajs, probs):
                if prob < 0.05 and (not i in [0]):
                    continue

                pos = torch.matmul(traj[..., 0:2], act_rot.T) + ctr
                pos = torch.matmul(pos[..., 0:2], rot.T) + orig

                ax.plot(pos[:, 0], pos[:, 1], alpha=0.3, color=clr, zorder=zorder, linestyle='--')
                ax.arrow(pos[-2, 0],
                         pos[-2, 1],
                         (pos[-1, 0] - pos[-2, 0]),
                         (pos[-1, 1] - pos[-2, 1]),
                         edgecolor=None,
                         color=clr,
                         alpha=0.3,
                         width=0.2,
                         zorder=zorder)
                ax.text(pos[-1, 0], pos[-1, 1], '{:.2f}'.format(prob),
                        color=clr, zorder=zorder, alpha=0.6)  # rotation=30

        # # lane graph
        # node_ctrs = lane_graph['node_ctrs']  # [196, 10, 2]
        # node_vecs = lane_graph['node_vecs']  # [196, 10, 2]
        # lane_ctrs = lane_graph['lane_ctrs']  # [196, 2]
        # lane_vecs = lane_graph['lane_vecs']  # [196, 2]

        # for ctrs_tmp, vecs_tmp, anch_pos, anch_vec in zip(node_ctrs, node_vecs, lane_ctrs, lane_vecs):
        #     anch_rot = torch.Tensor([[anch_vec[0], -anch_vec[1]],
        #                              [anch_vec[1], anch_vec[0]]])
        #     ctrs_tmp = torch.matmul(ctrs_tmp, anch_rot.T) + anch_pos
        #     ctrs_tmp = torch.matmul(ctrs_tmp, rot.T) + orig
        #     ax.plot(ctrs_tmp[:, 0], ctrs_tmp[:, 1], alpha=0.1, color='grey', linestyle='--')

        plt.tight_layout()
        plt.show()

In [11]:
Visualizer()

<__main__.Visualizer at 0x1348f5010>