<img src="media/nuplan.png" alt="Drawing" style="width: 700px;"/>

# Creating a new planner in nuPlan <a name="introduction"></a>


Welcome to nuPlan! In this tutorial we provide an example of implementing a new planner and running it in simulation.

## Setup

To be able to access all resources within this notebook, make sure Jupyter is launched at the root of this repo. The path of the notebook should be `~/nuplan-devkit/nuplan/tutorials

In [None]:
# Increase notebook width
from IPython.core.display import display, HTML
display(HTML("<style>.output_result { max-width:100% !important; }</style>"))
display(HTML("<style>.container { width:100% !important; }</style>"))

In [None]:
# Useful imports
import os
from pathlib import Path
import tempfile

import hydra

# 1) Creating the planner <a name="simulation"></a>

We use the example of the SimplePlanner class to demonstrate creating a planner in nuPlan.

Inheriting from AbstractPlanner, we implement methods for initialization, planner naming, and declaring the type of observations expected by compute_trajectory. We additionally implement a compute_trajectory method, the core behavior of the planner. 

In the case of SimplePlanner, we will just have the trajectory go straight. To accomplish this, we rely on a helper function, &lowbar;f_dot_kinematic_car, for computing the motion model.

In [None]:
from typing import List, Type

import numpy as np
import numpy.typing as npt

from nuplan.actor_state.ego_state import DynamicCarState, EgoState
from nuplan.actor_state.state_representation import StateSE2, StateVector2D, TimePoint
from nuplan.actor_state.vehicle_parameters import VehicleParameters, get_pacifica_parameters
from nuplan.maps.abstract_map import AbstractMap
from nuplan.planning.simulation.history.simulation_history_buffer import SimulationHistoryBuffer
from nuplan.planning.simulation.observation.observation_type import Detections, Observation
from nuplan.planning.simulation.planner.abstract_planner import AbstractPlanner
from nuplan.planning.simulation.simulation_manager.simulation_iteration import SimulationIteration
from nuplan.planning.simulation.trajectory.interpolated import InterpolatedTrajectory
from nuplan.planning.simulation.trajectory.trajectory import AbstractTrajectory


def _f_dot_kinematic_car(state: EgoState, vehicle: VehicleParameters) -> EgoState:
    """
    Compute x_dot = f(x) for a kinematic car

    :param state for which to compute motion model
    :param vehicle parameters
    """
    lf = vehicle.front_length - vehicle.cog_position_from_rear_axle
    lr = vehicle.rear_length + vehicle.cog_position_from_rear_axle
    beta = np.arctan2(lr * np.tan(state.tire_steering_angle), (lr + lf))

    car_speed = np.hypot(state.dynamic_car_state.rear_axle_velocity_2d.x,
                         state.dynamic_car_state.rear_axle_velocity_2d.y)

    return EgoState.from_raw_params(
        pose=StateSE2(x=car_speed * np.cos(beta + state.rear_axle.heading),
                      y=car_speed * np.sin(beta + state.rear_axle.heading),
                      heading=(car_speed / lr) * np.sin(beta)),
        velocity_2d=state.dynamic_car_state.rear_axle_acceleration_2d,
        acceleration_2d=StateVector2D(0.0, 0.0),
        tire_steering_angle=0.0,
        time_point=state.time_point)


class SimplePlanner(AbstractPlanner):
    """
    Planner going straight
    """

    def __init__(self,
                 horizon_seconds: float = 10.0,
                 sampling_time: float = 0.25,
                 acceleration: npt.NDArray[np.float32] = [0.0, 0.0],  # x (longitudinal), y (lateral)
                 max_velocity: float = 5.0,
                 steering_angle: float = 0.0):
        self.horizon_seconds = horizon_seconds
        self.sampling_time = sampling_time
        self.acceleration = StateVector2D(acceleration[0], acceleration[1])
        self.max_velocity = max_velocity
        self.steering_angle = steering_angle

    def initialize(self,
                   expert_goal_state: StateSE2,
                   mission_goal: StateSE2,
                   map_name: str,
                   map_api: AbstractMap) -> None:
        """ Inherited, see superclass. """
        pass

    def name(self) -> str:
        """ Inherited, see superclass. """
        return self.__class__.__name__

    def observation_type(self) -> Type[Observation]:
        """ Inherited, see superclass. """
        return Detections  # type: ignore

    def compute_trajectory(self, iteration: SimulationIteration,
                           history: SimulationHistoryBuffer) -> AbstractTrajectory:
        """
        Implement a trajectory that goes straight.
        Inherited, see superclass.
        """
        ego_state = history.ego_states[-1]
        state = EgoState(car_footprint=ego_state.car_footprint,
                         dynamic_car_state=DynamicCarState(ego_state.car_footprint.rear_axle_to_center_dist,
                                                           ego_state.dynamic_car_state.rear_axle_velocity_2d,
                                                           self.acceleration),
                         tire_steering_angle=self.steering_angle,
                         time_point=ego_state.time_point)
        trajectory: List[EgoState] = [state]
        for time_in_future in np.arange(iteration.time_us + self.sampling_time * 1e6,
                                        iteration.time_us + self.horizon_seconds * 1e6,
                                        self.sampling_time * 1e6):

            state_dot = _f_dot_kinematic_car(state, get_pacifica_parameters())
            next_point_x = state.rear_axle.x + state_dot.rear_axle.x * self.sampling_time
            next_point_y = state.rear_axle.y + state_dot.rear_axle.y * self.sampling_time
            next_point_heading = state.rear_axle.heading + state_dot.rear_axle.heading * self.sampling_time

            if state.dynamic_car_state.speed > self.max_velocity:
                state_dot.velocity_x = 0.0
                state_dot.velocity_y = 0.0

            next_point_velocity_x = np.fmax(0, state.dynamic_car_state.rear_axle_velocity_2d.x +
                                            state_dot.dynamic_car_state.rear_axle_acceleration_2d.x *
                                            self.sampling_time)
            next_point_velocity_y = np.fmax(0,
                                            state.dynamic_car_state.rear_axle_velocity_2d.y +
                                            state_dot.dynamic_car_state.rear_axle_acceleration_2d.y *
                                            self.sampling_time)
            state = EgoState.from_raw_params(
                StateSE2(next_point_x, next_point_y, next_point_heading),
                velocity_2d=StateVector2D(next_point_velocity_x,
                                          next_point_velocity_y),
                acceleration_2d=state.dynamic_car_state.rear_axle_acceleration_2d,
                tire_steering_angle=state.tire_steering_angle,
                time_point=TimePoint(time_in_future))

            trajectory.append(state)

        return InterpolatedTrajectory(trajectory)

# 2) Simulating the planner <a name="simulation"></a>

To run this planner in simulation, we prepare a simulation config file,

In [None]:
# Location of paths with all simulation configs
BASE_CONFIG_PATH = os.path.join(os.getenv('NUPLAN_TUTORIAL_PATH', ''), '../nuplan/planning/script')
COMMON_DIR = "file://" + os.path.join(BASE_CONFIG_PATH, 'config', 'common')
EXPERIMENT_DIR = "file://" + os.path.join(BASE_CONFIG_PATH, 'experiments')
CONFIG_PATH = os.path.join(BASE_CONFIG_PATH, 'config', 'simulation')
CONFIG_NAME = 'default_simulation'

# Create a temporary directory to store the simulation artifacts
SAVE_DIR = tempfile.mkdtemp()

# Select simulation parameters
EGO_CONTROLLER = 'log_play_back_controller'  # [log_play_back_controller, perfect_tracking_controller]
OBSERVATION = 'box_observation'  # [box_observation, idm_agents_observation, lidar_pc_observation]
DATASET_PARAMS = [
    'scenario_builder=nuplan_mini',  # use nuplan mini database (2.5h of 8 autolabeled logs in Las Vegas)
    'scenario_builder/nuplan/scenario_filter=one_continuous_log',  # simulate only one log
    'scenario_builder.nuplan.scenario_filter.log_names="[2021.05.27.14.27.08_g1p-veh-2035]"',  # select one log from test set of nuplan mini
    'scenario_builder.nuplan.scenario_filter.limit_scenarios_per_type=50',  # use 50 scenarios per type
    'scenario_builder.nuplan.scenario_filter.subsample_ratio=0.05',  # subsample 20s scenario from 20Hz to 1Hz
]

# Initialize configuration management system
hydra.core.global_hydra.GlobalHydra.instance().clear()  # reinitialize hydra if already initialized
hydra.initialize(config_path=CONFIG_PATH)

# Compose the configuration
cfg = hydra.compose(config_name=CONFIG_NAME, overrides=[
    f'group={SAVE_DIR}',
    f'experiment_name=planner_tutorial',
    f'ego_controller={EGO_CONTROLLER}',
    f'observation={OBSERVATION}',
    f'hydra.searchpath=[{COMMON_DIR}, {EXPERIMENT_DIR}]',
    *DATASET_PARAMS,
])

...instantiate the planner,

In [None]:
planner = SimplePlanner()

...and launch the simulation.

In [None]:
from nuplan.planning.script.run_simulation import run_simulation as main_simulation


# Run the simulation loop (real-time visualization not yet supported, see next section for visualization)
main_simulation(cfg, planner)

# Fetch the filesystem location of the simulation results file for visualization in nuBoard (next section)
results_dir = list(list(Path(SAVE_DIR).iterdir())[0].iterdir())[0]  # get the child dir 2 levels in
simulation_file = [str(file) for file in results_dir.iterdir() if file.is_file() and file.suffix == '.nuboard'][0]

# 3) Visualization <a name="dashboard"></a>

To visualize and evaluate the planner using nuBoard, we first prepare the nuBoard config,

In [None]:
# Location of path with all nuBoard configs
CONFIG_PATH = os.path.join(BASE_CONFIG_PATH, 'config/nuboard')
CONFIG_NAME = 'default_nuboard'


# Initialize configuration management system
hydra.core.global_hydra.GlobalHydra.instance().clear()  # reinitialize hydra if already initialized
hydra.initialize(config_path=CONFIG_PATH)

# Compose the configuration
cfg = hydra.compose(config_name=CONFIG_NAME, overrides=[
    'scenario_builder=nuplan_mini',  # set the database (same as simulation) used to fetch data for visualization
    f'simulation_path={simulation_file}',  # nuboard file path, if left empty the user can open the file inside nuBoard
    # Make sure that the notebook working directory is "/notebooks" and that Jupyter was launched at the root of the repo
    'resource_prefix=/notebooks/nuplan/planning/nuboard/',  # hack to pass CSS resources to the notebook
    f'hydra.searchpath=[{COMMON_DIR}, {EXPERIMENT_DIR}]',
])

...and then launch nuBoard.

In [None]:
from bokeh.io import show, output_notebook
from nuplan.planning.script.run_nuboard import initialize_nuboard
from nuplan.planning.script.run_nuboard import main as main_nuboard

EMBED_NUBOARD = True  # set to False to launch in separate window

# embed nuBoard in notebook
if EMBED_NUBOARD:
    # Run the nuBoard
    output_notebook()
    nuboard = initialize_nuboard(cfg)
    show(nuboard.main_page)

# launch nuBoard in separate window
else:
    # Run nuBoard
    main_nuboard(cfg)