In [1]:
import json
import gymnasium as gym
from stable_baselines3 import PPO, DQN
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from torch.utils.tensorboard import SummaryWriter

import time

In [2]:
# Load the arguments from args.json
with open('args.json', 'r') as f:
    args = json.load(f)
scenarios = args['scenarios']
# scenarios

In [3]:
total_timesteps = int(5e4)

scenarios_to_run = [
    1,
    2,
    3,
    4,
]

In [4]:
# Function to create a video demonstration of the model

def create_video(scenario_id: int, model: str, type: str = "") -> None:
    if model not in ['ppo', 'dqn']:
        raise ValueError("Model must be either 'ppo' or 'dqn'")
    
    # Load the environment
    video_env = gym.make(f"scenario_{scenario_id}_env", render_mode='rgb_array')
    if(type == "simple"):
        video_model = PPO.load(f"./trained_models/scenario_{scenario_id}_ppo_simple") if model == 'ppo' else DQN.load(f"./trained_models/scenario_{scenario_id}_dqn_simple")
    
    elif(type == "confort"):
        video_model = PPO.load(f"./trained_models/scenario_{scenario_id}_ppo_confort") if model == 'ppo' else DQN.load(f"./trained_models/scenario_{scenario_id}_dqn_confort")
    
    else:
        video_model = PPO.load(f"./trained_models/scenario_{scenario_id}_ppo") if model == 'ppo' else DQN.load(f"./trained_models/scenario_{scenario_id}_dqn")

    obs, _ = video_env.reset()
    done = truncated = False
    obs, info = video_env.reset()
    for _ in range(200):
        action, _ = video_model.predict(obs, deterministic=True)
        # print(action)
        obs, reward, done, truncated, info = video_env.step(action)
        video_env.render()
        time.sleep(1/20)  # Add a delay to achieve 30 fps
        if done:
            obs, _ = video_env.reset()
    
    # close video
    video_env.close()
    

In [None]:
best_ppo_reward = -float('inf')
best_dqn_reward = -float('inf')
train = False

for scenario in scenarios:
    if scenario['id'] in scenarios_to_run:
        # Clear the action.txt log file
        with open('action.txt', 'w') as f:
            f.write("")
        
        scenario_id = scenario['id']
        scenario_name = scenario['name']
        env_config = scenario['config']
        env_id = f"scenario_{scenario_id}_env"

        # Define log directories dynamically based on scenario ID and model type
        ppo_log_dir = f"./logs/scenario_{scenario_id}_ppo/"
        dqn_log_dir = f"./logs/scenario_{scenario_id}_dqn/"

        # Register the environment
        gym.envs.register(
            id=env_id,
            entry_point=scenario['env']['entry_point'],
            kwargs={'config': env_config}
        )

        # Create the environment
        env = gym.make(env_id, render_mode='human')
        obs, info = env.reset()
        env = Monitor(env)  # Wrap the environment with Monitor

        # Create and train PPO model
        ppo_model = PPO("MlpPolicy",
                        env,
                        policy_kwargs=dict(net_arch=[dict(pi=[256, 256], vf=[256, 256])]),
                        n_steps=32 * 12 // 6,
                        batch_size=32,
                        n_epochs=10,
                        learning_rate=5e-4,
                        gamma=0.8,
                        verbose=2,
                        tensorboard_log=ppo_log_dir,
                        device='cuda')
        if(train):
            # Train the PPO model
            ppo_model.learn(total_timesteps)

            # Save the last PPO model
            ppo_model.save(f"./trained_models/scenario_{scenario_id}_ppo_last_{total_timesteps}_steps")
            print(f"Last PPO model saved at {total_timesteps} steps")

            # Evaluate the PPO model
            ppo_mean_reward, ppo_std_reward = evaluate_policy(ppo_model, env, n_eval_episodes=10)
            print(f"Scenario {scenario_id} - PPO Average reward: {ppo_mean_reward} ± {ppo_std_reward}")

        # Clear the action.txt log file
        with open('action.txt', 'w') as f:
            f.write("")

        # Create and train DQN model
        dqn_model = DQN("MlpPolicy",
                        env,
                        policy_kwargs=dict(net_arch=[256, 256]),
                        learning_rate=5e-4,
                        buffer_size=15000,
                        learning_starts=200,
                        batch_size=32,
                        gamma=0.8,
                        train_freq=1,
                        gradient_steps=1,
                        target_update_interval=50,
                        verbose=1,
                        tensorboard_log=dqn_log_dir,
                        device='cuda')
        
        if(train):
            # Train the DQN model
            dqn_model.learn(total_timesteps)

            # Save the last DQN model
            dqn_model.save(f"./trained_models/scenario_{scenario_id}_dqn_last_{total_timesteps}_steps")
            print(f"Last DQN model saved at {total_timesteps} steps")

            # Evaluate the DQN model
            dqn_mean_reward, dqn_std_reward = evaluate_policy(dqn_model, env, n_eval_episodes=10)
            print(f"Scenario {scenario_id} - DQN Average reward: {dqn_mean_reward} ± {dqn_std_reward}")
        # Close the environment
        env.close()

In [None]:
create_video(4, 'ppo','confort')

In [None]:
create_video(3, 'dqn','confort')

In [None]:
def test_scenario(scenario_id: int, model_str: str, attempt: int, type: str) -> None:
    
    if model_str not in ['ppo', 'dqn']:
        raise ValueError("Model must be either 'ppo' or 'dqn'")
    
    model = PPO.load(f"./trained_models/scenario_{scenario_id}_ppo_{type}") if model_str == 'ppo' else DQN.load(f"./trained_models/scenario_{scenario_id}_dqn_{type}")

    env = gym.make(f"scenario_{scenario_id}_env", render_mode='human')

    with open(f'./test_results/speeds/speed_{scenario_id}_{model_str}_{type}_{attempt}.txt', 'w') as f:
            f.write("")  

    obs, _ = env.reset()  
    for i in range(200):
        

        with open(f'./test_results/speeds/speed_{scenario_id}_{model_str}_{type}_{attempt}.txt', 'a') as f:
            f.write(f"{env.unwrapped._get_values()[0]}\n")
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, done, truncation, info = env.step(action)  
        env.render()
        print(scenario_id)
        if done:
            with open(f'./test_results/crash.txt', 'a') as f:
                f.write(f"{scenario_id}_{model_str}_{type}_{attempt}_{info['crashed']}\n")

            with open(f'./test_results/actions/action_{scenario_id}_{model_str}_{type}_{attempt}.txt', 'w') as f:
                with open('action.txt', 'r') as f2:
                    f.write(f2.read())
            obs, _ = env.reset()
            break
        
        if i == 199:
            with open(f'./test_results/crash.txt', 'a') as f:
                f.write(f"{scenario_id}_{model_str}_{type}_{attempt}_{info['crashed']}\n")
            with open(f'./test_results/actions/action_{scenario_id}_{model_str}_{type}_{attempt}.txt', 'w') as f:
                with open('action.txt', 'r') as f2:
                    f.write(f2.read())
            obs, _ = env.reset()
            break        
        
    env.close()

for i in scenarios_to_run:
    for j in range(10):
        test_scenario(i, 'dqn', j, 'simple')
        test_scenario(i, 'dqn', j, 'confort')
        test_scenario(i, 'ppo', j, 'simple')
        test_scenario(i, 'ppo', j, 'confort')