In [None]:
!pip install gymnasium
!pip install stable-baselines3
!pip install highway-env

In [None]:
import gymnasium as gym
from stable_baselines3 import PPO, A2C
from stable_baselines3.common.vec_env import DummyVecEnv
import numpy as np
import matplotlib.pyplot as plt
import warnings

warnings.filterwarnings("ignore", category=UserWarning, module='gymnasium.core')

class CustomCallback:
    def __init__(self):
        self.rewards = []
        self.collisions = []
        self.speeds = []
        self.episode_rewards = []
        self.episode_collisions = []
        self.episode_speeds = []
        self.episode_times = []
        self.start_time = None

    def on_step(self, locals_, globals_):
        env = locals_['env']
        reward = locals_['rewards']
        done = locals_['dones']
        info = locals_['infos']

        # Track rewards
        self.rewards.append(reward[0])

        # Track vehicle speed
        vehicle = env.get_attr('vehicle')[0]  # Get the controlled vehicle
        self.speeds.append(vehicle.speed)

        # Track collisions
        collision = info[0].get('crashed', False)
        self.collisions.append(1 if collision else 0)

        # On episode end
        if done[0]:
            # Calculate total travel time (number of steps taken)
            self.episode_rewards.append(np.sum(self.rewards))
            self.episode_collisions.append(np.sum(self.collisions))
            self.episode_speeds.append(np.mean(self.speeds))
            self.episode_times.append(len(self.rewards))
            self.rewards = []
            self.collisions = []
            self.speeds = []

        return True

    def smooth_data(self, data, window_size=10):
        """ Apply a rolling average to smooth data. """
        return np.convolve(data, np.ones(window_size)/window_size, mode='valid')

    def aggregate_data(self, data, bin_size=10):
        """Aggregate data by calculating the average in bins."""
        return [np.mean(data[i:i+bin_size]) for i in range(0, len(data), bin_size)]

    def get_stats(self):
        def compute_stats(data):
            return {
                'mean': np.mean(data),
                'min': np.min(data),
                'max': np.max(data)
            }
        stats = {
            'reward': compute_stats(self.episode_rewards),
            'collision': compute_stats(self.episode_collisions),
            'speed': compute_stats(self.episode_speeds),
            'travel_time': compute_stats(self.episode_times)
        }
        return stats

    def plot_metrics(self, title="Performance Metrics"):
        episodes = range(len(self.episode_rewards))

        # Smooth and aggregate data for better visualization
        reward_smoothed = self.smooth_data(self.episode_rewards)
        collision_smoothed = self.smooth_data(self.episode_collisions)
        speed_smoothed = self.smooth_data(self.episode_speeds)
        time_smoothed = self.smooth_data(self.episode_times)

        reward_aggregated = self.aggregate_data(self.episode_rewards)
        collision_aggregated = self.aggregate_data(self.episode_collisions)
        speed_aggregated = self.aggregate_data(self.episode_speeds)
        time_aggregated = self.aggregate_data(self.episode_times)

        plt.figure(figsize=(20, 10))

        # Plot average reward per episode
        plt.subplot(2, 2, 1)
        plt.plot(episodes[:len(reward_smoothed)], reward_smoothed, label='Smoothed Reward ', color='blue')
        plt.plot(range(0, len(reward_aggregated)*10, 10), reward_aggregated, label='Aggregated Reward ', color='cyan', linestyle='--')
        plt.axhline(np.mean(self.episode_rewards), color='red', linestyle='--', label='Average Reward')
        plt.xlabel('Episode')
        plt.ylabel('Total Reward')
        plt.title('Reward per Episode')
        plt.legend()
        plt.ylim(bottom=0)

        # Plot number of collisions per episode
        plt.subplot(2, 2, 2)
        plt.plot(episodes[:len(collision_smoothed)], collision_smoothed, label='Smoothed Collisions', color='green')
        plt.plot(range(0, len(collision_aggregated)*10, 10), collision_aggregated, label='Aggregated Collisions ', color='lime', linestyle='--')
        plt.axhline(np.mean(self.episode_collisions), color='red', linestyle='--', label='Average Collisions')
        plt.xlabel('Episode')
        plt.ylabel('Number of Collisions')
        plt.title('Collisions per Episode')
        plt.legend()
        plt.ylim(bottom=0)

        # Plot average speed per episode
        plt.subplot(2, 2, 3)
        plt.plot(episodes[:len(speed_smoothed)], speed_smoothed, label='Smoothed Speed per episode', color='orange')
        plt.plot(range(0, len(speed_aggregated)*10, 10), speed_aggregated, label='Aggregated Speed per episode', color='gold', linestyle='--')
        plt.axhline(np.mean(self.episode_speeds), color='red', linestyle='--', label='Average Speed')
        plt.xlabel('Episode')
        plt.ylabel('Speed (m/s)')
        plt.title('Speed per Episode')
        plt.legend()
        plt.ylim(bottom=0)

        # Plot total travel time per episode
        plt.subplot(2, 2, 4)
        plt.plot(episodes[:len(time_smoothed)], time_smoothed, label='Smoothed Travel Time per episode', color='purple')
        plt.plot(range(0, len(time_aggregated)*10, 10), time_aggregated, label='Aggregated Travel Time per episode', color='violet', linestyle='--')
        plt.axhline(np.mean(self.episode_times), color='red', linestyle='--', label='Average Travel Time')
        plt.xlabel('Episode')
        plt.ylabel('Travel Time (timesteps)')
        plt.title('Travel Time per Episode')
        plt.legend()
        plt.ylim(bottom=0)  # Set lower limit to 0 for better visibility
        plt.suptitle(title, fontsize=16, y=0.01)
        plt.tight_layout()
        plt.show()

# Create the environment
env = gym.make('intersection-v0')

# Wrap the environment with DummyVecEnv for Stable Baselines3
env = DummyVecEnv([lambda: env])

# Create the PPO and A2C models
ppo_model = PPO('MlpPolicy', env, verbose=1)
a2c_model = A2C('MlpPolicy', env, verbose=1)

# Create the custom callbacks
ppo_callback = CustomCallback()
a2c_callback = CustomCallback()

# Train the models
ppo_model.learn(total_timesteps=500, callback=ppo_callback.on_step)
a2c_model.learn(total_timesteps=500, callback=a2c_callback.on_step)

# Save the models
ppo_model.save("ppo_intersection")
a2c_model.save("a2c_intersection")

# Load the models (optional, if you want to use the saved models later)
ppo_model = PPO.load("ppo_intersection", env=env)
a2c_model = A2C.load("a2c_intersection", env=env)

# Evaluate the models for a fixed number of episodes
def evaluate_model(model, env, callback, num_episodes=50):
    obs = env.reset()
    for i in range(num_episodes):
        done = False
        while not done:
            action, _states = model.predict(obs)
            obs, rewards, done, info = env.step(action)
            callback.on_step({'rewards': rewards, 'dones': done, 'infos': info, 'env': env}, {})
            if done:
                obs = env.reset()

# Evaluate PPO
evaluate_model(ppo_model, env, ppo_callback, num_episodes=50)
# Evaluate A2C
evaluate_model(a2c_model, env, a2c_callback, num_episodes=50)


ppo_callback.plot_metrics(title="Performance Metrics for PPO")
a2c_callback.plot_metrics(title="Performance Metrics for A2C")


def calculate_normalized_score(stats, alpha1, alpha2, alpha3, alpha4):
    def normalize(value, min_val, max_val):
        return (value - min_val) / (max_val - min_val)

    reward_norm = normalize(stats['reward']['mean'], min_reward, max_reward)
    collision_norm = normalize(stats['collision']['mean'], min_collision, max_collision)
    speed_norm = normalize(stats['speed']['mean'], min_speed, max_speed)
    travel_time_norm = normalize(stats['travel_time']['mean'], min_travel_time, max_travel_time)

    score = (alpha1 * reward_norm) + (alpha2 * (1 - collision_norm)) + (alpha3 * speed_norm) + (alpha4 * (1 - travel_time_norm))
    return score


ppo_stats = ppo_callback.get_stats()
a2c_stats = a2c_callback.get_stats()


min_reward = min(ppo_stats['reward']['min'], a2c_stats['reward']['min'])
max_reward = max(ppo_stats['reward']['max'], a2c_stats['reward']['max'])
min_collision = min(ppo_stats['collision']['min'], a2c_stats['collision']['min'])
max_collision = max(ppo_stats['collision']['max'], a2c_stats['collision']['max'])
min_speed = min(ppo_stats['speed']['min'], a2c_stats['speed']['min'])
max_speed = max(ppo_stats['speed']['max'], a2c_stats['speed']['max'])
min_travel_time = min(ppo_stats['travel_time']['min'], a2c_stats['travel_time']['min'])
max_travel_time = max(ppo_stats['travel_time']['max'], a2c_stats['travel_time']['max'])

# Hyperparameters for the scoring function (sum to 1)
alpha1, alpha2, alpha3, alpha4 = 0.3, 0.2, 0.3, 0.2

# Calculate scores
ppo_score = calculate_normalized_score(ppo_stats, alpha1, alpha2, alpha3, alpha4)
a2c_score = calculate_normalized_score(a2c_stats, alpha1, alpha2, alpha3, alpha4)

# Print the model with the higher score
if ppo_score > a2c_score:
    print(f"PPO has a higher score: {ppo_score}")
else:
    print(f"A2C has a higher score: {a2c_score}")
