# Shenzhen exposure-weight demo (grid / H3 hex)

This notebook computes **user ? spatial-unit exposure weights** (grid or H3 hex) in lunch/dinner windows using `poi_visit_aggregator`.

Outputs (per city):
- grid: `user_grid_time_strict_filled_<city>.parquet` + `qa_summary_strict_filled_<city>.csv`
- hex: `user_hex_time_strict_filled_<city>.parquet` + `qa_summary_hex_strict_filled_<city>.csv`

Two common workflows:
1) **Visualize existing outputs (recommended)**: set `EXPO_DIR` to the folder that contains the two output files.
2) **Run export (slow)**: set `RUN_EXPORT=True` and configure `STAYPOINTS/UUID_TABLE` plus `GRID_META` (grid) or `HEX_META` (hex).

Tip: for stability/performance, put `tmp_root` and DuckDB temp on a local disk (Colab: `/tmp`, Windows: e.g. `C:\temp`).


In [None]:
from __future__ import annotations

from pathlib import Path
import os
import sys

import numpy as np
import pandas as pd

# --- Colab detection ---
try:
    from google.colab import drive  # type: ignore

    IN_COLAB = True
except Exception:
    IN_COLAB = False


def _find_repo_root(start: Path) -> Path:
    p = start.resolve()
    for parent in [p, *p.parents]:
        if (parent / "pyproject.toml").exists() and (parent / "poi_visit_aggregator").exists():
            return parent
    return start.resolve()


# --- Colab bootstrap (Drive + optional clone) ---
# If you open this notebook directly in Colab (not opened from the repo),
# keep CLONE_REPO_IN_COLAB=True to clone the repo into Drive and add it to sys.path.
CLONE_REPO_IN_COLAB = True
COLAB_TARGET_DIR = Path("/content/drive/MyDrive/Script/Module")
COLAB_REPO_PATH = COLAB_TARGET_DIR / "poi_visit_aggregator"

if IN_COLAB and CLONE_REPO_IN_COLAB:
    drive.mount("/content/drive")
    COLAB_TARGET_DIR.mkdir(parents=True, exist_ok=True)
    os.chdir(str(COLAB_TARGET_DIR))

    if not COLAB_REPO_PATH.exists():
        get_ipython().system("git clone https://github.com/weipengdeng/poi_visit_aggregator.git")
    else:
        try:
            get_ipython().system(f"git -C {COLAB_REPO_PATH} pull")
        except Exception:
            pass

    if str(COLAB_REPO_PATH) in sys.path:
        sys.path.remove(str(COLAB_REPO_PATH))
    sys.path.insert(0, str(COLAB_REPO_PATH))
    os.chdir(str(COLAB_REPO_PATH))

if not IN_COLAB:
    # VSCode/Jupyter sometimes sets CWD to `notebooks/`. Make repo imports work either way.
    REPO_ROOT = _find_repo_root(Path.cwd())
    os.chdir(str(REPO_ROOT))
    if str(REPO_ROOT) not in sys.path:
        sys.path.insert(0, str(REPO_ROOT))

try:
    import psutil  # type: ignore
except Exception:
    psutil = None


def mem_gb():
    """Return (rss_gb, avail_gb, used_pct)."""
    if psutil is None:
        return (np.nan, np.nan, np.nan)
    vm = psutil.virtual_memory()
    rss = psutil.Process(os.getpid()).memory_info().rss
    return (float(rss) / 1e9, float(vm.available) / 1e9, float(vm.percent))


pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)

print("IN_COLAB:", IN_COLAB)
print("CWD:", Path.cwd())
rss_gb, avail_gb, used_pct = mem_gb()
print(f"Memory: RSS={rss_gb:.2f} GB | avail={avail_gb:.2f} GB | used={used_pct:.0f}%")


In [None]:
# Install export deps (run once)
# If you rerun the notebook often, you can comment this cell after the first run.
#
# Colab:
# !pip -q install -e ".[export]"
#
# Local (VSCode/Jupyter):
# !{sys.executable} -m pip install -e ".[export]"
#
# Optional (viz):
# !{sys.executable} -m pip install plotly


In [None]:
from __future__ import annotations

from pathlib import Path
import json
import os

CITY = "Shenzhen"
# City code that matches staypoints `c_code`.
GRID_UID_CODE = "440300"  # used when UNIT_MODE='grid'
HEX_UID_CODE = "440300"   # used when UNIT_MODE='hex'

# Spatial unit: 'grid' (default) or 'hex' (H3).
UNIT_MODE = os.environ.get("POI_VISIT_UNIT_MODE", "hex").strip().lower()
if UNIT_MODE not in {"grid", "hex"}:
    raise ValueError("POI_VISIT_UNIT_MODE must be grid|hex")
UNIT_UID_COL = "grid_uid" if UNIT_MODE == "grid" else "hex_uid"

# If you already have the outputs, keep RUN_EXPORT=False and set EXPO_DIR.
RUN_EXPORT = False

# --- Outputs location (visualize-only) ---
if IN_COLAB:
    DRIVE_ROOT = Path("/content/drive/MyDrive")
    EXPO_DIR = DRIVE_ROOT / "Project/202512_EFE/data/expo"
else:
    EXPO_DIR = Path(os.environ.get("POI_VISIT_EXPO_DIR", r"E:\\OneDrive\\HKU\\Project\\202512_EFE\\data\\expo"))

OUT_CITY_DIR = EXPO_DIR
OUT_FILE = OUT_CITY_DIR / (
    f"user_grid_time_strict_filled_{CITY}.parquet"
    if UNIT_MODE == "grid"
    else f"user_hex_time_strict_filled_{CITY}.parquet"
)
QA_FILE = OUT_CITY_DIR / (
    f"qa_summary_strict_filled_{CITY}.csv" if UNIT_MODE == "grid" else f"qa_summary_hex_strict_filled_{CITY}.csv"
)

# --- Grid meta for map visualization (grid only) ---
GRID_META_VIZ = None
if UNIT_MODE == "grid":
    GRID_META_VIZ = os.environ.get("POI_VISIT_GRID_META_VIZ", "").strip()
    if GRID_META_VIZ:
        GRID_META_VIZ = Path(GRID_META_VIZ)

# --- Hex meta (H3) ---
HEX_META = os.environ.get("POI_VISIT_HEX_META", "").strip()
if HEX_META:
    HEX_META = Path(HEX_META)
else:
    default_hex = Path.cwd() / f"hex_meta_{CITY}_{HEX_UID_CODE}.json"
    HEX_META = default_hex if default_hex.exists() else None

# --- (Optional) run export in this notebook ---
if RUN_EXPORT:
    if IN_COLAB:
        DRIVE_ROOT = Path("/content/drive/MyDrive")
        DATA_ROOT = DRIVE_ROOT / "Project/202512_EFE"

        UUID_TABLE = DATA_ROOT / "data/jike/uuid.csv"  # .csv or .parquet
        STAYPOINTS = [DATA_ROOT / "data/jike/track.csv"]

        GRID_META = DATA_ROOT / f"grid_meta_{CITY}.json"
        HEX_META = DATA_ROOT / f"hex_meta_{CITY}_{HEX_UID_CODE}.json"

        OUT_DIR = DATA_ROOT / "out/poi_visit_aggregator"

        # Recommended: keep temp files off Google Drive.
        TMP_ROOT = Path("/tmp/poi_visit_aggregator_tmp")
        DUCKDB_TEMP_DIR = Path("/tmp/duckdb_tmp")
    else:
        # TODO: edit these paths to your local disk (raw inputs).
        DATA_ROOT = Path(r"D:\\Project\\202512_EFE")
        UUID_TABLE = DATA_ROOT / "uuid.csv"  # .csv or .parquet
        STAYPOINTS = [DATA_ROOT / "staypoints.csv"]

        GRID_META = DATA_ROOT / f"grid_meta_{CITY}.json"
        HEX_META = DATA_ROOT / f"hex_meta_{CITY}_{HEX_UID_CODE}.json"

        OUT_DIR = DATA_ROOT / "out/poi_visit_aggregator"

        # Put temp on a fast local disk (avoid OneDrive folders).
        temp_dir = Path(os.environ.get("TEMP", "."))
        TMP_ROOT = Path(os.environ.get("POI_VISIT_TMP_ROOT", str(temp_dir / "poi_visit_aggregator_tmp")))
        DUCKDB_TEMP_DIR = Path(os.environ.get("POI_VISIT_DUCKDB_TMP", str(temp_dir / "duckdb_tmp")))

    # If staypoints contain nationwide users, keep this on for speed.
    FILTER_CITY_CODE = True
    CITY_CODE_COL = "c_code"
    CITY_CODE_VALUE = GRID_UID_CODE if UNIT_MODE == "grid" else HEX_UID_CODE

    OUT_CITY_DIR = OUT_DIR / CITY
    OUT_FILE = OUT_CITY_DIR / (
        f"user_grid_time_strict_filled_{CITY}.parquet"
        if UNIT_MODE == "grid"
        else f"user_hex_time_strict_filled_{CITY}.parquet"
    )
    QA_FILE = OUT_CITY_DIR / (
        f"qa_summary_strict_filled_{CITY}.csv"
        if UNIT_MODE == "grid"
        else f"qa_summary_hex_strict_filled_{CITY}.csv"
    )

print("UNIT_MODE:", UNIT_MODE)
print("UNIT_UID_COL:", UNIT_UID_COL)
print("OUT_FILE:", OUT_FILE)
print("QA_FILE:", QA_FILE)
print("GRID_META_VIZ:", GRID_META_VIZ)
print("HEX_META:", HEX_META)

if UNIT_MODE == "hex" and HEX_META is not None and HEX_META.exists():
    meta = json.loads(HEX_META.read_text(encoding="utf-8-sig"))
    print("[hex_meta] city=", meta.get("city"), "code=", meta.get("code"), "res=", meta.get("h3_resolution"), "n=", meta.get("n_hexes"))


In [None]:
# Optional: map your column names if they differ.
# Fill in only what you need.
SCHEMA_MAP = {
    "staypoints": {
        # "uuid": "uuid",
        # "start_time": "start_ms",
        # "end_time": "end_ms",
        # one of (x,y) or (lon,lat) or location
        # "lon": "lon",
        # "lat": "lat",
        # "location": "location",
        # "source": "source",
        # "c_code": "c_code",
    },
    "uuid_table": {
        # "uuid": "uuid",
    },
}

SCHEMA_MAP


In [None]:
import os
import time
from contextlib import contextmanager

try:
    import psutil  # type: ignore
except Exception:
    psutil = None


def rss_mb() -> float:
    if psutil is None:
        return float("nan")
    return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024


@contextmanager
def step(name: str):
    t0 = time.perf_counter()
    m0 = rss_mb()
    print(f"[START] {name} (RAM={m0:,.0f} MB)")
    try:
        yield
    finally:
        dt = time.perf_counter() - t0
        m1 = rss_mb()
        print(f"[ END ] {name} (dt={dt:,.1f}s, RAM={m1:,.0f} MB, ?={m1 - m0:,.0f} MB)")


In [None]:
from pathlib import Path
import os

RESUME_STAGE2 = False  # set True to reuse existing Stage-1 parts
KEEP_INTERMEDIATE = True  # keep Stage-1 parts (useful for resume/debug)

if RUN_EXPORT:
    if UNIT_MODE == "grid":
        from poi_visit_aggregator.export_user_grid_time_strict_filled import (
            export_user_grid_time_strict_filled,
        )

        with step("export_user_grid_time_strict_filled"):
            export_user_grid_time_strict_filled(
                city=CITY,
                staypoints=[Path(p) for p in STAYPOINTS],
                staypoints_format="auto",  # or csv/parquet
                uuid_table=UUID_TABLE,
                grid_meta_path=GRID_META,
                out_dir=OUT_DIR,
                tmp_root=TMP_ROOT,
                duckdb_temp_dir=DUCKDB_TEMP_DIR,
                schema_map=SCHEMA_MAP,
                output_grid_uid=True,
                output_grid_id=False,
                grid_uid_code=GRID_UID_CODE,
                grid_uid_prefix="grid",
                grid_uid_order="col_row",
                filter_city_code=FILTER_CITY_CODE,
                city_code_col=CITY_CODE_COL,
                city_code_value=CITY_CODE_VALUE,
                windows=["lunch", "dinner"],
                min_interval_minutes=5,
                point_source_filter=True,
                point_source_value="cell_appearance",
                drop_uuid_not_in_table=True,
                timestamps_are_utc=True,
                tz_offset_hours=8,
                epoch_unit="ms",
                coords_already_projected=False,
                uid64_hash_method="xxh64",
                buckets=256,
                batch_size=1_000_000,
                log_every_batches=500,
                overlap_rounding="floor",
                oob_mode="drop",
                threads=max(1, (os.cpu_count() or 8) - 1),
                memory_limit="8GB",
                id_mode="uuid",  # uuid|uid64|both
                resume_stage2=RESUME_STAGE2,
                keep_intermediate=KEEP_INTERMEDIATE,
            )
    else:
        from poi_visit_aggregator.export_user_hex_time_strict_filled import (
            export_user_hex_time_strict_filled,
        )

        if HEX_META is None:
            raise FileNotFoundError(
                "HEX_META not found. Set env var POI_VISIT_HEX_META to your hex_meta_*.json path."
            )

        with step("export_user_hex_time_strict_filled"):
            export_user_hex_time_strict_filled(
                city=CITY,
                staypoints=[Path(p) for p in STAYPOINTS],
                staypoints_format="auto",  # or csv/parquet
                uuid_table=UUID_TABLE,
                hex_meta_path=HEX_META,
                out_dir=OUT_DIR,
                tmp_root=TMP_ROOT,
                duckdb_temp_dir=DUCKDB_TEMP_DIR,
                schema_map=SCHEMA_MAP,
                output_hex_uid=True,
                output_h3_id=True,
                output_h3_int=False,
                hex_uid_code=HEX_UID_CODE,
                hex_uid_prefix="hex",
                filter_city_code=FILTER_CITY_CODE,
                city_code_col=CITY_CODE_COL,
                city_code_value=CITY_CODE_VALUE,
                windows=["lunch", "dinner"],
                min_interval_minutes=5,
                point_source_filter=True,
                point_source_value="cell_appearance",
                drop_uuid_not_in_table=True,
                timestamps_are_utc=True,
                tz_offset_hours=8,
                epoch_unit="ms",
                uid64_hash_method="xxh64",
                buckets=256,
                batch_size=1_000_000,
                log_every_batches=500,
                overlap_rounding="floor",
                oob_mode="drop",
                threads=max(1, (os.cpu_count() or 8) - 1),
                memory_limit="8GB",
                id_mode="uuid",  # uuid|uid64|both
                resume_stage2=RESUME_STAGE2,
                keep_intermediate=KEEP_INTERMEDIATE,
            )
else:
    print("RUN_EXPORT=False: skip export; visualize existing outputs.")

OUT_FILE


In [None]:
import os
import duckdb
import pandas as pd
from IPython.display import display

assert OUT_FILE.exists(), f"Missing output parquet: {OUT_FILE}"
assert QA_FILE.exists(), f"Missing QA csv: {QA_FILE}"

qa = pd.read_csv(QA_FILE, encoding="utf-8-sig")
display(qa.T.head(120))

con = duckdb.connect()
con.execute(f"PRAGMA threads={max(1, (os.cpu_count() or 8) - 1)}")

# Light preview (avoid loading the full parquet into memory)
df_head = con.execute("select * from read_parquet(?) limit 5", [str(OUT_FILE)]).fetchdf()
display(df_head)

unit_col = UNIT_UID_COL
overall_sql = f"""
select
  count(*) as n_rows,
  count(distinct uuid) as n_users,
  count(distinct {unit_col}) as n_units,
  sum(tau_strict_min) as tau_strict_min_sum,
  sum(tau_fill_min) as tau_fill_min_sum,
  sum(tau_filled_min) as tau_filled_min_sum,
  sum(case when tau_fill_min > 0 then 1 else 0 end) as n_rows_tau_fill_pos
from read_parquet(?)
"""

overall = con.execute(overall_sql, [str(OUT_FILE)]).fetchdf()
display(overall.T)

by = con.execute(
    '''
    select
      "window",
      is_weekend,
      count(*) as n_rows,
      sum(tau_strict_min) as tau_strict_min_sum,
      sum(tau_fill_min) as tau_fill_min_sum,
      sum(tau_filled_min) as tau_filled_min_sum
    from read_parquet(?)
    group by 1,2
    order by 1,2
    ''',
    [str(OUT_FILE)],
).fetchdf()
by["fill_share"] = by["tau_fill_min_sum"] / by["tau_filled_min_sum"]
display(by)

# Top units by filled minutes
top_units = con.execute(
    f'''
    select
      {unit_col} as unit_uid,
      sum(tau_filled_min) as tau_filled_min_sum
    from read_parquet(?)
    group by 1
    order by 2 desc
    limit 20
    ''',
    [str(OUT_FILE)],
).fetchdf()
display(top_units)
