Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
367 changes: 367 additions & 0 deletions tests/test_cpp_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -7441,3 +7441,370 @@ def test_route_choice_no_cyclic_routing_cpp():
assert trip_completed_cyclic == 60840
assert trip_completed_nocyclic == 60840
assert eq_tol(ttt_cyclic, ttt_nocyclic)


###########################################
# Coverage improvement tests (2026-04-02)
###########################################

# --- Vehicle abort, log access, route manipulation ---

def test_coverage_vehicle_abort_and_all_logs():
W = World(cpp=True, deltan=5, tmax=300, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
W.addNode("orig", 0, 0)
W.addNode("deadend", 500, 0)
W.addNode("orig2", 0, 500)
W.addNode("dest", 500, 500)

W.addLink("orig_deadend", "orig", "deadend", length=500, free_flow_speed=20)
W.addLink("orig2_dest", "orig2", "dest", length=500, free_flow_speed=20)

for t in range(0, 50, 10):
W.addVehicle("orig", "dest", departure_time=t, trip_abort=1, auto_rename=True)
for t in range(0, 50, 10):
W.addVehicle("orig2", "dest", departure_time=t, auto_rename=True)

W.exec_simulation()

aborted = [v for v in W.VEHICLES.values() if v.orig == W.get_node("orig")]
ended = [v for v in W.VEHICLES.values() if v.orig == W.get_node("orig2")]

for veh in aborted:
assert veh.state == "abort"
assert veh.arrival_time == -1
assert veh.travel_time == -1
assert veh.flag_trip_aborted == True

for veh in ended:
assert veh.state == "end"

for veh in list(W.VEHICLES.values()):
repr(veh)
assert len(veh.log_t) > 0
assert len(veh.log_x) > 0
assert len(veh.log_v) > 0
assert len(veh.log_s) > 0
assert len(veh.log_lane) > 0
assert len(veh.log_state) > 0
assert len(veh.log_link) > 0
assert len(veh.log_t_link) > 0
veh.traveled_route()

veh0 = list(W.VEHICLES.values())[0]
with pytest.raises(AttributeError):
_ = veh0.nonexistent_attr


def test_coverage_vehicle_route_manipulation():
W = World(cpp=True, deltan=5, tmax=500, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
W.addNode("orig", 0, 0)
W.addNode("mid", 500, 0)
W.addNode("dest1", 1000, 0)
W.addNode("dest2", 1000, 500)
W.addNode("dest3", 1000, -500)

link_om = W.addLink("orig_mid", "orig", "mid", length=500, free_flow_speed=20)
link_md1 = W.addLink("mid_dest1", "mid", "dest1", length=500, free_flow_speed=20)
link_md2 = W.addLink("mid_dest2", "mid", "dest2", length=500, free_flow_speed=20)
link_md3 = W.addLink("mid_dest3", "mid", "dest3", length=500, free_flow_speed=20)

W.addVehicle("orig", "dest1", departure_time=0, auto_rename=True)
W.addVehicle("orig", "dest2", departure_time=50, auto_rename=True)
W.addVehicle("orig", "dest1", departure_time=100, auto_rename=True)

vehs = sorted(W.VEHICLES.values(), key=lambda v: v.departure_time)
veh1, veh2, veh3 = vehs[0], vehs[1], vehs[2]

veh1.enforce_route([link_om, link_md1])
veh2.set_links_prefer([link_md2])
veh3.set_links_avoid([link_md3])

W.exec_simulation()

route1 = veh1.traveled_route()[0]
assert route1[-1] == link_md1


# --- Link mid-simulation queries, setters, edie_dx ---

def test_coverage_link_mid_simulation_queries():
W = World(cpp=True, deltan=5, tmax=300, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
W.addNode("A", 0, 0)
W.addNode("B", 1000, 0)
link = W.addLink("AB", "A", "B", length=1000, free_flow_speed=20)
W.adddemand("A", "B", 0, 200, flow=0.5)

W.exec_simulation(until_t=100)

assert link.speed >= 0
assert link.density >= 0
assert link.flow >= 0
assert link.arrival_count(0) >= 0
assert link.departure_count(0) >= 0

for t in [50, -1, 99999]:
itt = link.instant_travel_time(t)
att = link.actual_travel_time(t)
assert itt >= 0 or itt == -1
assert att >= 0 or att == -1

W.exec_simulation()


def test_coverage_link_setters_and_eular_dx():
W = World(cpp=True, deltan=5, tmax=300, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
W.addNode("n1", 0, 0)
W.addNode("n2", 1, 0)
link = W.addLink("l1", "n1", "n2", length=1000, free_flow_speed=20, eular_dx=200)
W.adddemand("n1", "n2", 0, 200, 0.5)

W.exec_simulation(until_t=100)

link.capacity_in_remain = 100.0
assert link.capacity_in_remain == pytest.approx(100.0)
link.capacity_out_remain = 100.0
assert link.capacity_out_remain == pytest.approx(100.0)

assert isinstance(link.cum_arrival, np.ndarray)
assert isinstance(link.cum_departure, np.ndarray)
assert isinstance(link.traveltime_instant, np.ndarray)
assert isinstance(link.traveltime_actual, np.ndarray)
assert link.edie_dx == 200

W.exec_simulation()


# --- World misc methods, NotImplementedError stubs ---

def test_coverage_misc_world_methods():
W = World(cpp=True, deltan=5, tmax=200, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
n1 = W.addNode("n1", 0, 0)
n2 = W.addNode("n2", 1, 0)
link = W.addLink("l1", "n1", "n2", length=500, free_flow_speed=20)
W.adddemand("n1", "n2", 0, 100, 0.5)
W.exec_simulation()

assert W.get_node(n1).name == "n1"
assert W.get_node("n2").name == "n2"
assert W.get_node(None) is None
with pytest.raises(Exception):
W.get_node(99999)

assert W.get_link(link).name == "l1"
assert W.get_link("l1").name == "l1"
assert W.get_link(None) is None
with pytest.raises(Exception):
W.get_link(99999)

with pytest.raises(NotImplementedError):
W.save_scenario("dummy")
with pytest.raises(NotImplementedError):
W.load_scenario("dummy")
with pytest.raises(NotImplementedError):
W.copy()
with pytest.raises(NotImplementedError):
W.save("dummy")

result = W.on_time(50)
assert result is True or result is False
W.change_print_mode(0)


# --- Node signal, rename ---

def test_coverage_node_signal_and_rename():
W = World(cpp=True, deltan=5, tmax=200, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
n1 = W.addNode("sig_node", 0, 0, signal=[60, 30])
W.addNode("n2", 1, 0)
W.addLink("l1", "sig_node", "n2", length=500, free_flow_speed=20, signal_group=[0])

with pytest.raises((ValueError, RuntimeError)):
W.addNode("sig_node", 2, 0, auto_rename=True)
with pytest.raises(ValueError):
W.addNode("sig_node", 3, 0)

W.adddemand("sig_node", "n2", 0, 100, 0.3)
W.exec_simulation()

assert "sig_node" in repr(n1)
sp = n1.signal_phase
assert isinstance(sp, (int, np.integer))
st = n1.signal_t
assert isinstance(st, (int, float, np.floating))
n1.signal_phase = 0
n1.signal_t = 0


# --- Route dunder methods ---

def test_coverage_route_dunder_methods():
W = World(cpp=True, deltan=5, tmax=200, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
W.addNode("orig", 0, 0)
W.addNode("mid", 1, 0)
W.addNode("dest", 2, 0)
l1 = W.addLink("l1", "orig", "mid", length=500, free_flow_speed=20)
l2 = W.addLink("l2", "mid", "dest", length=500, free_flow_speed=20)

route = W.defRoute(["l1", "l2"])
assert "Route" in repr(route)
assert list(route) == [l1, l2]
assert len(route) == 2
assert route[0] == l1


# --- addVehicle: taxi NotImplementedError, links_avoid ---

def test_coverage_addVehicle_taxi_and_links_avoid():
W = World(cpp=True, deltan=5, tmax=200, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
W.addNode("orig", 0, 0)
W.addNode("mid", 1, 0)
W.addNode("dest", 2, 0)
link1 = W.addLink("link1", "orig", "mid", length=500, free_flow_speed=20)
W.addLink("link2", "mid", "dest", length=500, free_flow_speed=20)

with pytest.raises(NotImplementedError):
W.addVehicle("orig", "dest", 0, mode="taxi")

W.addVehicle("orig", "dest", 0, links_avoid=[link1])
W.exec_simulation()
assert len(W.VEHICLES) >= 1


# --- Demand methods: point2point, area2area, area2area2, nodes2nodes2 ---

def test_coverage_demand_methods():
import warnings as _w
W = World(cpp=True, deltan=5, tmax=500, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
nodes = {}
for i in range(3):
for j in range(3):
nodes[(i, j)] = W.addNode(f"n{i}{j}", i, j)
for i in range(3):
for j in range(3):
if i + 1 < 3:
W.addLink(f"l{i}{j}_{i+1}{j}", f"n{i}{j}", f"n{i+1}{j}", length=500, free_flow_speed=20)
W.addLink(f"l{i+1}{j}_{i}{j}", f"n{i+1}{j}", f"n{i}{j}", length=500, free_flow_speed=20)
if j + 1 < 3:
W.addLink(f"l{i}{j}_{i}{j+1}", f"n{i}{j}", f"n{i}{j+1}", length=500, free_flow_speed=20)
W.addLink(f"l{i}{j+1}_{i}{j}", f"n{i}{j+1}", f"n{i}{j}", length=500, free_flow_speed=20)

W.adddemand_point2point(0, 0, 2, 2, 0, 300, flow=0.5)
with _w.catch_warnings():
_w.simplefilter("ignore", FutureWarning)
W.adddemand_area2area(0, 0, 1.5, 2, 2, 1.5, 0, 300, flow=0.2)
W.adddemand_area2area2(0, 0, 1.5, 2, 2, 1.5, 0, 300, volume=50)
W.adddemand_nodes2nodes2([nodes[(0,0)], nodes[(0,1)]], [nodes[(2,1)], nodes[(2,2)]], 0, 300, volume=50)

W.exec_simulation()
assert len(W.VEHICLES) > 0


# --- Link auto_rename ---

def test_coverage_link_auto_rename():
W = World(cpp=True, deltan=5, tmax=100, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
W.addNode("A", 0, 0)
W.addNode("B", 1, 0)
link1 = W.addLink("duplink", "A", "B", length=500, free_flow_speed=20)
assert link1.name == "duplink"
link2 = W.addLink("duplink", "A", "B", length=500, free_flow_speed=20, auto_rename=True)
assert link2.name.startswith("duplink_renamed")
with pytest.raises(ValueError):
W.addLink("duplink", "A", "B", length=500, free_flow_speed=20, auto_rename=False)


def test_coverage_vehicle_logs_and_analysis():
"""Access vehicle log data and analyzer through public API.
Internally triggers build_all_vehicle_logs_flat_compact, build_enter_log_data, etc."""
W = World(cpp=True, deltan=5, tmax=300, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
W.addNode("orig", 0, 0)
W.addNode("deadend", 1, 0)
W.addNode("orig2", 0, 1)
W.addNode("dest", 1, 1)
link_dead = W.addLink("link_dead", "orig", "deadend", length=500, free_flow_speed=20)
link_ok = W.addLink("link_ok", "orig2", "dest", length=500, free_flow_speed=20)

for t in [0, 10, 20]:
W.addVehicle("orig", "dest", departure_time=t, trip_abort=1, auto_rename=True)
for t in [0, 10, 20, 30, 40]:
W.addVehicle("orig2", "dest", departure_time=t, auto_rename=True)

W.exec_simulation()
# print_simple_stats triggers simulation_terminated → batch log build + enter_log build
W.analyzer.print_simple_stats()

# Access all vehicle log properties through public API
for veh in W.VEHICLES.values():
assert veh.state in ("home", "wait", "run", "end", "abort")
assert len(veh.log_t) > 0
assert len(veh.log_x) > 0
assert len(veh.log_v) > 0
assert len(veh.log_s) > 0
assert len(veh.log_lane) > 0
assert len(veh.log_state) > 0
assert len(veh.log_link) > 0
assert len(veh.log_t_link) > 0
veh.traveled_route()
_ = veh.departure_time
_ = veh.travel_time

# Access vehicles_enter_log through link (triggers build_enter_log_data internally)
assert isinstance(link_ok.vehicles_enter_log, dict)

# Verify abort and end states
aborted = [v for v in W.VEHICLES.values() if v.state == "abort"]
ended = [v for v in W.VEHICLES.values() if v.state == "end"]
assert len(aborted) > 0
assert len(ended) > 0


def test_coverage_simulation_lifecycle():
"""Test simulation lifecycle methods through public API.
Covers check_simulation_ongoing, iterative exec, and print_scenario_stats."""
W = World(cpp=True, deltan=5, tmax=300, print_mode=0, save_mode=0, show_mode=0, random_seed=42)
orig = W.addNode("orig", 0, 0)
dest = W.addNode("dest", 1, 0)
link1 = W.addLink("link1", "orig", "dest", length=500, free_flow_speed=20)

W.adddemand("orig", "dest", 0, 200, 0.5)

# Iterative execution
W.exec_simulation(until_t=100)
assert W.check_simulation_ongoing() == True

W.exec_simulation()
assert W.check_simulation_ongoing() == False

# All vehicles should have ended
for veh in W.VEHICLES.values():
assert veh.state == "end"

# Print methods through public API
W.analyzer.print_simple_stats()


def test_coverage_signal_capacity_enforce_route():
"""Test signals, explicit capacity, and enforce_route through public API."""
W = World(cpp=True, deltan=5, tmax=200, print_mode=0, save_mode=0, show_mode=0, random_seed=42)

n1 = W.addNode("n1", 0, 0, signal=[60, 30])
n2 = W.addNode("n2", 1, 0)
n3 = W.addNode("n3", 2, 0)

link1 = W.addLink("l1", "n1", "n2", length=500, free_flow_speed=20,
capacity_out=0.8, capacity_in=0.8, signal_group=[0])
link2 = W.addLink("l2", "n2", "n3", length=500, free_flow_speed=20)

W.addVehicle("n1", "n3", 0, auto_rename=True)
vehs = list(W.VEHICLES.values())
vehs[0].enforce_route([link1, link2])

W.adddemand("n1", "n3", 0, 100, 0.3)
W.exec_simulation()

ended_count = len([v for v in W.VEHICLES.values() if v.state == "end"])
assert ended_count > 0

# Verify the enforced-route vehicle used the specified path
route = vehs[0].traveled_route()[0]
assert route[-1] == link2
Loading