In [1]:
!pip install leuvenmapmatching gpxpy shapely plotly pyproj rtree

You should consider upgrading via the '/Users/mkharytonau/Projects/infographics/.venv/bin/python3 -m pip install --upgrade pip' command.[0m


In [2]:
import json
import gpxpy
import plotly.graph_objects as go
from leuvenmapmatching.map.inmem import InMemMap
from leuvenmapmatching.matcher.distance import DistanceMatcher
from shapely.geometry import LineString, Point
import rtree

In [None]:
# --- 1. Load reference GPX ---
gpx_file_path = "./testdata/tsna.gpx"
with open(gpx_file_path, 'r') as gpx_file:
    gpx = gpxpy.parse(gpx_file)
ref_coords = [(pt.latitude, pt.longitude) for trk in gpx.tracks for seg in trk.segments for pt in seg.points]

In [25]:
# --- 2. Load GPS trace ---
with open("./testdata/tsna.json", "r") as file:
	data = json.load(file)

gps_coords = [
	(entry["latitude"], entry["longitude"]) 
	for entry in data
]

def is_valid_coord(lat, lon):
    try:
        return (
            lat is not None and lon is not None and
            isinstance(lat, (int, float)) and isinstance(lon, (int, float))
        )
    except:
        return False

bad_gps_coords = [
    (lat, lon)
    for lat, lon in gps_coords
    if not is_valid_coord(lat, lon)
]
filtered_gps_coords = [
    (lat, lon)
    for lat, lon in gps_coords
    if is_valid_coord(lat, lon)
]

print(f"Filtered valid GPS points: {len(filtered_gps_coords)} of {len(gps_coords)}")
print(f"Bad GPS points: {bad_gps_coords}")

Filtered valid GPS points: 39 of 274
Bad GPS points: [(None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, None), (None, N

In [26]:
# --- 3. Build InMemMap from GPX ---
map_con = InMemMap("reference_map", use_rtree=True, index_edges=True)
for i, (lat, lon) in enumerate(ref_coords):
    map_con.add_node(i, (lat, lon))
    if i > 0:
        map_con.add_edge(i-1, i)

In [27]:
# --- 4. Run LeuvenMapMatching ---
matcher = DistanceMatcher(
    map_con,
    max_dist=500,
    obs_noise=4,
    obs_noise_ne=10,
    non_emitting_states=False,
    only_edges=True
)

# Input as (lat, lon) tuples for matcher
gps_input = [(lat, lon) for lat, lon in filtered_gps_coords]
matched_states, _ = matcher.match(gps_input)

In [28]:
# --- 5. Extract matched node coordinates ---
matched_node_coords = [
    map_con.node_coordinates(edge[0])
    for edge in matched_states
] + [map_con.node_coordinates(matched_states[-1][1])] # append last target

In [29]:
# --- 6. Snap GPS points to matched edges, calculate distances ---

from shapely.geometry import LineString, Point
from pyproj import Transformer
from functools import lru_cache

# Cache EPSG code determination
@lru_cache(maxsize=1000)
def get_epsg_code(lat: float, lon: float) -> int:
    """
    Returns the UTM EPSG code for a given WGS84 latitude and longitude.
    """
    zone_number = int((lon + 180) / 6) + 1
    is_northern = lat >= 0
    return 32600 + zone_number if is_northern else 32700 + zone_number

# Cache the transformers per EPSG zone
@lru_cache(maxsize=10)
def get_transformers_for_epsg(epsg_code: int):
    """
    Returns transformers to/from WGS84 <-> UTM for a given EPSG zone.
    """
    to_utm = Transformer.from_crs("EPSG:4326", f"EPSG:{epsg_code}", always_xy=True)
    to_wgs = Transformer.from_crs(f"EPSG:{epsg_code}", "EPSG:4326", always_xy=True)
    return to_utm, to_wgs

def get_transformers_for_point(lat: float, lon: float):
    """
    Returns (to_utm, to_wgs) transformers for the UTM zone corresponding to the given lat/lon.
    """
    epsg = get_epsg_code(lat, lon)
    return get_transformers_for_epsg(epsg)

# --- Step 1: Precompute cumulative distance along reference route in UTM ---
reference_utm_coords = [get_transformers_for_point(lat, lon)[0].transform(lon, lat) for lat, lon in ref_coords]

cumulative_distances = {0: 0.0}
for i in range(1, len(ref_coords)):
    prev = reference_utm_coords[i - 1]
    curr = reference_utm_coords[i]
    dist = LineString([prev, curr]).length
    cumulative_distances[i] = cumulative_distances[i - 1] + dist

# --- Step 2: Project GPS points onto matched edges and calculate distances ---
matched_coords = []
monotonic_distances = []
max_distance = 0.0

for i, (gps_lat, gps_lon) in enumerate(gps_input):
    state = matched_states[i]

    node1, node2 = state
    coord1 = map_con.node_coordinates(node1)
    coord2 = map_con.node_coordinates(node2)

    # Convert all points to UTM
    # use 1st point of reference edge to find correct UTM zone
    (transformer_to_utm, transformer_to_wgs) = get_transformers_for_point(coord1[0], coord1[1])
    pt1_utm = transformer_to_utm.transform(coord1[1], coord1[0])
    pt2_utm = transformer_to_utm.transform(coord2[1], coord2[0])
    gps_utm = transformer_to_utm.transform(gps_lon, gps_lat)

    # Project GPS point to reference edge
    edge_line = LineString([pt1_utm, pt2_utm])
    point = Point(gps_utm)
    projection_on_edge = edge_line.project(point)
    projected_point = edge_line.interpolate(projection_on_edge)

    # Convert projected point back to WGS84
    proj_lon, proj_lat = transformer_to_wgs.transform(projected_point.x, projected_point.y)
    matched_coords.append((proj_lat, proj_lon))  # (lat, lon)

    # Compute cumulative distance on reference
    base_distance = cumulative_distances.get(node1, 0.0)
    total_distance = base_distance + projection_on_edge

    # Enforce monotonicity
    max_distance = max(max_distance, total_distance)
    monotonic_distances.append(max_distance)

In [30]:
print(f"Reference GPS points: {len(ref_coords)}")
print(f"Original GPS points: {len(gps_input)}")
print(f"Matched states: {len(matched_states)}")
print(f"Matched points: {len(matched_coords)}")
print(f"Distances: {len(monotonic_distances)}")

Reference GPS points: 28
Original GPS points: 39
Matched states: 39
Matched points: 39
Distances: 39


## Visualize

In [31]:
import plotly.graph_objects as go

fig = go.Figure()

# 1. Original GPS trace
fig.add_trace(go.Scattermap(
    lon=[y for x, y in gps_coords],
    lat=[x for x, y in gps_coords],
    mode="markers+lines",
    name="Original GPS",
    marker=dict(color="blue", size=6)
))

# 2. Reference route (GPX)
fig.add_trace(go.Scattermap(
    lon=[y for x, y in ref_coords],
    lat=[x for x, y in ref_coords],
    mode="lines+markers",
    name="Reference Route",
    marker=dict(color="green", size=5),
    line=dict(width=2, color="green")
))

# 3. Matched node sequence (from map matcher)
fig.add_trace(go.Scattermap(
    lon=[y for x, y in matched_node_coords],
    lat=[x for x, y in matched_node_coords],
    mode="markers+lines",
    name="Matched Nodes",
    marker=dict(color="orange", size=6, symbol="circle")
))

# Create debug lines between GPS input and matched points
debug_lines = []
for (lat1, lon1), (lat2, lon2) in zip(gps_input, matched_coords):
    debug_lines.append(
        go.Scattermap(
            lon=[lon1, lon2],
            lat=[lat1, lat2],
            mode='lines',
            line=dict(width=1, color='grey'),
            showlegend=False
        )
    )

dist_labels = [f"{int(d)} m" for d in monotonic_distances]

# 4. Interpolated matched GPS points (on edges)
fig.add_trace(go.Scattermap(
    lon=[y for x, y in matched_coords],
    lat=[x for x, y in matched_coords],
    mode="markers+text",
    name="Matched Points on Route",
    marker=dict(color="red", size=4, symbol="circle"),
    text=dist_labels,
    textposition="top right"
))

# 5. Add debug lines
for line in debug_lines[:50]:
    fig.add_trace(line)

fig.update_layout(
    map_style="open-street-map",
    map_zoom=15,
    map_center={"lat": gps_coords[0][0], "lon": gps_coords[0][1]},
    margin={"l": 0, "r": 0, "t": 30, "b": 0},
    title="Map Matching Debug View"
)

fig.show()