In [1]:
import gpxpy
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import haversine_distances
import geopandas as gpd
from shapely.geometry import LineString
import osmnx as ox
import folium

In [3]:
def read_simple_gpx(path):
    with open(path, 'r', encoding='utf-8') as f:
        gpx = gpxpy.parse(f)
    rows = []
    for track in gpx.tracks:
        for segment in track.segments:
            for ind, point in enumerate(segment.points):
                row = dict(
                    name=getattr(track, "name", None),
                    step=ind,
                    lat=point.latitude,
                    lon=point.longitude,
                    elevation_m=point.elevation,
                    time=point.time
                )
                if type(point.elevation) in [float, int]:
                    row['elevation_f'] = point.elevation * 3.28084
                else:
                    row['elevation_f'] = None
                rows.append(row)
    df = pd.DataFrame(rows)
    return df

In [37]:
def bearing_deg(lat1, lon1, lat2, lon2):
    """
    Bearing from point 1 to point 2, degrees in [0, 360).
    Inputs in degrees.
    """
    lat1 = np.deg2rad(lat1)
    lon1 = np.deg2rad(lon1)
    lat2 = np.deg2rad(lat2)
    lon2 = np.deg2rad(lon2)
    dlon = lon2 - lon1

    y = np.sin(dlon) * np.cos(lat2)
    x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon)
    brng = np.rad2deg(np.arctan2(y, x))
    return (brng + 360) % 360

def step_data(df):
    rad_coords = df[['lat', 'lon']].apply(np.deg2rad)
    step_meters = haversine_distances(rad_coords.iloc[:-1], rad_coords.iloc[1:]).diagonal() * 6371000
    lat = df["lat"].to_numpy()
    lon = df["lon"].to_numpy()
    brng = bearing_deg(lat[:-1], lon[:-1], lat[1:], lon[1:])
    df['step_bearing'] = np.array([*brng, np.nan])
    df['step_bearing'] = df['step_bearing'].replace({0:np.nan})
    df['step_turn'] = df['step_bearing'].diff().abs()
    df['step_dist_m'] = np.array([0, *step_meters])
    df['step_dist_f'] = df['step_dist_m'] * 3.28084
    df['step_elevation_m'] = df['elevation_m'].diff().fillna(0)
    df['step_elevation_f'] = df['elevation_f'].diff().fillna(0)
    df['step_grade'] = df['step_elevation_m'] / df['step_dist_m'].replace(0, np.nan)
    return df

In [38]:
def step_analysis(df, rolling_window=3):
    df['avg_step_grade'] = df['step_grade'].rolling(rolling_window).mean()
    df['avg_bearing_change'] = df['step_turn'].rolling(rolling_window).mean() 
    hazards = dict(
        light_descent=(df["step_grade"].round(2) < -0.03) | (df["avg_step_grade"].round(2) < -0.08),
        steep_descent=(df["step_grade"].round(2) < -0.1) | (df["avg_step_grade"].round(2) < -0.15),
        ultra_steep_descent=(df["step_grade"].round(2) < -0.2) | (df["avg_step_grade"].round(2) < -0.2),
        turn_on_descent=(
            # Sudden Turn coming off a steep descent
            (df["step_turn"].round() > 24) & (df["step_grade"].shift(-1).round(2) < -0.05)
        ) |
        (
            # Sudden Turn on a steep descent
            (df["step_turn"].round() > 24) & (df["step_grade"] < -0.05)
        ) |
        (
            # Sustained Turn
            (df["avg_bearing_change"].round() > 24) & (df["avg_step_grade"].round(2) < -0.05)
            ),
        turn_on_steep_descent=(
            # Sudden Turn coming off a steep descent
            (df["step_turn"].round() > 24) & (df["step_grade"].shift(-1).round(2) < -0.1)
        ) |
        (
            # Sudden Turn on a steep descent
            (df["step_turn"].round() > 24) & (df["step_grade"].round(2) < -0.1)
        ) |
        (
            # Sustained Turn
            (df["avg_bearing_change"].round() > 24) & (df["avg_step_grade"].round(2) < -0.1)
            )
    )
    df["hazard"] = "none"
    for h in list(hazards.keys()):
        df.loc[hazards[h], "hazard"] = h
    return df

In [12]:
hazard_colors = {
    "light_descent": "#fee08b",
    "steep_descent": "#f46d43",
    "ultra_steep_descent": "#9F0712",
    "turn_on_descent": "#4F39F6",
    "turn_on_steep_descent": "#8A0194",
    "none": "#31D492"
}

In [None]:
def points_to_segments_lonlat(gdf, lon="lon", lat="lat", sort_col=None):
    g = gdf.sort_values(sort_col) if sort_col else gdf.sort_index()
    x0, y0 = g[lon].to_numpy()[:-1], g[lat].to_numpy()[:-1]
    x1, y1 = g[lon].to_numpy()[1:],  g[lat].to_numpy()[1:]
    seg_geom = [LineString([(a,b),(c,d)]) for a,b,c,d in zip(x0,y0,x1,y1)]
    seg = gpd.GeoDataFrame(
        {"start_i": g.index[:-1], "end_i": g.index[1:]},
        geometry=seg_geom,
        crs=gdf.crs
    )
    seg = seg.merge(gdf.drop(columns='geometry'), left_on='end_i', right_on='step', how='left')
    return seg

In [77]:
paths = ["arlington_loop.gpx"]

In [78]:
dfs = []
segments = []
for path in paths:
    df = read_simple_gpx(path)
    df['name'] = path.split(".gpx")[0]
    df = step_data(df)
    df = step_analysis(df)
    gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.lon, df.lat), crs="EPSG:4326")
    gdf_segments = points_to_segments_lonlat(gdf)
    segments.append(gdf_segments)
    dfs.append(df)
df = pd.concat(dfs, ignore_index=True)
gdf_segments = gpd.GeoDataFrame(
    pd.concat(segments, ignore_index=True),
    crs=segments[0].crs,
)

In [79]:
df.head(10).columns

Index(['name', 'step', 'lat', 'lon', 'elevation_m', 'time', 'elevation_f',
       'step_bearing', 'step_turn', 'step_dist_m', 'step_dist_f',
       'step_elevation_m', 'step_elevation_f', 'step_grade', 'avg_step_grade',
       'avg_bearing_change', 'hazard'],
      dtype='str')

In [80]:
df[df['step_grade'] > 0].groupby('name')['step_grade'].describe()

Unnamed: 0_level_0,count,mean,std,min,25%,50%,75%,max
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
arlington_loop,248.0,0.039777,0.031916,0.000949,0.012527,0.03155,0.061102,0.150051


In [81]:
df[df['step_grade'] < 0].groupby('name')['step_grade'].describe()

Unnamed: 0_level_0,count,mean,std,min,25%,50%,75%,max
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
arlington_loop,304.0,-0.035763,0.025966,-0.133447,-0.052019,-0.030013,-0.015087,-0.002154


In [82]:
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.lon, df.lat), crs="EPSG:4326")

In [83]:
# points_to_segments_lonlat(gdf).explore(column='step_grade', style_kwds={"weight": 6}, cmap='Reds_r', vmin=-.25, vmax=0)

In [84]:
gdf_segments = points_to_segments_lonlat(gdf)

In [85]:
# gdf_segments.explore(column='hazard', cmap='Reds', style_kwds={"weight": 6})

In [86]:
m = gdf_segments.explore(column='hazard', categorical=True, cmap=list(hazard_colors.values()), categories=list(hazard_colors.keys()), legend=True,style_kwds={"weight": 6})
gdf.iloc[[0]].explore(m=m,  marker_type="marker",
    tooltip=False,
    marker_kwds={
        "icon": folium.Icon(
            icon="arrow-right",
            prefix="fa",   # Font Awesome
            color="green"
        )
    }
)
gdf.iloc[[-1]].explore(m=m,  marker_type="marker",
    tooltip=False,
    marker_kwds={
        "icon": folium.Icon(
            icon="stop",
            prefix="fa",   # Font Awesome
            color="red"
        )
    }
)
m


In [87]:
route_line = gdf_segments.union_all()
route_poly = (
    gpd.GeoSeries([route_line], crs=gdf_segments.crs)
    .to_crs(3857)
    .buffer(2)
    .to_crs(gdf_segments.crs)
    .iloc[0]
)

In [89]:
tags = {"highway": ["stop", "traffic_signals"]}
all_stops = ox.features_from_polygon(route_poly, tags=tags)

ReadTimeout: HTTPSConnectionPool(host='overpass-api.de', port=443): Read timed out. (read timeout=180)

In [None]:
lights = all_stops[all_stops["highway"] == "traffic_signals"]
stops  = all_stops[all_stops["highway"] == "stop"]

In [None]:
stops

Unnamed: 0_level_0,Unnamed: 1_level_0,geometry,highway,direction
element,id,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
node,57807871,POINT (-122.28657 37.91931),stop,
node,57846732,POINT (-122.27811 37.90328),stop,
node,57870497,POINT (-122.30781 37.93468),stop,
node,57913392,POINT (-122.30435 37.92974),stop,
node,57929815,POINT (-122.30948 37.93233),stop,
node,9658059769,POINT (-122.29417 37.9254),stop,backward
node,9906917489,POINT (-122.28053 37.90827),stop,forward


In [None]:
if 'description' in lights.columns:
    lights = lights[lights['description'] != 'flashing yellow light']
lights

Unnamed: 0_level_0,Unnamed: 1_level_0,geometry,highway,direction
element,id,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
node,57797132,POINT (-122.28141 37.91326),traffic_signals,
node,263579538,POINT (-122.28139 37.91303),traffic_signals,


In [None]:
m = gdf_segments.explore(column='hazard', categorical=True, cmap=list(hazard_colors.values()), categories=list(hazard_colors.keys()), legend=True,style_kwds={"weight": 6})
gdf.iloc[[0]].explore(m=m,  marker_type="marker",
    tooltip=False,
    marker_kwds={
        "icon": folium.Icon(
            icon="arrow-right",
            prefix="fa",   # Font Awesome
            color="green"
        )
    }
)
gdf.iloc[[-1]].explore(m=m,  marker_type="marker",
    tooltip=False,
    marker_kwds={
        "icon": folium.Icon(
            icon="stop",
            prefix="fa",   # Font Awesome
            color="red"
        )
    }
)
stops.explore(
    m=m,
    tooltip=["highway"],
    marker_kwds={"radius": 4, "color": "red"},
)
lights.explore(
    m=m,
    tooltip=["highway"],
    marker_kwds={"radius": 6},
)
m


In [28]:
# import osmnx as ox
# import geopandas as gpd
# import numpy as np

# def controls_on_route_edges(
#     gdf_segments,
#     corridor_m=60,
#     ctrl_snap_max_m=10,
#     network_type="bike",
#     include_crossings=False,
# ):
#     """
#     Returns controls that apply to the route by:
#       - building a corridor polygon around your route
#       - downloading an OSMnx graph in that corridor
#       - snapping route segments to graph edges to get the route edge set
#       - snapping control points to graph edges
#       - keeping only controls whose snapped edge is in the route edge set

#     gdf_segments: GeoDataFrame of LineString route segments in EPSG:4326
#     """
#     # 1) Corridor polygon around the route (buffer in meters)
#     route_line = gdf_segments.union_all()
#     route_poly = (
#         gpd.GeoSeries([route_line], crs=gdf_segments.crs)
#         .to_crs(3857)
#         .buffer(corridor_m)
#         .to_crs(gdf_segments.crs)
#         .iloc[0]
#     )

#     # 2) Download and project graph
#     G = ox.graph_from_polygon(route_poly, network_type=network_type)
#     Gp = ox.project_graph(G, to_crs="EPSG:3857")

#     # 3) Snap route segments to nearest edges (use segment midpoints)
#     segs_3857 = gdf_segments.to_crs(3857).copy()
#     mids = segs_3857.geometry.interpolate(0.5, normalized=True)
#     uvk_route = ox.distance.nearest_edges(Gp, mids.x.to_numpy(), mids.y.to_numpy(), return_dist=False)
#     route_edge_set = set(uvk_route)

#     # 4) Fetch candidate controls (stop signs + traffic lights)
#     tags = {"highway": ["stop", "traffic_signals"]}
#     if include_crossings:
#         # optional: sometimes signals show up as crossing-related nodes
#         tags["highway"].append("crossing")

#     controls = ox.features_from_polygon(route_poly, tags=tags)
#     controls = controls[controls.geometry.type == "Point"].copy()

#     # 5) Snap controls to nearest edges and filter to route edges
#     controls_3857 = controls.to_crs(3857)
#     uvk_ctrl, d_ctrl = ox.distance.nearest_edges(
#         Gp, controls_3857.geometry.x.to_numpy(), controls_3857.geometry.y.to_numpy(), return_dist=True
#     )

#     controls_3857["u"] = [t[0] for t in uvk_ctrl]
#     controls_3857["v"] = [t[1] for t in uvk_ctrl]
#     controls_3857["key"] = [t[2] for t in uvk_ctrl]
#     controls_3857["snap_dist_m"] = d_ctrl

#     on_route = controls_3857[
#         controls_3857.apply(lambda r: (r["u"], r["v"], r["key"]) in route_edge_set, axis=1)
#         & (controls_3857["snap_dist_m"] <= ctrl_snap_max_m)
#         & (controls_3857["highway"].isin(["stop", "traffic_signals"]))  # keep only these two
#     ].copy()

#     return on_route.to_crs(gdf_segments.crs), controls.to_crs(gdf_segments.crs), G


In [29]:
# controls_on_route, controls_all, G = controls_on_route_edges(
#     gdf_segments,
#     corridor_m=60,
#     ctrl_snap_max_m=10,
#     network_type="bike",
# )

# # separate them
# stops  = controls_on_route[controls_on_route["highway"] == "stop"]
# lights = controls_on_route[controls_on_route["highway"] == "traffic_signals"]


In [30]:
# m = gdf_segments.explore(column='hazard', cmap='Reds', style_kwds={"weight": 6})
# controls_on_route.explore(
#     m=m,
#     tooltip=["highway", "snap_dist_m"],
#     marker_kwds={"radius": 6},
# )
# m

