In [1]:
import gymnasium as gym
from stable_baselines3 import PPO,DQN
from config import config
from stable_baselines3.common.env_util import make_vec_env
import optuna
from stable_baselines3.common.callbacks import EvalCallback,BaseCallback

log_dir = "logs"
env_id = "intersection-v0"
num_cpu = 4




In [None]:
class DoneCallback(BaseCallback):
    def __init__(self, check_freq: int, save_freq: int, save_path: str, verbose=1):
        super(DoneCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_freq = save_freq
        self.save_path = save_path
        self.done_count = 0

    def _on_step(self) -> bool:
        if 'done' in self.locals.keys():
            if self.locals['done']:
                self.done_count += 1

        if self.n_calls % self.check_freq == 0:
            print(f"Step: {self.num_timesteps} Done count: {self.done_count}")

        if self.n_calls % self.save_freq == 0:
            self.model.save(self.save_path + str(self.num_timesteps))

        return True

In [None]:
def optimize_ppo(trial):
    """ Learning hyperparameters we want to optimize"""
    return {
        'n_steps': int(trial.suggest_loguniform('n_steps', 32, 2048)),
        'gamma': trial.suggest_categorical('gamma', [0.9, 0.95, 0.98, 0.99, 0.999, 0.9999]),
        'learning_rate': trial.suggest_loguniform('learning_rate', 1e-5, 1e-2),
        'ent_coef': trial.suggest_loguniform('ent_coef', 0.00000001, 0.1),
        'clip_range': trial.suggest_uniform('clip_range', 0.1, 0.4),
        'n_epochs': int(trial.suggest_loguniform('n_epochs', 1, 10)),
    }

def objective(trial):
    """ Objective function for optimization """
    env = make_vec_env(env_id, n_envs=num_cpu,env_kwargs={"config":config})
    model = PPO('CnnPolicy', env, verbose=0,tensorboard_log=log_dir,**optimize_ppo(trial))

    callback = DoneCallback(check_freq=128, save_freq=5000, save_path="./models/hypertuning_")
    eval_env = make_vec_env(env_id, n_envs=1,env_kwargs={"config":config})
    # Evaulation callback, to evaluate the model during training
    eval_callback = EvalCallback(eval_env, best_model_save_path='./logs/',
                                 log_path='./logs/', eval_freq=500,
                                 deterministic=True, render=False)

    model.learn(total_timesteps=512*50, callback=[callback, eval_callback], progress_bar=True)

    # Retrieve the best reward
    best_reward = eval_callback.best_mean_reward
    return best_reward

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=30)
trial = study.best_trial
print(trial.value)
for key, value in trial.params.items():
    print("{}: {}".format(key, value))

In [None]:
# Evaluate a trained model

model = DQN.load("models/dqn_cnn20000.zip")
eval_env = gym.make(env_id, render_mode="rgb_array", config=config)

while True:
  done = truncated = False
  obs, info = eval_env.reset()
  while not (done or truncated):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, truncated, info = eval_env.step(action)
    eval_env.render()