In [None]:
# Import libraries and make sure highway-env is installed properly
import gymnasium
import highway_env
import numpy as np
import matplotlib.pyplot as plt

# Create the environment with visual rendering
env = gymnasium.make("highway-better-v1", render_mode="rgb_array")
obs, info = env.reset()

# Render and show the first frame
frame = env.render()
plt.imshow(frame)
plt.axis('off')
plt.title("Initial Frame")
plt.show()

env.close()

In [None]:
from pprint import pprint
# print the environment information
print("Environment Information:")
pprint(env.unwrapped.config)

import tensorboard
print(tensorboard.__version__)

In [None]:
import gymnasium as gym
from stable_baselines3.common.monitor import Monitor

# === Create wrapped evaluation env ===
def make_env(str_env=None):
    def _init():
        if str_env is None or str_env == "highway":
            env_id = "highway-better-v1"
        elif str_env == "intersection":
            env_id = "intersection-v1"
        elif str_env == "racetrack":
            env_id = "racetrack-v0"
        else:
            raise ValueError(f"Unknown environment: {str_env}")

        env = gym.make(env_id, render_mode="rgb_array")
        return Monitor(env)
    return _init


In [None]:
import optuna
import torch
import os
import json
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import BaseCallback
from optuna.pruners import MedianPruner

# === Global reward tracker ===
best_rewards = {}

# === Optuna Callback for pruning ===
class OptunaCallback(BaseCallback):
    def __init__(self, trial, eval_freq=2000, n_eval_episodes=3, verbose=0):
        super().__init__(verbose)
        self.trial = trial
        self.eval_freq = eval_freq
        self.n_eval_episodes = n_eval_episodes

    def _on_step(self):
        if self.n_calls % self.eval_freq == 0:
            reward, _ = evaluate_policy(self.model, self.training_env, n_eval_episodes=self.n_eval_episodes, deterministic=True)
            self.trial.report(reward, self.n_calls)
            if self.trial.should_prune():
                raise optuna.exceptions.TrialPruned()
        return True

# === Objective Function for PPO ===
def objective(trial, phase, str_env, coarse_params=None, save_dir=None):
    global best_rewards
    coarse_params_path = os.path.join(save_dir, "PPO_best_coarse_params.json")

    # Init per-env reward tracking
    if str_env not in best_rewards:
        best_rewards[str_env] = {"coarse": -float("inf"), "fine": -float("inf")}

    # === Coarse Phase ===
    if phase == "coarse":
        learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-3, log=True)
        gamma = trial.suggest_float("gamma", 0.85, 0.999)
        net_arch = trial.suggest_categorical("net_arch", ([64, 64], [128, 128], [256, 256]))

        config = {
            "learning_rate": learning_rate,
            "gamma": gamma,
            "net_arch": net_arch,
        }

    # === Fine Phase ===
    elif phase == "fine":
        assert coarse_params is not None, "Need coarse params for fine tuning"

        config = {
            **coarse_params,
            "entropy_coef": trial.suggest_float("entropy_coef", 1e-4, 0.05),
            "clip_range": trial.suggest_float("clip_range", 0.1, 0.3),
            "gae_lambda": trial.suggest_float("gae_lambda", 0.8, 0.99),
            "vf_coef": trial.suggest_float("vf_coef", 0.3, 0.9),
            "max_grad_norm": trial.suggest_float("max_grad_norm", 0.3, 1.0),
            "n_steps": trial.suggest_categorical("n_steps", [64, 128, 256]),
            "batch_size": trial.suggest_categorical("batch_size", [32, 64, 128])
        }

    # === Build Environment and Model ===
    env = make_vec_env(make_env(str_env), n_envs=1)
    model = PPO(
        "MlpPolicy",
        env,
        learning_rate=config["learning_rate"],
        gamma=config["gamma"],
        policy_kwargs={"net_arch": config["net_arch"]},
        ent_coef=config.get("entropy_coef", 0.01),
        clip_range=config.get("clip_range", 0.2),
        gae_lambda=config.get("gae_lambda", 0.95),
        vf_coef=config.get("vf_coef", 0.5),
        max_grad_norm=config.get("max_grad_norm", 0.5),
        n_steps=config.get("n_steps", 128),
        batch_size=config.get("batch_size", 64),
        verbose=0,
        tensorboard_log=f"../tensorboard_logs/{str_env}/PPO_phase_{phase}",
        device="cpu"
    )

    model.learn(total_timesteps=10_000, callback=OptunaCallback(trial))
    mean_reward, _ = evaluate_policy(model, env, n_eval_episodes=10, deterministic=True)
    env.close()
    trial.set_user_attr("mean_reward", mean_reward)

    # === Save Best Model If Improved ===
    if mean_reward > best_rewards[str_env][phase]:
        best_rewards[str_env][phase] = mean_reward
        model.save(os.path.join(save_dir, f"PPO_best_{phase}.zip"))
        print(f"💾 Saved new best {phase} model (trial {trial.number}) for {str_env}")
        if phase == "coarse":
            with open(coarse_params_path, "w") as f:
                json.dump(config, f, indent=2)
            print(f"✅ Coarse tuning params saved for {str_env} (trial {trial.number})")

    return mean_reward

# === COARSE PHASE ===
def run_coarse_phase(str_env):
    print(f"🔧 Starting COARSE tuning for {str_env}...")
    save_dir = f"../trained_models/{str_env}/PPO/"
    os.makedirs(save_dir, exist_ok=True)

    study = optuna.create_study(
        direction="maximize",
        sampler=optuna.samplers.TPESampler(seed=42),
        pruner=MedianPruner(n_startup_trials=3, n_warmup_steps=1)
    )
    study.optimize(lambda trial: objective(trial, phase="coarse", str_env=str_env, save_dir=save_dir), n_trials=15)

# === FINE PHASE ===
def run_fine_phase(str_env):
    save_dir = f"../trained_models/{str_env}/PPO/"
    coarse_params_path = os.path.join(save_dir, "PPO_best_coarse_params.json")

    if not os.path.exists(coarse_params_path):
        raise FileNotFoundError(f"Missing coarse phase results for {str_env}. Run coarse phase first.")
    with open(coarse_params_path, "r") as f:
        coarse_params = json.load(f)

    print(f"🔬 Starting FINE tuning for {str_env}...")
    study = optuna.create_study(
        direction="maximize",
        sampler=optuna.samplers.TPESampler(seed=123),
        pruner=MedianPruner(n_startup_trials=3, n_warmup_steps=1)
    )
    study.optimize(lambda trial: objective(trial, phase="fine", str_env=str_env, coarse_params=coarse_params, save_dir=save_dir), n_trials=15)
    print(f"✅ Fine tuning complete for {str_env}.")

# === MAIN EXECUTION ===
run_coarse = True
run_fine = True

env_list = ["highway", "intersection", "racetrack"]
for str_env in env_list:
    print(f"\n🚦 Running PPO tuning for environment: {str_env}")
    if run_coarse:
        run_coarse_phase(str_env)
    if run_fine:
        run_fine_phase(str_env)


In [None]:
# import os
# from stable_baselines3 import PPO
# from stable_baselines3.common.env_util import make_vec_env
# from stable_baselines3.common.monitor import Monitor
# import gymnasium as gym

# SAVE_DIR = "../trained_models/highway/PPO/"
# # === Load trained Optuna model ===
# model_path = os.path.join(SAVE_DIR, "PPO_best_fine.zip")
# model = PPO.load(model_path)

# # === Environment for continued training ===
# def make_env():
#     env = gym.make("highway-better-v1")
#     return Monitor(env)

# train_env = make_vec_env(make_env, n_envs=1)

# # === Rebind environment in case original wasn't saved in model ===
# model.set_env(train_env)

# # === Training configuration ===
# total_timesteps = 40000
# save_interval = 10000
# timesteps_run = 0

# cp_log_dir = f"../checkpoints/highway/PPO_trained_model_tuned"
# os.makedirs(cp_log_dir, exist_ok=True)

# while timesteps_run < total_timesteps:
#     model.learn(
#         total_timesteps=save_interval,
#         reset_num_timesteps=False,
#         tb_log_name="highway_PPO_tuned",
#         log_interval=1,
#     )
#     timesteps_run += save_interval
#     model.save(f"{cp_log_dir}/{timesteps_run}")
#     print(f"✅ Saved checkpoint at {timesteps_run} timesteps")

# # === Save final model ===
# final_model_path = os.path.join(SAVE_DIR, "PPO_trained_tuned.zip")
# model.save(final_model_path)
# print(f"✅ Final model saved at {final_model_path}")


In [None]:
# import os
# import imageio
# from stable_baselines3 import PPO
# from stable_baselines3.common.env_util import make_vec_env
# from IPython.display import Video

# # === Configuration ===
# algo_name = "PPO"
# video_eval_dir = f"../tuned_videos/highway/{algo_name}_tuned/video_eval"
# os.makedirs(video_eval_dir, exist_ok=True)
# video_path = os.path.join(video_eval_dir, f"{algo_name}_eval.mp4")

# # === Load trained model ===
# model_path = "../trained_models/highway/PPO/PPO_trained_tuned.zip"
# model = PPO.load(model_path)

# env = make_vec_env(make_env(render_mode="rgb_array"), n_envs=1)
# # === Evaluate and collect frames ===
# frames = []
# num_episodes = 5  # Number of episodes to evaluate

# for i in range(num_episodes):
#     # === Synchronize the two environments ===
#     obs_stacked = env.reset()
#     done = False
#     while not done:
#         action, _ = model.predict(obs_stacked)
#         obs_stacked, _, done, _ = env.step(action)

#         frame = env.render()
#         frames.append(frame)
#         if done[0]:  
#             break

#     # Add a few idle frames for padding
#     for _ in range(10):
#         frames.append(frames[-1])

# # === Save video ===
# imageio.mimsave(video_path, frames, fps=30)

# # === Display video ===
# Video(video_path, embed=True, width=600, height=400)
