The following script is the environment created to simulate CACC behaviour, where there are $n$ number of follower cars in a line behind a leader car, and the goal is to train each car to follow a specific time headway between each other, avoiding collision.



To install the packages needed for this script, please refer to the README.md in the directory to instructions to create your virtual environment with *conda*. As well, to run this script itself, please select your virtual environment as the kernal. 

In [None]:
import numpy as np
import json
import random
import pygame
import functools

from pygame import gfxdraw
from gymnasium import spaces
from pettingzoo.utils.env import ParallelEnv
from pettingzoo.utils.conversions import parallel_wrapper_fn

Each of the *follower* cars are defined by an object of class *Car*. The *Car* object has the following properties: 
- Initial Position
- Initial Velocity
- Initial Acceleration 
- Position 
- Velocity 
- Acceleration 

Each of these properties help define the dynamics of the car, and are the same to the leader car dynamics. The leader car dynamics are defined in the *setup()* and *step()* function of the main environment class. 

Functions *__init__()* and *reset()* are used to create the object, define initial conditions, and to change the parameters back to the initial conditions. 


In [None]:
class Car:
    
    # Initializes position and velocity of car 
    def __init__(self, initial_position, initial_velocity): 
        self.initial_position = initial_position
        self.initial_velocity = initial_velocity
        self.previous_acceleration = 0
        self.reset()
    
    # Reset car state 
    def reset(self):
        self.position = self.initial_position
        self.velocity = self.initial_velocity
        self.acceleration = 0.0
        self.previous_acceleration = 0.0

Function *step()* applies the car dynamics per time step. Based on the environment time step *tau*, the car moves forward based on basic kinematics equations. The acceleration of the vehicles are determined by the action of the RL algorithm. This acceleration value is what you are trying to "train". The velocity is clipped to check for the bounds, and normalized to eliminate backwards movement.


$$ v_1 = v_0 + at $$  
$$ d = vt + \frac{1}{2}at^2 $$

In [None]:
# Step in environment 
    def step(self, action, tau):
        action = np.array(action).flatten()
        self.previous_acceleration = self.acceleration

        self.velocity = np.clip(self.velocity,0,33)
        self.acceleration = action[0] * 3
        self.velocity += self.acceleration*tau
        self.normalizedVelocity = max(0,self.velocity)
        self.position += self.normalizedVelocity * tau + 0.5 * self.acceleration * tau ** 2
    # Return state 
    def get_state(self):
        return np.array([self.position, self.velocity], dtype=np.float32)

Most multi-agent reinforcement learning problems use PettingZoo to establish their environments. PettingZoo is the interface that allows your RL agents to learn, improve, and eventually change based on your dynamics and rewards. The general structure of the environment follows the following tutorial: [Custom Environment Tutorial](https://pettingzoo.farama.org/tutorials/custom_environment/)


As well, there are several different types of multi-agent problems, and can be crudely described as either cooperative or competitive. This determines whether or not your agents will work together, or fight each other to achieve the best score. Our environment is cooperative, and the [Multiwalker](https://pettingzoo.farama.org/environments/sisl/multiwalker/) example was also used to model our environment. 

To start, all the required parameters need to be initialized in the environment. This includes the number of followers, the timestep *τ* and basic common real-life parameters such as the length of the car. The velocity profile of the leader car is based on a dataset from the U.S Department of Transportation, where the velocity of cars were tracked across the [I-80 highway](https://datahub.transportation.gov/stories/s/Next-Generation-Simulation-NGSIM-Open-Data/i5zb-xe34/) in California. Each iteration of the environment selects a car from this dataset, and the leader car takes on their velocity profile. 


As well, here you will see are your important spaces being defined: observation, action and agents. Note the upper and lower bounds of the actions and observations: they are normalized, ranging from 0 - 1.


In [None]:
# Start of environment 
class ParallelCarEnv(ParallelEnv):

    metadata = {"render_modes": ["human", "rgb_array", "debug"], "render_fps": 60}

    # Initializes the environment 
    def __init__(self, n_followers=2, render_mode=None): 
        super().__init__()

        # Initial parameters 
        self.n_followers = n_followers
        self.tau = 0.1
        self.position_threshold = 1000 # 1 km 
        self.car_length = 5 # Length of the car in meters

        self.render_mode = render_mode
        self.screen = None
        self.clock = None
        
        # Load velocity profiles
        import os
        data_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "velocityProfiles.json")
        with open(data_path, "r") as f:
            self.velocity_profiles = json.load(f)
        self.unique_vehicle_ids = list(self.velocity_profiles.keys())

        # Observation and Action Spaces
        obs_low = np.array([0,0,0,0,0], dtype=np.float32) # [follower velocity, leader velocity, distance headway]
        obs_high = np.array([1,1,1,1,1], dtype=np.float32) # [follower velocity, leader velocity, distance headway]

        self.observation_spaces = {
            f"follower_{i}": spaces.Box(low=obs_low, high=obs_high, dtype=np.float32)
            for i in range(self.n_followers)
        }
        self.action_spaces = {
            f"follower_{i}": spaces.Box(
                low=np.array([-1.0], dtype=np.float32),
                high=np.array([1.0], dtype=np.float32),
                dtype=np.float32
            )
            for i in range(self.n_followers)
        }

        # Generate agents (follower cars) 
        self._agents = [f"follower_{i}" for i in range(self.n_followers)]
        self.possible_agents = self._agents.copy()

    @property
    def agents(self):
        """Return the list of active agents."""
        return self._agents.copy()
    
    @agents.setter
    def agents(self, value):
        """Set the list of active agents."""
        self._agents = value

The *reset()* function serves to restart the environment after an episode is done. The conditions for an episode reset are in the *step()* function. All environment and leader car parameters are changed to their initial values, and a new velocity profile is chosen for the leader.

In [None]:
# Initialize / Reset the environment
    def reset(self, seed=None, options=None):

        if seed is not None:
            np.random.seed(seed)
            random.seed(seed)
        # Reset agents list to all possible agents
        self._agents = self.possible_agents.copy()
        
        # Initial parameters 
        self.time = 0
        self.leader_velocity_counter = 0
        self.vehicle_id = random.choice(self.unique_vehicle_ids)
        self.leader_velocity = self.velocity_profiles[self.vehicle_id]["velocity"][0] * 0.3048
        self.leader_position = random.uniform(250,300)

        # Reset followers 
        if hasattr(self, "followers") and len(self.followers) == self.n_followers:
            # If followers already exist, reset their positions and velocities
            for i, follower in enumerate(self.followers):
                follower.initial_position = self.leader_position - 50 * (i + 1) - random.uniform(-5, 5)
                follower.initial_velocity = np.clip(self.leader_velocity + random.uniform(-3, 3), 0, 33)
                follower.reset()
        else:
            # Create new followers if they don't exist 
            self.followers = [
                Car(
                    initial_position=self.leader_position - 50 * (i + 1) - random.uniform(-5, 5),
                    initial_velocity=np.clip(self.leader_velocity + random.uniform(-3, 3), 0, 33)
                ) for i in range(self.n_followers)
            ]       

        # Reset all reward parameters 
        self.rewards = {agent: 0.0 for agent in self.agents}
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}

        observations = self._get_observations()
        return observations, self.infos

The *step()* function is where most of the actions happen in the environment. This is the part of the environment that takes actions from the RL agent to progress in the environment. From basic RL concepts, we know that the agent applies an action to take an environment from its current state to a new updated state. 

The environment is first moved forward by increasing environment time by the defined timestep. The first main action we take is updating the leader vehicle. This is done by selecting the next value in the velocity dataset. This works because the dataset also has a 10ms rate, the same value as our *τ* value. After the leader moves forward, the follower cars also move. These dynamics can be seen the the respective *step()* function in the *Car* class. 

In [None]:
def step(self, actions):

        self.time += self.tau # Move environment forward

        # Update leader
        if self.leader_velocity_counter < len(self.velocity_profiles[self.vehicle_id]["velocity"]):
            self.leader_velocity = self.velocity_profiles[self.vehicle_id]["velocity"][self.leader_velocity_counter] * 0.3048 
            self.leader_velocity_counter += 1
        # If we've reached the end, keep the last velocity
        self.leader_position += self.leader_velocity * self.tau

        # Update followers
        for i, agent in enumerate(self.agents):
            self.followers[i].step(actions[agent], self.tau)

Next, the rewards are defined in the environment. The rewards are a crucial component that provides the agent feedback as to its performance. You use negative reward to indicate bad behaviour, and the contrary for good behaviour. The main characteristics of the rewards lie in the distance headway and the time headway between each car. These properties help us determine if the car is at a safe distance away from the car, which will influence the reward. The distance headway is determined as the distance from the back of the leader car to the front of the follower car. The time headway is found by using this distance headway, and the velocity of the follower vehicle. It is important to note the structure of the *self.followers* array: the first element of this array is the follower car right behind the leader car.

In [None]:
# Compute rewards, terminations, truncations
        #[ follower[2] --> follower[1] --> follower[0] --> leader ]
        for i, agent in enumerate(self.agents):
            if i == 0: # if it is the follower car right behind the leader 
                front_position = self.leader_position
                velocity = self.leader_velocity
            else: # if it is a follower car right behind another follower car 
                front_position = self.followers[i-1].position
                velocity = self.followers[i-1].velocity

            distanceHeadway = front_position - self.followers[i].position - self.car_length
            timeHeadway = distanceHeadway / (self.followers[i].velocity + 1e-6) 
            relativeVelocity = self.followers[i].velocity - velocity

Another important characteristic is the time to collision (TTC) threshold between the follower car and the car in front of it. This determines the time needed for the cars to collide. Given that the follower car is faster than the car in front, the TTC can be determined as:  
$$ t_c(t) = \frac{distance headway}{\Delta v} $$

The specific value of the reward, based on the TTC, is determined by: 

$$ r_c(t) = log\left(\frac{TTC}{TTC Threshold}\right) $$


In [None]:
# Time To Collision (TTC) Penalty
            ttc_threshold = 4  # seconds
            if relativeVelocity > 0:  # If follower is faster than the car in front
                timeToCollision = distanceHeadway / relativeVelocity
                if 0 < timeToCollision <= ttc_threshold: 
                    ttcPenalty = np.log10(timeToCollision/ttc_threshold); 
                else: 
                    ttcPenalty = 0
            else:  # If follower is slower than the car in front 
                ttcPenalty = 0 

With the following characteristics, values can be assigned based on the values of these characteristics. The only positive reward is the *lognorm* reward, which rewards the follower cars that keep a certain time headway between itself and the vehicle in front of it. This uses a log-normal probability distributions, which helps train the agent for a specific time headway. The target time headway here is 1.5 seconds. The probability distribution and [plot](https://www.desmos.com/calculator/xnucpurjxa) are shown here: 

$$ \frac{1}{x\sigma \sqrt{2\pi}} exp\left(-\frac{(ln(x) - \mu)^2}{2\sigma^2}\right) $$

The rest of the components of the reward function are penalties, and weightings are assigned based on the severity of thse penalties: 
- **ttc**: time to collision
- **collision**: if two cars collide, high penalty
- **jerk**: if a car changes their acceleration by a large margin, apply a penalty. We want smooth movement
- **reverse**: we do not want any car moving backwards on a highway
- **gap**: encourage the follower car to not be too far away from the leader car by applying a penalty if they are too far


In [None]:
# Reward based on log-normal distribution (encouraging time headway ~ 1.5s)
            normalized_timeHeadway = min(abs(timeHeadway), 15) 
            mew = 0.4226
            sigma = 0.4365
            x = max(1e-6, abs(normalized_timeHeadway))

            # More balanced reward structure
            lognorm = (10) * (1/(x*sigma*np.sqrt(2*np.pi)))*np.exp(-((np.log(x)-mew)**2)/(2*(sigma**2)))
            ttc = (10) * ttcPenalty
            collision = (-100) * (distanceHeadway <= 0)  # Reduced from -100
            jerk = (-2) * abs(self.followers[i].previous_acceleration-self.followers[i].acceleration) / 6 
            reverse = (-100) * (self.followers[i].velocity < 0)  # Reduced from -100   
            gap = (-5) * (abs(timeHeadway) > 15 and distanceHeadway>100)
            

            reward = lognorm + ttc + collision + jerk + reverse + gap

            self.rewards[agent] = reward
            # Store reward breakdown in infos
            if self.render_mode == "debug":
                self.infos[agent] = {
                    "lognorm": lognorm,
                    "ttc": ttc,
                    "collision": collision,
                    "jerk": jerk,
                    "reverse": reverse,
                    "gap": gap
                }

Now the environment determines if the episode should end. There are two types of ending conditions: truncation and termination. Truncation applies to events that are outside of environment constraints, such as a leader finishing its velocity profile. A termination is caused by catastrophic behaviour, such as collisions. 

In [None]:
self.rewards[agent] = reward
            # Store reward breakdown in infos
            if self.render_mode == "debug":
                self.infos[agent] = {
                    "lognorm": lognorm,
                    "ttc": ttc,
                    "collision": collision,
                    "jerk": jerk,
                    "reverse": reverse,
                    "gap": gap
                }

            # Termination 
            if distanceHeadway <= 0: 
                self.terminations[agent] = True
            elif abs(timeHeadway) > 15 and distanceHeadway > 100: 
                self.terminations[agent] = True
            else:   
                self.terminations[agent] = False
                
        shared_reward = np.mean(list(self.rewards.values()))
        for agent in self.rewards:
           self.rewards[agent] = shared_reward

        # Global Truncation
        for agent in self.agents:
            if self.leader_velocity_counter >= len(self.velocity_profiles[self.vehicle_id]["velocity"]) or self.leader_position >= self.position_threshold:
                self.truncations[agent] = True
            else: 
                self.truncations[agent] = False 
        
        # Global Termination
        if any(self.terminations.values()) or any(self.truncations.values()):
            for agent in self.agents:
                self.terminations[agent] = True
                self.truncations[agent] = self.truncations.get(agent, False)
            self.agents = []

At the end of the *step()* function, you want to render the environment if needed, and return the proper data, based on the PettingZoo API. Note the observations that are returned: a normalization function is used for it.


In [None]:
# Render environment 
        if self.render_mode == "human":
            self.render()
        
        return self._get_observations(), self.rewards, self.terminations, self.truncations, self.infos

The *_get_observations()* function serves to normalize the observation values. As the observation values grow over time in the environment, it is good practice to keep them within a range for easier training. The values used to normalize the values are the max values for the distance and velocity. 

In [None]:
# Normalized observations for each agent
    def _get_observations(self):
        observations = {}
        for i, agent in enumerate(self.possible_agents):
            if agent in self.agents:
                front_position = self.leader_position if i == 0 else self.followers[i - 1].position
                front_velocity = self.leader_velocity if i == 0 else self.followers[i - 1].velocity
                state = self.followers[i].get_state()
                distanceHeadway = front_position - self.followers[i].position - self.car_length
                observations[agent] = np.array([
                    front_position/self.position_threshold, # leader position
                    front_velocity/33, # Max velocity is 33 m/s
                    state[0]/self.position_threshold, # follower position
                    state[1]/33, # Max velocity is 33 m/s
                    distanceHeadway/self.position_threshold
                ], dtype=np.float32) # normalized state
            else:
                # Return zeros for terminated/truncated agents
                observations[agent] = np.zeros(self.observation_spaces[agent].shape, dtype=np.float32)
        return observations

For better training, these functions allocate cache for observation and action spacing. 

In [None]:
@functools.lru_cache(maxsize=None)
    def observation_space(self, agent):
        return self.observation_spaces[agent]
    
    @functools.lru_cache(maxsize=None)
    def action_space(self, agent):
        return self.action_spaces[agent]

Lastly, the *render()* function is important for visualizing performance after training: during training, be sure to set the mode to None. After initializing the required *pygame* modules, the background and general bounds are defined. 



In [None]:
def render(self):
        if self.render_mode is None:
            # Default to rgb_array if not specified
            self.render_mode = "rgb_array"

        try:
            import pygame
            from pygame import gfxdraw
        except ImportError as e:
            raise ImportError('pygame is not installed, run `pip install pygame`') from e

        # Initialize pygame
        if self.screen is None:
            pygame.init()
            if self.render_mode == "human":
                pygame.display.init()
                self.screen = pygame.display.set_mode((800, 600))
                pygame.display.set_caption("multiCarEnv")
            elif self.render_mode == "rgb_array" or self.render_mode == "debug":
                self.screen = pygame.Surface((800, 600))
        if self.clock is None:
            self.clock = pygame.time.Clock()

        font = pygame.font.Font(None, 24)
        world_width = self.position_threshold + 50
        car_height = 25
        car_width = 50
        road_y = 400 

        # Draw background
        surf = pygame.Surface((800, 600))
        surf.fill((255, 255, 255))
        pygame.draw.line(surf, (0, 0, 0), (0, road_y), (800, road_y), 2)

        meters_to_pixels = 800 / self.position_threshold
        car_width = int(self.car_length * meters_to_pixels)

Now the cars are generated. They are simple rectangles with a length of 5 (like in real life), and the coordinates of each car are the four corners of your car, where the "position" of the car is the front of the car. 

In [None]:
# Draw leader car (red)
        leader_x = int(self.leader_position * meters_to_pixels)
        leader_coords = [
            (leader_x - car_width, road_y - car_height), (leader_x - car_width, road_y - 2 * car_height),
            (leader_x, road_y - 2 * car_height), (leader_x, road_y - car_height)
        ]
        gfxdraw.aapolygon(surf, leader_coords, (0, 0, 0))
        gfxdraw.filled_polygon(surf, leader_coords, (255, 0, 0))

        # Draw followers (green)
        for i, follower in enumerate(self.followers):
            follower_x = int(follower.position * meters_to_pixels)
            follower_coords = [
                (follower_x - car_width, road_y - car_height), (follower_x - car_width, road_y - 2 * car_height),
                (follower_x, road_y - 2 * car_height), (follower_x, road_y - car_height)
            ]
            gfxdraw.aapolygon(surf, follower_coords, (0, 0, 0))
            gfxdraw.filled_polygon(surf, follower_coords, (0, 255, 0))

Lastly, different stats are shown on the simulation screen for easier testing. 


In [None]:
 # Draw follower stats
            display_follower_velocity = f"Follower {i} Velocity: {int(follower.velocity)} m/s"
            text_follower_velocity = font.render(display_follower_velocity, True, (0, 0, 0))
            surf.blit(text_follower_velocity, (500, 95 + 35 * i))

        # Draw leader stats
        display_leader_velocity = f"Leader Velocity: {int(self.leader_velocity)} m/s"
        text_leader_velocity = font.render(display_leader_velocity, True, (0, 0, 0))
        surf.blit(text_leader_velocity, (500, 25))

        display_leader_position = f"Leader Position: {int(self.leader_position)} m"
        text_leader_position = font.render(display_leader_position, True, (0, 0, 0))
        surf.blit(text_leader_position, (500, 60))

        # Flip and blit
        self.screen.blit(surf, (0, 0))

        if self.render_mode == "human":
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    raise KeyboardInterrupt  # This will stop training
            self.clock.tick(self.metadata["render_fps"])
            pygame.display.flip()
            return None  # Explicitly return None for human mode
        elif self.render_mode == "rgb_array" or self.render_mode == "debug":
            arr = np.transpose(
                np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
            )
            return arr.astype(np.uint8)

You will notice one last function in this environment. Traditionally for cooperative multi-agent environments, the PettingZoo Parallel API is used. For some PettingZoo utilities, conversion to their AEC API is needed, and a wrapper to do just this is needed. 



In [None]:
def parallel_env(*args, **kwargs):
    return ParallelCarEnv(*args, **kwargs)

# Agent API wrapper for PettingZoo compatibility
from pettingzoo.utils.conversions import parallel_to_aec
env = parallel_to_aec(ParallelCarEnv())