## Dependencies

In [None]:
!pip install -q gymnasium stable_baselines3

In [None]:
!git clone https://gitlab.unimelb.edu.au/ai4r/ai4r-gym.git

In [None]:
%cd ai4r-gym

## Library Imports

In [None]:
import ai4rgym
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from stable_baselines3 import PPO, SAC, DDPG
from utils import ensure_dir, ensure_dirs, eval_model, evaluate_policy
from utils import plot_rewards, plot_and_animate_trajectory

## Environment Settings

### Specify the Vehicle Parameters

Dictionary with car specifications,
in the form of a dynamic bicycle model

In [None]:
# SPECIFY THE VEHCILE PARAMETERS
bicycle_model_parameters = {
    "Lf" : 0.55*2.875,
    "Lr" : 0.45*2.875,
    "m"  : 2000.0,
    "Iz" : (1.0/12.0) * 2000.0 * (4.692**2+1.850**2),
    "Cm" : (1.0/100.0) * (1.0 * 400.0 * 9.0) / 0.2286,
    "Cd" : 0.5 * 0.24 * 2.2204 * 1.202,
    "delta_offset" : 0 * np.pi/180,
    "delta_request_max" : 45 * np.pi/180,
    "Ddelta_lower_limit" : -45 * np.pi/180,
    "Ddelta_upper_limit" :  45 * np.pi/180,
    "v_transition_min" : 500.0 / 3.6,
    "v_transition_max" : 600.0 / 3.6,
    "body_len_f" : (0.55*2.875) * 1.5,
    "body_len_r" : (0.45*2.875) * 1.5,
    "body_width" : 2.50,
}

### Specify the Road


Specified as a list of dictionaries, where each
element in the list specifies a segment of the road.
Example segment dictionaries:
```Python
{"type":"straight", "length":3.0}
{"type":"curved", "curvature":1/50.0, "angle_in_degrees":45.0}
{"type":"curved", "curvature":1/50.0, "length":30.0}
```

In [None]:
road_elements_list = [
    {"type":"straight", "length":100.0, "v_max_kph":60.0},
    {"type":"curved", "curvature":1/800.0, "angle_in_degrees":15.0, "v_max_kph":50.0},
    {"type":"straight", "length":100.0, "v_max_kph":60.0},
    {"type":"curved", "curvature":-1/400.0, "angle_in_degrees":30.0, "v_max_kph":40.0},
    {"type":"straight", "length":100.0, "v_max_kph":60.0},
]

### Specify the Numerical Integration Details

The options available for the numerical
integration method are:
`["euler", "huen", "midpoint", "rk4", "rk45"]`


In [None]:
numerical_integration_parameters = {
    "method" : "rk4",
    "Ts" : 0.05,
    "num_steps_per_Ts" : 1,
}

###  Specify the Initial State Distribution

The initial state is sampled from a uniform
distribution between the minimum and maximum
(i.e., between lower and upper bounds)
> Note: a factor of (1/3.6) converts from units of [km/h] to [m/s]

In [None]:
py_init_min = -1.0
py_init_max =  1.0

v_init_min_in_kmh = 55.0
v_init_max_in_kmh = 65.0

py_init_min = -1.0
py_init_max =  1.0

v_init_min_in_kmh = 55.0
v_init_max_in_kmh = 65.0

initial_state_bounds = {
    "px_init_min" : 0.0,
    "px_init_max" : 0.0,
    "py_init_min" : py_init_min,
    "py_init_max" : py_init_max,
    "theta_init_min" : 0.0,
    "theta_init_max" : 0.0,
    "vx_init_min" : v_init_min_in_kmh * (1.0/3.6),
    "vx_init_max" : v_init_max_in_kmh * (1.0/3.6),
    "vy_init_min" : 0.0,
    "vy_init_max" : 0.0,
    "omega_init_min" : 0.0,
    "omega_init_max" : 0.0,
    "delta_init_min" : 0.0,
    "delta_init_max" : 0.0,
}

### Specify the Observation Parameters

In [None]:
# SPECIFY THE OBSERVATION PARAMETERS
observation_parameters = {
    "should_include_ground_truth_px"                       :  "info",
    "should_include_ground_truth_py"                       :  "info",
    "should_include_ground_truth_theta"                    :  "info",
    "should_include_ground_truth_vx"                       :  "info",
    "should_include_ground_truth_vy"                       :  "info",
    "should_include_ground_truth_omega"                    :  "info",
    "should_include_ground_truth_delta"                    :  "info",
    "should_include_road_progress_at_closest_point"        :  "info",
    "should_include_vx_sensor"                             :  "info",
    "should_include_distance_to_closest_point"             :  "obs",
    "should_include_heading_angle_relative_to_line"        :  "obs",
    "should_include_heading_angular_rate_gyro"             :  "info",
    "should_include_closest_point_coords_in_body_frame"    :  "info",
    "should_include_look_ahead_line_coords_in_body_frame"  :  "info",
    "should_include_road_curvature_at_closest_point"       :  "obs",
    "should_include_look_ahead_road_curvatures"            :  "info",
    "should_include_speed_limit"                           :  "obs",
    "should_include_recommended_speed"                     :  "obs",
    

    "scaling_for_ground_truth_px"                       :  1.0,
    "scaling_for_ground_truth_py"                       :  1.0,
    "scaling_for_ground_truth_theta"                    :  1.0,
    "scaling_for_ground_truth_vx"                       :  1.0,
    "scaling_for_ground_truth_vy"                       :  1.0,
    "scaling_for_ground_truth_omega"                    :  1.0,
    "scaling_for_ground_truth_delta"                    :  1.0,
    "scaling_for_road_progress_at_closest_point"        :  1.0,
    "scaling_for_vx_sensor"                             :  1.0,
    "scaling_for_distance_to_closest_point"             :  1.0,
    "scaling_for_heading_angle_relative_to_line"        :  1.0,
    "scaling_for_heading_angular_rate_gyro"             :  1.0,
    "scaling_for_closest_point_coords_in_body_frame"    :  1.0,
    "scaling_for_look_ahead_line_coords_in_body_frame"  :  1.0,
    "scaling_for_road_curvature_at_closest_point"       :  1.0,
    "scaling_for_look_ahead_road_curvatures"            :  1.0,
    "scaling_for_speed_limit"                           :  1.0,
    "scaling_for_recommended_speed"                     :  1.0,

    "vx_sensor_bias"    : 0.0,
    "vx_sensor_stddev"  : 0.1,

    "distance_to_closest_point_bias"    :  0.0,
    "distance_to_closest_point_stddev"  :  0.01,

    "heading_angle_relative_to_line_bias"    :  0.0,
    "heading_angle_relative_to_line_stddev"  :  0.01,

    "heading_angular_rate_gyro_bias"    :  0.0,
    "heading_angular_rate_gyro_stddev"  :  0.01,

    "closest_point_coords_in_body_frame_bias"    :  0.0,
    "closest_point_coords_in_body_frame_stddev"  :  0.0,

    "look_ahead_line_coords_in_body_frame_bias"    :  0.0,
    "look_ahead_line_coords_in_body_frame_stddev"  :  0.0,

    "road_curvature_at_closest_point_bias"    :  0.0,
    "road_curvature_at_closest_point_stddev"  :  0.0,

    "look_ahead_road_curvatures_bias"    :  0.0,
    "look_ahead_road_curvatures_stddev"  :  0.0,

    "look_ahead_line_coords_in_body_frame_distance"    :  100.0,
    "look_ahead_line_coords_in_body_frame_num_points"  :  10,
}

### Specify Termination Parameters

In [None]:
termination_parameters = {
    "speed_lower_bound"  :  0.0,
    "speed_upper_bound"  :  (200.0/3.6),
    "distance_to_closest_point_upper_bound"  :  20.0,
    "reward_speed_lower_bound"  :  0.0,
    "reward_speed_upper_bound"  :  0.0,
    "reward_distance_to_closest_point_upper_bound"  :  0.0,
}

## Environment Definition

### Initialise the Autonomous Driving Environment

Options available for the "render_mode" are:
`["matplotlib", None]`

In [None]:
env = gym.make(
    "ai4rgym/autonomous_driving_env",
    render_mode=None,
    bicycle_model_parameters=bicycle_model_parameters,
    road_elements_list=road_elements_list,
    numerical_integration_parameters=numerical_integration_parameters,
    termination_parameters=termination_parameters,
    initial_state_bounds=initial_state_bounds,
    observation_parameters=observation_parameters,
)

### Additional Setup

In [None]:
# > Time increment per simulation step (units: seconds)
Ts_sim = 0.05

# Specify the integration method to simulate
integration_method = "rk4"

# Set the integration method and Ts of the gymnasium
env.unwrapped.set_integration_method(integration_method)
env.unwrapped.set_integration_Ts(Ts_sim)
# Set the road condition
env.unwrapped.set_road_condition(road_condition="wet")

env = gym.wrappers.RescaleAction(env, min_action=-1, max_action=1)

### Verify Environment

In [None]:
# Reset the gymnasium
# > which also returns the first observation
observation, info_dict = env.reset()

In [None]:
observation

In [None]:
info_dict

In [None]:
env.observation_space

In [None]:
env.action_space

In [None]:
random_action = env.action_space.sample()

In [None]:
observation, reward, terminated, truncated, info = env.step(random_action)

In [None]:
observation

In [None]:
reward

In [None]:
info

### Reward Wrapper

In [None]:
from typing import Optional


class CustomRewardWrapper(gym.Wrapper):
    """
    A custom wrapper that reconstructs the environment's previous reward logic
    (progress, deviation from line, speed shaping, and termination rewards)
    while the base environment returns zero reward. This keeps reward shaping
    out of the env and makes it configurable/replaceable.
    """

    def __init__(self, env: gym.Env, cfg: Optional[dict] = None):
        super().__init__(env)
        self.cfg = cfg or {}
        # Coefficients for previous env reward composition
        # Previous env used: 0.0*progress + 100.0*deviation + 1.0*speed + termination
        # Maintain same defaults here; allow override via cfg
        self.k_progress = self.cfg.get("k_progress", 0.0)
        self.k_deviation = self.cfg.get("k_deviation", 100.0)
        self.k_speed = self.cfg.get("k_speed", 1.0)
        # Bonus when finished (not present previously, default 0)
        self.finished_bonus = self.cfg.get("finished_bonus", 0.0)
        # Timeout penalty if a TimeLimit truncation occurs
        self.timeout_penalty = self.cfg.get("timeout_penalty", 0.0)

    # Standalone re-implementation of previous reward shaping terms
    @staticmethod
    def reward_for_distance_to_line(d: float) -> float:
        """
        Mirror AutonomousDrivingEnv.compute_default_reward_for_distance_to_line
        d >= 0: distance magnitude to center line.
        """
        a = 3.0
        b = 1.0
        c = 2.0
        if d < 0.5:
            return a * (1 - d**2)
        elif 0.5 <= d < 2:
            return b * (2 - d) ** 2
        else:
            return -c * (d - 2) ** 3

    @staticmethod
    def reward_for_speed(speed_mps: float, recommended_speed_mps: float) -> float:
        """
        Quadratic speed reward centered on the road’s recommended speed.

        - Inputs and output use SI units (m/s).
        - Returns a high reward near the recommended speed and decreases
          quadratically as the measured speed deviates from it.
        
        NOTE: you could optionally factor in the speed limit to apply 
        an additional penalty for exceeding it.
        """
        # If no recommendation is available, return neutral reward (0.0)
        if not (recommended_speed_mps is not None) or recommended_speed_mps <= 0:
            return 0.0
        a = 6.0 / 1000.0  # ≈ 0.006, a simple fraction that works well
        b = 300.0
        dv = float(speed_mps) - float(recommended_speed_mps)
        return -a * (dv ** 2) + b

    def step(self, action):
        obs, base_r, terminated, truncated, info = self.env.step(action)
        # Base env reward is intentionally zero; we reconstruct externally
        r = 0.0

        # We need values used by the original env reward terms.
        # These are available via env.current_ground_truth and the car state.
        # Access safely; if missing, default to zeros.
        gt = getattr(self.env, "current_ground_truth", {}) or {}
        progress_at_closest_point = float(gt.get("road_progress_at_closest_point", 0.0))
        distance_to_closest_point = float(abs(gt.get("distance_to_closest_point", 0.0)))

        # The env used previous_progress_at_closest_p internally; in the wrapper
        # we track our own previous progress to compute progress_reward.
        if not hasattr(self, "_prev_progress"):
            self._prev_progress = progress_at_closest_point
        progress_reward = progress_at_closest_point - self._prev_progress
        self._prev_progress = progress_at_closest_point

        # Speed shaping (m/s) relative to road recommended speed (m/s)
        vx_mps = float(getattr(getattr(self.env, "car", None), "vx", 0.0))
        recommended_speed_mps = float(gt.get("recommended_speed_at_closest_point", 0.0))
        # recommended_speed_mps = 60.0 / 3.6

        deviation_reward = self.reward_for_distance_to_line(distance_to_closest_point)
        speed_reward = self.reward_for_speed(vx_mps, recommended_speed_mps)

        r += self.k_progress * progress_reward
        r += self.k_deviation * deviation_reward
        r += self.k_speed * speed_reward

        # Termination-based rewards/penalties
        term = info.get("termination", {}) if isinstance(info, dict) else {}
        term_cfg = info.get("termination_rewards", {}) if isinstance(info, dict) else {}
        if terminated:
            if term.get("speed_high"):
                r += float(term_cfg.get("speed_upper_bound", 0.0))
            if term.get("speed_low"):
                r += float(term_cfg.get("speed_lower_bound", 0.0))
            if term.get("off_track"):
                r += float(term_cfg.get("off_track", 0.0))
            if term.get("finished"):
                r += float(self.finished_bonus)

        # Time-limit truncation (Gymnasium's TimeLimit sets this flag in info)
        if truncated and isinstance(info, dict) and info.get("TimeLimit.truncated", False):
            r += float(self.timeout_penalty)

        return obs, r, terminated, truncated, info

    def reset(self, **kwargs):
        res = self.env.reset(**kwargs)
        # Reset wrapper progress tracker
        self._prev_progress = 0.0
        # After reset, try to align with current env ground-truth if available
        gt = getattr(self.env, "current_ground_truth", {}) or {}
        self._prev_progress = float(gt.get("road_progress_at_closest_point", 0.0))
        return res

Create a new environment and wrap it with the Reward Wrapper

In [None]:
env = gym.make(
    "ai4rgym/autonomous_driving_env",
    render_mode=None,
    bicycle_model_parameters=bicycle_model_parameters,
    road_elements_list=road_elements_list,
    numerical_integration_parameters=numerical_integration_parameters,
    termination_parameters=termination_parameters,
    initial_state_bounds=initial_state_bounds,
    observation_parameters=observation_parameters,
)

# > Time increment per simulation step (units: seconds)
Ts_sim = 0.05

# Specify the integration method to simulate
integration_method = "rk4"

# Set the integration method and Ts of the gymnasium
env.unwrapped.set_integration_method(integration_method)
env.unwrapped.set_integration_Ts(Ts_sim)
# Set the road condition
env.unwrapped.set_road_condition(road_condition="wet")

env = gym.wrappers.RescaleAction(env, min_action=-1, max_action=1)

cfg = { 'k_deviation': 100.0, 'k_speed': 1.0}
env = CustomRewardWrapper(env, cfg=cfg)

Let's check the reward now:

In [None]:
observation, info = env.reset()
random_action = env.action_space.sample()
observation, reward, terminated, truncated, info = env.step(random_action)
print(reward)

## Model Definition and Setup

In [None]:
model_name = "PPO_SL"

TIMESTEPS_PER_EPOCH = 50000
N_EPOCHS = 10

logdir = "logs"
models_dir = f"models/{model_name}"
figs_dir = f"models/{model_name}/figs"

ensure_dirs([logdir, models_dir, figs_dir])

In [None]:
model = PPO("MultiInputPolicy", env, verbose = 1)
#model = SAC("MultiInputPolicy", env, verbose = 1)
#model = DDPG("MultiInputPolicy", env, verbose = 1)
#model = TD3("MultiInputPolicy", env, verbose = 1)
#model = A2C("MultiInputPolicy", env, verbose = 1)

## Training Loop

In [None]:
model = PPO("MultiInputPolicy", env, verbose = 1, tensorboard_log=logdir)
epoch = 1
N_EPOCHS = 20

for i in range(epoch, N_EPOCHS + 1):
    model.learn(
        total_timesteps=TIMESTEPS_PER_EPOCH,
        reset_num_timesteps=False,
        tb_log_name=f"{model_name}"
    )
    model.save(f"{models_dir}/{TIMESTEPS_PER_EPOCH * i}")
    eval_model(env, model, figs_dir, TIMESTEPS_PER_EPOCH * i)
    epoch += 1

## Evaluating the Policy

In [None]:
## TODO: THESE CELLS NEED TO BE REPLACED WITH THE NEW TESTING CODE

model_name = "PPO"
model_idx = 500000

Ts = numerical_integration_parameters["Ts"]
path_for_saving_figures = f"models/{model_name}/eval"
ensure_dir(path_for_saving_figures)

# Model Definition
#model = PPO("MultiInputPolicy", env, verbose = 1)

# Load Model
#model_path = f'models/{model_name}/{str(model_idx)}.zip'
#print("Loading saved model ..")
#model = PPO.load(model_path, env = env)

### Test Reward Wrapper

In [None]:
## TODO: THESE CELLS NEED TO BE REPLACED WITH THE NEW TESTING CODE

class TestRewardWrapper(gym.Wrapper):
    def __init__(self, env):
        super(TestRewardWrapper, self).__init__(env)

    def step(self, action):
        observation, reward, terminated, truncated, info = self.env.step(action)

        # Create your Custom Test reward here
        distance_to_line = abs(observation["distance_to_closest_point"])
        # Reward = 1 only if the car is within 1 meters of the line
        if distance_to_line < 1:
            reward = 1.0
        else:
            reward = 0

        return observation, reward, terminated, truncated, info

env = TestRewardWrapper(env)

### Evaluation

In [None]:
## TODO: THESE CELLS NEED TO BE REPLACED WITH THE NEW TESTING CODE

print("Evaluating model .. ")
trajectory, rewards = evaluate_policy(
    env, model, max_steps = 5000, return_rewards = True)

total_reward = sum(rewards)
avg_reward = total_reward/len(rewards)

print(f"Evaluation Complete. Total Reward for Episode: {total_reward:.0f}, Avg Reward/Step: {avg_reward:.2f}")
fig = plot_rewards(rewards)
fig.savefig(f"{path_for_saving_figures}/test_rewards.png")

print("Plotting and animating trajectory .. ")
ani = plot_and_animate_trajectory(
    env, trajectory, Ts, path_for_saving_figures
)

In [None]:
from IPython.display import HTML
HTML(ani.to_jshtml())

## Training Policy with Internal Lane Keeping Enabled

In [None]:
# Create a fresh env with ONLY internal lane keeping enabled
internal_policy_config = {
    "enable_lane_keep": True,
    "enable_cruise_control": False,
}

env_lk = gym.make(
    "ai4rgym/autonomous_driving_env",
    render_mode=None,
    bicycle_model_parameters=bicycle_model_parameters,
    road_elements_list=road_elements_list,
    numerical_integration_parameters=numerical_integration_parameters,
    termination_parameters=termination_parameters,
    initial_state_bounds=initial_state_bounds,
    observation_parameters=observation_parameters,
    internal_policy_config=internal_policy_config,
)

# Match earlier setup
Ts_sim = 0.05
integration_method = "rk4"
env_lk.unwrapped.set_integration_method(integration_method)
env_lk.unwrapped.set_integration_Ts(Ts_sim)
env_lk.unwrapped.set_road_condition(road_condition="wet")

# Action scaling and reward shaping wrapper (same config as before)
env_lk = gym.wrappers.RescaleAction(env_lk, min_action=-1, max_action=1)
reward_cfg = {"k_deviation": 100.0, "k_speed": 1.0}
env_lk = CustomRewardWrapper(env_lk, cfg=reward_cfg)

# Rename model/run for this experiment
model_name_lk = "PPO_LK"
models_dir_lk = f"models/{model_name_lk}"
figs_dir_lk = f"{models_dir_lk}/figs"
ensure_dirs([models_dir_lk, figs_dir_lk])

# Build model and train using the same epoch/timestep structure
model_lk = PPO("MultiInputPolicy", env_lk, verbose=1, tensorboard_log=logdir)

start_epoch = 1
N_EPOCHS_LK = 10  # follow previous pattern; adjust if desired

for i in range(start_epoch, N_EPOCHS_LK + 1):
    model_lk.learn(
        total_timesteps=TIMESTEPS_PER_EPOCH,
        reset_num_timesteps=False,
        tb_log_name=f"{model_name_lk}",
    )
    step_count = TIMESTEPS_PER_EPOCH * i
    model_lk.save(f"{models_dir_lk}/{step_count}")
    eval_model(env_lk, model_lk, figs_dir_lk, step_count)
    print(f"[LK] Finished epoch {i}/{N_EPOCHS_LK} | saved at step {step_count}")

## Training Policy with Internal Cruise Control Enabled

In [None]:
# Create a fresh env with ONLY internal cruise control enabled
internal_policy_config_cc = {
    "enable_lane_keep": False,
    "enable_cruise_control": True,
    "cruise_use_recommended_speed": True,  # Use recommended speed for cruise control
}

env_cc = gym.make(
    "ai4rgym/autonomous_driving_env",
    render_mode=None,
    bicycle_model_parameters=bicycle_model_parameters,
    road_elements_list=road_elements_list,
    numerical_integration_parameters=numerical_integration_parameters,
    termination_parameters=termination_parameters,
    initial_state_bounds=initial_state_bounds,
    observation_parameters=observation_parameters,
    internal_policy_config=internal_policy_config_cc,
)

# Match earlier setup
Ts_sim = 0.05
integration_method = "rk4"
env_cc.unwrapped.set_integration_method(integration_method)
env_cc.unwrapped.set_integration_Ts(Ts_sim)
env_cc.unwrapped.set_road_condition(road_condition="wet")

env_cc = gym.wrappers.RescaleAction(env_cc, min_action=-1, max_action=1)
reward_cfg = {"k_deviation": 100.0, "k_speed": 1.0}
env_cc = CustomRewardWrapper(env_cc, cfg=reward_cfg)

# Rename model/run for this experiment
model_name_cc = "PPO_CC"
models_dir_cc = f"models/{model_name_cc}"
figs_dir_cc = f"{models_dir_cc}/figs"
ensure_dirs([models_dir_cc, figs_dir_cc])

# Build model and train using the same epoch/timestep structure
model_cc = PPO("MultiInputPolicy", env_cc, verbose=1, tensorboard_log=logdir)

start_epoch = 1
N_EPOCHS_CC = 10  # follow previous pattern; adjust if desired

for i in range(start_epoch, N_EPOCHS_CC + 1):
    model_cc.learn(
        total_timesteps=TIMESTEPS_PER_EPOCH,
        reset_num_timesteps=False,
        tb_log_name=f"{model_name_cc}",
    )
    step_count = TIMESTEPS_PER_EPOCH * i
    model_cc.save(f"{models_dir_cc}/{step_count}")
    eval_model(env_cc, model_cc, figs_dir_cc, step_count)
    print(f"[CC] Finished epoch {i}/{N_EPOCHS_CC} | saved at step {step_count}")