In [1]:
# lex_lts_osm_aadt.py
# Usage: python lex_lts_osm_aadt.py

from pathlib import Path
import warnings, math, re, webbrowser
import numpy as np
import pandas as pd
import geopandas as gpd
from pyogrio import read_dataframe, write_dataframe
try:
    from pyogrio import list_layers as _list_layers
except Exception:
    _list_layers = None
import osmnx as ox
import shapely
import folium
from folium.plugins import LocateControl, MeasureControl, Fullscreen, MousePosition

# OSMnx 2.x moved project_gdf:
try:
    from osmnx.projection import project_gdf as _project_gdf
except Exception:
    _project_gdf = getattr(ox, "project_gdf", None)

In [2]:
# Loading data
GEOJSON_IN = Path("lexbike.geojson")           # your bike-lane file (GeoJSON/GPKG/Shapefile ok)
CITY       = "Lexington, Kentucky, USA"        # OSM place query
OUT_ENRICH = Path("lexbike_with_speed_aadt.geojson")
OUT_MAP    = Path("lexbike_LTS_map.html")

# OSM → segment snapping
NEAREST_MAX_DIST_M = 20   # increase to 35–50 if needed

# OPTIONAL: AADT data (KYTC or city open data)
# Supported: point or line layer (GeoJSON/GPKG/SHP). If CSV, include lon/lat cols set below.
AADT_PATH =  None # Path("Traffic_Station_Counts.geojson") e.g., Path("k ytc_aadt_2023.geojson")
CSV_LON   = "lon" # change if you use CSV
CSV_LAT   = "lat"
AADT_MAX_DIST_M = 50  # nearest distance for AADT match

# If your bike lane file has facility info, set column aliases here (best guess fallback below)
FACILITY_CANDIDATES   = ["Type_Facility","class","Type_Road","infra","infra_type","facility_t","fac_type"]
CENTERLINE_CANDIDATES = ["_centerline","center_line","center"]
NAME_CANDIDATES       = ["Name_Facility", "name","street","label","road","corridor"]

# Used for rounding to nearest common mph
COMMON_MPH = np.array([15,20,25,30,35,40,45,50,55])

In [3]:
gpd.options.io_engine = "pyogrio"  # use pyogrio for IO

def pick_col(cols, candidates):
    for c in candidates:
        if c in cols: return c
    return None

def mph_from_kph(k): return k * 0.621371

def normalize_fac(val: str) -> str:
    s = (str(val) if val is not None else "").lower().strip()
    if any(k in s for k in ["protected","separated","cycletrack","cycle track","pbl"]): return "protected"
    if any(k in s for k in ["shared-use","shared use","multiuse","multi-use","path","trail","mup","sup"]): return "path"
    if any(k in s for k in ["buffer","bbl","buffered"]): return "buffered_lane"
    if "no lane" in s: return "mixed"
    if any(k in s for k in ["bike lane","bikelane"," lane", " bl", "(bl)","painted"]): return "lane"
    if any(k in s for k in ["boulevard","greenway","neighborhood"]): return "boulevard"
    if any(k in s for k in ["mixed","shared lane","sharrow","slm"]): return "mixed"
    return "unknown"


def compute_lts(row):
    fac = row.get("__fac_cat","unknown")
    speed = row.get("speed_mph", np.nan)
    aadt  = row.get("aadt", np.nan)
    centerline = bool(row.get("__centerline", False))

    speed_known = not np.isnan(speed)
    aadt_known  = not np.isnan(aadt)

    # Clear low-stress categories
    if fac in ["protected","path"]: return 1
    if fac == "boulevard" and speed_known and speed <= 25 and (not aadt_known or aadt <= 1500): return 1

    if fac == "buffered_lane":
        if speed_known and speed <= 30 and (not aadt_known or aadt <= 8000): return 2
        return 3

    if fac == "lane":
        if speed_known and speed <= 30 and (not aadt_known or aadt <= 6000): return 2
        if speed_known and speed <= 35 and (not aadt_known or aadt <= 12000): return 3
        return 4

    if fac in ["mixed","unknown"]:
        if speed_known and speed <= 25 and aadt_known and aadt <= 1500 and not centerline: return 2
        if speed_known and speed <= 30 and (not aadt_known or aadt <= 3000): return 3
        return 4

    return 4

def ensure_4326(gdf):
    return gdf.set_crs(4326) if gdf.crs is None else gdf.to_crs(4326)


def sanitize_aadt(series: pd.Series) -> pd.Series:
    s = pd.to_numeric(series, errors="coerce")          # force numeric, non-numeric -> NaN
    s = s.replace([np.inf, -np.inf], np.nan)

    # Remove obviously impossible values (domain guardrails)
    # Typical KY AADT: 0–200k; interstates can be higher, but 300k is a safe hard cap
    hard_cap = 300_000
    s[(s < 0) | (s > hard_cap)] = np.nan

    # Optional: dynamic cap to catch a few extreme outliers in otherwise clean data
    if s.notna().sum() >= 20:                            # only if enough data
        p99 = np.nanpercentile(s, 99)
        dyn_cap = max(100_000, p99 * 1.5)                # keep plausible heavy corridors
        s[s > dyn_cap] = np.nan

    return s

# --- Robust OSM speed prep (no direct edges["maxspeed"] access) ---
def project_to_local(gdf):
    return _project_gdf(gdf) if _project_gdf else gdf.to_crs(32616)  # UTM 16N ~ KY

# parse explicit maxspeed if present; else all-NaN
def _parse_maxspeed_to_mph(val):
    if val is None or (isinstance(val, float) and np.isnan(val)): return np.nan
    if isinstance(val, (list, tuple, set)):
        vals = [_parse_maxspeed_to_mph(v) for v in val]
        vals = [v for v in vals if not np.isnan(v)]
        return min(vals) if vals else np.nan
    s = str(val).strip().lower()
    if s in ("signals","walk","none"): return np.nan
    m = __import__("re").findall(r"(\d+(?:\.\d+)?)\s*(km/h|kph|mph)?", s)
    if m:
        mphs = [ (float(num)*0.621371 if unit in ("km/h","kph") else float(num)) for num, unit in m ]
        return min(mphs) if mphs else np.nan
    try:
        return float(s)
    except:
        return np.nan

def norm_highway(v):
    if isinstance(v, (list, tuple)):
        return str(v[0]) if v else ""
    return str(v)

def prepare_osm_edges_with_mph(G):
    # always get OSMnx defaults
    G = ox.add_edge_speeds(G)  # adds speed_kph
    e = ox.graph_to_gdfs(G, nodes=False, fill_edge_geometry=True)

    # normalize highway
    e["highway_norm"] = e["highway"].apply(norm_highway) if "highway" in e.columns else ""

    # explicit maxspeed (if exists); otherwise all NaN
    if "maxspeed" in e.columns:
        e["maxspeed_mph_raw"] = e["maxspeed"].apply(parse_maxspeed_to_mph)
    else:
        e["maxspeed_mph_raw"] = pd.Series(np.nan, index=e.index, dtype="float64")

    # fallback mph from OSMnx's speed_kph
    if "speed_kph" not in e.columns:
        G2 = ox.add_edge_speeds(G)
        e2 = ox.graph_to_gdfs(G2, nodes=False, fill_edge_geometry=True)
        e["speed_kph"] = e2["speed_kph"]
    e["speed_mph_fallback"] = e["speed_kph"] * 0.621371

    # final mph + flag
    e["speed_mph"]    = e["maxspeed_mph_raw"].where(~e["maxspeed_mph_raw"].isna(), e["speed_mph_fallback"])
    e["has_maxspeed"] = e["maxspeed_mph_raw"].notna()

    # keep only drivable roads (avoid footway/cycleway tiny defaults)
    ROAD_WHITELIST = {
        "motorway","trunk","primary","secondary","tertiary",
        "motorway_link","trunk_link","primary_link","secondary_link","tertiary_link",
        "residential","unclassified","living_street","service"
    }
    e = e[e["highway_norm"].isin(ROAD_WHITELIST)].copy()
    return e

def two_pass_nearest(segments_m: gpd.GeoDataFrame, edges_m: gpd.GeoDataFrame, max_dist_m=20):
    # Pass 1: prefer explicit signs
    e1 = edges_m[edges_m["has_maxspeed"]]
    hit = gpd.sjoin_nearest(
        segments_m, e1[["geometry","speed_mph","highway_norm","has_maxspeed"]],
        how="left", max_distance=max_dist_m, distance_col="__dist_speed"
    )
    # Pass 2: fill misses with any edge (defaults)
    need2 = hit["speed_mph"].isna()
    if need2.any():
        e2 = edges_m[["geometry","speed_mph","highway_norm","has_maxspeed"]]
        fill = gpd.sjoin_nearest(
            segments_m[need2], e2, how="left",
            max_distance=max_dist_m, distance_col="__dist_speed"
        )
        hit.loc[need2, ["speed_mph","highway_norm","has_maxspeed","__dist_speed"]] = \
            fill[["speed_mph","highway_norm","has_maxspeed","__dist_speed"]].values
    return hit

def round_to_common_mph(v):
    if v is None or np.isnan(v): return v
    v = float(v)
    if v < 10 or v > 60: return v
    return float(COMMON_MPH[np.argmin(np.abs(COMMON_MPH - v))])

In [4]:
# -----------------------------
# 1) Load your bike segments
# -----------------------------
gdf = read_dataframe(GEOJSON_IN)
gdf = ensure_4326(gdf)

# Drop columns with unuseful information
columns_to_drop = ['From_', 'To_', 'Status', 'Status_Notes', 'YearComplete','ProjectCost','FundSource','Maintenance'
                  , 'Greenway', 'Notes','Notes_Temp','created_by','created_date','last_edited_by','last_edited_date']
gdf = gdf.drop(columns_to_drop, axis=1)

# Guess optional columns
facility_col   = pick_col(gdf.columns, FACILITY_CANDIDATES)
centerline_col = pick_col(gdf.columns, CENTERLINE_CANDIDATES)
name_col       = pick_col(gdf.columns, NAME_CANDIDATES)

gdf["__fac_cat"] = gdf[facility_col].map(normalize_fac) if facility_col else "unknown"
gdf["__centerline"] = gdf[centerline_col].astype(str).str.lower().isin(["yes","true","1"]) if centerline_col else False


In [12]:
BIKE_GDF = gdf  # <-- change if your variable is named differently

# 1) projection helper (OSMnx 2.x moved project_gdf)
try:
    from osmnx.projection import project_gdf as _project_gdf
except Exception:
    _project_gdf = None

def _proj(gdf):
    return _project_gdf(gdf) if _project_gdf else gdf.to_crs(32616)  # UTM 16N (KY)


#This code is still cooked

In [96]:
# AADT Code is still not working (needs to be fixed)

In [104]:
# -----------------------------
# 5) Compute LTS (speed + optional AADT)
# -----------------------------
joined_speed["LTS"] = joined_speed.apply(compute_lts, axis=1).astype(int)

# -----------------------------
# 6) Save + interactive map (auto open)
# -----------------------------
write_dataframe(joined_speed, OUT_ENRICH, driver="GeoJSON")
print(f"Saved enriched GeoJSON -> {OUT_ENRICH}")

m = folium.Map(location=(38.0406, -84.5037), zoom_start=12, tiles="CartoDB positron")
LocateControl(auto_start=False, flyTo=True).add_to(m)
MeasureControl(primary_length_unit="meters").add_to(m)
Fullscreen().add_to(m)
MousePosition().add_to(m)
folium.TileLayer("OpenStreetMap").add_to(m)

LTS_COLORS = {1: "#2ECC71", 2: "#4682B4", 3: "#F39C12", 4: "#E74C3C"}
def style_by_lts(feature):
    lts = feature["properties"].get("LTS", 4)
    return {"color": LTS_COLORS.get(int(lts), "#E74C3C"), "weight": 3, "opacity": 0.9}

tooltip_fields = [c for c in [name_col, facility_col] if c] + ["speed_mph","aadt","LTS"]

folium.GeoJson(
    joined.to_json(),
    style_function=style_by_lts,
    tooltip=folium.GeoJsonTooltip(fields=tooltip_fields),
    highlight_function=lambda _: {"weight": 5},
    name="Bike segments (LTS by OSM speed + AADT)"
).add_to(m)

# Simple legend
m.get_root().html.add_child(folium.Element("""
<div style="position: fixed; bottom: 20px; left: 20px; z-index: 9999;
            background: white; padding: 10px 12px; border-radius: 8px;
            box-shadow: 0 2px 8px rgba(0,0,0,0.2); font-size: 14px;">
<b>Level of Traffic Stress</b><br>
<span style="display:inline-block;width:14px;height:14px;background:#2ECC71;"></span> LTS 1 (very low)<br>
<span style="display:inline-block;width:14px;height:14px;background:#4682B4;"></span> LTS 2 (low)<br>
<span style="display:inline-block;width:14px;height:14px;background:#F39C12;"></span> LTS 3 (moderate)<br>
<span style="display:inline-block;width:14px;height:14px;background:#E74C3C;"></span> LTS 4 (high)
</div>
"""))

if not joined_speed.empty:
    minx, miny, maxx, maxy = joined_speed.total_bounds
    m.fit_bounds([[miny, minx], [maxy, maxx]])

m.save(str(OUT_MAP))
print(f"Saved map -> {OUT_MAP}")
webbrowser.open_new(Path(OUT_MAP).resolve().as_uri())

Saved enriched GeoJSON -> lexbike_with_speed_aadt.geojson
Saved map -> lexbike_LTS_map.html


True

Opening in existing browser session.


Current problems:
1. Speeds on certain roads are still not rounding correctly.
2. AADT can't be right. They are giving incredibly large numbers such as 3.4e74
3. Graphics are not loading very quickly.