In [1]:
!pip install stable-baselines3
!pip install metadrive-simulator

Collecting stable-baselines3
  Downloading stable_baselines3-2.3.2-py3-none-any.whl.metadata (5.1 kB)
Collecting gymnasium<0.30,>=0.28.1 (from stable-baselines3)
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium<0.30,>=0.28.1->stable-baselines3)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading stable_baselines3-2.3.2-py3-none-any.whl (182 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m182.3/182.3 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m24.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium, stable-baselines3
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1 stable-baselines3-2.3

In [2]:
# %load_ext tensorboard
# %tensorboard --logdir ./ppo_metadrive_tensorboard/

### Custom reward example

In [None]:
# Import necessary libraries
import gym
import numpy as np
import torch
from metadrive import MetaDriveEnv
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.utils import set_random_seed
from functools import partial

# --- Custom Environment Class ---
class CustomMetaDriveEnv(MetaDriveEnv):
    @classmethod
    def default_config(cls):
        config = MetaDriveEnv.default_config()
        # Disable image observations and rendering
        config.update({
            "image_observation": False,
            "use_render": False,
            # "num_agents": 1,
            # "is_multi_agent": False,
            # "manual_control": False,
            # "agent_policy": None,
            # Other necessary configurations
        })
        # Configure vehicle sensors
        config["vehicle_config"].update({
            "lidar": {
                "num_lasers": 72,
                "distance": 50,
                "num_others": 0,
            },
            "side_detector": {"num_lasers": 0},
            "lane_line_detector": {"num_lasers": 0},
            "use_navigation": True,
            "max_speed": 20.0,
        }, allow_add_new_key=True)
        return config

    def __init__(self, config=None):
        if config is None:
            config = self.default_config()
        super(CustomMetaDriveEnv, self).__init__(config)
        self.speed_limit = self.config["vehicle_config"]["max_speed"]

    def reset(self, **kwargs): # Add **kwargs to accept additional keyword arguments
        # Ensure that the agent is properly initialized
        obs = super(CustomMetaDriveEnv, self).reset(**kwargs) # Pass kwargs to the superclass reset() method
        if len(self.engine.agents) == 0:
            self.setup_engine()
            self.engine.setup_world()
            self._add_agents()
        return obs

    def _add_agents(self):
        # Add a single agent to the environment
        self.default_agent = self.spawn_object(
            self.config["vehicle_config"]["vehicle_class"],
            vehicle_config=self.config["vehicle_config"]
        )
        self.agent_ids = ["default_agent"]
        self.engine.agents["default_agent"] = self.default_agent

    def reward_function(self, vehicle_id: str):
        # Get the default reward and reward_info from the base class
        default_reward, reward_info = super(CustomMetaDriveEnv, self).reward_function(vehicle_id)
        total_reward = default_reward
        vehicle = self.engine.agents[vehicle_id]

        # --- Speed Limit Penalty ---
        speed_penalty = 0.0
        if vehicle.speed > self.speed_limit:
            speed_penalty = - (vehicle.speed - self.speed_limit) * 0.1
            total_reward += speed_penalty

        # --- Lane Keeping Reward ---
        lane_reward = 0.0
        if hasattr(vehicle, 'navigation') and vehicle.navigation.current_ref_lanes:
            target_lane = vehicle.navigation.current_ref_lanes[0]
            if vehicle.lane == target_lane:
                lane_reward = 0.05
                total_reward += lane_reward

        # Update reward_info with custom information
        reward_info['speed_penalty'] = speed_penalty
        reward_info['lane_reward'] = lane_reward

        return total_reward, reward_info

# --- Environment Creation Function ---
def create_env(seed=0):
    env = CustomMetaDriveEnv({
        # Map Configuration:
        "map": "Cr",  # Use a predefined map
        # Traffic Configuration:
        # "traffic_density": 0.1,
        # # Accident Probability:
        # "accident_prob": 0,
        # # Episode Horizon:
        # "horizon": 1000,
        # # Spawn Configuration:
        # "random_spawn_lane_index": False,
        # # Seed Configuration:
        # "start_seed": seed,
        # # Traffic Mode:
        # "traffic_mode": "trigger",
        # # Environment Variety:
        # "num_scenarios": 100,
        # Agent Configuration:
        # "num_agents": 1,
        # "is_multi_agent": False,
        # "manual_control": False,
        # "agent_policy": None,
    })
    return Monitor(env)

# --- Set Random Seed for Reproducibility ---
set_random_seed(0)

# --- Number of Parallel Environments ---
num_envs = 1  # Since we're using DummyVecEnv

steps = 2048  # Number of steps per environment per update

# --- Create the Vectorized Environment ---
train_env = DummyVecEnv([partial(create_env) for i in range(num_envs)])

# --- Verify the Observation Space ---
obs = train_env.reset()
print("Observation shape:", obs.shape)
print("Observation space:", train_env.observation_space)
print("Observation dtype:", obs.dtype)

# --- Define Policy Keyword Arguments with Shared Layers ---
policy_kwargs = dict(
    net_arch=[
        {"shared": [256, 256]},
        dict(
            pi=[128, 64],  # Policy head with layers of 128 and 64 units
            vf=[128, 64]   # Value head with layers of 128 and 64 units
        )
    ]
)

# --- Instantiate the PPO Model ---
model = PPO(
    policy="MlpPolicy",
    env=train_env,
    policy_kwargs=policy_kwargs,
    n_steps=steps,
    batch_size=steps * num_envs // 2,  # Keeping batch_size = steps * num_envs // 2
    learning_rate=3e-4,
    gamma=0.99,
    gae_lambda=0.95,
    ent_coef=0.0,
    clip_range=0.2,
    verbose=1,
    tensorboard_log="./ppo_metadrive_tensorboard/",
    device="cuda" if torch.cuda.is_available() else "cpu",
)

# --- Train the Agent ---
model.learn(total_timesteps=1_000_000)

# --- Save the Trained Model ---
model.save("ppo_metadrive_agent")

# --- Close the Environment ---
train_env.close()


In [None]:
# train_env.close()


###Advanced Exploration Techniques (not yet working)

In [None]:
# pip install --upgrade gymnasium


In [None]:
# # Import necessary libraries
# import numpy as np
# import torch
# import torch.nn as nn
# import torch.optim as optim
# import gym
# import gymnasium as gymn
# from stable_baselines3 import PPO
# from stable_baselines3.common.vec_env import DummyVecEnv
# from stable_baselines3.common.monitor import Monitor
# from stable_baselines3.common.utils import set_random_seed
# from stable_baselines3.common.callbacks import BaseCallback
# from functools import partial
# import warnings

# from metadrive import MetaDriveEnv

# # Suppress potential warnings for cleaner output
# warnings.filterwarnings("ignore")

# # --- Define RND Module ---
# class RNDModule(nn.Module):
#     def __init__(self, obs_dim, hidden_size=256, device='cpu'):
#         super(RNDModule, self).__init__()
#         self.device = device
#         # Fixed Target Network
#         self.target = nn.Sequential(
#             nn.Linear(obs_dim, hidden_size),
#             nn.ReLU(),
#             nn.Linear(hidden_size, hidden_size),
#             nn.ReLU()
#         ).to(self.device)
#         for param in self.target.parameters():
#             param.requires_grad = False  # Freeze target network

#         # Predictor Network
#         self.predictor = nn.Sequential(
#             nn.Linear(obs_dim, hidden_size),
#             nn.ReLU(),
#             nn.Linear(hidden_size, hidden_size),
#             nn.ReLU()
#         ).to(self.device)

#     def forward(self, state):
#         state = state.to(self.device)
#         target_features = self.target(state)
#         pred_features = self.predictor(state)
#         return pred_features, target_features

# # --- Define Intrinsic Reward Callback ---
# class IntrinsicRewardCallback(BaseCallback):
#     """
#     Custom callback for logging intrinsic and total rewards.
#     """
#     def __init__(self, verbose=0):
#         super(IntrinsicRewardCallback, self).__init__(verbose)

#     def _on_step(self) -> bool:
#         # Access the last infos
#         for info in self.locals.get('infos', []):
#             if 'intrinsic_reward' in info:
#                 self.logger.record('intrinsic_reward', info['intrinsic_reward'])
#             if 'total_reward' in info:
#                 self.logger.record('total_reward', info['total_reward'])
#         return True

# # --- Define Custom Wrapper ---
# class GymnasiumToGymWrapper(gym.Env):
#     """
#     A custom wrapper to convert a Gymnasium environment to a Gym-compatible environment.
#     """
#     def __init__(self, gymnasium_env):
#         super(GymnasiumToGymWrapper, self).__init__()
#         self.gymn_env = gymnasium_env

#         # Convert observation_space from gymnasium to gym
#         if isinstance(self.gymn_env.observation_space, gymn.spaces.Box):
#             self.observation_space = gym.spaces.Box(
#                 low=self.gymn_env.observation_space.low,
#                 high=self.gymn_env.observation_space.high,
#                 dtype=self.gymn_env.observation_space.dtype
#             )
#         else:
#             raise NotImplementedError("Only Box observation spaces are supported.")

#         # Convert action_space from gymnasium to gym
#         if isinstance(self.gymn_env.action_space, gymn.spaces.Discrete):
#             self.action_space = gym.spaces.Discrete(self.gymn_env.action_space.n)
#         elif isinstance(self.gymn_env.action_space, gymn.spaces.Box):
#             self.action_space = gym.spaces.Box(
#                 low=self.gymn_env.action_space.low,
#                 high=self.gymn_env.action_space.high,
#                 dtype=self.gymn_env.action_space.dtype
#             )
#         else:
#             raise NotImplementedError("Only Discrete and Box action spaces are supported.")

#     def reset(self, **kwargs):
#         """
#         Resets the environment and returns the initial observation.
#         """
#         obs, info = self.gymn_env.reset(**kwargs)
#         return obs

#     def step(self, action):
#         """
#         Takes an action and returns the next observation, reward, done, and info.
#         """
#         obs, reward, terminated, truncated, info = self.gymn_env.step(action)
#         done = terminated or truncated
#         return obs, reward, done, info

#     def render(self, mode='human'):
#         """
#         Renders the environment.
#         """
#         return self.gymn_env.render(mode=mode)

#     def close(self):
#         """
#         Closes the environment.
#         """
#         self.gymn_env.close()

# # --- Define RND Wrapper ---
# class RNDWrapper(gymn.Env):
#     """
#     A custom wrapper to integrate Random Network Distillation (RND) intrinsic rewards.
#     """
#     def __init__(self, gymnasium_env, alpha=0.1, hidden_size=256, device='cpu'):
#         super(RNDWrapper, self).__init__()
#         self.gymn_env = gymnasium_env
#         self.alpha = alpha
#         self.device = torch.device(device)
#         self.obs_dim = int(np.prod(self.gymn_env.observation_space.shape))
#         self.rnd = RNDModule(obs_dim=self.obs_dim, hidden_size=hidden_size, device=self.device)
#         self.rnd.to(self.device)
#         self.rnd_optimizer = optim.Adam(self.rnd.predictor.parameters(), lr=1e-3)

#     def reset(self, **kwargs):
#         """
#         Resets the environment and returns the initial observation.
#         """
#         obs, info = self.gymn_env.reset(**kwargs)
#         return obs

#     def step(self, action):
#         """
#         Takes an action and returns the next observation, reward, done, and info.
#         """
#         obs, reward, terminated, truncated, info = self.gymn_env.step(action)
#         done = terminated or truncated

#         # Convert observation to tensor
#         state_tensor = torch.tensor(obs, dtype=torch.float32).to(self.device)
#         with torch.no_grad():
#             _, target_features = self.rnd(state_tensor)
#         pred_features, _ = self.rnd(state_tensor)

#         # Calculate intrinsic reward
#         intrinsic_reward = nn.MSELoss()(pred_features, target_features.detach())
#         intrinsic_reward = intrinsic_reward.item() * self.alpha

#         # Update total reward
#         total_reward = reward + intrinsic_reward

#         # Add intrinsic reward to info
#         info['intrinsic_reward'] = intrinsic_reward
#         info['total_reward'] = total_reward

#         # Compute loss and optimize predictor
#         rnd_loss = nn.MSELoss()(pred_features, target_features.detach())
#         self.rnd_optimizer.zero_grad()
#         rnd_loss.backward()
#         self.rnd_optimizer.step()

#         return obs, total_reward, done, info

#     def render(self, mode='human'):
#         """
#         Renders the environment.
#         """
#         return self.gymn_env.render(mode=mode)

#     def close(self):
#         """
#         Closes the environment.
#         """
#         self.gymn_env.close()

# # --- Environment Creation Function ---
# def create_env_with_rnd(seed=0):
#     """
#     Creates the MetaDrive environment wrapped with RND and GymnasiumToGymWrapper.
#     """
#     # Initialize the custom MetaDrive environment
#     env = MetaDriveEnv({
#         # Map Configuration:
#         "map": "Cr",  # Use a predefined map
#         # Traffic Configuration:
#         "traffic_density": 0.1,
#         # Accident Probability:
#         "accident_prob": 0,
#         # Episode Horizon:
#         "horizon": 1000,
#         # Spawn Configuration:
#         "random_spawn_lane_index": False,
#         # Seed Configuration:
#         "start_seed": seed,
#         # Traffic Mode:
#         "traffic_mode": "trigger",
#         # Environment Variety:
#         "num_scenarios": 100,
#         # Vehicle Configuration:
#         "vehicle_config": {
#             "lidar": {
#                 "num_lasers": 72,
#                 "distance": 50,
#                 "num_others": 0,
#             },
#             "side_detector": {"num_lasers": 0},
#             "lane_line_detector": {"num_lasers": 0},
#             # "max_speed": 20.0,
#         },
#         # Additional configurations if necessary
#     })

#     # Wrap with RNDWrapper
#     env = RNDWrapper(env, alpha=0.1, hidden_size=256, device="cuda" if torch.cuda.is_available() else "cpu")

#     # Wrap with GymnasiumToGymWrapper
#     env = GymnasiumToGymWrapper(env)

#     # Optionally, wrap with Monitor for logging
#     env = Monitor(env)

#     return env

# # --- Set Random Seed for Reproducibility ---
# set_random_seed(0)

# # --- Number of Parallel Environments ---
# num_envs = 1  # Adjust based on your system's capacity

# steps = 2048  # Number of steps per environment per update

# # --- Create the Vectorized Environment ---
# try:
#     train_env = DummyVecEnv([partial(create_env_with_rnd, seed=i) for i in range(num_envs)])
#     print("Vectorized environment created successfully.")
# except NotImplementedError as e:
#     print("Failed to create vectorized environment:", e)
#     # Optionally, handle the error or exit
#     raise e

# # --- Verify the Observation Space ---
# obs = train_env.reset()
# print("Observation shape:", obs.shape)
# print("Observation space:", train_env.observation_space)
# print("Observation dtype:", obs.dtype)

# # --- Define Policy Keyword Arguments (Using Default Architecture) ---
# policy_kwargs = dict(
#     net_arch=[256, 256, dict(pi=[128], vf=[128])],
# )

# # --- Instantiate the PPO Model ---
# model = PPO(
#     policy="MlpPolicy",
#     env=train_env,
#     policy_kwargs=policy_kwargs,  # Use your custom policy architecture
#     n_steps=steps,
#     batch_size=steps * num_envs // 2,  # Keeping batch_size = steps * num_envs // 2
#     learning_rate=3e-4,
#     gamma=0.99,
#     gae_lambda=0.95,
#     ent_coef=0.01,  # Increased entropy coefficient to encourage exploration
#     clip_range=0.2,
#     verbose=1,
#     tensorboard_log="./ppo_metadrive_tensorboard/",
#     device="cuda" if torch.cuda.is_available() else "cpu",
# )

# # --- Instantiate the Callback ---
# intrinsic_reward_callback = IntrinsicRewardCallback()

# # --- Train the Agent ---
# print("Starting training...")
# try:
#     model.learn(total_timesteps=100_000, callback=intrinsic_reward_callback)  # Start with 100k
#     print("Initial training phase completed.")
#     model.learn(total_timesteps=900_000, callback=intrinsic_reward_callback)  # Continue to 1M
#     print("Training completed.")
# except Exception as e:
#     print("An error occurred during training:", e)

# # --- Save the Trained Model ---
# try:
#     model.save("ppo_metadrive_agent_with_rnd")
#     print("Model saved as 'ppo_metadrive_agent_with_rnd'.")
# except Exception as e:
#     print("Failed to save the model:", e)

# # --- Close the Environment ---
# try:
#     train_env.close()
#     print("Environment closed.")
# except Exception as e:
#     print("Failed to close the environment:", e)


In [None]:
# train_env.close()


###Video download if trying on google collab

In [None]:
# from stable_baselines3.common.vec_env import VecVideoRecorder

# # Record a video of the agent’s performance
# video_length = 1000  # Number of steps to record

# eval_env = DummyVecEnv([create_env])
# eval_env = VecVideoRecorder(
#     eval_env, "videos/", record_video_trigger=lambda x: x == 0, video_length=video_length
# )

# obs = eval_env.reset()
# for _ in range(video_length):
#     action, _states = model.predict(obs, deterministic=True)
#     obs, rewards, dones, info = eval_env.step(action)

# eval_env.close()


###Evaluate

In [None]:
# # Create a single evaluation environment
# eval_env = MetaDriveEnv({
#     "use_render": True,  # Enable rendering for visual evaluation
#     "map": "C",
#     "traffic_density": 0,
#     "accident_prob": 0,
#     "horizon": 500
# })

# # Reset the environment and evaluate the trained agent
# obs = eval_env.reset()
# for _ in range(1000):
#     action, _ = model.predict(obs, deterministic=True)
#     obs, reward, done, info = eval_env.step(action)
#     eval_env.render()  # Render the environment for visual feedback
#     if done:
#         obs = eval_env.reset()

# eval_env.close()


###Model that takes into consideration time

In [None]:
# pip install stable-baselines3[extra]
# pip install sb3-contrib
# from sb3_contrib import RecurrentPPO

# # Use RecurrentPPO instead of PPO
# model = RecurrentPPO(
#     "MlpLstmPolicy",
#     train_env,
#     verbose=1,
#     tensorboard_log="./ppo_metadrive_tensorboard/",
#     device="cuda",
#     n_steps=steps,
#     batch_size=steps * num_envs,
#     policy_kwargs=dict(
#         lstm_hidden_size=256,  # Size of the LSTM hidden state
#         net_arch=[dict(vf=[256], pi=[256])]  # Define policy and value networks
#     )
# )
