In [1]:
from pathlib import Path
import os
import sys
import shutil

import numpy as np
import pandas as pd

import traci  # make sure traci is installed: pip install traci

# --- Paths ---

PROJECT_DIR = Path.cwd()

NETWORK_FILE  = PROJECT_DIR / "my_network.net.xml"
ROUTE_FILE    = PROJECT_DIR / "my_routes.rou.xml"
CONFIG_FILE   = PROJECT_DIR / "my_config.sumocfg"

print("Working directory:", PROJECT_DIR)
print("Network exists? ", NETWORK_FILE.exists())
print("Routes exist?  ", ROUTE_FILE.exists())
print("Config exists? ", CONFIG_FILE.exists())

# --- SUMO_HOME ---

SUMO_HOME = os.environ.get("SUMO_HOME")
if SUMO_HOME is None:
    raise EnvironmentError("SUMO_HOME is not set. Please set it before running Notebook 2.")

print("SUMO_HOME:", SUMO_HOME)


def get_sumo_binary(gui: bool = False) -> str:
    """
    Find the SUMO binary (sumo or sumo-gui).
    1) Try in PATH
    2) Try in SUMO_HOME/bin
    """
    base_name = "sumo-gui" if gui else "sumo"

    # 1) PATH
    cmd = shutil.which(base_name)
    if cmd is not None:
        return cmd

    # 2) SUMO_HOME/bin
    bin_dir = Path(SUMO_HOME) / "bin"
    if sys.platform.startswith("win"):
        candidate = bin_dir / f"{base_name}.exe"
    else:
        candidate = bin_dir / base_name

    if not candidate.exists():
        raise FileNotFoundError(
            f"{base_name} not found in PATH or at {candidate}. "
            "Check SUMO installation."
        )

    return str(candidate)


Working directory: C:\Users\manda\OneDrive\Documents\AI Traffic - Jupyter
Network exists?  True
Routes exist?   True
Config exists?  True
SUMO_HOME: C:\Program Files (x86)\Eclipse\Sumo\


In [2]:
def traci_smoke_test(use_gui: bool = False):
    if traci.isLoaded():
        traci.close()

    sumo_bin = get_sumo_binary(gui=use_gui)
    cmd = [sumo_bin, "-c", str(CONFIG_FILE), "--step-length", "1"]

    print("Starting SUMO for smoke test...")
    print("Command:", " ".join(cmd))

    traci.start(cmd)

    for i in range(10):
        traci.simulationStep()
    sim_time = traci.simulation.getTime()
    print("Simulation time after 10 steps:", sim_time)

    traci.close()
    print("TraCI smoke test OK.")


traci_smoke_test(use_gui=False)


Starting SUMO for smoke test...
Command: C:\Program Files (x86)\Eclipse\Sumo\bin\sumo.EXE -c C:\Users\manda\OneDrive\Documents\AI Traffic - Jupyter\my_config.sumocfg --step-length 1
Simulation time after 10 steps: 10.0
TraCI smoke test OK.


In [3]:
if traci.isLoaded():
    traci.close()

sumo_bin = get_sumo_binary(gui=False)
cmd = [sumo_bin, "-c", str(CONFIG_FILE), "--step-length", "1"]

print("Starting SUMO for TLS discovery...")
traci.start(cmd)

tls_ids = traci.trafficlight.getIDList()
print("\nTotal Traffic Lights Found:", len(tls_ids))
print("TLS IDs:")
for t in tls_ids:
    print(" -", t)

traci.close()


Starting SUMO for TLS discovery...

Total Traffic Lights Found: 7
TLS IDs:
 - 1234828897
 - 1757353212
 - 1757353214
 - 1783045940
 - 1783045985
 - 1843356909
 - 7671039164


In [4]:
if traci.isLoaded():
    traci.close()

sumo_bin = get_sumo_binary(gui=False)
cmd = [sumo_bin, "-c", str(CONFIG_FILE), "--step-length", "1"]

traci.start(cmd)

tls_lane_map = {}

for tls in tls_ids:
    lanes = traci.trafficlight.getControlledLanes(tls)
    # remove duplicates while preserving order
    lanes = list(dict.fromkeys(lanes))
    tls_lane_map[tls] = lanes

print("\nTLS → Controlled Lanes mapping:")
for tls, lanes in tls_lane_map.items():
    print(f"{tls}: {lanes}")

traci.close()



TLS → Controlled Lanes mapping:
1234828897: ['107445428#1_0', '-173540663#0_0', '-47195458#0_0']
1757353212: ['1173473098#2_0', '107445426#1_0', '173540663#2_0']
1757353214: ['800859756#1_0', '800859756#1_1']
1783045940: ['-821520600#3_0', '315129223#3_0', '315129223#3_1', '821520600#2_0']
1783045985: ['-821520600#2_0', '821520600#1_0', '324489280#3_0']
1843356909: ['223051913#1_0', '223051913#1_1']
7671039164: ['164073716#1_0', '164073716#1_1', '164073716#1_2']


In [5]:
def get_tls_state(tls_id: str, lane_map: dict) -> list:
    """
    Build the raw state vector for one TLS:
      [queue_len_lane1, ..., queue_len_laneK,
       wait_time_lane1, ..., wait_time_laneK,
       current_phase]
    """
    lane_ids = lane_map[tls_id]
    queue_lengths = []
    waiting_times = []

    for lane in lane_ids:
        q = traci.lane.getLastStepHaltingNumber(lane)  # vehicles with speed < 0.1 m/s
        w = traci.lane.getWaitingTime(lane)            # accumulated waiting time (s)
        queue_lengths.append(q)
        waiting_times.append(w)

    current_phase = traci.trafficlight.getPhase(tls_id)
    state = queue_lengths + waiting_times + [current_phase]
    return state


# Compute feature_size by sampling all TLS states once
if traci.isLoaded():
    traci.close()

sumo_bin = get_sumo_binary(gui=False)
cmd = [sumo_bin, "-c", str(CONFIG_FILE), "--step-length", "1"]
traci.start(cmd)

# advance a few steps so there is some traffic
for _ in range(5):
    traci.simulationStep()

lengths = []
for tls in tls_ids:
    s = get_tls_state(tls, tls_lane_map)
    lengths.append(len(s))

feature_size = max(lengths)

print("State lengths per TLS:", lengths)
print("Chosen feature_size (padded):", feature_size)

traci.close()


State lengths per TLS: [7, 7, 5, 9, 7, 5, 7]
Chosen feature_size (padded): 9


In [6]:
from collections import defaultdict

if traci.isLoaded():
    traci.close()

sumo_bin = get_sumo_binary(gui=False)
cmd = [sumo_bin, "-c", str(CONFIG_FILE), "--step-length", "1"]
traci.start(cmd)

# Start with empty neighbor sets (so isolated TLS still appear)
tls_adj = {tls: set() for tls in tls_ids}

for tls in tls_ids:
    controlled_links = traci.trafficlight.getControlledLinks(tls)
    # controlled_links is a list of lists of (inLane, outLane, via) tuples
    for link_group in controlled_links:
        for (incoming, outgoing, _) in link_group:
            for other_tls in tls_ids:
                if other_tls == tls:
                    continue
                if outgoing in tls_lane_map.get(other_tls, []):
                    tls_adj[tls].add(other_tls)
                    tls_adj[other_tls].add(tls)  # undirected

traci.close()

# Fallback chain if we found no edges at all
edge_count = sum(len(neigh) for neigh in tls_adj.values())
if edge_count == 0 and len(tls_ids) > 1:
    print("No adjacency found from network; using simple chain graph.")
    tls_list = list(tls_ids)
    for i in range(len(tls_list) - 1):
        a, b = tls_list[i], tls_list[i+1]
        tls_adj[a].add(b)
        tls_adj[b].add(a)

print("\nTLS adjacency list:")
for tls, neigh in tls_adj.items():
    print(f"{tls}: {sorted(list(neigh))}")

# Build adjacency matrix
num_nodes = len(tls_ids)
tls_index = {tls_id: i for i, tls_id in enumerate(tls_ids)}
adj_matrix = np.zeros((num_nodes, num_nodes), dtype=int)

for tls, neighbors in tls_adj.items():
    i = tls_index[tls]
    for nb in neighbors:
        if nb in tls_index:
            j = tls_index[nb]
            adj_matrix[i, j] = 1
            adj_matrix[j, i] = 1

print("\nAdjacency matrix shape:", adj_matrix.shape)
print("Example row 0:", adj_matrix[0])



TLS adjacency list:
1234828897: []
1757353212: []
1757353214: []
1783045940: ['1783045985']
1783045985: ['1783045940']
1843356909: []
7671039164: []

Adjacency matrix shape: (7, 7)
Example row 0: [0 0 0 0 0 0 0]


In [7]:
def compute_global_reward(tls_lane_map: dict) -> float:
    """
    Global reward = negative total waiting time across all controlled lanes,
    scaled down for numerical stability.
    """
    total_wait = 0.0
    for tls, lanes in tls_lane_map.items():
        for lane in lanes:
            total_wait += traci.lane.getWaitingTime(lane)
    return -total_wait / 1000.0  # scaling factor


if traci.isLoaded():
    traci.close()

sumo_bin = get_sumo_binary(gui=False)
cmd = [sumo_bin, "-c", str(CONFIG_FILE), "--step-length", "1"]
traci.start(cmd)

for _ in range(20):
    traci.simulationStep()

r = compute_global_reward(tls_lane_map)
print("Sample reward after 20 steps:", r)

traci.close()


Sample reward after 20 steps: -0.021
