Notes: an intersection should have what
- Red Light time (int)
- Green Light time (int)
- Congestion rate: (1 value, percent, float)
- Queue Length (buffer, int)
- Time of day (str)
- Intersection ID (int/str)

A road should have what
- 

In [None]:
class Intersection:
    def __init__(self, time_of_day, intersection_id, traffic_input, traffic_output):
        self.intersection_id = intersection_id
        self.time_of_day = time_of_day
        self.traffic_input = traffic_input      # Incoming traffic
        self.traffic_output = traffic_output    # Outgoing traffic
        
        self.traffic_light = {
            "turning": {
                "n-w": 10,       
                "e-n": 10,       
                "s-e": 10,
                "w-s": 10,
            },
            "straight": {
                "n-s": 30,       
                "s-n": 30,
                "w-e": 30,
                "e-w": 30,
            } 
        }

        self.edges = []  # List of roads connected to this intersection
        
    def __lt__(self, other):
        """Defines how to compare two Intersection objects based on total traffic light duration"""
        self_time = sum(self.traffic_light["straight"].values()) + sum(self.traffic_light["turning"].values())
        other_time = sum(other.traffic_light["straight"].values()) + sum(other.traffic_light["turning"].values())
        return self_time < other_time
        
    def add_edge(self, road):
        self.edges.append(road)
        
    def get_state(self):
        return {
            "time_of_day": self.time_of_day,
            "intersection_id": self.intersection_id,
            "traffic_input": self.traffic_input,
            "traffic_output": self.traffic_output,
            "edges": [road.road_id for road in self.edges],
        }

class Road:
    def __init__(self, direction, road_id, max_capacity, edge_data):
        self.direction = direction
        self.road_id = road_id
        self.max_capacity = max_capacity
        self.edge_data = edge_data  # Probability of directional turn

        self.current_capacity = 0

    def add_car_to_road(self, num_to_add=1):
        self.current_capacity = min(self.current_capacity + num_to_add, self.max_capacity)

    def remove_car_from_road(self, num_to_remove=1):
        self.current_capacity = max(self.current_capacity - num_to_remove, 0)
        
    def get_congestion(self):
        return self.current_capacity / self.max_capacity

    def get_state(self):
        return {
            "road_id": self.road_id,
            "current_capacity": self.current_capacity,
            "congestion": self.get_congestion(),
            "edge_data": self.edge_data,
        }

In [120]:
class TrafficSystem:
    def __init__(self, intersections):
        self.intersections = intersections  # List of Intersection objects

    def simulate_congestion(self, **timings):
        """
        Simulates congestion based on given traffic light durations.
        Returns an aggregated congestion score (lower is better).
        """
        total_congestion = 0

        # Update traffic light timings
        for intersection in self.intersections:
            for key in intersection.traffic_light["straight"]:
                intersection.traffic_light["straight"][key] = timings.get(key, 30)
            for key in intersection.traffic_light["turning"]:
                intersection.traffic_light["turning"][key] = timings.get(key, 10)

        # Simulate each intersection processing
        for intersection in self.intersections:
            for road in intersection.edges:
                road.remove_car_from_road(road.current_capacity // 2)  # Assume partial clearance
                total_congestion += road.get_congestion()

        return -total_congestion  # Negative because BO maximizes by default

    def optimize_traffic_lights(self):
        """
        Uses Bayesian Optimization to find the best traffic light durations.
        """

        # Define the search space (e.g., min 5s, max 60s for each phase)
        pbounds = {
            "n-s": (5, 60), "s-n": (5, 60),
            "w-e": (5, 60), "e-w": (5, 60),
            "n-w": (5, 30), "e-n": (5, 30),
            "s-e": (5, 30), "w-s": (5, 30),
        }

        optimizer = BayesianOptimization(
            f=self.simulate_congestion,
            pbounds=pbounds,
            random_state=42,
        )

        optimizer.maximize(
            init_points=5,  # Random samples
            n_iter=15,      # Optimization iterations
        )

        print("\nOptimal Traffic Light Timings:")
        print(optimizer.max)

        return optimizer.max["params"]  # Return best light durations
    
    def process_intersections(self):
        print("\n--- Processing Traffic System ---\n")
        for intersection in self.intersections:
            print(f"Processing {intersection.intersection_id} at {intersection.time_of_day}")

            for road in intersection.edges:
                print(f"Before: {road.road_id} congestion: {road.get_congestion():.2f}")
                road.remove_car_from_road(road.current_capacity // 2)  # Simulated clearance
                print(f"After: {road.road_id} congestion: {road.get_congestion():.2f}\n")


In [121]:
# Define Roads
road_s1 = Road("S", "road_s1", 30, [0.4, 0.2, 0.2, 0.2], )  # South to Intersection 3
road_n1 = Road("N", "road_n1", 30, [0.4, 0.2, 0.2, 0.2])  # North to Intersection 1
road_e2 = Road("E", "road_e2", 30, [0.4, 0.2, 0.2, 0.2])  # East into Intersection 3
road_s3 = Road("S", "road_s3", 30, [0.4, 0.2, 0.2, 0.2])  # Southbound within Intersection 3
road_n3 = Road("N", "road_n3", 30, [0.4, 0.2, 0.2, 0.2])  # Northbound within Intersection 3
road_nw3 = Road("NW", "road_nw3", 30, [0.4, 0.2, 0.2, 0.2])  # North-West turn to Intersection 2

# Define Intersections
intersection_1 = Intersection("7AM", "Intersection 1", [road_s1], [road_n1])  # Connects to Intersection 3 (South → North)
intersection_2 = Intersection("7AM", "Intersection 2", [road_e2], [road_s3])  # Connects via e-s turn to Intersection 3
intersection_3 = Intersection("7AM", "Intersection 3", [road_n1, road_s3], [road_n3, road_nw3])  # 3-way

# Connect Roads to Intersections
intersection_1.add_edge(road_s1)
intersection_1.add_edge(road_n1)

intersection_2.add_edge(road_e2)
intersection_2.add_edge(road_s3)

intersection_3.add_edge(road_n1)
intersection_3.add_edge(road_s3)
intersection_3.add_edge(road_n3)
intersection_3.add_edge(road_nw3)

# Run Traffic System
traffic_system = TrafficSystem([intersection_1, intersection_2, intersection_3])
traffic_system.process_intersections()  



--- Processing Traffic System ---

Processing Intersection 1 at 7AM
Before: road_s1 congestion: 0.00
After: road_s1 congestion: 0.00

Before: road_n1 congestion: 0.00
After: road_n1 congestion: 0.00

Processing Intersection 2 at 7AM
Before: road_e2 congestion: 0.00
After: road_e2 congestion: 0.00

Before: road_s3 congestion: 0.00
After: road_s3 congestion: 0.00

Processing Intersection 3 at 7AM
Before: road_n1 congestion: 0.00
After: road_n1 congestion: 0.00

Before: road_s3 congestion: 0.00
After: road_s3 congestion: 0.00

Before: road_n3 congestion: 0.00
After: road_n3 congestion: 0.00

Before: road_nw3 congestion: 0.00
After: road_nw3 congestion: 0.00



In [123]:
# Define Roads
road_s1 = Road("S", "road_s1", 30, [0.4, 0.2, 0.2, 0.2])  
road_n1 = Road("N", "road_n1", 30, [0.4, 0.2, 0.2, 0.2])  
road_e2 = Road("E", "road_e2", 30, [0.4, 0.2, 0.2, 0.2])  
road_s3 = Road("S", "road_s3", 30, [0.4, 0.2, 0.2, 0.2])  
road_n3 = Road("N", "road_n3", 30, [0.4, 0.2, 0.2, 0.2])  
road_nw3 = Road("NW", "road_nw3", 30, [0.4, 0.2, 0.2, 0.2])  
for road in [road_s1, road_n1, road_e2, road_s3, road_n3, road_nw3]:
    road.add_car_to_road(15)  # Start with 50% road capacity


# Define Intersections
intersection_1 = Intersection("7AM", "Intersection 1", [road_s1], [road_n1])  
intersection_2 = Intersection("7AM", "Intersection 2", [road_e2], [road_s3])  
intersection_3 = Intersection("7AM", "Intersection 3", [road_n1, road_s3], [road_n3, road_nw3])  

# Connect Roads to Intersections
intersection_1.add_edge(road_s1)
intersection_1.add_edge(road_n1)

intersection_2.add_edge(road_e2)
intersection_2.add_edge(road_s3)

intersection_3.add_edge(road_n1)
intersection_3.add_edge(road_s3)
intersection_3.add_edge(road_n3)
intersection_3.add_edge(road_nw3)

# Define the Traffic System
traffic_system = TrafficSystem([intersection_1, intersection_2, intersection_3])
traffic_system.process_intersections()



--- Processing Traffic System ---

Processing Intersection 1 at 7AM
Before: road_s1 congestion: 0.50
After: road_s1 congestion: 0.27

Before: road_n1 congestion: 0.50
After: road_n1 congestion: 0.27

Processing Intersection 2 at 7AM
Before: road_e2 congestion: 0.50
After: road_e2 congestion: 0.27

Before: road_s3 congestion: 0.50
After: road_s3 congestion: 0.27

Processing Intersection 3 at 7AM
Before: road_n1 congestion: 0.27
After: road_n1 congestion: 0.13

Before: road_s3 congestion: 0.27
After: road_s3 congestion: 0.13

Before: road_n3 congestion: 0.50
After: road_n3 congestion: 0.27

Before: road_nw3 congestion: 0.50
After: road_nw3 congestion: 0.27

