# Baltimore Analytics — Data Ingestion Pipeline v2
**Project:** `baltimore-analytics`  
**Dataset:** `raw_data`  
**API:** ArcGIS REST (replaces Socrata SODA — portal migrated post-2021)  
**Author:** Spencer  

---
### Architecture
```
baltimore-analytics (GCP Project)
├── raw_data          ← partitioned source tables (this notebook)
├── analytics         ← cleaned, enriched, joined tables (future)
└── views             ← Looker Studio-facing pre/post 2021 views (future)
```
### Pre/Post 2021 Design
- **Crime:** Baltimore split this for us — Legacy SRS (through 12/31/2024) vs current NIBRS (2021+)
- **311:** Consolidated FeatureServer covers 2021+; annual slugs cover pre-2021
- **All others:** Date-partitioned; `_period` column (`pre_2021` / `post_2021`) added at ingestion
- **Views:** Auto-created in `views` dataset at end of this notebook

### Partition Strategy
All date-partitioned tables use **MONTH** partitioning (not DAY) to stay under BigQuery's 4,000 partition limit.

---

## 0. Install Dependencies

In [None]:
# Uncomment and run once
# %pip install requests google-cloud-bigquery google-cloud-bigquery-storage pandas pyarrow db-dtypes

## 1. Configuration

In [2]:
import requests
import time
import logging
import pandas as pd
from google.cloud import bigquery
from google.oauth2 import service_account
from datetime import datetime

logging.basicConfig(level=logging.INFO, format='%(asctime)s — %(levelname)s — %(message)s')
log = logging.getLogger(__name__)

# ─────────────────────────────────────────────
# PROJECT CONFIG
# ─────────────────────────────────────────────
GCP_PROJECT      = "baltimore-analytics"           # GCP Project ID
BQ_DATASET       = "raw_data"                      # BigQuery dataset
GCP_REGION       = "us-east1"                      # Closest region to Baltimore
CREDENTIALS_PATH = "service_account.json"          # Update this path

# ArcGIS standard query parameters
BASE_PARAMS = {
    "where":          "1=1",
    "outFields":      "*",
    "f":              "json",
    "returnGeometry": "true",
    "outSR":          "4326",  # WGS84 — standard for BigQuery GIS
}

PAGE_SIZE = 1000  # Conservative; 311 Yearly service MaxRecordCount is 2000

# ─────────────────────────────────────────────
# 311 LAYER INDEX
# Single FeatureServer hosts all years as numbered layers.
# Layer 0 = 2022, Layer 1 = 2021, ..., Layer 18 = 2004
# 2023-present consolidated endpoint is currently unavailable.
# ─────────────────────────────────────────────
CSR_311_BASE = "https://services1.arcgis.com/UWYHeuuJISiGmgXx/ArcGIS/rest/services/311_Customer_Service_Requests_Yearly/FeatureServer"

# ─────────────────────────────────────────────
# DATASET REGISTRY
# url:            Primary ArcGIS FeatureServer query endpoint
# date_col:       Used for BigQuery partitioning and _period flag
# partition_type: MONTH for long historical datasets (avoids 4000 partition limit)
# union_with:     Additional endpoints to union into the same table
# ─────────────────────────────────────────────
DATASETS = {

    # ── PUBLIC SAFETY ──────────────────────────────────────────────────────
    "crime_incidents_legacy": {
        "url": "https://services1.arcgis.com/UWYHeuuJISiGmgXx/arcgis/rest/services/Part1_Crime_Beta/FeatureServer/0/query",
        "description": "BPD Part 1 Crime — Legacy SRS through 12/31/2024. Pre-2021 analytical anchor.",
        "date_col": "CrimeDateTime",
        "partition_type": "MONTH",
        "union_with": [],
        "quality_note": "SRS hierarchy rule — only most serious offense per incident. Primary source for pre-2021 analysis."
    },
    "crime_incidents_current": {
        "url": "https://services1.arcgis.com/UWYHeuuJISiGmgXx/arcgis/rest/services/NIBRS_GroupA_Crime_Data/FeatureServer/0/query",
        "description": "BPD NIBRS Group A Crime — current post-2021 dataset. Updated weekly.",
        "date_col": "CrimeDateTime",
        "partition_type": "MONTH",
        "union_with": [],
        "quality_note": "NIBRS reports all offenses per incident — not directly comparable to Legacy SRS counts."
    },
    "bpd_arrests": {
        "url": "https://egis.baltimorecity.gov/egis/rest/services/GeoSpatialized_Tables/Arrest/FeatureServer/0/query",
        "description": "BPD Arrests. Updated weekly.",
        "date_col": "ArrestDateTime",   # Note: actual column is arrestdatetime (no separate arrestdate)
        "partition_type": "MONTH",
        "union_with": [],
        "quality_note": "~41% of records missing geo coordinates (older records never geocoded)."
    },

    # ── HOUSING / VACANCY ──────────────────────────────────────────────────
    "vacant_building_notices": {
        "url": "https://egisdata.baltimorecity.gov/egis/rest/services/Housing/DHCD_Open_Baltimore_Datasets/FeatureServer/1/query",
        "description": "Active vacant building notices citywide. Updated daily.",
        "date_col": "DateNotice",
        "partition_type": "MONTH",
        "union_with": [],
        "quality_note": None
    },
    "vacant_building_rehabs": {
        "url": "https://egisdata.baltimorecity.gov/egis/rest/services/Housing/DHCD_Open_Baltimore_Datasets/FeatureServer/2/query",
        "description": "Rehab permits issued for vacant buildings. Includes HousingMarketTypology.",
        "date_col": "DateIssued",
        "union_with": [],
        "quality_note": None
    },

    # ── PERMITS / INVESTMENT ───────────────────────────────────────────────
    "building_permits": {
        "url": "https://services1.arcgis.com/UWYHeuuJISiGmgXx/arcgis/rest/services/Housing_and_Building_Permits__2015_to_2018/FeatureServer/0/query",
        "description": "Building permits 2015-present. Union of historical and current endpoints.",
        "date_col": "DateIssued",       # Actual column normalizes to issueddate
        "partition_type": "MONTH",
        "union_with": [
            "https://egisdata.baltimorecity.gov/egis/rest/services/Housing/DHCD_Open_Baltimore_Datasets/FeatureServer/3/query"
        ],
        "quality_note": None
    },

    # ── 311 SERVICE REQUESTS ───────────────────────────────────────────────
    # 2023-2026 consolidated endpoint unavailable — coverage is 2004-2022 only.
    "service_requests_311": {
        "url": f"{CSR_311_BASE}/0/query",
        "description": "311 CSR 2004-2022 from yearly FeatureServer layers.",
        "date_col": "CreatedDate",
        "partition_type": "MONTH",
        "union_with": [f"{CSR_311_BASE}/{i}/query" for i in range(1, 19)],
        "quality_note": "2023-2026 data excluded — consolidated endpoint returning query error. Coverage is 2004-2022."
    },

    # ── PROPERTY ───────────────────────────────────────────────────────────
    "real_property": {
        "url": "https://geodata.baltimorecity.gov/egis/rest/services/CityView/Realproperty_OB/FeatureServer/0/query",
        "description": "Real property — 200k+ parcels, ownership, assessed value. Full refresh (no date partition).",
        "date_col": None,
        "union_with": [],
        "quality_note": None
    },

    # ── REFERENCE / SPATIAL ────────────────────────────────────────────────
    "neighborhood_boundaries": {
        "url": "https://geodata.baltimorecity.gov/egis/rest/services/CityView/Neighborhoods/FeatureServer/0/query",
        "description": "Baltimore NSA polygon boundaries. Primary spatial join key. Polygon geometry captured as geo_polygon_wkt.",
        "date_col": None,
        "union_with": [],
        "quality_note": None
    },
}

print(f"✓ Config loaded. {len(DATASETS)} datasets registered.")

✓ Config loaded. 9 datasets registered.


## 2. Initialize Clients

In [3]:
credentials = service_account.Credentials.from_service_account_file(
    CREDENTIALS_PATH,
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
bq = bigquery.Client(project=GCP_PROJECT, credentials=credentials)

for dataset_name in [BQ_DATASET, "views"]:
    ds = bigquery.Dataset(f"{GCP_PROJECT}.{dataset_name}")
    ds.location = GCP_REGION
    bq.create_dataset(ds, exists_ok=True)
    log.info(f"✓ Dataset '{GCP_PROJECT}.{dataset_name}' ready.")

2026-02-26 16:43:25,365 — INFO — ✓ Dataset 'baltimore-analytics.raw_data' ready.
2026-02-26 16:43:25,563 — INFO — ✓ Dataset 'baltimore-analytics.views' ready.


## 3. Helper Functions

In [8]:
import time
import requests
import pandas as pd
from google.cloud import bigquery
import pyarrow as pa


def fetch_arcgis_layer(url: str, page_size: int = PAGE_SIZE) -> pd.DataFrame:
    """
    Paginate through an ArcGIS FeatureServer layer.
    Handles both point geometry (x/y) and polygon geometry (rings).
    Includes retry logic with exponential backoff for rate limiting.
    """
    all_records = []
    offset = 0
    max_retries = 5

    try:
        count_resp = requests.get(
            url, params={"where": "1=1", "returnCountOnly": "true", "f": "json"}, timeout=15
        )
        total = count_resp.json().get("count", "unknown")
        log.info(f"  Total records: {total}")
    except Exception:
        log.info("  Could not get total count — proceeding with pagination.")

    while True:
        params = {**BASE_PARAMS, "resultOffset": offset, "resultRecordCount": page_size}

        # Retry loop with exponential backoff
        for attempt in range(max_retries):
            try:
                resp = requests.get(url, params=params, timeout=60)
                resp.raise_for_status()
                data = resp.json()
                break  # success
            except Exception as e:
                if attempt < max_retries - 1:
                    wait = 2 ** attempt * 3  # 3, 6, 12, 24 seconds
                    log.warning(f"  Attempt {attempt+1} failed ({e}). Retrying in {wait}s...")
                    time.sleep(wait)
                else:
                    raise

        if "error" in data:
            raise ValueError(f"ArcGIS error: {data['error']}")

        features = data.get("features", [])
        if not features:
            break

        for feature in features:
            row = feature.get("attributes", {})
            geom = feature.get("geometry")
            if geom:
                if "x" in geom and "y" in geom:
                    # Point geometry
                    row["_longitude"] = geom.get("x")
                    row["_latitude"]  = geom.get("y")
                elif "rings" in geom:
                    # Polygon geometry — capture outer ring as WKT
                    rings = geom["rings"]
                    if rings:
                        coords = ", ".join(f"{x} {y}" for x, y in reversed(rings[0]))
                        row["geo_polygon_wkt"] = f"POLYGON(({coords}))"
            all_records.append(row)

        log.info(f"  {len(all_records):,} rows fetched...")
        offset += page_size

        if len(features) < page_size:
            break

        time.sleep(0.3)

    df = pd.DataFrame.from_records(all_records)
    log.info(f"  ✓ {len(df):,} total rows.")
    return df


def fetch_all_sources(config: dict) -> pd.DataFrame:
    """Fetch primary URL + any union_with URLs, concatenate into one DataFrame."""
    all_dfs = []
    log.info("  Fetching primary source...")
    all_dfs.append(fetch_arcgis_layer(config["url"]))

    for i, union_url in enumerate(config.get("union_with", [])):
        log.info(f"  Fetching union source {i+1}/{len(config['union_with'])}...")
        try:
            all_dfs.append(fetch_arcgis_layer(union_url))
        except Exception as e:
            log.warning(f"  ⚠ Union source {i+1} failed (skipping): {e}")

    combined = pd.concat(all_dfs, ignore_index=True)
    log.info(f"  ✓ Combined: {len(combined):,} rows from {len(all_dfs)} source(s).")
    return combined


def clean_dataframe(df: pd.DataFrame, config: dict) -> pd.DataFrame:
    """Normalize columns, parse dates, build geo WKT, add metadata."""
    df.columns = [c.lower().strip().replace(" ", "_") for c in df.columns]

    date_col = config.get("date_col")
    date_col_norm = date_col.lower() if date_col else None

    if date_col_norm and date_col_norm in df.columns:
        if pd.api.types.is_numeric_dtype(df[date_col_norm]):
            df[date_col_norm] = pd.to_datetime(df[date_col_norm], unit="ms", utc=True, errors="coerce")
        else:
            df[date_col_norm] = pd.to_datetime(df[date_col_norm], utc=True, errors="coerce")

        # Use Int64 (nullable) to avoid int32 pyarrow conversion issues
        df["_year"]   = df[date_col_norm].dt.year.astype("Int64")
        df["_month"]  = df[date_col_norm].dt.month.astype("Int64")
        df["_period"] = df["_year"].apply(
            lambda y: "pre_2021" if pd.notna(y) and y < 2021 else "post_2021"
        )

    if "_latitude" in df.columns and "_longitude" in df.columns:
        df["_latitude"]  = pd.to_numeric(df["_latitude"],  errors="coerce")
        df["_longitude"] = pd.to_numeric(df["_longitude"], errors="coerce")
        df["geo_point_wkt"] = df.apply(
            lambda r: f"POINT({r['_longitude']} {r['_latitude']})"
            if pd.notna(r["_latitude"]) and pd.notna(r["_longitude"]) else None,
            axis=1
        )

    # Force mixed-type columns to string to avoid pyarrow conversion errors
    if "council_district" in df.columns:
        df["council_district"] = df["council_district"].astype(str)

    # Use string for _ingested_at to avoid pyarrow UTC datetime issues
    df["_ingested_at"] = pd.Timestamp.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
    df["_source_url"]  = config["url"]
    return df


def build_bq_schema(df: pd.DataFrame, date_col: str = None) -> list:
    """Auto-generate BigQuery schema. Handles nullable int types and geo."""
    dtype_map = {
        "object":              "STRING",
        "int64":               "INT64",
        "Int64":               "INT64",   # nullable integer
        "float64":             "FLOAT64",
        "bool":                "BOOL",
        "datetime64[ns]":      "TIMESTAMP",
        "datetime64[ns, UTC]": "TIMESTAMP",
        "datetime64[us, UTC]": "TIMESTAMP",
    }
    schema = []
    for col, dtype in df.dtypes.items():
        if col == "geo_point_wkt":
            schema.append(bigquery.SchemaField(col, "GEOGRAPHY"))
        elif col == "geo_polygon_wkt":
            schema.append(bigquery.SchemaField(col, "STRING"))
        elif date_col and col == date_col.lower():
            schema.append(bigquery.SchemaField(col, "TIMESTAMP"))
        else:
            bq_type = dtype_map.get(str(dtype), "STRING")
            schema.append(bigquery.SchemaField(col, bq_type))
    return schema


def load_to_bigquery(df: pd.DataFrame, table_name: str, date_col: str = None, partition_type: str = "DAY") -> None:
    """
    Load DataFrame to BigQuery. Always WRITE_TRUNCATE.
    Uses MONTH partitioning by default for long historical datasets.
    Partitioned + clustered if date_col provided.
    """
    table_ref  = f"{GCP_PROJECT}.{BQ_DATASET}.{table_name}"
    schema     = build_bq_schema(df, date_col=date_col)
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    )
    if date_col:
        job_config.time_partitioning = bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.MONTH if partition_type == "MONTH" else bigquery.TimePartitioningType.DAY,
            field=date_col.lower()
        )
        cluster_cols = [c for c in ["_period", "neighborhood", "district", "new_district"] if c in df.columns][:4]
        if cluster_cols:
            job_config.clustering_fields = cluster_cols

    log.info(f"  Loading to {table_ref}...")
    bq.load_table_from_dataframe(df, table_ref, job_config=job_config).result()
    log.info(f"  ✓ {bq.get_table(table_ref).num_rows:,} rows → {table_ref}")


print("✓ Helper functions defined.")

✓ Helper functions defined.


## 4. Validate Endpoints
Run this before the full ingestion. Confirms each endpoint is reachable and returns a record count.  
Fix any ❌ before proceeding to Cell 5.

In [5]:
print("Validating endpoints...\n")

for table_name, config in DATASETS.items():
    all_urls = [config["url"]] + config.get("union_with", [])
    for i, url in enumerate(all_urls):
        label = table_name if i == 0 else f"  └─ union {i}"
        try:
            resp  = requests.get(url, params={"where": "1=1", "returnCountOnly": "true", "f": "json"}, timeout=15)
            data  = resp.json()
            if "error" in data:
                print(f"❌ {label:45} API error: {data['error'].get('message', '')}")
            else:
                count = data.get('count', 'unknown')
                print(f"✅ {label:45} {count:>10,} records" if isinstance(count, int) else f"✅ {label:45} {count} records")
        except Exception as e:
            print(f"❌ {label:45} {str(e)[:50]}")

print("\nDone. Fix any ❌ URLs in Cell 1 before running the ingestion.")

Validating endpoints...

✅ crime_incidents_legacy                           644,737 records
✅ crime_incidents_current                          239,435 records
✅ bpd_arrests                                      393,475 records
✅ vacant_building_notices                           11,990 records
✅ vacant_building_rehabs                            12,123 records
✅ building_permits                                 155,802 records
✅   └─ union 1                                     274,367 records
✅ service_requests_311                           1,009,100 records
✅   └─ union 1                                     987,498 records
✅   └─ union 2                                     852,787 records
✅   └─ union 3                                     767,903 records
✅   └─ union 4                                     792,440 records
✅   └─ union 5                                     671,777 records
✅   └─ union 6                                     698,099 records
✅   └─ union 7                       

## 5. Run Ingestion Pipeline
Fetches all datasets and loads to BigQuery. Uses `WRITE_TRUNCATE` — safe to re-run.  
**Expected runtime: 3-5 hours** (311 service requests is ~9M rows across 19 layers).

In [None]:
ingestion_log = []

for table_name, config in DATASETS.items():
    log.info(f"\n{'='*60}")
    log.info(f"Processing: {table_name}")
    if config.get("quality_note"):
        log.warning(f"⚠ {config['quality_note']}")

    result = {
        "table": table_name, "status": None, "rows_ingested": None,
        "null_rate_date_col": None, "null_rate_geo": None,
        "date_range_min": None, "date_range_max": None,
        "error": None, "quality_note": config.get("quality_note")
    }

    try:
        df            = fetch_all_sources(config)
        df            = clean_dataframe(df, config)
        date_col      = config.get("date_col")
        date_col_norm = date_col.lower() if date_col else None
        partition_type = config.get("partition_type", "DAY")

        result["rows_ingested"] = len(df)
        if date_col_norm and date_col_norm in df.columns:
            result["null_rate_date_col"] = round(df[date_col_norm].isna().mean(), 4)
            result["date_range_min"]     = str(df[date_col_norm].min())
            result["date_range_max"]     = str(df[date_col_norm].max())
        if "geo_point_wkt" in df.columns:
            result["null_rate_geo"] = round(df["geo_point_wkt"].isna().mean(), 4)

        load_to_bigquery(df, table_name, date_col=date_col_norm, partition_type=partition_type)
        result["status"] = "SUCCESS"

    except Exception as e:
        log.error(f"✗ Failed: {table_name} — {e}")
        result["status"] = "FAILED"
        result["error"]  = str(e)

    ingestion_log.append(result)

log.info("\n✓ Ingestion pipeline complete.")

## 6. Ingestion Audit Report

In [None]:
audit_df = pd.DataFrame(ingestion_log)
pd.set_option("display.max_colwidth", 60)

print("\n" + "="*80)
print("INGESTION AUDIT REPORT")
print(f"Run at: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC")
print("="*80)
display(audit_df[["table", "status", "rows_ingested", "date_range_min",
                   "date_range_max", "null_rate_date_col", "null_rate_geo", "quality_note"]])

# Append audit log to BigQuery for run history
audit_df["run_at"] = pd.Timestamp.utcnow()
bq.load_table_from_dataframe(
    audit_df,
    f"{GCP_PROJECT}.{BQ_DATASET}._ingestion_log",
    job_config=bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        autodetect=True
    )
).result()
print(f"\n✓ Audit log saved to {GCP_PROJECT}.{BQ_DATASET}._ingestion_log")

## 7. Create Pre/Post 2021 Views

In [None]:
PARTITIONED_TABLES = [
    ("crime_incidents_legacy",  "crimedatetime"),
    ("crime_incidents_current", "crimedatetime"),
    ("bpd_arrests",             "arrestdatetime"),   # Note: not arrestdate
    ("vacant_building_notices", "datenotice"),
    ("vacant_building_rehabs",  "dateissued"),
    ("building_permits",        "issueddate"),        # Note: not issuedate
    ("service_requests_311",    "createddate"),
]

for table_name, date_col in PARTITIONED_TABLES:
    for period, operator, year in [("pre_2021", "<", 2021), ("post_2021", ">=", 2021)]:
        view_ref = f"{GCP_PROJECT}.views.{table_name}_{period}"
        view     = bigquery.Table(view_ref)
        view.view_query = f"""
            SELECT * FROM `{GCP_PROJECT}.{BQ_DATASET}.{table_name}`
            WHERE EXTRACT(YEAR FROM {date_col}) {operator} {year}
        """
        bq.delete_table(view_ref, not_found_ok=True)
        bq.create_table(view)
        log.info(f"  ✓ {view_ref}")

print("\n✓ All pre/post 2021 views created in 'views' dataset.")

## 8. BigQuery GIS Verification

In [None]:
# Geo coverage by period for each partitioned table
for table_name in [t for t in DATASETS if DATASETS[t].get("date_col")]:
    try:
        result = bq.query(f"""
            SELECT _period,
                   COUNT(*) AS total,
                   COUNTIF(geo_point_wkt IS NOT NULL) AS with_geo,
                   ROUND(COUNTIF(geo_point_wkt IS NOT NULL) / COUNT(*) * 100, 1) AS geo_pct
            FROM `{GCP_PROJECT}.{BQ_DATASET}.{table_name}`
            GROUP BY _period ORDER BY _period
        """).to_dataframe()
        print(f"\n{table_name}:")
        display(result)
    except Exception as e:
        print(f"  ⚠ {table_name}: {e}")

# Verify neighborhood_boundaries has polygon geometry
print("\nVerifying neighborhood_boundaries polygon geometry:")
try:
    result = bq.query(f"""
        SELECT
            COUNT(*) as total,
            COUNTIF(geo_polygon_wkt IS NOT NULL) as with_polygon
        FROM `{GCP_PROJECT}.{BQ_DATASET}.neighborhood_boundaries`
    """).to_dataframe()
    row = result.iloc[0]
    print(f"  {int(row['with_polygon'])} / {int(row['total'])} neighborhoods have polygon geometry.")
except Exception as e:
    print(f"  ⚠ {e}")

In [9]:
bq.delete_table("baltimore-analytics.raw_data.real_property", not_found_ok=True)
config = DATASETS["real_property"]
df = fetch_all_sources(config)
df = clean_dataframe(df, config)
load_to_bigquery(df, "real_property", date_col=None)

2026-02-26 16:57:29,185 — INFO —   Fetching primary source...
2026-02-26 16:57:29,544 — INFO —   Total records: 238496
2026-02-26 16:57:30,378 — INFO —   1,000 rows fetched...
2026-02-26 16:57:31,376 — INFO —   2,000 rows fetched...
2026-02-26 16:57:32,467 — INFO —   3,000 rows fetched...
2026-02-26 16:57:33,545 — INFO —   4,000 rows fetched...
2026-02-26 16:57:34,589 — INFO —   5,000 rows fetched...
2026-02-26 16:57:35,640 — INFO —   6,000 rows fetched...
2026-02-26 16:57:36,755 — INFO —   7,000 rows fetched...
2026-02-26 16:57:37,890 — INFO —   8,000 rows fetched...
2026-02-26 16:57:38,993 — INFO —   9,000 rows fetched...
2026-02-26 16:57:40,108 — INFO —   10,000 rows fetched...
2026-02-26 16:57:41,207 — INFO —   11,000 rows fetched...
2026-02-26 16:57:42,251 — INFO —   12,000 rows fetched...
2026-02-26 16:57:43,285 — INFO —   13,000 rows fetched...
2026-02-26 16:57:44,514 — INFO —   14,000 rows fetched...
2026-02-26 16:57:45,642 — INFO —   15,000 rows fetched...
2026-02-26 16:57:46,

---
## Next Steps
Once all datasets show `SUCCESS` and geo coverage looks reasonable:

1. **`02_spatial_enrichment.ipynb`** — Spatial join all point tables to NSA polygons, standardize `neighborhood` as universal join key
2. **`03_feature_engineering.ipynb`** — Aggregate to NSA level, build Neighborhood Vitality Index feature matrix
3. **`04_clustering.ipynb`** — K-means segmentation of neighborhoods into cohorts
4. **`05_looker_studio.md`** — Dashboard connection guide

**If 311 annual union endpoints fail:**  
Search `data.baltimorecity.gov` for `311 2015` (etc.), grab the URL, click the API tab to get the correct FeatureServer endpoint, and update `union_with` in Cell 1.

**Known data quality issues:**
- `bpd_arrests`: ~41% of records missing geo (older records never geocoded by BPD)
- `service_requests_311`: ~13% missing geo (older pre-2010 records)
- `service_requests_311`: 2023-2026 data excluded — consolidated endpoint unavailable

**BigQuery GIS quick reference:**
```sql
-- Point in polygon
ST_WITHIN(point_geo, polygon_geo)

-- Distance between points (meters)
ST_DISTANCE(point_a, point_b)

-- Count points within polygon
ST_INTERSECTS(point_geo, polygon_geo)
```