# Code Snippet 2. Geocoding pipeline

In [3]:
from __future__ import annotations

import io
import csv
import re
from typing import Optional

import numpy as np
import pandas as pd
import requests


# ---------------------------
# 0) Inputs / outputs
# ---------------------------
IN_HARMONIZED = "/content/harmonized_core.csv"
S3_PATH = "/content/S3_Gwendolyn Stegall data.xlsx"
OUT_GEOCODED = "harmonized_geocoded.csv"

CENSUS_BATCH_URL = "https://geocoding.geo.census.gov/geocoder/locations/addressbatch"
CENSUS_BENCHMARK = "Public_AR_Current"
CENSUS_VINTAGE = "Current_Current"

STATE_DEFAULT = "NY"
ZIP_RE = re.compile(r"\b(\d{5})(?:-\d{4})?\b")


# ---------------------------
# 1) Address cleaning / classification
# ---------------------------
def norm_ws(x: Optional[str]) -> Optional[str]:
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return None
    s = str(x).strip()
    s = re.sub(r"\s+", " ", s)
    return s if s else None


def strip_parentheses(s: str) -> str:
    return re.sub(r"\s*\([^)]*\)\s*", " ", s).strip()


def extract_zip(addr: Optional[str]) -> Optional[str]:
    a = norm_ws(addr)
    if not a:
        return None
    m = ZIP_RE.search(a)
    return m.group(1) if m else None


def classify_address_kind(addr: Optional[str]) -> str:
    a = norm_ws(addr)
    if not a:
        return "missing"

    a = strip_parentheses(a).lower()

    if re.search(r"\bbetween\b", a):
        return "between_phrase"
    if " and " in a or " & " in a:
        return "intersection"
    if ";" in a or " or " in a or " later " in a or " and later " in a:
        return "multi_address"
    if re.match(r"^\d", a):
        return "street_address"

    return "descriptive"


def clean_street_for_geocode(addr: Optional[str]) -> Optional[str]:
    a = norm_ws(addr)
    if not a:
        return None

    a = strip_parentheses(a)
    a = a.replace("‐", "-").replace("–", "-").replace("—", "-")
    a = re.sub(r"\s+", " ", a).strip()

    a_lower = a.lower()

    if re.search(r"\bbetween\b", a_lower):
        return None
    if " and " in a_lower or " & " in a_lower:
        return None
    if ";" in a_lower or " or " in a_lower or " later " in a_lower or " and later " in a_lower:
        return None

    a = re.sub(
        r",\s*(New York|NYC|Manhattan|Brooklyn|Queens|Bronx|Staten Island)\b.*$",
        "",
        a,
        flags=re.I,
    ).strip()
    a = re.sub(r",\s*NY\b.*$", "", a, flags=re.I).strip()

    return a if a else None


def borough_to_city(borough: Optional[str]) -> str:
    if not borough:
        return "New York"
    b = borough.strip().lower()
    if b.startswith("manhattan"):
        return "New York"
    if b.startswith("brooklyn"):
        return "Brooklyn"
    if b.startswith("queens"):
        return "Queens"
    if b.startswith("bronx"):
        return "Bronx"
    if "staten" in b:
        return "Staten Island"
    return "New York"


# ---------------------------
# 2) Optional enrichment: bring S3 borough back in (improves city field)
# ---------------------------
def enrich_s3_borough(harm: pd.DataFrame, s3_path: Optional[str]) -> pd.DataFrame:
    if not s3_path:
        harm["borough_hint"] = pd.NA
        return harm

    try:
        s3 = pd.read_excel(s3_path).reset_index(drop=True)
    except Exception:
        harm["borough_hint"] = pd.NA
        return harm

    if "source_record_id" in harm.columns:
        harm["source_record_id"] = harm["source_record_id"].astype(str)

    s3_rows = harm.loc[harm["source_dataset"] == "S3", "source_record_id"].dropna()
    if s3_rows.empty:
        harm["borough_hint"] = pd.NA
        return harm

    width = int(s3_rows.astype(str).str.len().max())

    s3["source_record_id"] = [f"{i+1:0{width}d}" for i in range(len(s3))]
    s3["source_record_id"] = s3["source_record_id"].astype(str)

    aux = s3[["source_record_id", "Borough"]].rename(columns={"Borough": "borough_hint"})
    harm = harm.merge(aux, on="source_record_id", how="left")
    return harm


# ---------------------------
# 3) Census batch geocoding
# ---------------------------
def census_batch_geocode(batch_df: pd.DataFrame) -> pd.DataFrame:
    if batch_df.empty:
        return pd.DataFrame(
            columns=[
                "id",
                "census_match",
                "census_match_type",
                "census_matched_address",
                "census_lon",
                "census_lat",
                "census_tigerline_id",
                "census_side",
            ]
        )

    out = io.StringIO()
    writer = csv.writer(out, lineterminator="\n")
    for _, r in batch_df.iterrows():
        writer.writerow([r["id"], r["street"], r["city"], r["state"], r.get("zip", "") or ""])
    out.seek(0)

    files = {"addressFile": ("batch.csv", out.getvalue(), "text/csv")}
    params = {"benchmark": CENSUS_BENCHMARK, "vintage": CENSUS_VINTAGE}

    resp = requests.post(CENSUS_BATCH_URL, params=params, files=files, timeout=60)
    resp.raise_for_status()

    rows = []
    reader = csv.reader(io.StringIO(resp.text))
    for row in reader:
        if len(row) < 6:
            continue

        rid = row[0].strip()
        match = row[2].strip() if len(row) > 2 else ""
        match_type = row[3].strip() if len(row) > 3 else ""
        matched_addr = row[4].strip() if len(row) > 4 else ""
        coords = row[5].strip() if len(row) > 5 else ""

        lon, lat = (np.nan, np.nan)
        if coords and "," in coords:
            parts = [p.strip() for p in coords.split(",")]
            if len(parts) == 2:
                try:
                    lon = float(parts[0])
                    lat = float(parts[1])
                except Exception:
                    lon, lat = (np.nan, np.nan)

        tiger = row[6].strip() if len(row) > 6 else ""
        side = row[7].strip() if len(row) > 7 else ""

        rows.append(
            {
                "id": rid,
                "census_match": match,
                "census_match_type": match_type,
                "census_matched_address": matched_addr,
                "census_lon": lon,
                "census_lat": lat,
                "census_tigerline_id": tiger if tiger else pd.NA,
                "census_side": side if side else pd.NA,
            }
        )

    return pd.DataFrame(rows)


# ---------------------------
# 4) Run geocoding: add coords where missing, keep provenance, no merging
# ---------------------------
harm = pd.read_csv(
    IN_HARMONIZED,
    dtype={"entry_id": "string", "source_dataset": "string", "source_record_id": "string"},
)

if "source_dataset" not in harm.columns or harm["source_dataset"].isna().all():
    harm["source_dataset"] = harm["entry_id"].astype(str).str.split(":").str[0]
else:
    m = harm["source_dataset"].isna()
    harm.loc[m, "source_dataset"] = harm.loc[m, "entry_id"].astype(str).str.split(":").str[0]

harm = enrich_s3_borough(harm, S3_PATH)

harm["address_kind"] = harm["address_raw"].map(classify_address_kind)

coords_missing = harm["lat"].isna() | harm["lon"].isna()

harm["geocode_attempted"] = False
harm["geocode_skip_reason"] = pd.NA

harm.loc[coords_missing & harm["address_raw"].isna(), "geocode_skip_reason"] = "missing_address"
harm.loc[
    coords_missing & harm["address_kind"].isin(["intersection", "between_phrase", "multi_address", "descriptive"]),
    "geocode_skip_reason",
] = harm.loc[
    coords_missing & harm["address_kind"].isin(["intersection", "between_phrase", "multi_address", "descriptive"]),
    "address_kind",
]

needs_geocode = coords_missing & harm["address_raw"].notna()

batch_rows = []
for _, r in harm.loc[needs_geocode].iterrows():
    kind = r.get("address_kind")
    if kind != "street_address":
        continue

    street = clean_street_for_geocode(r["address_raw"])
    if not street:
        if pd.isna(harm.loc[harm["entry_id"] == r["entry_id"], "geocode_skip_reason"]).all():
            harm.loc[harm["entry_id"] == r["entry_id"], "geocode_skip_reason"] = "unusable_address_string"
        continue

    borough = norm_ws(r.get("borough_hint"))
    city = borough_to_city(borough)
    z = extract_zip(r["address_raw"])

    batch_rows.append(
        {
            "id": str(r["entry_id"]),
            "street": street,
            "city": city,
            "state": STATE_DEFAULT,
            "zip": z or "",
        }
    )

batch_df = pd.DataFrame(batch_rows)

if not batch_df.empty:
    harm.loc[harm["entry_id"].isin(batch_df["id"].astype(str)), "geocode_attempted"] = True

results = census_batch_geocode(batch_df)

harm["geocode_provider"] = pd.NA
harm["geocode_status"] = pd.NA
harm["geocode_match_type"] = pd.NA
harm["geocode_matched_address"] = pd.NA
harm["geocode_query"] = pd.NA

res_map = results.set_index("id").to_dict(orient="index") if not results.empty else {}

for i, r in harm.iterrows():
    if not (pd.isna(r["lat"]) or pd.isna(r["lon"])):
        continue

    rid = str(r["entry_id"])

    if rid not in res_map:
        if r.get("geocode_attempted") is True and pd.isna(r.get("geocode_status")):
            harm.at[i, "geocode_provider"] = "US Census Geocoder"
            harm.at[i, "geocode_status"] = "No_Match"
        continue

    rr = res_map[rid]
    harm.at[i, "geocode_provider"] = "US Census Geocoder"
    harm.at[i, "geocode_status"] = rr.get("census_match", pd.NA)
    harm.at[i, "geocode_match_type"] = rr.get("census_match_type", pd.NA)
    harm.at[i, "geocode_matched_address"] = rr.get("census_matched_address", pd.NA)

    qrow = batch_df.loc[batch_df["id"] == rid]
    if not qrow.empty:
        harm.at[i, "geocode_query"] = (
            f'{qrow.iloc[0]["street"]}, {qrow.iloc[0]["city"]}, {qrow.iloc[0]["state"]} {qrow.iloc[0]["zip"]}'.strip()
        )

    if str(rr.get("census_match", "")).lower() == "match":
        harm.at[i, "lon"] = rr.get("census_lon", np.nan)
        harm.at[i, "lat"] = rr.get("census_lat", np.nan)
        harm.at[i, "space_basis"] = "geocoded_census"
        harm.at[i, "spatial_precision"] = "point"
        harm.at[i, "geocode_skip_reason"] = pd.NA
    else:
        if harm.at[i, "geocode_attempted"] is True and pd.isna(harm.at[i, "geocode_skip_reason"]):
            harm.at[i, "geocode_skip_reason"] = "geocoder_returned_no_match"

# Post-pass: make attempted failures explicit for auditability (A method)
mask_no_match = (
    (harm["geocode_attempted"] == True)
    & (harm["geocode_status"].astype(str).str.lower() == "no_match")
    & (harm["geocode_skip_reason"].isna())
)
harm.loc[mask_no_match, "geocode_skip_reason"] = "geocoder_no_match"

harm.to_csv(OUT_GEOCODED, index=False, encoding="utf-8-sig")
print(f"Wrote: {OUT_GEOCODED} (rows={len(harm)})")


Wrote: harmonized_geocoded.csv (rows=542)
