In [None]:
# all counting of vehicles across all directions has been removed.

In [1]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error, r2_score

In [None]:
# PCU flow in every direction - example

In [2]:
pcu_flow = {'north': 4380.0, 'east': 2730.0, 'south': 5580.0, 'west': 4020.0}

In [5]:
# webster method and find green timings for each leg
# here, pcu_flow is in dictionary
# lanes per approach sholuld also be inputted in dictionary form similar to pcu_flow eg: {"North": 2, "South": 2, "East": 1, "West": 1}
# output for any time is in seconds
# These values are gotten from only 1 cycle

In [3]:
def webster_method(pcu_flow, lanes_per_approach, lost_time_per_phase=2.0):

    DEFAULT_SAT_FLOW = 1900  # pcu/hr/lane
    y_ratios = {}
    
    # flow ratio for each approach
    for approach, q in pcu_flow.items():
        s = DEFAULT_SAT_FLOW * lanes_per_approach.get(approach, 1)
        y_ratios[approach] = q / s

    # total flow ratio
    Y = sum(y_ratios.values())  
    num_phases = len(pcu_flow)
    L = lost_time_per_phase * num_phases  # total lost time

    # Webster's formula for cycle length
    # oversaturated condition
    if Y >= 1.0:  
        C = 120

    # not an oversaturated condition 
    else:
        C = (1.5 * L + 5) / (1 - Y)
        C = max(20, min(C, 180))  # limit between 20–180s

    # green calculation per phase
    greens = {}
    effective_green = C - L
    for approach, y in y_ratios.items():
        greens[approach] = (y / Y) * effective_green if Y > 0 else effective_green / num_phases

    return {"cycle": round(C, 2), "lost_time": L, "greens": greens, "flow_ratios": y_ratios}

In [4]:
lanes_per_approach = {"North": 2, "South": 2, "East": 1, "West": 1}

In [5]:
webster_method(pcu_flow,lanes_per_approach,2)

{'cycle': 120,
 'lost_time': 8,
 'greens': {'north': 29.357271095152605,
  'east': 18.29802513464991,
  'south': 37.400359066427285,
  'west': 26.9443447037702},
 'flow_ratios': {'north': 2.305263157894737,
  'east': 1.436842105263158,
  'south': 2.9368421052631577,
  'west': 2.1157894736842104}}

In [None]:
# getting yellow and red timings

In [6]:
webster_output = webster_method(pcu_flow, lanes_per_approach, lost_time_per_phase=2.0)

In [7]:
def add_clearance_times(webster_output, yellow_time=3.0, all_red_time=2.0, min_green=6.0):
    
    final = {}
    C = webster_output["cycle"]  # total cycle time

    for approach, g in webster_output["greens"].items():
        green = max(g, min_green)
        total_phase_time = green + yellow_time + all_red_time
        total_red = max(0.0, C - total_phase_time)  # remainder of cycle

        final[approach] = {
            "green": round(green, 2),
            "yellow": yellow_time,
            "all_red": all_red_time,
            "total_red": round(total_red, 2)
        }

    return {"cycle": C, "phases": final}

In [29]:
# total_red	Duration when this direction’s signal is fully red (no movement) - When other approaches are active

In [8]:
add_clearance_times(webster_output,3.0,2.0,6.0)

{'cycle': 120,
 'phases': {'north': {'green': 29.36,
   'yellow': 3.0,
   'all_red': 2.0,
   'total_red': 85.64},
  'east': {'green': 18.3, 'yellow': 3.0, 'all_red': 2.0, 'total_red': 96.7},
  'south': {'green': 37.4, 'yellow': 3.0, 'all_red': 2.0, 'total_red': 77.6},
  'west': {'green': 26.94, 'yellow': 3.0, 'all_red': 2.0, 'total_red': 88.06}}}

In [9]:
# yolo should give outputs like this
#{
  #"north": {"car": 10, "bus": 2, "bike": 5},
  #"east": {"car": 5, "bus": 1, "bike": 3},
  #"south": {"car": 15, "bus": 3, "bike": 8},
  #"west": {"car": 8, "bus": 1, "bike": 4}
#}

# save this in .json file eg : traffic_counts.json

In [None]:
import json, time

with open("pcu_output.json") as f:
    pcu_values = json.load(f) 

In [None]:
print("PCU values from YOLO:", pcu_values)

In [None]:
# directly applying webster

In [None]:
webster_output = webster_method(pcu_values, lanes_per_approach, lost_time_per_phase=2.0)
final_timings = add_clearance_times(webster_output,3.0,2.0,6.0)

In [None]:
print("Signal timings:", final_timings)
apply_signal_timings(final_timings)

In [None]:
# Loop it for live operation - If YOLO keeps updating that JSON file (say every few seconds or minutes), wrap it in a loop

In [None]:
while True:
    with open("pcu_output.json") as f:
        pcu_values = json.load(f)
    
    webster_output = webster_method(pcu_values, lanes)
    final_timings = add_clearance_times(webster_output)
    apply_signal_timings(final_timings)
    
    time.sleep(final_timings["cycle"])