# Multi-sensor Rendering

[![Click and Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/metadriverse/metadrive/blob/main/documentation/source/simgen_render.ipynb)


In our [SimGen project](https://metadriverse.github.io/simgen/), we use MetaDrive's ScenarioEnv to generate multiple sensor rendering, namely the Semantic Map, Depth Image and RGB Image, to condition the SimGen model.

In this example, we will demonstrate the minimal example on how to achive this in MetaDrive's side.

The code working with SimGen can be found [here](https://github.com/metadriverse/SimGen/blob/0daf05b7f0a8d2a582cc4e054720b60f328fea50/metadrive_simgen.py).


### Utilities

In [1]:
import time

import cv2
import gymnasium as gym
import mediapy as media
import numpy as np
import tqdm
from PIL import Image
from PIL import ImageDraw, ImageFont
from metadrive.component.sensors.depth_camera import DepthCamera
from metadrive.component.sensors.rgb_camera import RGBCamera
from metadrive.component.sensors.semantic_camera import SemanticCamera
from metadrive.engine.asset_loader import AssetLoader
from metadrive.envs.scenario_env import ScenarioEnv
from metadrive.obs.image_obs import ImageObservation
from metadrive.obs.observation_base import BaseObservation
from metadrive.policy.replay_policy import ReplayEgoCarPolicy


def postprocess_semantic_image(image):
    """
    In order to align with the Segformer's output, we modify the output color of the semantic image from MetaDrive.
    """
    # customized
    old_LANE_LINE = (255, 255, 255)
    old_CROSSWALK = (55, 176, 189)

    new_LANE_LINE = (128, 64, 128)
    new_CROSSWALK = (128, 64, 128)

    # Change the color of the lane line and crosswalk
    assert image.dtype == np.uint8

    is_lane_line = (
            (image[..., 0] == old_LANE_LINE[0]) &
            (image[..., 1] == old_LANE_LINE[1]) &
            (image[..., 2] == old_LANE_LINE[2])
    )
    image[is_lane_line] = new_LANE_LINE

    is_crosswalk = (
            (image[..., 0] == old_CROSSWALK[0]) &
            (image[..., 1] == old_CROSSWALK[1]) &
            (image[..., 2] == old_CROSSWALK[2])
    )
    image[is_crosswalk] = new_CROSSWALK

    return image


### Customized Observation Class

In [3]:
class SimGenObservation(BaseObservation):
    def __init__(self, config):
        super(SimGenObservation, self).__init__(config)
        assert config["norm_pixel"] is False
        assert config["stack_size"] == 1
        self.seg_obs = ImageObservation(config, "seg_camera", config["norm_pixel"])
        self.rgb_obs = ImageObservation(config, "rgb_camera", config["norm_pixel"])
        self.depth_obs = ImageObservation(config, "depth_camera", config["norm_pixel"])

    @property
    def observation_space(self):
        os = dict(
            rgb=self.rgb_obs.observation_space,
            seg=self.seg_obs.observation_space,
            depth=self.depth_obs.observation_space,
        )
        return gym.spaces.Dict(os)

    def observe(self, vehicle):
        ret = {}

        seg_cam = self.engine.get_sensor("seg_camera").cam
        agent = seg_cam.getParent()
        original_position = seg_cam.getPos()
        heading, pitch, roll = seg_cam.getHpr()
        seg_img = self.seg_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])
        assert seg_img.ndim == 4
        assert seg_img.shape[-1] == 1
        assert seg_img.dtype == np.uint8
        # Do some postprocessing here
        seg_img = seg_img[..., 0]
        before = seg_img.copy()
        seg_img = postprocess_semantic_image(seg_img[..., ::-1]) # BGR -> RGB and postprocess
        ret["seg"] = seg_img

        depth_cam = self.engine.get_sensor("depth_camera").cam
        agent = depth_cam.getParent()
        original_position = depth_cam.getPos()
        heading, pitch, roll = depth_cam.getHpr()
        depth_img = self.depth_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])
        assert depth_img.ndim == 4
        assert depth_img.shape[-1] == 1
        assert depth_img.dtype == np.uint8
        depth_img = depth_img[..., 0]
        # before = depth_img.copy()
        depth_img = cv2.bitwise_not(depth_img)
        depth_img = depth_img[..., None]
        ret["depth"] = depth_img

        rgb_cam = self.engine.get_sensor("rgb_camera").cam
        agent = rgb_cam.getParent()
        original_position = rgb_cam.getPos()
        heading, pitch, roll = rgb_cam.getHpr()
        rgb_img = self.rgb_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])
        assert rgb_img.ndim == 4
        assert rgb_img.shape[-1] == 1
        assert rgb_img.dtype == np.uint8
        rgb_img = rgb_img[..., 0]
        # Change the color from BGR to RGB
        rgb_img = rgb_img[..., ::-1]
        ret["rgb"] = rgb_img

        return ret


### Setup ScenarioEnv

In [4]:
# ===== MetaDrive Setup =====
import os
sensor_size = (80, 45) if os.getenv('TEST_DOC') else (800, 450)

env = ScenarioEnv(
    {
        'agent_observation': SimGenObservation,

        # To enable onscreen rendering, set this config to True.
        "use_render": False,

        # !!!!! To enable offscreen rendering, set this config to True !!!!!
        "image_observation": True,

        "norm_pixel": False,
        "stack_size": 1,

        # ===== The scenario and MetaDrive config =====
        "agent_policy": ReplayEgoCarPolicy,
        "no_traffic": False,
        "sequential_seed": True,
        "reactive_traffic": False,
        "num_scenarios": 9,
        "horizon": 1000,
        "no_static_vehicles": False,
        "agent_configs": {
            "default_agent": dict(use_special_color=True, vehicle_model="varying_dynamics_bounding_box")
        },
        "vehicle_config": dict(
            show_navi_mark=False,
            show_line_to_dest=False,
            lidar=dict(num_lasers=120, distance=50),
            lane_line_detector=dict(num_lasers=0, distance=50),
            side_detector=dict(num_lasers=12, distance=50),
        ),
        # "use_bounding_box": True,
        "data_directory": AssetLoader.file_path("nuscenes", unix_style=False),
        "height_scale": 1,

        "set_static": True,

        # ===== Set some sensor and visualization configs =====
        "daytime": "08:10",
        "window_size": (sensor_size[0], sensor_size[1]),
        "camera_dist": 0.8,  # 0.8, 1.71
        "camera_height": 1.5,  # 1.5
        "camera_pitch": None,
        "camera_fov": 66,  # 60, 66
        "sensors": dict(
            depth_camera=(DepthCamera, sensor_size[0], sensor_size[1]),
            rgb_camera=(RGBCamera, sensor_size[0], sensor_size[1]),
            seg_camera=(SemanticCamera,sensor_size[0], sensor_size[1]),
        ),

        # ===== Remove useless items in the images =====
        "show_logo": False,
        "show_fps": False,
        "show_interface": True,
        "disable_collision": True,
        "force_destroy": True,
    }
)

[38;20m[INFO] Environment: ScenarioEnv[0m
[38;20m[INFO] MetaDrive version: 0.4.3[0m
[38;20m[INFO] Sensors: [lidar: Lidar(), side_detector: SideDetector(), lane_line_detector: LaneLineDetector(), depth_camera: DepthCamera(800, 450), rgb_camera: RGBCamera(800, 450), seg_camera: SemanticCamera(800, 450)][0m
[38;20m[INFO] Render Mode: offscreen[0m
[38;20m[INFO] Horizon (Max steps per agent): 1000[0m


### Rollout

In [5]:
skip_steps = 1
fps = 10

frames = []

try:
    env.reset()
    scenario = env.engine.data_manager.current_scenario
    scenario_id = scenario['id']
    print(
        "Current scenario ID {}, dataset version {}, len: {}".format(
            scenario_id, scenario['version'], scenario['length']
        )
    )
    horizon = scenario['length']
    for t in tqdm.trange(horizon):
        o, r, d, _, _ = env.step([1, 0.88])
        if t % skip_steps == 0:
            depth_img = Image.fromarray(o["depth"].repeat(3, axis=-1), mode="RGB")
            seg_img = Image.fromarray(o["seg"], mode="RGB")
            rgb_img = Image.fromarray(o["rgb"], mode="RGB")
    
            vis = cv2.hconcat([o["seg"], o["depth"].repeat(3, axis=-1)])
            h, w, _ = o["rgb"].shape
            vis_w = vis.shape[1]
            image = cv2.resize(o["rgb"], (vis_w, int(h * vis_w / w)))
            vis = cv2.vconcat([vis, image])
    
            # Quick visualization:
            # import matplotlib.pyplot as plt;plt.imshow(vis);plt.show()
            frames.append(vis)
finally:
    env.close()

[38;20m[INFO] Assets version: 0.4.3[0m
[38;20m[INFO] Known Pipes: glxGraphicsPipe[0m
[38;20m[INFO] Assets version: 0.4.3[0m
[38;20m[INFO] Known Pipes: glxGraphicsPipe[0m
[38;20m[INFO] Start Scenario Index: 0, Num Scenarios : 9[0m


Current scenario ID scene-0061, dataset version nuscenesv1.0-mini, len: 191


 87%|████████▋ | 167/191 [00:16<00:01, 14.44it/s][38;20m[INFO] Episode ended! Scenario Index: 0 Scenario id: scene-0061 Reason: arrive_dest.[0m
100%|██████████| 191/191 [00:17<00:00, 10.63it/s]


In [6]:
# Show video
media.show_video(frames, fps=fps, width=600)

0
This browser does not support the video tag.
