In [7]:
# 49035114000 (center)
# 49035980000 (airport)
# 49035110106 (ski)
# 49035101402 (U of U)

In [3]:
# ============================================================
# FAST + ROBUST VERSION
# Build OD-level annual statistics for Complete Trips
# OD defined strictly by Census Tract (GEOID + shapefile)
# Population-level statistics (NOT sampled)
# ============================================================

import pandas as pd
import numpy as np
import geopandas as gpd
import glob
import json
from datetime import datetime
import pygeohash as pgh


# =========================
# CONFIG
# =========================

BASE_DIR = "C:/Users/rli04/Villanova University/Complete-trip-coordinate - Documents/General"
PARQUET_DIR = f"{BASE_DIR}/Salt_Lake/delivery"

TRACT_SHP = (
    f"{BASE_DIR}/Manuscript/Figure/Visualization-RL/"
    f"2-OD patterns by census track/six_counties_track.shp"
)

ORIG_TRACT = "49035110106"
DEST_TRACT = "49035101402"
# 49035114000 (center)
# 49035980000 (airport)
# 49035110106 (ski)
# 49035101402 (U of U)
MONTHS = [
    "Jan","Feb","Mar","Apr","May","Jun",
    "Jul","Aug","Sep","Oct","Nov","Dec"
]

OUTPUT_STATS_JSON = f"{ORIG_TRACT}_to_{DEST_TRACT}.stats.json"

USE_COLS = [
    "linked_trip_id",
    "travel_mode",
    "local_datetime_start",
    "local_datetime_end",
    "geohash7_orig",
    "geohash7_dest"
]


# =========================
# 0️⃣ LOAD TRACTS
# =========================

tracts = gpd.read_file(TRACT_SHP).to_crs("EPSG:4326")
tracts["GEOID"] = tracts["GEOID"].astype(str)

orig_poly = tracts.loc[tracts["GEOID"] == ORIG_TRACT, ["geometry"]].copy()
dest_poly = tracts.loc[tracts["GEOID"] == DEST_TRACT, ["geometry"]].copy()

if orig_poly.empty or dest_poly.empty:
    raise ValueError("Origin/Destination tract GEOID not found in shapefile.")


# =========================
# 1️⃣ LOAD YEARLY DATA
# =========================

dfs = []
for m in MONTHS:
    files = glob.glob(f"{PARQUET_DIR}/Salt_Lake-{m}-2020/*.snappy.parquet")
    if not files:
        continue
    dfs.append(pd.concat(
        [pd.read_parquet(f, columns=USE_COLS) for f in files],
        ignore_index=True
    ))

if not dfs:
    raise ValueError("No parquet files found. Check PARQUET_DIR and MONTHS.")

df = pd.concat(dfs, ignore_index=True)

df["local_datetime_start"] = pd.to_datetime(df["local_datetime_start"], errors="coerce")
df["local_datetime_end"]   = pd.to_datetime(df["local_datetime_end"], errors="coerce")

df = df[df["local_datetime_end"] > df["local_datetime_start"]]

# 清理 geohash 空值/非字符串（避免 pgh.decode 崩）
df = df[df["geohash7_orig"].notna() & df["geohash7_dest"].notna()]
df["geohash7_orig"] = df["geohash7_orig"].astype(str)
df["geohash7_dest"] = df["geohash7_dest"].astype(str)


# =========================
# 2️⃣ FAST GEOHASH DECODE (UNIQUE CACHE)
# =========================

def _decode_unique_geohash(series: pd.Series) -> pd.DataFrame:
    """
    Decode geohash series into (lat, lon) using unique-cache mapping.
    Invalid geohash -> NaN.
    """
    uniq = series.unique()
    mapping = {}
    for gh in uniq:
        try:
            lat, lon = pgh.decode(gh)
            mapping[gh] = (lat, lon)
        except Exception:
            mapping[gh] = (np.nan, np.nan)

    latlon = series.map(mapping)
    out = pd.DataFrame(latlon.tolist(), index=series.index, columns=["lat", "lon"])
    return out

o_latlon = _decode_unique_geohash(df["geohash7_orig"])
d_latlon = _decode_unique_geohash(df["geohash7_dest"])

df["orig_lat"] = o_latlon["lat"]
df["orig_lon"] = o_latlon["lon"]
df["dest_lat"] = d_latlon["lat"]
df["dest_lon"] = d_latlon["lon"]

# 丢掉 decode 失败的点
df = df[df["orig_lat"].notna() & df["orig_lon"].notna() & df["dest_lat"].notna() & df["dest_lon"].notna()]


# =========================
# 3️⃣ OD FILTER (SPATIAL JOIN, FAST)
# =========================

# Origin points GeoDataFrame
gdf = gpd.GeoDataFrame(
    df,
    geometry=gpd.points_from_xy(df["orig_lon"], df["orig_lat"]),
    crs="EPSG:4326"
)

# sjoin origin within ORIG tract
gdf = gpd.sjoin(gdf, orig_poly, predicate="within", how="inner")
# 清理 join 产生的列，避免第二次 sjoin 冲突
for col in ["index_right"]:
    if col in gdf.columns:
        gdf = gdf.drop(columns=[col])

# Destination points GeoDataFrame（替换 geometry 为 dest 点）
gdf = gdf.drop(columns=["geometry"])
gdf = gpd.GeoDataFrame(
    gdf,
    geometry=gpd.points_from_xy(gdf["dest_lon"], gdf["dest_lat"]),
    crs="EPSG:4326"
)

# sjoin destination within DEST tract
gdf = gpd.sjoin(gdf, dest_poly, predicate="within", how="inner")
for col in ["index_right"]:
    if col in gdf.columns:
        gdf = gdf.drop(columns=[col])

print("After OD (tract-based) filter, rows:", len(gdf))
print("Unique linked trips:", gdf["linked_trip_id"].nunique())

# 如果 OD 没有任何记录，直接输出空 stats（不报错）
if gdf.empty:
    stats = {
        "schema": "nova.complete_trip.od_stats.v1",
        "generated_at": datetime.utcnow().isoformat() + "Z",
        "od": {"origin": ORIG_TRACT, "destination": DEST_TRACT},
        "coverage": {"temporal": "year-2020", "spatial": "Salt Lake 6-county"},
        "counts": {"linked_trips": 0},
        "note": "No rows after tract-based OD filter"
    }
    with open(OUTPUT_STATS_JSON, "w", encoding="utf-8") as f:
        json.dump(stats, f, indent=2, allow_nan=False)
    print("Stats JSON written to:", OUTPUT_STATS_JSON)
    raise SystemExit


# =========================
# 4️⃣ COMPLETE TRIP FILTER (≥2 legs)  ——逻辑不变
# =========================

leg_counts = gdf.groupby("linked_trip_id").size()


if gdf.empty:
    stats = {
        "schema": "nova.complete_trip.od_stats.v1",
        "generated_at": datetime.utcnow().isoformat() + "Z",
        "od": {"origin": ORIG_TRACT, "destination": DEST_TRACT},
        "coverage": {"temporal": "year-2020", "spatial": "Salt Lake 6-county"},
        "counts": {"linked_trips": 0},
        "note": "No valid complete trips (linked_trip with >=2 legs) after filtering"
    }
    with open(OUTPUT_STATS_JSON, "w", encoding="utf-8") as f:
        json.dump(stats, f, indent=2, allow_nan=False)
    print("Stats JSON written to:", OUTPUT_STATS_JSON)
    raise SystemExit


# =========================
# 5️⃣ LEG DURATION (VECTOR)
# =========================

gdf["duration_min"] = (
    gdf["local_datetime_end"] - gdf["local_datetime_start"]
).dt.total_seconds() / 60


# =========================
# 6️⃣ COMPLETE-TRIP AGGREGATION (FAST)
# =========================

trip_stats = (
    gdf
    .groupby("linked_trip_id")
    .agg(
        total_duration=("duration_min", "sum"),
        transfers=("duration_min", "size"),
        modes=("travel_mode", lambda x: set(m.lower().strip() for m in x))
    )
)

trip_stats["transfers"] = trip_stats["transfers"] - 1


# =========================
# 7️⃣ FINAL STATS (WITH EMPTY GUARD)
# =========================

dur = trip_stats["total_duration"].to_numpy()
mean_dur = float(dur.mean())
trf = trip_stats["transfers"].to_numpy()
modes = trip_stats["modes"].to_numpy()

def pct(a, q):
    return float(np.percentile(a, q))
# =========================
# 7️⃣b TRAVEL TIME DISTRIBUTION (HISTOGRAM)
# =========================

BIN_WIDTH = 5        # minutes
MAX_TIME  = 180       # minutes (cap long tail)

bins = np.arange(0, MAX_TIME + BIN_WIDTH, BIN_WIDTH)

# cap extreme values
dur_capped = np.clip(dur, 0, MAX_TIME)

hist_counts, bin_edges = np.histogram(dur_capped, bins=bins)

travel_time_hist = {
    "bin_width_min": BIN_WIDTH,
    "max_time_min": MAX_TIME,
    "bin_edges_min": bin_edges.tolist(),
    "counts": hist_counts.tolist()
}


stats = {
    "schema": "nova.complete_trip.od_stats.v1",
    "generated_at": datetime.utcnow().isoformat() + "Z",
    "od": {"origin": ORIG_TRACT, "destination": DEST_TRACT},
    "coverage": {"temporal": "year-2020", "spatial": "Salt Lake 6-county"},
    "counts": {"linked_trips": int(len(dur))},
    "trip_duration_min": {
        "min": float(dur.min()),
        "mean": mean_dur,
        "p25": pct(dur, 25),
        "median": pct(dur, 50),
        "p75": pct(dur, 75),
        "max": float(dur.max())
    },
    "transfers": {
        "avg": float(trf.mean()),
        "p75": int(pct(trf, 75)),
        "max": int(trf.max())
    },
    "mode_involvement": {
        "car": float(sum("car" in m for m in modes) / len(modes)),
        "bus": float(sum("bus" in m for m in modes) / len(modes)),
        "rail": float(sum("rail" in m for m in modes) / len(modes)),
        "walk": float(sum("walk" in m for m in modes) / len(modes))
    },
    "travel_time_distribution": travel_time_hist

}

with open(OUTPUT_STATS_JSON, "w", encoding="utf-8") as f:
    json.dump(stats, f, indent=2, allow_nan=False)

print("Stats JSON written to:", OUTPUT_STATS_JSON)


After OD (tract-based) filter, rows: 217
Unique linked trips: 217
Stats JSON written to: 49035110106_to_49035101402.stats.json


In [21]:
print(hist_counts)


[ 283 2034  987  229   76   24   13   11    5    4    7    4    3    3
    1    2    1   12]
