# Cell 1: imports and SUMO setup


In [1]:
import os
import sys
import numpy as np

# Make sure SUMO_HOME is set in your shell before launching Jupyter
if "SUMO_HOME" not in os.environ:
    raise EnvironmentError("Please set the SUMO_HOME environment variable before starting Jupyter")

SUMO_HOME = os.environ["SUMO_HOME"]
TOOLS_DIR = os.path.join(SUMO_HOME, "tools")
if TOOLS_DIR not in sys.path:
    sys.path.append(TOOLS_DIR)

import traci

# RL imports
import gym
from gym import spaces
from stable_baselines3 import DQN

print("SUMO_HOME:", SUMO_HOME)


Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.


SUMO_HOME: /usr/share/sumo


# Cell 2: simple TraCI smoke test

In [2]:
CONFIG_PATH = "intersection.sumocfg"  # notebook is in sumo_net/

traci.start(["sumo-gui", "-c", CONFIG_PATH, "--no-step-log", "true", "--start"])
print("Connected to SUMO from notebook")

for step in range(100):
    traci.simulationStep()

traci.close()
print("Smoke test finished, SUMO closed.")

 Retrying in 1 seconds
Connected to SUMO from notebook
Smoke test finished, SUMO closed.


# Cell 3: define SumoTrafficEnv environment

In [3]:
class SumoTrafficEnv(gym.Env):
    """
    RL environment for a single SUMO intersection.

    - Observation: [queue_lane_0, ..., queue_lane_N-1, current_phase]
    - Action: phase index (0 .. num_phases-1)
    - Reward: - (total halting vehicles on controlled lanes)
    """

    metadata = {"render.modes": ["human"]}

    def __init__(
        self,
        sumocfg_path: str = "intersection.sumocfg",
        gui: bool = False,
        max_steps: int = 3600,
        delta_time: int = 5,
    ):
        super().__init__()

        self.sumocfg_path = sumocfg_path
        self.gui = gui
        self.max_steps = max_steps       # seconds of simulation per episode
        self.delta_time = delta_time     # seconds per RL step

        self.step_count = 0
        self._sumo_started = False

        # discover TLS, controlled lanes, and number of phases (headless)
        self._discover_network()

        # ---- Gym spaces ----
        self.n_lanes = len(self.controlled_lanes)
        self.action_space = spaces.Discrete(self.num_phases)

        # obs: queue for each lane + current phase
        low = np.zeros(self.n_lanes + 1, dtype=np.float32)
        high = np.full(self.n_lanes + 1, 100.0, dtype=np.float32)  # max queue guess
        self.observation_space = spaces.Box(low=low, high=high, dtype=np.float32)

    # ---------- SUMO helpers ---------- #

    def _start_sumo(self, use_gui: bool | None = None):
        if use_gui is None:
            use_gui = self.gui
        sumo_binary = "sumo-gui" if use_gui else "sumo"
        traci.start(
            [sumo_binary, "-c", self.sumocfg_path, "--no-step-log", "true", "--start"]
        )
        self._sumo_started = True

    def _close_sumo(self):
        if self._sumo_started and traci.isLoaded():
            traci.close()
        self._sumo_started = False

    def _discover_network(self):
        """Start SUMO once (headless) to get TLS id, controlled lanes, and num phases."""
        # always headless for discovery
        self._start_sumo(use_gui=False)

        tls_ids = traci.trafficlight.getIDList()
        if not tls_ids:
            self._close_sumo()
            raise RuntimeError("No traffic lights found in the network.")

        self.tl_id = tls_ids[0]
        print("[ENV] TLS:", self.tl_id)

        # controlled lanes (deduplicated)
        raw_lanes = traci.trafficlight.getControlledLanes(self.tl_id)
        seen = set()
        self.controlled_lanes = []
        for l in raw_lanes:
            if l not in seen:
                seen.add(l)
                self.controlled_lanes.append(l)

        print("[ENV] Controlled lanes:", self.controlled_lanes)

        # number of phases
        prog_defs = traci.trafficlight.getCompleteRedYellowGreenDefinition(self.tl_id)
        phases = prog_defs[0].phases
        self.num_phases = len(phases)
        print("[ENV] Number of phases:", self.num_phases)

        self._close_sumo()

    # ---------- Gym API ---------- #

    def reset(self):
        """Start a fresh SUMO episode and return initial state."""
        self._close_sumo()
        self._start_sumo()   # uses self.gui
        self.step_count = 0
        state = self._get_state()
        return state

    def _get_state(self):
        # queues on each controlled lane
        queues = []
        for lane in self.controlled_lanes:
            q = traci.lane.getLastStepHaltingNumber(lane)
            queues.append(float(q))

        # current phase index
        phase = float(traci.trafficlight.getPhase(self.tl_id))
        state = np.array(queues + [phase], dtype=np.float32)
        return state

    def step(self, action):
        """Apply chosen phase, step SUMO, return (state, reward, done, info)."""
        self.step_count += 1

        # clip / wrap action to valid phase index
        phase_idx = int(action) % self.num_phases
        traci.trafficlight.setPhase(self.tl_id, phase_idx)

        # advance simulation for delta_time seconds
        for _ in range(self.delta_time):
            traci.simulationStep()

        # new state
        state = self._get_state()

        # reward = - total halting vehicles on controlled lanes
        total_halts = 0.0
        for lane in self.controlled_lanes:
            total_halts += traci.lane.getLastStepHaltingNumber(lane)
        reward = -total_halts

        # episode termination
        done = False
        if self.step_count * self.delta_time >= self.max_steps:
            done = True
        if traci.simulation.getMinExpectedNumber() == 0:
            done = True

        info = {"total_halts": total_halts}

        if done:
            self._close_sumo()

        return state, float(reward), done, info

    def render(self, mode="human"):
        # gui=True already shows the GUI
        pass

    def close(self):
        self._close_sumo()

# Cell 4: test environment with random actions (with GUI)

In [4]:
env = SumoTrafficEnv(
    sumocfg_path="intersection.sumocfg",
    gui=True,      # show GUI to visually verify
    max_steps=200,
    delta_time=2,
)

obs = env.reset()
print("Initial obs:", obs)

for t in range(50):
    action = env.action_space.sample()  # random phase
    obs, reward, done, info = env.step(action)
    print(f"t={t}, action={action}, reward={reward}, info={info}")
    if done:
        break

env.close()

 Retrying in 1 seconds
[ENV] TLS: J11
[ENV] Controlled lanes: ['-E7_0', '-E5_0', 'E6_0', 'E4_0']
[ENV] Number of phases: 4
 Retrying in 1 seconds


  prog_defs = traci.trafficlight.getCompleteRedYellowGreenDefinition(self.tl_id)


Initial obs: [0. 0. 0. 0. 0.]
t=0, action=3, reward=-0.0, info={'total_halts': 0.0}
t=1, action=1, reward=-0.0, info={'total_halts': 0.0}
t=2, action=2, reward=-0.0, info={'total_halts': 0.0}
t=3, action=3, reward=-0.0, info={'total_halts': 0.0}
t=4, action=3, reward=-1.0, info={'total_halts': 1.0}
t=5, action=3, reward=-4.0, info={'total_halts': 4.0}
t=6, action=1, reward=-5.0, info={'total_halts': 5.0}
t=7, action=1, reward=-8.0, info={'total_halts': 8.0}
t=8, action=3, reward=-8.0, info={'total_halts': 8.0}
t=9, action=3, reward=-12.0, info={'total_halts': 12.0}
t=10, action=0, reward=-9.0, info={'total_halts': 9.0}
t=11, action=3, reward=-7.0, info={'total_halts': 7.0}
t=12, action=1, reward=-8.0, info={'total_halts': 8.0}
t=13, action=2, reward=-8.0, info={'total_halts': 8.0}
t=14, action=3, reward=-6.0, info={'total_halts': 6.0}
t=15, action=1, reward=-9.0, info={'total_halts': 9.0}
t=16, action=3, reward=-10.0, info={'total_halts': 10.0}
t=17, action=1, reward=-13.0, info={'tota

# Cell 5: train DQN on the SUMO env (no GUI for speed)

In [None]:
train_env = SumoTrafficEnv(
    sumocfg_path="intersection.sumocfg",
    gui=False,      # training without GUI
    max_steps=3600, # 1 hour per episode
    delta_time=5,   # decision every 5s
)

model = DQN(
    "MlpPolicy",
    train_env,
    learning_rate=1e-3,
    buffer_size=50_000,
    learning_starts=1_000,
    batch_size=64,
    gamma=0.99,
    train_freq=4,
    target_update_interval=1_000,
    verbose=1,
)

model.learn(total_timesteps=200_000)

# optionally save
os.makedirs("models", exist_ok=True)
model_path = "models/dqn_sumo_traffic"
model.save(model_path)

train_env.close()
print("Training complete, model saved to", model_path)

# Cell 6: replay the trained model with GUI

In [5]:
# reload the model if needed
from stable_baselines3 import DQN

model_path = "models/dqn_sumo_traffic"
model = DQN.load(model_path)

eval_env = SumoTrafficEnv(
    sumocfg_path="intersection.sumocfg",
    gui=True,
    max_steps=600,
    delta_time=2,
)

obs = eval_env.reset()
done = False
step = 0

while not done:
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, info = eval_env.step(action)
    step += 1
    if step % 20 == 0:
        print(f"step={step}, reward={reward}, info={info}")

eval_env.close()
print("Replay finished.")

FileNotFoundError: [Errno 2] No such file or directory: 'models/dqn_sumo_traffic.zip'