-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
82 lines (66 loc) · 2.3 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import logging
import os
import sys
from collections import deque
from logging.handlers import RotatingFileHandler
from drone_cab import Package, Pickup, Vehicle, Warehouse
from drone_cab.assign import assign_package_pickup, assign_package_vehicle
if "SUMO_HOME" in os.environ:
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
import traci
logger = logging.getLogger(__name__)
def main():
logging.basicConfig(
handlers=[
RotatingFileHandler(
"drone_cab.log", mode="w", maxBytes=1024 * 1024 * 256, backupCount=1
)
],
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
level=logging.DEBUG,
)
traci.start(
[
"sumo-gui",
"-c",
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"data",
"config.sumocfg",
),
"-d",
"150",
]
)
logger.info("traci.start()")
warehouse = Warehouse()
pickup_list = Pickup.create_pickup_list()
for pickup in pickup_list:
traci.addStepListener(pickup.drone)
traci.addStepListener(pickup)
package_queue: deque[Package] = deque()
for step in range(400):
logger.info(f"Simulation {step=}")
for vehicle in Vehicle.get_vehicle_list():
vehicle.step()
if step == 30:
for destination_id in ["234807099", "239713538", "359039090"]:
package_queue.append(Package(destination_id))
while package_queue:
package = package_queue.popleft()
try:
pickup = assign_package_pickup(package, pickup_list)
assert pickup is not None, f"Failed to assign {package} to any pickup"
vehicle = assign_package_vehicle(
package, Vehicle.get_vehicle_list(), warehouse
)
assert vehicle is not None, f"Failed to assign {package} to any vehicle"
except AssertionError:
package_queue.appendleft(package)
logger.debug("AssertionError", exc_info=True)
traci.simulationStep()
logger.info("traci.simulationStep()")
traci.close()
logger.info("traci.close()")
if __name__ == "__main__":
main()