In [1]:
from stable_baselines3 import A2C
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.vec_env import DummyVecEnv

from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from stable_baselines3.common.callbacks import EvalCallback, BaseCallback
from stable_baselines3.common.vec_env import DummyVecEnv, VecEnv, sync_envs_normalization
from stable_baselines3.common.monitor import Monitor
import gymnasium as gym

In [2]:
from gymnasium_env.envs.f1_env import RewardFunction
from gymnasium_env.envs.f1_env import RewardFunctionPerPositionAtFinalLap

from racesim.config import GridConfigSortedById
from racesim.config import GridConfig

In [3]:
class F1EnvEvalCallback(EvalCallback):
    def __init__(
        self,
        eval_env: Union[gym.Env, VecEnv],
        callback_on_new_best: Optional[BaseCallback] = None,
        callback_after_eval: Optional[BaseCallback] = None,
        n_eval_episodes: int = 5,
        eval_freq: int = 10000,
        log_path: Optional[str] = None,
        best_model_save_path: Optional[str] = None,
        deterministic: bool = True,
        render: bool = False,
        verbose: int = 1,
        warn: bool = True,
        seed_eval_env: int = 0,
        reward_function: RewardFunction = None,
        grid_config: GridConfig = None
    ):
        super().__init__(eval_env,callback_on_new_best,callback_after_eval,n_eval_episodes,eval_freq,
                         log_path,best_model_save_path,deterministic,render,verbose,warn)
        self.seed_eval_env = seed_eval_env
        self.reward_function = reward_function
        self.grid_config = grid_config

    def _on_step(self) -> bool:
        continue_training = super()._on_step()
        if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
            self.set_env_for_next_eval()
        return continue_training
        

    def set_env_for_next_eval(self) -> None:
        self.eval_env = gym.make("F1Env/Basic-v0", seed=self.seed_eval_env,
                                 reward_function=self.reward_function, grid_config=self.grid_config)

In [4]:
seed = 0
n_seeds = 3
seed_eval = 50
reward_function = RewardFunctionPerPositionAtFinalLap()

grid_config = GridConfigSortedById()

In [5]:
eval_env_kwards = {"seed": seed_eval, "reward_function": reward_function, "grid_config": grid_config, "render_mode": None}

In [6]:
for i in range(n_seeds):
    print("-----------------\n\n" + " RUN " + str(i) + "\n\n--------------------------")

    env_kwargs = {"seed": i, "reward_function": reward_function, "grid_config": grid_config, "render_mode": None}
    env = make_vec_env("F1Env/Basic-v0", n_envs=2, vec_env_cls=SubprocVecEnv, env_kwargs=env_kwargs)
    env_to_eval = make_vec_env("F1Env/Basic-v0", n_envs=1, vec_env_cls=SubprocVecEnv, env_kwargs=eval_env_kwards)

    eval_callback = F1EnvEvalCallback(eval_env=env_to_eval, best_model_save_path="./logs_" + str(i) + "/",
                             log_path="./logs_" + str(i) + "/", n_eval_episodes=20, eval_freq=12500,
                             deterministic=True, render=False, seed_eval_env=seed_eval,
                                      reward_function=reward_function, grid_config=grid_config)

    model_reward_position_change = A2C("MlpPolicy", env, gamma=1, n_steps=64, learning_rate=0.00035,
                                       verbose=0, tensorboard_log="./a2c_r3/", seed=i)
    model_reward_position_change.learn(total_timesteps=1e6, callback=eval_callback)
    model_reward_position_change.save(path="./model_reward_position_change_"+str(i))

-----------------

 RUN 0

--------------------------
Eval num_timesteps=25000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
New best mean reward!




Eval num_timesteps=50000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=75000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=100000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=125000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=150000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=175000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=200000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=225000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=250000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=275000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=300000, episode_reward=-1.00 +/- 0.00
Episode length: 77.00 +/- 0.00
Eval num_timesteps=325000, episode

In [7]:
# import numpy as np
# from tqdm import tqdm
# import torch
# from stable_baselines3.common.utils import obs_as_tensor
# env_to_test = gym.make("F1Env/Basic-v0")

In [8]:
# agent = model_reward_position_change
# stops = 0

# for episode in tqdm(range(1)):
#     t = 0
#     total_rw = 0
#     obs, info = env_to_test.reset()
#     print(env_to_test.our_driver.tyre.__dict__)
#     # print(obs)
#     # print("------")
#     print("START POSITION: ", env_to_test.our_driver.position)
#     done = False

#     # play one episode
#     while not done:
#         # display(env_to_test.race_simulation.to_df())
#         action, _ = agent.predict(obs, deterministic=True)
#         with torch.no_grad():
#             observation = obs.reshape((-1,) + agent.observation_space.shape)
#             observation = obs_as_tensor(observation, 'cpu')
#             q_values = agent.q_net(observation)
#             # print(q_values)
#         if action != 0:
#             stops+=1
#             print(action, " - laps to go: ", env_to_test.our_driver.laps_to_go, " - position: ", env_to_test.our_driver.position)
#             print(q_values)
#         next_obs, reward, terminated, truncated, info = env_to_test.step(action)
#         # print('REWARDA: ', reward)

#         total_rw += reward

#         done = terminated or truncated
#         obs = next_obs
#         # print(obs)
#         # print("------")
#         t+=1

#     # print("Reward: " , total_rw)
# # display(env_to_test.race_simulation.to_df())
# print("Reward: " , total_rw, " - ", "Stops: ", stops)
# env_to_test.race_simulation.driver_being_controlled.position

In [9]:
# env_to_test.race_simulation.to_df()