<a href="https://colab.research.google.com/github/pranavkantgaur/training_materials/blob/master/intro_to_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install gymnasium highway-env stable-baselines3 scikit-optimize torch

Collecting highway-env
  Downloading highway_env-1.10.1-py3-none-any.whl.metadata (16 kB)
Collecting stable-baselines3
  Downloading stable_baselines3-2.6.0-py3-none-any.whl.metadata (4.8 kB)
Collecting scikit-optimize
  Downloading scikit_optimize-0.10.2-py2.py3-none-any.whl.metadata (9.7 kB)
Collecting pyaml>=16.9 (from scikit-optimize)
  Downloading pyaml-25.1.0-py3-none-any.whl.metadata (12 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.me

In [2]:
import gymnasium
from gymnasium.wrappers import RecordVideo
import highway_env
from stable_baselines3 import DQN
import os

# Training
env = gymnasium.make("highway-v0")
model = DQN('MlpPolicy', env,
            policy_kwargs=dict(net_arch=[256, 256]),
            learning_rate=5e-4,
            buffer_size=15000,
            learning_starts=200,
            batch_size=32,
            gamma=0.8,
            train_freq=1,
            gradient_steps=1,
            target_update_interval=50,
            verbose=1,
            tensorboard_log="highway_dqn/")
model.learn(int(2e2))
model.save("highway_dqn/model")

# Testing with Video
env = gymnasium.make("highway-v0", render_mode='rgb_array')
env = RecordVideo(env, "highway_dqn/videos", episode_trigger=lambda x: True)
os.makedirs("highway_dqn/videos", exist_ok=True)

obs, info = env.reset()
done = truncated = False
while not (done or truncated):
    action, _ = model.predict(obs, deterministic=True)
    obs, _, done, truncated, _ = env.step(action)
env.close()

# Show video
from IPython.display import HTML
from base64 import b64encode

video_path = "highway_dqn/videos/rl-video-episode-0.mp4"
mp4 = open(video_path, 'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""<video width=400 controls><source src="{data_url}"></video>""")

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to highway_dqn/DQN_1
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 8.25     |
|    ep_rew_mean      | 6.14     |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 4        |
|    fps              | 2        |
|    time_elapsed     | 13       |
|    total_timesteps  | 33       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 14.9     |
|    ep_rew_mean      | 11.2     |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 8        |
|    fps              | 2        |
|    time_elapsed     | 48       |
|    total_timesteps  | 119      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 15.2     |
|

  """


In [3]:
import gymnasium
from gymnasium.wrappers import RecordVideo
import highway_env
import numpy as np
import os

def rule_based_policy(obs):
    ego_speed = obs[0, 3]
    front_vehicle = obs[1:][np.argwhere(obs[1:, 1] == obs[0, 1])]
    if front_vehicle.size > 0 and front_vehicle[0, 0] < 20:
        if obs[0, 1] > 0: return 0  # Lane left
        else: return 4  # Slow down
    return 3 if ego_speed < 25 else 1  # Accelerate

# Testing
env = gymnasium.make("highway-v0", render_mode='rgb_array')
env = RecordVideo(env, "rule_based/videos", episode_trigger=lambda x: True)
os.makedirs("rule_based/videos", exist_ok=True)

obs, info = env.reset()
done = truncated = False
while not (done or truncated):
    action = rule_based_policy(obs)
    obs, _, done, truncated, _ = env.step(action)
env.close()

# Show video
from IPython.display import HTML
from base64 import b64encode

video_path = "rule_based/videos/rl-video-episode-0.mp4"
mp4 = open(video_path, 'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""<video width=400 controls><source src="{data_url}"></video>""")

In [None]:
import gymnasium
from gymnasium.wrappers import RecordVideo
import highway_env
from skopt import gp_minimize
from skopt.space import Real, Integer
import numpy as np
import os

space = [Integer(0, 2), Real(10, 30)]  # Action mode, safety distance

def evaluate(params):
    env = gymnasium.make("highway-v0")
    total_reward = 0
    obs, _ = env.reset()
    done = truncated = False
    while not (done or truncated):
        action = 3 if obs[0, 3] < 25 else 1  # Simple speed control
        if params[0] == 0 and obs[0, 1] > 0: action = 0
        elif params[0] == 1: action = 2
        obs, reward, done, truncated, _ = env.step(action)
        total_reward += reward
    return -total_reward

result = gp_minimize(evaluate, space, n_calls=2, random_state=0)
best_params = result.x

# Test best params
env = gymnasium.make("highway-v0", render_mode='rgb_array')
env = RecordVideo(env, "bo_agent/videos", episode_trigger=lambda x: True)
os.makedirs("bo_agent/videos", exist_ok=True)

obs, _ = env.reset()
done = truncated = False
while not (done or truncated):
    action = 3 if obs[0, 3] < 25 else 1
    if best_params[0] == 0 and obs[0, 1] > 0: action = 0
    elif best_params[0] == 1: action = 2
    obs, _, done, truncated, _ = env.step(action)
env.close()

# Show video
from IPython.display import HTML
from base64 import b64encode

video_path = "bo_agent/videos/rl-video-episode-0.mp4"
mp4 = open(video_path, 'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""<video width=400 controls><source src="{data_url}"></video>""")

In [None]:
import gymnasium
from gymnasium.wrappers import RecordVideo
import highway_env
from stable_baselines3 import PPO
import os

# Training
env = gymnasium.make("highway-v0")
model = PPO('MlpPolicy', env,
            policy_kwargs=dict(net_arch=[256, 256]),
            learning_rate=3e-4,
            n_steps=512,
            batch_size=64,
            n_epochs=10,
            gamma=0.9,
            verbose=1)
model.learn(int(2e1))
model.save("highway_ppo/model")

# Testing
env = gymnasium.make("highway-v0", render_mode='rgb_array')
env = RecordVideo(env, "highway_ppo/videos", episode_trigger=lambda x: True)
os.makedirs("highway_ppo/videos", exist_ok=True)

obs, info = env.reset()
done = truncated = False
while not (done or truncated):
    action, _ = model.predict(obs, deterministic=True)
    obs, _, done, truncated, _ = env.step(action)
env.close()

# Show video
from IPython.display import HTML
from base64 import b64encode

video_path = "highway_ppo/videos/rl-video-episode-0.mp4"
mp4 = open(video_path, 'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""<video width=400 controls><source src="{data_url}"></video>""")

In [None]:
import gymnasium
from gymnasium.wrappers import RecordVideo
import highway_env
from stable_baselines3 import PPO
from torch import nn
import torch
import os

class TransformerPolicy(nn.Module):
    def __init__(self, feature_dim, action_dim):
        super().__init__()
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=64, nhead=4),
            num_layers=2
        )
        self.fc = nn.Linear(feature_dim, 64)
        self.head = nn.Linear(64, action_dim)

    def forward(self, x):
        x = self.fc(x)
        x = self.transformer(x.unsqueeze(0)).squeeze(0)
        return self.head(x)

# Training
env = gymnasium.make("highway-v0")
policy_kwargs = dict(
    features_extractor_class=TransformerPolicy,
    features_extractor_kwargs=dict(feature_dim=25, action_dim=5)
)
model = PPO('MlpPolicy', env,
            policy_kwargs=policy_kwargs,
            learning_rate=3e-4,
            verbose=1)
model.learn(int(2e4))
model.save("transformer_agent/model")

# Testing
env = gymnasium.make("highway-v0", render_mode='rgb_array')
env = RecordVideo(env, "transformer_agent/videos", episode_trigger=lambda x: True)
os.makedirs("transformer_agent/videos", exist_ok=True)

obs, info = env.reset()
done = truncated = False
while not (done or truncated):
    action, _ = model.predict(obs, deterministic=True)
    obs, _, done, truncated, _ = env.step(action)
env.close()

# Show video
from IPython.display import HTML
from base64 import b64encode

video_path = "transformer_agent/videos/rl-video-episode-0.mp4"
mp4 = open(video_path, 'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""<video width=400 controls><source src="{data_url}"></video>""")

## OLD

In [None]:
!pip install gymnasium>=0.29.1 highway-env stable-baselines3 pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

In [None]:
import gymnasium
import highway_env
from stable_baselines3 import DQN

env = gymnasium.make("highway-v0")
model = DQN('MlpPolicy', env,
              policy_kwargs=dict(net_arch=[256, 256]),
              learning_rate=5e-4,
              buffer_size=15000,
              learning_starts=200,
              batch_size=32,
              gamma=0.8,
              train_freq=1,
              gradient_steps=1,
              target_update_interval=50,
              verbose=1,
              tensorboard_log="highway_dqn/")
model.learn(int(2e1))
model.save("highway_dqn/model")

# Load and test saved model
model = DQN.load("highway_dqn/model")
while True:
  done = truncated = False
  obs, info = env.reset()
  while not (done or truncated):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, truncated, info = env.step(action)
    env.render()


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to highway_dqn/DQN_3


  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger.warn(
  gym.logger

KeyboardInterrupt: 

In [None]:
TODO: https://github.com/Farama-Foundation/HighwayEnv/blob/master/scripts/