In this tutorial, we'll demonstrate the Argoverse 2.0 map API, and visualize some of the map data.

In [1]:
from argparse import Namespace
from pathlib import Path

In [2]:
import argparse
from pathlib import Path
from typing import List

import matplotlib

import matplotlib.pyplot as plt
import numpy as np

from mpl_toolkits.axes_grid1 import make_axes_locatable

import av2.geometry.polyline_utils as polyline_utils
import av2.rendering.vector as vector_plotting_utils
from av2.datasets.sensor.av2_sensor_dataloader import AV2SensorDataLoader
from av2.map.map_api import ArgoverseStaticMap, LaneSegment

  UINT8_BITS: Final[np.uint8] = np.log2(UINT8_MAX + 1).astype(np.uint8)
  UINT8_BITS: Final[np.uint8] = np.log2(UINT8_MAX + 1).astype(np.uint8)
  UINT8_BITS: Final[np.uint8] = np.log2(UINT8_MAX + 1).astype(np.uint8)


First, we'll plot the lane graph and crosswalks, with crosswalks colored purple, and the lanes colored green.

In [3]:
import logging
import os
import sys
from pathlib import Path
from typing import Final

import click
import cv2
import numpy as np

import av2.rendering.color as color_utils
import av2.rendering.rasterize as raster_rendering_utils
import av2.rendering.video as video_utils
import av2.utils.io as io_utils
import av2.utils.raster as raster_utils
from av2.datasets.sensor.av2_sensor_dataloader import AV2SensorDataLoader
from av2.datasets.sensor.constants import RingCameras
from av2.map.map_api import ArgoverseStaticMap
from av2.rendering.color import GREEN_HEX, RED_HEX
from av2.utils.typing import NDArrayByte, NDArrayFloat, NDArrayInt
import av2

In [4]:
def generate_egoview_overlaid_lidar(
    data_root: Path,
    output_dir: Path,
    log_id: str,
    render_ground_pts_only: bool,
    dump_single_frames: bool,
) -> None:
    """Render LiDAR points from a particular camera's viewpoint (color by ground surface, and apply ROI filtering).

    Args:
        data_root: path to directory where the logs live on disk.
        output_dir: path to directory where renderings will be saved.
        log_id: unique ID for AV2 scenario/log.
        render_ground_pts_only: whether to only render LiDAR points located close to the ground surface.
        dump_single_frames: Whether to save to disk individual RGB frames of the rendering, in addition to generating
            the mp4 file.

    Raises:
        RuntimeError: If vehicle log data is not present at `data_root` for `log_id`.
    """
    loader = AV2SensorDataLoader(data_dir=data_root, labels_dir=data_root)

    log_map_dirpath = data_root / log_id / "map"
    avm = ArgoverseStaticMap.from_map_dir(log_map_dirpath, build_raster=True)
    # print(avm)

    # repeat red to green colormap every 50 m.
    colors_arr_rgb = color_utils.create_colormap(
        color_list=[RED_HEX, GREEN_HEX], n_colors=NUM_RANGE_BINS
    )
    colors_arr_rgb = (colors_arr_rgb * 255).astype(np.uint8)
    colors_arr_bgr: NDArrayByte = np.fliplr(colors_arr_rgb)

    print(list(RingCameras))
    for _, cam_name in enumerate(list(RingCameras)):
        print(type(cam_name))
        cam_im_fpaths = loader.get_ordered_log_cam_fpaths(log_id, cam_name.value)
        num_cam_imgs = len(cam_im_fpaths)

        video_list = []
        # print(cam_im_fpaths)
        for i, im_fpath in enumerate(cam_im_fpaths):
            if i % 50 == 0:
                logging.info(
                    f"\tOn file {i}/{num_cam_imgs} of camera {cam_name} of {log_id}"
                )

            cam_timestamp_ns = int(im_fpath.stem)
            city_SE3_ego = loader.get_city_SE3_ego(log_id, cam_timestamp_ns)
            
            if city_SE3_ego is None:
                logger.exception("missing LiDAR pose")
                continue

            # load feather file path, e.g. '315978406032859416.feather"
            lidar_fpath = loader.get_closest_lidar_fpath((data_root / log_id).name, cam_timestamp_ns)
            # print(lidar_fpath)
            # print(cam_timestamp_ns)
            # print(loader._sdb.get_closest_lidar_timestamp(cam_timestamp_ns, (data_root / log_id).name))
            if lidar_fpath is None:
                # logger.info(
                #     "No LiDAR sweep found within the synchronization interval for %s, so skipping...",
                #     cam_name,
                # )
                continue

            img_bgr = io_utils.read_img(im_fpath, channel_order="BGR")

            lidar_points_ego = io_utils.read_lidar_sweep(lidar_fpath, attrib_spec="xyz")
            lidar_timestamp_ns = int(lidar_fpath.stem)

            # put into city coords, then prune away ground and non-RoI points
            lidar_points_city = city_SE3_ego.transform_point_cloud(lidar_points_ego)
            lidar_points_city = avm.remove_non_drivable_area_points(lidar_points_city)
            is_ground_logicals = avm.get_ground_points_boolean(lidar_points_city)
            # lidar_points_city = lidar_points_city[
            #     is_ground_logicals if render_ground_pts_only else ~is_ground_logicals
            # ]
            lidar_points_ego = city_SE3_ego.inverse().transform_point_cloud(
                lidar_points_city
            )

            # motion compensate always
            (
                uv,
                points_cam,
                is_valid_points,
            ) = loader.project_ego_to_img_motion_compensated(
                points_lidar_time=lidar_points_ego,
                cam_name=cam_name,
                cam_timestamp_ns=cam_timestamp_ns,
                lidar_timestamp_ns=lidar_timestamp_ns,
                log_id=log_id,
            )

            if is_valid_points is None or uv is None or points_cam is None:
                continue

            if is_valid_points.sum() == 0:
                continue

            uv_int: NDArrayInt = np.round(uv[is_valid_points]).astype(np.int32)
            points_cam = points_cam[is_valid_points]
            pt_ranges: NDArrayFloat = np.linalg.norm(points_cam[:, :3], axis=1)
            color_bins: NDArrayInt = np.round(pt_ranges).astype(np.int32)
            # account for moving past 100 meters, loop around again
            color_bins = color_bins % NUM_RANGE_BINS
            uv_colors_bgr = colors_arr_bgr[color_bins]

            img_empty = np.full_like(img_bgr, fill_value=0)
            # print(uv_int)
            # print(uv_colors_bgr)
            # print(img_empty.shape, uv_int.shape, uv_colors_bgr.shape)
            # print(img_empty[uv_int])
            # img_empty[uv_int] = uv_colors_bgr
            # print(img_empty)
            # img_empty = raster_rendering_utils.draw_points_xy_in_img(
            #     img_empty, uv_int, uv_colors_bgr, diameter=5, with_anti_alias=False, sigma=1.0, alpha=0.0
            # )
            # blended_bgr = raster_utils.blend_images(img_bgr, img_empty)
            # frame_rgb = blended_bgr[:, :, ::-1]

            blended_bgr = raster_rendering_utils.draw_points_xy_in_img(
                img_bgr, uv_int, uv_colors_bgr, diameter=5, with_anti_alias=False, sigma=1.0, alpha=0.0
            )
            # blended_bgr = raster_utils.blend_images(img_bgr, img_empty)
            frame_rgb = blended_bgr[:, :, ::-1]

            if dump_single_frames:
                save_dir = output_dir / log_id / cam_name
                os.makedirs(save_dir, exist_ok=True)
                cv2.imwrite(
                    str(save_dir / f"{cam_name}_{lidar_timestamp_ns}.jpg"), blended_bgr
                )

            video_list.append(frame_rgb)

        if len(video_list) == 0:
            raise RuntimeError(
                "No video frames were found; log data was not found on disk."
            )

        video: NDArrayByte = np.stack(video_list).astype(np.uint8)
        video_output_dir = Path(output_dir) / Path("videos")
        video_utils.write_video(
            video=video,
            dst=video_output_dir / f"{log_id}_{cam_name}.mp4",
            fps=RING_CAMERA_FPS,
        )

In [5]:
# path to where the logs live
dataroot = Path("av2-api/tests/unit/test_data/sensor/test")

# unique log identifier
log_id = Path("7fab2350-7eaf-3b7e-a39d-6937a4c1bede")

NUM_RANGE_BINS: Final[int] = 50
RING_CAMERA_FPS: Final[int] = 20

generate_egoview_overlaid_lidar(
        data_root=dataroot,
        output_dir="./av2_out",
        log_id=log_id,
        render_ground_pts_only=False,
        dump_single_frames=False,
    )

[<RingCameras.RING_REAR_LEFT: 'ring_rear_left'>, <RingCameras.RING_SIDE_LEFT: 'ring_side_left'>, <RingCameras.RING_FRONT_LEFT: 'ring_front_left'>, <RingCameras.RING_FRONT_CENTER: 'ring_front_center'>, <RingCameras.RING_FRONT_RIGHT: 'ring_front_right'>, <RingCameras.RING_SIDE_RIGHT: 'ring_side_right'>, <RingCameras.RING_REAR_RIGHT: 'ring_rear_right'>]
<enum 'RingCameras'>


RuntimeError: No video frames were found; log data was not found on disk.