In [3]:
import overpy
import re
import math

STATE_DATA = {
    "ES": {  # Espírito Santo
        "population": 3_833_712,
        "veh_ownership": 0.533,
        "full_name": "Espírito Santo",
        "bbox": "(-21.5, -42.0, -17.5, -38.0)",
        "urbanization": 0.852
    },
    "RS": {  # Rio Grande do Sul
        "population": 10_882_965,
        "veh_ownership": 0.683,
        "full_name": "Rio Grande do Sul",
        "bbox": "(-34.1, -57.8, -27.0, -49.7)",
        "urbanization": 0.844
    },
    "DF": {  # Distrito Federal
        "population": 2_817_381,
        "veh_ownership": 0.800,
        "full_name": "Distrito Federal",
        "bbox": "(-16.2, -48.4, -15.3, -47.3)",
        "urbanization": 0.964
    }
}

def haversine(lat1, lon1, lat2, lon2):
    """Calculate distance between two geographic points (km)."""
    R = 6371
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    a = (
        math.sin(dlat / 2) ** 2
        + math.cos(math.radians(lat1))
        * math.cos(math.radians(lat2))
        * math.sin(dlon / 2) ** 2
    )
    return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

def estimate_daily_traffic(highway_name, state):
    state = state.upper()
    if state not in STATE_DATA:
        return {"error": f"Invalid state code '{state}'. Use 'ES', 'RS', or 'DF'."}

    api = overpy.Overpass()
    # Build regex for “BR 101” or “BR101”
    name_number = re.sub(r"\s+", " ", highway_name.strip()).replace(" ", ".?")
    bbox = STATE_DATA[state]["bbox"]

    query = f'''
        [out:json];
        (
            way["highway"~"motorway|trunk|primary"]["ref"~"{name_number}", i]{bbox};
            way["highway"~"motorway|trunk|primary"]["name"~"{name_number}", i]{bbox};
        );
        (._;>;);
        out body;
    '''

    try:
        result = api.query(query)
    except Exception as e:
        return {"error": f"Overpass query failed: {str(e)}"}

    priority_map = {
        "motorway": 1.0, "motorway_link": 0.95,
        "trunk": 0.85, "trunk_link": 0.80,
        "primary": 0.70, "primary_link": 0.65,
        "secondary": 0.60, "secondary_link": 0.55,
        "tertiary": 0.50, "tertiary_link": 0.45,
        "residential": 0.30, "unclassified": 0.20,
        "service": 0.10
    }

    base_capacity = 0.0
    total_length = 0.0
    valid_segments = 0

    # Dictionary to accumulate length-weighted priority sums for each highway type
    length_weighted_priorities = {}

    for way in result.ways:
        highway_type = way.tags.get("highway", "")
        if highway_type not in priority_map:
            continue

        lanes_tag = way.tags.get("lanes")
        maxspeed_tag = way.tags.get("maxspeed")
        if not lanes_tag or not maxspeed_tag:
            continue

        try:
            lanes = int(lanes_tag.split(";")[0])
            if lanes < 1:
                continue
        except Exception:
            continue

        try:
            speed_str = maxspeed_tag.lower()
            if "mph" in speed_str:
                speed_val = float(speed_str.replace(" mph", ""))
                speed = int(speed_val * 1.60934)
            else:
                match = re.search(r"\d+", speed_str)
                if match:
                    speed = int(match.group())
                else:
                    continue
            if speed <= 0:
                continue
        except Exception:
            continue

        nodes = way.nodes
        if len(nodes) < 2:
            continue

        length_km = 0.0
        for i in range(1, len(nodes)):
            n1, n2 = nodes[i - 1], nodes[i]
            length_km += haversine(
                float(n1.lat), float(n1.lon),
                float(n2.lat), float(n2.lon)
            )
        if length_km <= 0:
            continue

        valid_segments += 1
        priority = priority_map[highway_type]

        # Accumulate length-weighted priorities for mode estimation
        length_weighted_priorities[highway_type] = length_weighted_priorities.get(highway_type, 0.0) + length_km

        base_capacity += lanes * speed * priority * length_km
        total_length += length_km

    if valid_segments == 0:
        return {
            "estimated_class": "unknown",
            "daily_vehicles": 0,
            "length_km": 0,
            "segments": 0,
            "state": STATE_DATA[state]["full_name"],
            "message": (
                f"No valid highway segments with complete data found for "
                f"'{highway_name}' in {STATE_DATA[state]['full_name']}"
            )
        }

    # Length-weighted mode: highway_type with largest total length
    estimated_class = max(length_weighted_priorities, key=length_weighted_priorities.get)

    state_params = STATE_DATA[state]
    state_weight = state_params["population"] * state_params["urbanization"]

    daily_vehicles = base_capacity * state_weight

    return {
        "estimated_class": estimated_class,
        "daily_vehicles": round(daily_vehicles),
        "base_capacity": round(base_capacity, 2),
        "length_km": round(total_length, 2),
        "segments": valid_segments,
        "state": state_params["full_name"],
        "population": state_params["population"],
        "parameters": {
            "urbanization": state_params["urbanization"],
            "veh_ownership": state_params["veh_ownership"],
            "state_weight": state_weight
        },
        "message": (
            "Daily vehicles estimated by scaling base_capacity "
            "with (population × urbanization) for state. "
            "Estimated highway class is based on length-weighted mode."
        )
    }

In [5]:
# List of highways to test, organized by state
highways_by_state = {
    "ES": ["BR-101", "BR-259", "BR-262", "BR-393", "BR-447", "BR-482", "BR-484"],
    "RS": ["BR-101", "BR-290", "BR-386"],
    "DF": ["BR-010", "BR-020", "BR-060", "BR-070", "BR-080", "BR-251"]
}

def test_highways():
    results = []
    total_tests = sum(len(highways) for highways in highways_by_state.values())
    current = 1

    print(f"Starting traffic estimation for {total_tests} highway-state combinations...\n")

    for state, highways in highways_by_state.items():
        for highway in highways:
            print(f"[{current}/{total_tests}] Processing {highway} in {state}...")
            result = estimate_daily_traffic(highway, state)

            if "error" in result:
                print(f"    ⚠️ Error: {result['error']}")
            else:
                print(f"    ✅ Completed: {result['daily_vehicles']} traffic score estimated.")
                results.append({
                    "Highway": highway,
                    "State": state,
                    "Estimated Class": result["estimated_class"],
                    "Daily Vehicles": result["daily_vehicles"],  # Keep internally for normalization
                    "Length (km)": result["length_km"],
                    "Segments": result["segments"]
                })

            current += 1

    print("\nAll estimations complete.\n")

    # Normalize daily vehicle counts using Min-Max scaling
    daily_vals = [r["Daily Vehicles"] for r in results]
    min_val = min(daily_vals)
    max_val = max(daily_vals)
    for r in results:
        if max_val != min_val:
            norm = (r["Daily Vehicles"] - min_val) / (max_val - min_val)
        else:
            norm = 0.0
        r["Traffic Score"] = round(norm * 100, 2)

    # Sort by normalized traffic score
    results.sort(key=lambda x: x["Traffic Score"], reverse=True)

    # Print the final result table (without Daily Vehicles)
    print(f"{'Highway':<10} {'State':<5} {'Class':<15} {'Traffic Score (0-100)':<20} {'Length (km)':<12} {'Segments':<9}")
    print("-" * 85)
    for r in results:
        print(f"{r['Highway']:<10} {r['State']:<5} {r['Estimated Class']:<15} "
              f"{r['Traffic Score']:<20} {r['Length (km)']:<12} {r['Segments']:<9}")

# Run the test
test_highways()

Starting traffic estimation for 16 highway-state combinations...

[1/16] Processing BR-101 in ES...
    ✅ Completed: 253083090482 traffic score estimated.
[2/16] Processing BR-259 in ES...
    ✅ Completed: 23165873679 traffic score estimated.
[3/16] Processing BR-262 in ES...
    ✅ Completed: 16647496242 traffic score estimated.
[4/16] Processing BR-393 in ES...
    ✅ Completed: 13045606 traffic score estimated.
[5/16] Processing BR-447 in ES...
    ✅ Completed: 0 traffic score estimated.
[6/16] Processing BR-482 in ES...
    ✅ Completed: 22699432194 traffic score estimated.
[7/16] Processing BR-484 in ES...
    ✅ Completed: 7365561566 traffic score estimated.
[8/16] Processing BR-101 in RS...
    ✅ Completed: 411554010667 traffic score estimated.
[9/16] Processing BR-290 in RS...
    ✅ Completed: 808630632869 traffic score estimated.
[10/16] Processing BR-386 in RS...
    ✅ Completed: 239856756037 traffic score estimated.
[11/16] Processing BR-010 in DF...
    ✅ Completed: 29890166008