# 0. Install Dependencies

In [2]:
!pip install tensorflow
!pip install keras
!pip install gym
!pip install gymnasium
!pip install keras-rl2
!pip install stable-baselines3
!pip install traci

Collecting tensorflow
  Downloading tensorflow-2.18.0-cp311-cp311-win_amd64.whl.metadata (3.3 kB)
Collecting tensorflow-intel==2.18.0 (from tensorflow)
  Downloading tensorflow_intel-2.18.0-cp311-cp311-win_amd64.whl.metadata (4.9 kB)
Collecting absl-py>=1.0.0 (from tensorflow-intel==2.18.0->tensorflow)
  Using cached absl_py-2.1.0-py3-none-any.whl.metadata (2.3 kB)
Collecting astunparse>=1.6.0 (from tensorflow-intel==2.18.0->tensorflow)
  Using cached astunparse-1.6.3-py2.py3-none-any.whl.metadata (4.4 kB)
Collecting flatbuffers>=24.3.25 (from tensorflow-intel==2.18.0->tensorflow)
  Using cached flatbuffers-24.3.25-py2.py3-none-any.whl.metadata (850 bytes)
Collecting gast!=0.5.0,!=0.5.1,!=0.5.2,>=0.2.1 (from tensorflow-intel==2.18.0->tensorflow)
  Using cached gast-0.6.0-py3-none-any.whl.metadata (1.3 kB)
Collecting google-pasta>=0.1.1 (from tensorflow-intel==2.18.0->tensorflow)
  Using cached google_pasta-0.2.0-py3-none-any.whl.metadata (814 bytes)
Collecting libclang>=13.0.0 (from te

# 1. Build Environment with OpenAI Gymnasium

In [None]:
# import libraries
import numpy as np
import random
import gymnasium
import traci
import sumolib
import time

In the following cell we define our SumoEnv which acts as a structure that Stable Baselines3 is able to easily understand and interact with!

Note that our SumoEnv Class implements the parent Class, gymnasium.Env

4 MANDITORY functions of any gymnasium.Env are init, step, render, and reset

Do **not** change the input, or return parameters in any of these, as Stable Baselines3 expects a strict format!

We can implement additional functions within this class, such as the arbitrary "perform_action()" function that I suggest below. This will help ensure modularity of our program.

In [None]:
class SumoEnv(gymnasium.Env):
  def __init__(self, use_gui=False):
    super().__init__() # Initializes the parent class

    # Check if TraCI is already loaded; if so, close it
    if traci.isLoaded():
      traci.close()

    # Define the Discrete action space with gymnasium.spaces.Discrete(n)
    # choices are up & down = green, or l & r = green
    self.action_space = gymnasium.spaces.Discrete(2)

    # Define the Box observation space with gymnasium.spaces.Box()
    # Note the structure of the Box parameters requires NumPy arrays!
    
    max_cars = 20 # CHANGE FOR ACTUAL MAX. NUMBER OF CARS
    self.max_cars = max_cars

    # np array structure: [traffic_light_phase][positions][speeds], dtype=np.float32
    self.observation_space = gymnasium.spaces.Box(
      low=np.array([0] + [-np.inf] * (2 * max_cars) + [0] * max_cars),
      high=np.array([3] + [np.inf] * (2 * max_cars) + [np.inf] * max_cars),
      dtype=np.float32
    )
    
    # Upon each render of the SumoEnv Class, we should start the simulation
    # Implement the sumo_binary, sumo_config, and traci.start from test_demo.py
    self.use_gui = use_gui
    if use_gui:
      sumo_binary = sumolib.checkBinary('sumo-gui')
    else:
      sumo_binary = sumolib.checkBinary('sumo')
    sumo_config = "../Networks/demo_net/demo.sumocfg"
    self.sumo_binary = sumo_binary
    self.sumo_config = sumo_config

    # Start the simulation
    self.started = False
    
    # Define consistent pause time for sumo-gui visualization
    self.pause_time = 0.1



  def step(self, action):
    # On first step, start the traci sim
    if not self.started:
      traci.start([self.sumo_binary, "--start", "-c", self.sumo_config, "--no-step-log"])
      self.started = True
      traffic_light_id = traci.trafficlight.getIDList()[0]
      traci.trafficlight.setPhase(traffic_light_id, 0)
      traci.trafficlight.setPhaseDuration(traffic_light_id, 99999)  # Hold this phase indefinitely
    
    # Perform the action
    self.perform_action(action)

    # Advance the simulation by one step
    traci.simulationStep()
    if self.use_gui: # pause in between steps to slow down if in 'simulation mode'
      time.sleep(self.pause_time) 
    print("Step: " + str(traci.simulation.getTime()))
    # Get the new state
    observation = self.get_state()

    # Calculate the reward
    reward = self.calculate_reward()

    # Determine if simulation is done
    done = self.is_done()

    # Set placeholder for info
    info = {}
    # Set placeholder for truncated
    truncated = False

    # set 'observation' to a numpy array
    observation = np.array(observation, dtype=np.float32)

    # Return step information (MUST follow this order of variables!!!)
    return observation, reward, done, truncated, info

  def render(self):
    # render needs to exist in the Gymnasium env, as it is an essential aspect
    # however we might not need to put anything inside it, hence 'pass'
    # this depends on if the command -> traci.simulationStep() exists somewhere else in the Class
    pass

  def reset(self, seed=None, options=None):

    # resets the gymnasium.Env parent class
    super().reset(seed=seed)

    # close the simulation (reset)
    if not self.use_gui: # traci.load() doesn't work for sumo-gui - i.e. can only run once
      traci.load(["-c", self.sumo_config])

    # convert 'observation' to a NumPy array
    observation = np.array(self.get_state(), dtype=np.float32)

    # return 'observation' and 'info' --> MUST be in this form
    return observation, {}

  def get_state(self):
    # Get the traffic light phase
    traffic_light_ids = traci.trafficlight.getIDList()
    traffic_light_phase = traci.trafficlight.getPhase(traffic_light_ids[0]) # only 1 in this network

    # Get vehicle IDs and limit to max_cars
    vehicle_ids = traci.vehicle.getIDList()

    # Collect positions and speeds, padding if fewer than max_cars
    positions = []
    speeds = []
    for v_id in vehicle_ids:
      position = traci.vehicle.getPosition(v_id)  # Returns (x, y) tuple
      speed = traci.vehicle.getSpeed(v_id)
      positions.extend(position)  # Add x, y to positions list
      speeds.append(speed)

    # Pad positions and speeds if there are fewer than max_cars vehicles
    if len(vehicle_ids) < self.max_cars:
      missing_cars = self.max_cars - len(vehicle_ids)
      positions.extend([0.0, 0.0] * missing_cars)
      speeds.extend([0.0] * missing_cars)

    # Create the state as a numpy array
    obs = np.array([traffic_light_phase] + positions + speeds, dtype=np.float32)
    return obs

  def perform_action(self, action):

    light_id = traci.trafficlight.getIDList()[0]
    current_phase = traci.trafficlight.getPhase(light_id)

    """
    Phases: 
      (0) E & W = green, N & S = red
      (1) E & W = yellow, N & S = red
      (2) E & W = red, N & S = green
      (3) E & W = red, N & S = yellow

      Define action 1 as switching green to E & W; thus turning N & S red
      Define action 2 as switching green to N & S; thus turning E & W red

      Also, no actions can be performed during yellow light!
    """
    if action == 0 and current_phase != 0:
      traci.trafficlight.setPhase(light_id, 1)  # transition to yellow
      traci.trafficlight.setPhaseDuration(light_id, 3)  # set yellow duration
      for _ in range(3):  # simulate 3 seconds of yellow
        traci.simulationStep()
        if self.use_gui:
          time.sleep(self.pause_time)
      traci.trafficlight.setPhase(light_id, 0)  # set E-W green
      traci.trafficlight.setPhaseDuration(light_id, 99999)  # Hold this phase indefinitely

    elif action == 1 and current_phase != 2:
      traci.trafficlight.setPhase(light_id, 3)  # transition to yellow
      traci.trafficlight.setPhaseDuration(light_id, 3)
      for _ in range(3):  # simulate 3 seconds of yellow
        traci.simulationStep()
        if self.use_gui:
          time.sleep(self.pause_time)
      traci.trafficlight.setPhase(light_id, 2)  # set N-S green
      traci.trafficlight.setPhaseDuration(light_id, 99999)  # Hold this phase indefinitely

  def calculate_reward(self):
    # REWARD FUNCTION: Calculate the reward (should be negative if in a poor state i.e. high congestion)
    lane_ids = traci.lane.getIDList()
    vehicle_ids = traci.vehicle.getIDList()
    try:
      congestion = self.calculate_congestion(vehicle_ids)
      wait_time = self.calculate_avg_wait_time(lane_ids)
      stops = self.calculate_total_stops(lane_ids)
      avg_speed = self.calculate_avg_speed(vehicle_ids)# -> would be maximize so don't multiply by -1
      reward = -2.5*congestion + -2*wait_time + -1*stops + 0.7*avg_speed # minimize all terms
    except:
      reward = 0

    return reward

  def is_done(self):
    max_time = 1000  # Example maximum simulation time
    return traci.simulation.getTime() >= max_time or len(traci.vehicle.getIDList()) == 0


  # METRICS:
  def calculate_congestion(self, vehicle_ids):
    congestion = 0
    current_time = traci.simulation.getTime()  # Get the current simulation time
    
    for vehicle_id in vehicle_ids:
      departure_time = traci.vehicle.getDeparture(vehicle_id)  # Get each vehicle's departure time
      speed = traci.vehicle.getSpeed(vehicle_id)  # Get the vehicle's current speed
      
      # Check if the vehicle is stopped and not just starting/departing
      if speed == 0 and current_time not in range(int(departure_time) - 1, int(departure_time) + 2):
          congestion += 1  # Increment congestion counter for stopped vehicles
  
    return congestion


  def calculate_avg_wait_time(self, lane_ids):
    wait_times = []
    
    # total wait time of cars in all lanes
    for lane_id in lane_ids:
      # total wait time of all cars in one lane
      for vehicle_id in traci.lane.getLastStepVehicleIDs(lane_id):
          wait_time = traci.vehicle.getWaitingTime(vehicle_id)
          wait_times.append(wait_time)
  
    avg_wait_time = sum(wait_times)/len(wait_times) if wait_times else 0
    
    return avg_wait_time

  def calculate_total_stops(self, lane_ids):
    total_stops = 0
    for lane_id in lane_ids:
      stops_in_lane = traci.lane.getLastStepHaltingNumber(lane_id)
      total_stops += stops_in_lane

    return total_stops
    
  def calculate_avg_speed(self, vehicle_ids):
    total_speed = sum(traci.vehicle.getSpeed(v_id) for v_id in vehicle_ids)
    avg_speed = total_speed / len(vehicle_ids) if vehicle_ids else 0

    return avg_speed


NameError: name 'gymnasium' is not defined

## Use the SumoEnv

In [None]:
env = SumoEnv()

In [None]:
# Print samples of the action and observation spaces:
# env.<action_space_variable>.sample()
# env.<observation_space_variable>.sample()

In [None]:
# run sample episodes with random actions to get baseline score




# 2. Create Agent with Stable Baselines3

In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.vec_env import DummyVecEnv

In [None]:
env = SumoEnv()
check_env() # checks that the custom environment is compatible with Stable Baselines3
env = DummyVecEnv([lambda: env])

In [None]:
# Select the RL algorithm and train the model


In [None]:
# run test episodes with model.predict()


# 3. Saving and Reloading Agent

In [None]:
# saves the model
model.save("filename")

In [None]:
# Create a new instance of the SumoEnv
env = DummyVecEnv([lambda: SumoEnv()])

In [None]:
# load the trained model, making sure to pass in the SumoEnv
model = PPO.load("filename", env=env)

In [None]:
# run test episodes with model.predict()