In [4]:
# --- Robust load + assign distance bands + write balanced/unbalanced ---
import pandas as pd
import numpy as np
import geopandas as gpd
import pyogrio
from shapely import wkb, wkt
from shapely.geometry import Point
from pathlib import Path

# ===== 路径 =====
P_PANEL_LONG = "data/processed/voa/panel_long_onspd_enriched_ct_ptal_imd.parquet"
P_STATIONS   = "data/interim/tfl_elizabeth_line_stations_2022_27700.gpkg"

OUT_PARQ_ALL = "data/clean/voa_panel_with_station_band.parquet"
OUT_GPKG_ALL = "data/clean/voa_panel_with_station_band_27700.gpkg"

OUT_PARQ_BAL = "data/clean/voa_panel_with_station_band_balanced.parquet"
OUT_PARQ_UNB = "data/clean/voa_panel_with_station_band_unbalanced.parquet"
OUT_GPKG_BAL = "data/clean/voa_panel_with_station_band_balanced_27700.gpkg"
OUT_GPKG_UNB = "data/clean/voa_panel_with_station_band_unbalanced_27700.gpkg"

# ===== 小工具 =====
def read_first_layer(path):
    layer = pyogrio.list_layers(path)[0][0]
    return gpd.read_file(path, layer=layer, engine="pyogrio")

def ensure_27700(gdf):
    if gdf.crs is None:
        gdf.set_crs(27700, inplace=True)
    elif gdf.crs.to_epsg() != 27700:
        gdf = gdf.to_crs(27700)
    return gdf

def load_panel_as_gdf(parq_path):
    df = pd.read_parquet(parq_path)  # 用 pandas 读，绕过 geo 元数据限制

    geom = None
    # 1) 优先尝试从 geom_27700 解析
    if "geom_27700" in df.columns:
        s = df["geom_27700"]
        if len(s) and pd.notna(s.iloc[0]):
            v0 = s.iloc[0]
            try:
                if isinstance(v0, (bytes, bytearray, memoryview)):
                    geom = gpd.GeoSeries([wkb.loads(v) if pd.notna(v) else None for v in s], crs=27700)
                elif isinstance(v0, (list, tuple)):  # 整数列表 → bytes → WKB
                    geom = gpd.GeoSeries([wkb.loads(bytes(v)) if isinstance(v, (list,tuple)) else None for v in s], crs=27700)
                elif isinstance(v0, str):
                    # 尝试当作 WKT
                    geom = gpd.GeoSeries([wkt.loads(v) if isinstance(v, str) else None for v in s], crs=27700)
            except Exception:
                geom = None

    # 2) 退而求其次：用 Easting/Northing 生成点
    if geom is None or geom.isna().all():
        if {"onspd_easting","onspd_northing"}.issubset(df.columns):
            x = pd.to_numeric(df["onspd_easting"], errors="coerce")
            y = pd.to_numeric(df["onspd_northing"], errors="coerce")
            geom = gpd.GeoSeries(gpd.points_from_xy(x, y, crs=27700))
        else:
            raise ValueError("找不到可用几何（geom_27700 或 onspd_easting/onspd_northing）。")

    gdf = gpd.GeoDataFrame(df, geometry=geom, crs=27700)
    # 清理无效或空几何
    gdf = gdf[gdf.geometry.notna() & ~gdf.geometry.is_empty].copy()
    return gdf

# ===== 1) 读车站与长表 =====
stns = ensure_27700(read_first_layer(P_STATIONS)).reset_index(drop=True)
name_col = next((c for c in ["NAME","FULL_NAME","FullName","Station","station",
                             "station_name","name","TITLE","Title"] if c in stns.columns), None)
stns["station_id"] = (stns.get("OBJECTID", pd.Series(range(len(stns))))).astype(str)
stns["station_name"] = stns[name_col].astype(str) if name_col else ("station_" + (stns.index+1).astype(str))
stns = stns[["station_id","station_name","geometry"]].copy()

panel = load_panel_as_gdf(P_PANEL_LONG)
panel = ensure_27700(panel)

# ===== 2) 最近邻距离 + 前闭后开分箱 =====
out = gpd.sjoin_nearest(panel, stns, how="left", distance_col="dist_to_sta_m")
out = out.drop(columns=[c for c in out.columns if c.endswith("_left") or c.endswith("_right")], errors="ignore")

bins   = [0, 100, 400, 800, np.inf]
labels = ["b0_100","b100_400","b400_800","beyond_800"]
out["station_band"] = pd.cut(out["dist_to_sta_m"], bins=bins, labels=labels, right=False)

name_map = {
    "b0_100": "in_station_0_100",
    "b100_400": "in_station_100_400",
    "b400_800": "in_station_400_800",
    "beyond_800": "beyond_800"
}
for lab, col in name_map.items():
    out[col] = (out["station_band"] == lab).astype("int8")

# ===== 3) 若缺 n_t，则尝试用 uarn×avd 推出；否则跳过 =====
if "n_t" not in out.columns and {"uarn","avd"}.issubset(out.columns):
    out["n_t"] = out.groupby("uarn")["avd"].transform("nunique").astype("int16")

# ===== 4) 写出全集 =====
Path(OUT_PARQ_ALL).parent.mkdir(parents=True, exist_ok=True)
out.to_parquet(OUT_PARQ_ALL, index=False)
try:
    Path(OUT_GPKG_ALL).unlink()
except FileNotFoundError:
    pass
pyogrio.write_dataframe(out, OUT_GPKG_ALL, layer="voa_with_station_band_27700", driver="GPKG")

# ===== 5) 写出子集（balanced: n_t==3；unbalanced: n_t>=2）=====
if "n_t" in out.columns:
    balanced   = out.loc[out["n_t"] == 3].copy()
    unbalanced = out.loc[out["n_t"] >= 2].copy()   # 注意：此定义包含 balanced

    balanced.to_parquet(OUT_PARQ_BAL, index=False)
    unbalanced.to_parquet(OUT_PARQ_UNB, index=False)

    for fp, gdf, layer in [
        (OUT_GPKG_BAL, balanced,   "balanced_27700"),
        (OUT_GPKG_UNB, unbalanced, "unbalanced_27700"),
    ]:
        try:
            Path(fp).unlink()
        except FileNotFoundError:
            pass
        pyogrio.write_dataframe(gdf, fp, layer=layer, driver="GPKG")

    print(f"Done. all={len(out)}, balanced={len(balanced)}, unbalanced={len(unbalanced)}")
else:
    print(f"Done. all={len(out)}。未找到 n_t，已写出全集，子集未生成。")

print(out[["dist_to_sta_m","station_band","in_station_0_100","in_station_100_400","in_station_400_800","beyond_800"]].head())


Done. all=270229, balanced=234294, unbalanced=257276
   dist_to_sta_m station_band  in_station_0_100  in_station_100_400  \
0     508.430171     b400_800                 0                   0   
1    2385.569224   beyond_800                 0                   0   
2    1239.263748   beyond_800                 0                   0   
3    1588.461475   beyond_800                 0                   0   
4    1587.996266   beyond_800                 0                   0   

   in_station_400_800  beyond_800  
0                   1           0  
1                   0           1  
2                   0           1  
3                   0           1  
4                   0           1  


In [7]:
# --- Add switch_uncertain from existing n_t (0 if n_t==3 else 1) ---
import pandas as pd
from pathlib import Path

# 想处理哪个就放进来；不存在的会自动跳过
PATHS = [
    "data/clean/voa_panel_with_station_band.parquet",
    "data/clean/voa_panel_with_station_band_balanced.parquet",
    "data/clean/voa_panel_with_station_band_unbalanced.parquet",
]

OVERWRITE = False  # 若想原地覆盖，改为 True

for p in PATHS:
    p = Path(p)
    if not p.exists():
        print(f"[skip] not found: {p}")
        continue

    df = pd.read_parquet(p)

    if "n_t" not in df.columns:
        raise KeyError(f"{p} 缺少 n_t 列，无法生成 switch_uncertain。")

    # 等于3记0，其它（含NaN）记1
    df["switch_uncertain"] = (df["n_t"].fillna(-1) != 3).astype("int8")

    # 写出
    if OVERWRITE:
        outp = p
    else:
        outp = p.with_name(p.stem + "_with_switch" + p.suffix)

    df.to_parquet(outp, index=False)
    share = float(df["switch_uncertain"].mean())
    print(f"[done] {outp} | rows={len(df)} | switch_uncertain=1 share={share:.4f}")


[done] data/clean/voa_panel_with_station_band_with_switch.parquet | rows=270229 | switch_uncertain=1 share=0.1330
[done] data/clean/voa_panel_with_station_band_balanced_with_switch.parquet | rows=234294 | switch_uncertain=1 share=0.0000
[done] data/clean/voa_panel_with_station_band_unbalanced_with_switch.parquet | rows=257276 | switch_uncertain=1 share=0.0893


In [14]:
# --- Robust QC merge: use borough_i code as primary; sjoin only for QC/fill ---
import geopandas as gpd, pandas as pd, numpy as np
from pathlib import Path
from shapely import wkb, wkt
import binascii
import pyogrio

POINT_FILES = [
    "data/clean/voa_panel_with_station_band_with_switch.parquet",
    "data/clean/voa_panel_with_station_band_balanced_with_switch.parquet",
    "data/clean/voa_panel_with_station_band_unbalanced_with_switch.parquet",
]
BORO_GPKG = "data/interim/gla_boroughs_2024_27700.gpkg"
OUT_DIR   = "data/clean/sjoin_borough_qc"
Path(OUT_DIR).mkdir(parents=True, exist_ok=True)

# 代码 → 名称映射
BORO_MAP = {
    "5030":"City of London","5060":"Barking & Dagenham","5090":"Barnet","5120":"Bexley",
    "5150":"Brent","5180":"Bromley","5210":"Camden","5240":"Croydon","5270":"Ealing",
    "5300":"Enfield","5330":"Greenwich","5360":"Hackney","5390":"Hammersmith & Fulham",
    "5420":"Haringey","5450":"Harrow","5480":"Havering","5510":"Hillingdon",
    "5540":"Hounslow","5570":"Islington","5600":"Kensington & Chelsea",
    "5630":"Kingston upon Thames","5660":"Lambeth","5690":"Lewisham","5720":"Merton",
    "5750":"Newham","5780":"Redbridge","5810":"Richmond upon Thames","5840":"Southwark",
    "5870":"Sutton","5900":"Tower Hamlets","5930":"Waltham Forest","5960":"Wandsworth",
    "5990":"Westminster City"
}

def _to_geom(v):
    if v is None or (isinstance(v, float) and np.isnan(v)): return None
    # bytes / memoryview -> WKB
    if isinstance(v, (bytes, bytearray, memoryview)):
        return wkb.loads(bytes(v))
    # 整数列表 -> bytes -> WKB
    if isinstance(v, (list, tuple)) and len(v) and isinstance(v[0], (int, np.integer)):
        return wkb.loads(bytes(v))
    # 字符串：WKT 或 HEX-WKB
    if isinstance(v, str):
        s = v.strip()
        if s.upper().startswith(("POINT","LINESTRING","POLYGON","MULTI")):
            return wkt.loads(s)
        # 可能是十六进制的WKB
        try:
            return wkb.loads(binascii.unhexlify(s))
        except Exception:
            return None
    return None

def load_points(path: str) -> gpd.GeoDataFrame:
    """从 Parquet 稳健读取为 GeoDataFrame，自动解析 bytes/WKT。"""
    df = pd.read_parquet(path)
    geom = None
    # 优先使用已存在的 geometry 列
    if "geometry" in df.columns:
        gs = df["geometry"].apply(_to_geom)
        if gs.notna().any(): geom = gpd.GeoSeries(gs, crs=27700)
    # 备选：geom_27700
    if geom is None and "geom_27700" in df.columns:
        gs = df["geom_27700"].apply(_to_geom)
        if gs.notna().any(): geom = gpd.GeoSeries(gs, crs=27700)
    # 最后兜底：东/北坐标
    if geom is None and {"onspd_easting","onspd_northing"}.issubset(df.columns):
        x = pd.to_numeric(df["onspd_easting"], errors="coerce")
        y = pd.to_numeric(df["onspd_northing"], errors="coerce")
        geom = gpd.points_from_xy(x, y, crs=27700)
    if geom is None:
        raise ValueError(f"{path}: 找不到可解析的几何（geometry/geom_27700/onspd_easting+northing）")
    gdf = gpd.GeoDataFrame(df, geometry=geom, crs=27700)
    # 去除空/无效几何（极少数情况）
    gdf = gdf[gdf.geometry.notna() & ~gdf.geometry.is_empty].copy()
    return gdf

def norm_name(s):
    if pd.isna(s): return s
    t = str(s).strip().upper().replace("&","AND")
    aliases = {"WESTMINSTER CITY":"WESTMINSTER","CITY OF WESTMINSTER":"WESTMINSTER"}
    return aliases.get(t, t)

# 读 borough 面
bor = gpd.read_file(BORO_GPKG)
if bor.crs is None or bor.crs.to_epsg()!=27700:
    bor = bor.to_crs(27700)
name_col = next((c for c in ["borough","BOROUGH","NAME","name","GSS_NAME","NAME_SHORT"] if c in bor.columns), None)
if not name_col:
    raise KeyError("Borough底图缺少可用名称列（尝试 borough/BOROUGH/NAME/name/GSS_NAME/NAME_SHORT）")
bor = bor[[name_col,"geometry"]].rename(columns={name_col:"borough_from_poly"})
bor["borough_from_poly_norm"] = bor["borough_from_poly"].map(norm_name)

summ = []
for p in POINT_FILES:
    fp = Path(p)
    if not fp.exists():
        print(f"[skip] {fp} not found"); continue

    pts = load_points(p)  # ← 修复：把 bytes/WKT 解析为 Shapely
    n_in = len(pts)

    # 1) 代码主列：borough_i → 名称
    if "borough_i" not in pts.columns:
        raise KeyError(f"{fp.name} 缺少 'borough_i' 列")
    code = (pts["borough_i"].astype(str).str.extract(r"(\d+)")[0]).str.zfill(4)
    pts["borough_from_code"] = code.map(BORO_MAP)
    pts["code_valid"] = pts["borough_from_code"].notna().astype("int8")
    pts["borough_from_code_norm"] = pts["borough_from_code"].map(norm_name)

    # 2) 空间联结（边界算在内）
    joined = gpd.sjoin(pts, bor[["borough_from_poly","borough_from_poly_norm","geometry"]],
                       how="left", predicate="covered_by").drop(columns=["index_right"])
    n_out = len(joined)

    # 3) 质检标记与“最终列”（仅修补缺失/明显错误）
    joined["borough_spatial_ok"] = joined["borough_from_poly"].notna().astype("int8")
    both = joined["borough_from_code"].notna() & joined["borough_from_poly"].notna()
    joined["borough_mismatch"] = np.where(
        both & (joined["borough_from_code_norm"] != joined["borough_from_poly_norm"]), 1, 0
    ).astype("int8")

    joined["borough_final"] = joined["borough_from_code"].copy()
    joined.loc[joined["borough_from_code"].isna(), "borough_final"] = joined.loc[
        joined["borough_from_code"].isna(), "borough_from_poly"
    ]

    matched_poly = int(joined["borough_spatial_ok"].sum())
    mismatch_cnt = int(joined["borough_mismatch"].sum())

    outp = Path(OUT_DIR) / (fp.stem + "_boroughqc.parquet")
    joined.drop(columns=["borough_from_code_norm","borough_from_poly_norm"], inplace=True)
    joined.to_parquet(outp, index=False)

    print(f"[done] {fp.name} -> {outp} | rows_in={n_in} rows_out={n_out} | "
          f"poly_matched={matched_poly} ({matched_poly/n_out:.2%}) | "
          f"mismatch={mismatch_cnt} ({mismatch_cnt/n_out:.2%})")

    summ.append((fp.name, n_in, n_out, matched_poly, mismatch_cnt, outp))

print("\n=== Summary ===")
for name, n0, n1, mpoly, mmis, outp in summ:
    print(f"{name}: 保留 {n1} 行（原 {n0}），空间匹配 {mpoly}，不一致 {mmis} → {outp}")


[done] voa_panel_with_station_band_with_switch.parquet -> data/clean/sjoin_borough_qc/voa_panel_with_station_band_with_switch_boroughqc.parquet | rows_in=270229 rows_out=270229 | poly_matched=270207 (99.99%) | mismatch=651 (0.24%)
[done] voa_panel_with_station_band_balanced_with_switch.parquet -> data/clean/sjoin_borough_qc/voa_panel_with_station_band_balanced_with_switch_boroughqc.parquet | rows_in=234294 rows_out=234294 | poly_matched=234276 (99.99%) | mismatch=528 (0.23%)
[done] voa_panel_with_station_band_unbalanced_with_switch.parquet -> data/clean/sjoin_borough_qc/voa_panel_with_station_band_unbalanced_with_switch_boroughqc.parquet | rows_in=257276 rows_out=257276 | poly_matched=257256 (99.99%) | mismatch=608 (0.24%)

=== Summary ===
voa_panel_with_station_band_with_switch.parquet: 保留 270229 行（原 270229），空间匹配 270207，不一致 651 → data/clean/sjoin_borough_qc/voa_panel_with_station_band_with_switch_boroughqc.parquet
voa_panel_with_station_band_balanced_with_switch.parquet: 保留 234294 行（原

In [15]:
import pandas as pd
from pathlib import Path

DIR = Path("data/clean/sjoin_borough_qc")
FILES = [
    DIR / "voa_panel_with_station_band_with_switch_boroughqc.parquet",
    DIR / "voa_panel_with_station_band_balanced_with_switch_boroughqc.parquet",
    DIR / "voa_panel_with_station_band_unbalanced_with_switch_boroughqc.parquet",
]

OUT_DIR = Path("data/clean/borough_final")
OUT_DIR.mkdir(parents=True, exist_ok=True)

for fp in FILES:
    df = pd.read_parquet(fp)
    name = fp.stem.replace("_boroughqc","")

    # 1) 简报
    n = len(df)
    matched = int(df["borough_from_poly"].notna().sum())
    mism   = int(df.get("borough_mismatch", 0).sum())
    print(f"\n[{name}] rows={n} | poly_matched={matched} ({matched/n:.2%}) | mismatch={mism} ({mism/n:.2%})")

    # 2) 生成最终列（若上一轮已生成就跳过）
    if "borough_final" not in df.columns:
        # 以代码为主；代码缺失时用空间兜底
        df["borough_final"] = df["borough_from_code"].combine_first(df["borough_from_poly"])

    # 3) 输出几个检查表
    # 未匹配到任何 borough 的（极少数）
    no_poly = df[df["borough_from_poly"].isna()]
    no_poly.to_parquet(OUT_DIR / f"{name}_no_poly.parquet", index=False)

    # 不一致的 pair 分布（看具体错在哪些 borough）
    if "borough_mismatch" in df.columns:
        mm = df[df["borough_mismatch"] == 1][["uarn","borough_from_code","borough_from_poly"]]
        mm.to_parquet(OUT_DIR / f"{name}_mismatch_pairs.parquet", index=False)
        print(f"  saved mismatch list: {OUT_DIR / (name + '_mismatch_pairs.parquet')} (rows={len(mm)})")

    # 4) 写“最终版”表（仅多一列 borough_final，原列保留）
    outp = OUT_DIR / f"{name}_boroughfinal.parquet"
    df.to_parquet(outp, index=False)
    print(f"  wrote: {outp}")



[voa_panel_with_station_band_with_switch] rows=270229 | poly_matched=270207 (99.99%) | mismatch=651 (0.24%)
  saved mismatch list: data/clean/borough_final/voa_panel_with_station_band_with_switch_mismatch_pairs.parquet (rows=651)
  wrote: data/clean/borough_final/voa_panel_with_station_band_with_switch_boroughfinal.parquet

[voa_panel_with_station_band_balanced_with_switch] rows=234294 | poly_matched=234276 (99.99%) | mismatch=528 (0.23%)
  saved mismatch list: data/clean/borough_final/voa_panel_with_station_band_balanced_with_switch_mismatch_pairs.parquet (rows=528)
  wrote: data/clean/borough_final/voa_panel_with_station_band_balanced_with_switch_boroughfinal.parquet

[voa_panel_with_station_band_unbalanced_with_switch] rows=257276 | poly_matched=257256 (99.99%) | mismatch=608 (0.24%)
  saved mismatch list: data/clean/borough_final/voa_panel_with_station_band_unbalanced_with_switch_mismatch_pairs.parquet (rows=608)
  wrote: data/clean/borough_final/voa_panel_with_station_band_unbala

In [17]:
# === 环境 ===
import os, re
from pathlib import Path
import pandas as pd
import geopandas as gpd
import numpy as np
import pyogrio

# 兼容不同版本 shapely 的 WKB 解析
try:
    from shapely import from_wkb as _from_wkb
    def wkb_to_geom(b):
        return _from_wkb(bytes(b)) if isinstance(b, (bytes, bytearray, memoryview)) else b
except Exception:
    from shapely import wkb as _wkb
    def wkb_to_geom(b):
        return _wkb.loads(bytes(b)) if isinstance(b, (bytes, bytearray, memoryview)) else b

# ---------- 参数 ----------
P_RING = "data/interim/net_band/output_realistic_walking/elizabeth_all_realistic_rings.gpkg"
IN_FILES = [
    "data/clean/borough_final/voa_panel_with_station_band_balanced_with_switch_boroughfinal.parquet",
    "data/clean/borough_final/voa_panel_with_station_band_with_switch_boroughfinal.parquet",
    "data/clean/borough_final/voa_panel_with_station_band_unbalanced_with_switch_boroughfinal.parquet",
]
DEF_CRS = "EPSG:27700"
def out_path(p): return str(Path(p).with_suffix("").as_posix() + "_with_walkbands.parquet")

# ---------- 1) 读取并标准化环带 ----------
def read_rings(p_ring: str) -> gpd.GeoDataFrame:
    rings = gpd.read_file(p_ring, engine="pyogrio")
    if rings.crs is None:
        rings = rings.set_crs(DEF_CRS, allow_override=True)

    # 找到环带字段
    cands = [c for c in rings.columns if c.lower() in
             {"band","ring","dist_band","walk_band","distance_band","ring_name","band_name","label","name"}]
    band_col = cands[0] if cands else None
    if band_col is None:
        obj_cols = [c for c in rings.columns if rings[c].dtype == "object"]
        hit = next((c for c in obj_cols
                    if rings[c].astype(str).str.contains("0.?100|100.?400|400.?800|beyond|800\\+", case=False, na=False).any()), None)
        if hit is None:
            raise ValueError("无法识别环带字段，请检查 GPKG。")
        band_col = hit

    def std_band(v: str) -> str:
        s = str(v).lower().replace("–","-").replace("—","-").replace("_","-").replace(" ", "")
        if "beyond" in s or "800+" in s or "gt800" in s: return "beyond_800"
        if "0-100" in s: return "0_100"
        if "100-400" in s: return "100_400"
        if "400-800" in s: return "400_800"
        m = re.findall(r"(\d+)\s*-\s*(\d+)", s)
        if m:
            a,b = m[0]
            if (a,b) in {("0","100"),("100","400"),("400","800")}:
                return f"{a}_{b}"
        return "unknown"

    rings = rings.copy()
    rings["band_std"] = rings[band_col].map(std_band)
    rings = rings[rings["band_std"].isin(["0_100","100_400","400_800","beyond_800"])].copy()
    rings["rank"] = rings["band_std"].map({"0_100":0,"100_400":1,"400_800":2,"beyond_800":3})
    return rings[["band_std","rank","geometry"]]

RINGS = read_rings(P_RING)

# ---------- 2) 读取点（支持：GeoParquet、WKB列、easting/northing、lon/lat） ----------
def read_points_any(pq_path: str) -> gpd.GeoDataFrame:
    # 先尝试 GeoPandas 直接读（若是 GeoParquet 会自动还原几何）
    try:
        gdf = gpd.read_parquet(pq_path)
        if gdf.crs is None:
            gdf = gdf.set_crs(DEF_CRS, allow_override=True)
        return gdf
    except Exception:
        pass

    # 用 pandas 读 + 手动解 WKB
    df = pd.read_parquet(pq_path)

    if "geometry" in df.columns:
        df = df.copy()
        df["geometry"] = df["geometry"].apply(wkb_to_geom)
        gdf = gpd.GeoDataFrame(df, geometry="geometry")
        if gdf.crs is None:
            gdf = gdf.set_crs(DEF_CRS, allow_override=True)
        return gdf

    # 无 geometry：尝试坐标列
    if {"onspd_easting","onspd_northing"}.issubset(df.columns):
        gdf = gpd.GeoDataFrame(df,
            geometry=gpd.points_from_xy(df["onspd_easting"], df["onspd_northing"]),
            crs=DEF_CRS,
        )
        return gdf
    if {"lon","lat"}.issubset(df.columns):
        gdf = gpd.GeoDataFrame(df,
            geometry=gpd.points_from_xy(df["lon"], df["lat"]),
            crs="EPSG:4326",
        )
        return gdf.to_crs(DEF_CRS)

    raise ValueError("未找到可用几何：缺少 geometry / onspd_easting+northing / lon+lat。")

# ---------- 3) 空间连接（含边界），并唯一化到最近环带 ----------
def sjoin_covered_by(pts: gpd.GeoDataFrame, polys: gpd.GeoDataFrame) -> pd.DataFrame:
    try:
        j = gpd.sjoin(pts, polys, how="left", predicate="covered_by")
    except Exception:
        # 旧版兼容：within ∪ touches
        j1 = gpd.sjoin(pts, polys, how="left", predicate="within")
        j2 = gpd.sjoin(pts, polys, how="left", predicate="touches")
        j  = pd.concat([j1, j2], ignore_index=True).drop_duplicates(subset=[j1.index.name or "index_left","index_right"])
    return j

def label_walk_bands(gdf_points: gpd.GeoDataFrame, rings: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    pts = gdf_points.to_crs(rings.crs)
    j = sjoin_covered_by(pts.reset_index(names="_pt_idx"), rings.reset_index(drop=True))

    # 若一个点命中多个环带，按 rank（距离近优先）唯一化
    j = (j.sort_values(["_pt_idx","rank"])
           .drop_duplicates(subset=["_pt_idx"], keep="first")
           .set_index("_pt_idx"))

    out = pts.copy()
    for col in ["walk_0_100","walk_100_400","walk_400_800","walk_beyond_800"]:
        out[col] = False

    band = j["band_std"].reindex(out.index)
    out.loc[band.eq("0_100"), "walk_0_100"] = True
    out.loc[band.eq("100_400"), "walk_100_400"] = True
    out.loc[band.eq("400_800"), "walk_400_800"] = True
    if "beyond_800" in rings["band_std"].unique():
        out.loc[band.eq("beyond_800"), "walk_beyond_800"] = True

    # 补集：前三者全 False 则 beyond_800 = True
    none_hit = ~out[["walk_0_100","walk_100_400","walk_400_800"]].any(axis=1)
    out.loc[none_hit, "walk_beyond_800"] = True

    # 互斥性检查
    s = out[["walk_0_100","walk_100_400","walk_400_800","walk_beyond_800"]].sum(axis=1)
    dup, miss = int((s>1).sum()), int((s==0).sum())
    print(f"[check] 多重命中(>1)：{dup}；未命中(=0)：{miss}")
    assert dup == 0, "发现同一点命中多个环带，请检查环带拓扑或唯一化逻辑。"

    return out

# ---------- 4) 批处理并写出 ----------
def process_one(parquet_in: str, rings: gpd.GeoDataFrame):
    print(f"\n=== 处理: {parquet_in} ===")
    gdf = read_points_any(parquet_in)
    gdf = label_walk_bands(gdf, rings)

    Path(out_path(parquet_in)).parent.mkdir(parents=True, exist_ok=True)
    gdf.to_parquet(out_path(parquet_in), index=False)

    summary = (gdf[["walk_0_100","walk_100_400","walk_400_800","walk_beyond_800"]]
               .agg(["sum"]).T.rename(columns={"sum":"count"}))
    print(summary)

for p in IN_FILES:
    process_one(p, RINGS)
print("\n完成。输出文件已追加 _with_walkbands.parquet。")



=== 处理: data/clean/borough_final/voa_panel_with_station_band_balanced_with_switch_boroughfinal.parquet ===
[check] 多重命中(>1)：0；未命中(=0)：0
                  count
walk_0_100          834
walk_100_400       8616
walk_400_800      16356
walk_beyond_800  208488

=== 处理: data/clean/borough_final/voa_panel_with_station_band_with_switch_boroughfinal.parquet ===
[check] 多重命中(>1)：0；未命中(=0)：0
                  count
walk_0_100         1011
walk_100_400      10678
walk_400_800      20907
walk_beyond_800  237633

=== 处理: data/clean/borough_final/voa_panel_with_station_band_unbalanced_with_switch_boroughfinal.parquet ===
[check] 多重命中(>1)：0；未命中(=0)：0
                  count
walk_0_100          938
walk_100_400       9942
walk_400_800      18832
walk_beyond_800  227564

完成。输出文件已追加 _with_walkbands.parquet。


In [8]:
# === Δlnrv for BALANCED only (08–15, 15–21, 08–21) ===
import pandas as pd, numpy as np
from pathlib import Path

# 1) 读取 balanced 子集（优先带 switch 的那个）
CANDIDATES = [
    "data/clean/voa_panel_with_station_band_balanced_with_switch.parquet",
    "data/clean/voa_panel_with_station_band_balanced.parquet",
]
for P_IN in CANDIDATES:
    try:
        df = pd.read_parquet(P_IN); print(f"Loaded: {P_IN}"); break
    except Exception: 
        df = None
if df is None:
    raise FileNotFoundError("找不到 balanced 子集文件。请检查路径。")

# 2) 基础清洗（只用到 uarn/avd/ln_rv/n_t）
df["avd"]   = pd.to_numeric(df["avd"], errors="coerce").astype("Int64")
df["ln_rv"] = pd.to_numeric(df["ln_rv"], errors="coerce")
if "n_t" not in df.columns: 
    raise KeyError("balanced 表缺少 n_t 列。")

# 宽容地把 08/15/21 映射为 2008/2015/2021
df.loc[df["avd"].isin([8,15,21,23]), "avd"] = df.loc[df["avd"].isin([8,15,21,23]), "avd"].map(
    {8:2008, 15:2015, 21:2021, 23:2023}
)

PAIRS = [(2008,2015), (2015,2021), (2008,2021)]
YEARS = sorted({y for p in PAIRS for y in p})

# 3) 先“配对”：uarn×year 透视成每年一列
wide_year = (df[df["avd"].isin(YEARS)]
             .dropna(subset=["uarn","avd","ln_rv"])
             .drop_duplicates(["uarn","avd"])
             .pivot(index="uarn", columns="avd", values="ln_rv")
             .reindex(columns=YEARS))

# 4) 计算各跨度 Δ，并在该跨度内部 winsorize（1%/2%）
def winsorize(s, p=0.01):
    s = s.copy()
    if s.dropna().empty: return s
    lo, hi = s.quantile([p, 1-p])
    return s.clip(lo, hi)

dcols = {"uarn": wide_year.index}
report = []
nt_by_u = df.groupby("uarn")["n_t"].max(min_count=1)

for t0, t1 in PAIRS:
    d = wide_year[t1] - wide_year[t0]   # 只有两端都有时非空
    key = f"dlnrv_{t0%100:02d}_{t1%100:02d}"
    dcols[key]       = d.values
    dcols[key+"_w1"] = winsorize(d, 0.01).values
    dcols[key+"_w2"] = winsorize(d, 0.02).values

    # 报告：N 与 balanced 占比（balanced 子集应≈1.0）
    u_keep = d.index[~d.isna()]
    N = len(u_keep)
    bal_share = (nt_by_u.loc[u_keep] == 3).mean() if N else 0.0
    report.append((key, N, round(bal_share, 4)))

delta_wide = pd.DataFrame(dcols)

# 5) 另给 long 版（更方便分组/绘图）
delta_long = (delta_wide
              .melt(id_vars="uarn", var_name="span", value_name="dlnrv")
              .assign(winsor=lambda x: x["span"].str.extract(r"_(w[12])$").fillna("raw"),
                      span=lambda x: x["span"].str.replace(r"_w[12]$","",regex=True)))

# 6) 写出（不覆盖原 balanced 表）
BASE = "data/clean"
Path(BASE).mkdir(parents=True, exist_ok=True)
delta_wide.to_parquet(f"{BASE}/voa_balanced_deltas_wide.parquet", index=False)
delta_long.to_parquet(f"{BASE}/voa_balanced_deltas_long.parquet", index=False)

print("Spans summary (BALANCED only):", report)
print("Wrote:",
      f"{BASE}/voa_balanced_deltas_wide.parquet",
      "and",
      f"{BASE}/voa_balanced_deltas_long.parquet")


Loaded: data/clean/voa_panel_with_station_band_balanced_with_switch.parquet
Spans summary (BALANCED only): [('dlnrv_08_15', 78098, 1.0), ('dlnrv_15_21', 78098, 1.0), ('dlnrv_08_21', 78098, 1.0)]
Wrote: data/clean/voa_balanced_deltas_wide.parquet and data/clean/voa_balanced_deltas_long.parquet


In [20]:
# —— 只检查新表的列名（Parquet/GPKG 都可）——
import pandas as pd
from pathlib import Path
import pyogrio

# 如需改路径，在这里改；默认检查 ALL / BALANCED / UNBALANCED 三个 Parquet
PARQUETS = [
    "data/clean/borough_final/voa_panel_with_station_band_balanced_with_switch_boroughfinal_with_walkbands.parquet"
]

# 可选：若你想检查 GPKG，也把路径放进这里
GPKGS = [
    # "data/clean/voa_panel_with_station_band_27700.gpkg",
    # "data/clean/voa_panel_with_station_band_balanced_27700.gpkg",
    # "data/clean/voa_panel_with_station_band_unbalanced_27700.gpkg",
]

pd.set_option("display.max_columns", 200)

def inspect_parquet(p):
    print(f"\n=== {p} ===")
    if not Path(p).exists():
        print("  (not found)")
        return
    df = pd.read_parquet(p)   # 用 pandas 读取，绕过 GeoParquet 元数据问题
    print(f"  shape: {df.shape}")
    print("  columns:\n ", list(df.columns))
    print("\n  dtypes:")
    print(df.dtypes.sort_index())
    # 常用关键列的快照（只显示存在的）
    keys = ["uarn","avd","year_norm","ln_rv","dist_to_sta_m","station_band",
            "in_station_0_100","in_station_100_400","in_station_400_800",
            "beyond_800","n_t","n_t_fix","panel_bucket","borough",
            "in_caz","in_town_centre","geometry"]
    keep = [c for c in keys if c in df.columns]
    if keep:
        print("\n  head (key cols):")
        print(df[keep].head(5))

def inspect_gpkg(p):
    print(f"\n=== {p} (GPKG) ===")
    if not Path(p).exists():
        print("  (not found)")
        return
    layers = pyogrio.list_layers(p)
    print("  layers:", [lyr[0] for lyr in layers])
    # 看每个图层的列
    for lyr, *_ in layers:
        g = pyogrio.read_dataframe(p, layer=lyr)
        print(f"  - layer: {lyr}, shape={g.shape}")
        print("    columns:\n   ", list(g.columns))
        print("    dtypes:")
        print(g.dtypes.sort_index())
        print("    head:")
        print(g.head(3))
        print()

for fp in PARQUETS:
    inspect_parquet(fp)

for gp in GPKGS:
    inspect_gpkg(gp)



=== data/clean/borough_final/voa_panel_with_station_band_balanced_with_switch_boroughfinal_with_walkbands.parquet ===
  shape: (234294, 55)
  columns:
  ['avd', 'uarn', 'postcode', 'borough_i', 'scat', 'scat3', 'scat_grp', 'rv', 'ln_rv', 'n_t', 'balanced_3', 'pair_0815', 'pair_1521', 'pair_0821', 'single_period', 'scat_grp_lag', 'scat3_lag', 'switch_it', 'switch_code_it', 'pc_ns', 'onspd_lat', 'onspd_lon', 'onspd_easting', 'onspd_northing', 'geom_27700', 'in_caz', 'in_town_centre', 'ptal_2015_num', 'ptal_2015_band', 'ai_2015', 'ptal_dist_m', 'pc_nospace', 'lsoa11cd', 'imd2019_rank', 'imd2019_decile', 'geometry', 'station_id', 'station_name', 'dist_to_sta_m', 'station_band', 'in_station_0_100', 'in_station_100_400', 'in_station_400_800', 'beyond_800', 'switch_uncertain', 'borough_from_code', 'code_valid', 'borough_from_poly', 'borough_spatial_ok', 'borough_mismatch', 'borough_final', 'walk_0_100', 'walk_100_400', 'walk_400_800', 'walk_beyond_800']

  dtypes:
ai_2015                floa