In [16]:
#!/usr/bin/env python3
from __future__ import annotations

import json
import logging
import os
import re
from typing import Dict, Optional, Tuple

import geopandas as gpd
import googlemaps
import numpy as np
import pandas as pd
from shapely.geometry import Point
from tqdm import tqdm

# ---------------------- Config ----------------------
INPUT_MD   = "SoFlaRecordsScraper - MiamiDade.csv"
INPUT_PBC  = "SoFlaRecordsScraper - PalmBeachCounty.csv"
INPUT_BRD  = "SoFlaRecordsScraper - Broward.csv"

OUTPUT_GEOJSON  = "map_data.geojson"
UNGEOCODED_CSV  = "ungeocoded.csv"
GEO_CACHE_PATH  = "geocode_cache.json"
DEFAULT_CRS     = "EPSG:4326"

# ---- Load Google Maps API key: %store first, env fallback ----
try:
    get_ipython().run_line_magic("store", "-r google_maps_API_Key")
except Exception:
    pass

if "google_maps_API_Key" not in globals() or not google_maps_API_Key:
    google_maps_API_Key = os.getenv("GOOGLE_MAPS_API_KEY", "")

if not google_maps_API_Key:
    raise ValueError("No Google Maps API key found. Set `%store google_maps_API_Key` or env var GOOGLE_MAPS_API_KEY.")

# Columns we care about (we won’t fail if some are missing)
COLUMNS_TO_KEEP = [
    "ScrapeDate","Doc Type","Instrument_Num","Record Date","Record Date Search",
    "Seller","Buyer","Consideration","Folio","Use Code Description","Building Sq. Ft",
    "Lot Size","Date of Previous Sale","Previous Owner Name","Previous Sale Price",
    "Physical Address","Mailing Address","Municipality","PropAppraiserURL",
    "Sunbiz Doc URL First Party","Sunbiz Doc URL Second Party",
    "First Party Registered Agent Name & Address","First Party Document Number",
    "First Party FEI/EIN Number","First Party Mailing Address","First Party Principal Address",
    "First Party State","First Party Date Filed","Second Party Registered Agent Name & Address",
    "Second Party Status","Second Party Document Number","Second Party FEI/EIN Number",
    "Second Party Mailing Address","Second Party Principal Address","Second Party State",
    "Second Party Date Filed",
]

RES_CLASSES = ["RESIDENTIAL","CONDOMINIUM","CONDO","FAMILY","RV PARK"]
COM_CLASSES = [
    "OFFICE","MANUFACTURING","COMMERCIAL","HOTEL","MOTEL","INDUSTRIAL","HEAVY IND",
    "GOLF COURSE","RETAIL","WAREH/DIST TERM","WAREHOUSE","STORAGE","MULTIFAMILY",
    "SCHOOL","RESTAURANTS","SHOPPING CENTER","MULTI-FAMILY","SERVICE STATION",
    "DRUG STORE","RELIGIOUS","WAREHOUSING","NIGHTCLUBS","PARKING LOT",
]

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("sofla-geo")

# ---------------------- Helpers ----------------------
def _keep_columns(df: pd.DataFrame) -> pd.DataFrame:
    avail = [c for c in COLUMNS_TO_KEEP if c in df.columns]
    return df[avail].copy() if avail else df.copy()

def format_md_folio(folio: str) -> str:
    s = str(folio)
    return f"{s[0:2]}-{s[2:6]}-{s[6:9]}-{s[9:13]}" if (s.isdigit() and len(s) == 13) else s

def extract_broward_folio(url: str) -> Optional[str]:
    if isinstance(url, str) and "Folio=" in url:
        return url.split("Folio=")[1]
    return None

def make_anchor(text: str, url: Optional[str]) -> str:
    if pd.isna(url) or url in {"No matching document found","No Prop Appraiser URL Found","Data Not Found"}:
        return str(text)
    return f'<a href="{url}" target="_blank">{text}</a>'

def normalize_pbc_addresses(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    for col in ["Physical Address","Municipality"]:
        if col in df.columns:
            df[col] = df[col].astype(str).str.strip().replace({"Data Not Found": ""}).fillna("")
    has_addr = df.get("Physical Address","") != ""
    is_uninc = df.get("Municipality","").eq("UNINCORPORATED") if "Municipality" in df.columns else False
    df.loc[has_addr & is_uninc, "Physical Address"] = (
        df.loc[has_addr & is_uninc, "Physical Address"] + " IN UNINCORPORATED PALM BEACH COUNTY"
    )
    df.loc[has_addr & ~is_uninc, "Physical Address"] = (
        df.loc[has_addr & ~is_uninc, "Physical Address"].str.cat(
            df.loc[has_addr & ~is_uninc, "Municipality"], sep=" IN "
        )
    )
    if "Municipality" in df.columns:
        df = df.drop(columns="Municipality")
    df["Physical Address"] = df["Physical Address"].str.replace(r" in $","Data Not Found",regex=True).str.strip(", ")
    return df

def split_zip_from_address(df: pd.DataFrame, addr_col: str) -> pd.DataFrame:
    """Capture ZIP to ZipCode column; DO NOT alter any 'Full Address'."""
    df = df.copy()
    z = df[addr_col].astype(str).str.extract(r"(\b\d{5}(?:-\d{4})?\b)", expand=False)
    df["ZipCode"] = z
    df[addr_col] = df[addr_col].astype(str).str.replace(
        r"\b\d{5}(?:-\d{4})?\b","",regex=True
    ).str.strip().str.strip(",")
    return df

def classify_use_code(series: pd.Series) -> pd.Series:
    s = series.fillna("").astype(str).str.upper()
    res_pat = "|".join(map(re.escape, RES_CLASSES))
    com_pat = "|".join(map(re.escape, COM_CLASSES))
    out = pd.Series("OTHER", index=s.index)
    out[s.str.contains(res_pat, na=False)] = "RESIDENTIAL"
    out[s.str_contains(com_pat, na=False)] = "COMMERCIAL"
    return out

def classify_use_code(series: pd.Series) -> pd.Series:  # (fixed: older pandas compat)
    s = series.fillna("").astype(str).str.upper()
    res_pat = "|".join(map(re.escape, RES_CLASSES))
    com_pat = "|".join(map(re.escape, COM_CLASSES))
    out = pd.Series("OTHER", index=s.index)
    out[s.str.contains(res_pat, na=False)] = "RESIDENTIAL"
    out[s.str.contains(com_pat, na=False)] = "COMMERCIAL"
    return out

def normalize_doc_types(s: pd.Series) -> pd.Series:
    s = s.fillna("").astype(str)
    for pat, repl in {
        r"\bDEE\b":"DEED",
        r"\bDeed Transfers of Real Property\b":"DEED",
        r"\bMOR\b":"MORTGAGE",
        r"Mortgage/ Modifications & Assumptions":"MORTGAGE",
    }.items():
        s = s.str.replace(pat, repl, regex=True)
    return s

def clean_sale_price(s: pd.Series) -> pd.Series:
    s = s.replace(["None","No price found!","Data Not Found","No results."], np.nan)
    s = s.astype(str).str.replace("$","",regex=False).str.replace(",","",regex=False)
    return pd.to_numeric(s, errors="coerce")

def add_anchors(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    if {"Sunbiz Doc URL First Party","Seller"}.issubset(df.columns):
        df["Seller"] = [make_anchor(t,u) for t,u in zip(df["Seller"].astype(str), df["Sunbiz Doc URL First Party"])]
    if {"Sunbiz Doc URL Second Party","Buyer"}.issubset(df.columns):
        df["Buyer"] = [make_anchor(t,u) for t,u in zip(df["Buyer"].astype(str), df["Sunbiz Doc URL Second Party"])]
    if {"PropAppraiserURL","Folio"}.issubset(df.columns):
        df["Folio"] = [make_anchor(str(t),u) for t,u in zip(df["Folio"], df["PropAppraiserURL"])]
    return df

# ---------------------- Geocoding w/ cache ----------------------
def load_cache(path: str) -> Dict[str, Tuple[float,float]]:
    if os.path.exists(path):
        try:
            with open(path, "r", encoding="utf-8") as f:
                return json.load(f)
        except Exception:
            return {}
    return {}

def save_cache(path: str, cache: Dict[str, Tuple[float,float]]) -> None:
    tmp = path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(cache, f)
    os.replace(tmp, path)

zipcode_re    = re.compile(r"\b\d{5}(?:-\d{4})?\b")
zip4_zeros_re = re.compile(r"(\b\d{5})-0000\b")

def geocode_addresses(
    df: pd.DataFrame,
    addr_col: str,
    cache_path: str,
    api_key: str,
    show_progress: bool = True,
) -> pd.DataFrame:
    """Geocode unique addresses using Google Maps with FL/US bias.
       Keep address string (incl. county suffix) as provided.
       Fallback: drop ZIP; try '{no_zip}, FL, USA'.
    """
    df = df.copy()
    cache = load_cache(cache_path)
    gmaps = googlemaps.Client(key=api_key)

    def _geo_one(addr: str) -> Tuple[Optional[float], Optional[float]]:
        if not addr:
            return (None, None)
        if addr in cache:
            return cache[addr]
        try:
            # slight normalization: collapse '-0000' in ZIP9
            addr_try = zip4_zeros_re.sub(r"\1", addr).strip()
            result = gmaps.geocode(
                addr_try,
                components={"administrative_area": "FL", "country": "US"},
                region="us",
            )
            if not result:
                # fallback: drop ZIP entirely and try with explicit FL, USA
                no_zip = zipcode_re.sub("", addr_try).replace("  ", " ").strip(", ").strip()
                if no_zip:
                    result = gmaps.geocode(
                        f"{no_zip}, FL, USA",
                        components={"administrative_area": "FL", "country": "US"},
                        region="us",
                    )
            if result:
                lat = result[0]["geometry"]["location"]["lat"]
                lng = result[0]["geometry"]["location"]["lng"]
                cache[addr] = (lat, lng)  # cache against the original string
                return (lat, lng)
        except Exception as e:
            log.warning("Geocode failed for '%s': %s", addr, e)
        return (None, None)

    if addr_col not in df.columns:
        # Guarantee output columns exist even if addr_col is missing
        df["latitude"] = np.nan
        df["longitude"] = np.nan
        return df

    addrs = df[addr_col].fillna("").astype(str)
    unique_addrs = addrs.unique().tolist()
    log.info("Unique addresses to resolve: %d", len(unique_addrs))

    iterator = tqdm(unique_addrs, desc="Geocoding", unit="addr") if show_progress else unique_addrs
    for a in iterator:
        if a and a not in cache:
            _geo_one(a)
    save_cache(cache_path, cache)

    coords = addrs.map(lambda a: cache.get(a, (None, None)))
    df["latitude"]  = coords.map(lambda t: t[0])
    df["longitude"] = coords.map(lambda t: t[1])
    return df

# ---------------------- Main ----------------------
def main() -> None:
    # Start clean (optional)
    if os.path.exists(OUTPUT_GEOJSON):
        os.remove(OUTPUT_GEOJSON)
        log.info("Deleted old %s", OUTPUT_GEOJSON)

    # 1) Load CSVs (full sheets)
    md_df  = pd.read_csv(INPUT_MD,  encoding="utf-8-sig")
    pbc_df = pd.read_csv(INPUT_PBC, encoding="utf-8-sig")
    brd_df = pd.read_csv(INPUT_BRD, encoding="utf-8-sig")

    md_df, pbc_df, brd_df = _keep_columns(md_df), _keep_columns(pbc_df), _keep_columns(brd_df)

    # 2) County-specific tweaks
    if "Folio" in md_df.columns:
        md_df["Folio"] = md_df["Folio"].astype(str).map(format_md_folio)
    if "PropAppraiserURL" in brd_df.columns:
        brd_df["Folio"] = brd_df["PropAppraiserURL"].map(extract_broward_folio)

    # Full Address + County (for geocoding & display) — build BEFORE any ZIP edits
    if "Physical Address" in md_df.columns:
        md_df["Full Address"] = md_df["Physical Address"].astype(str) + " MIAMI-DADE"
        md_df["County"] = "Miami-Dade"
    if "Physical Address" in pbc_df.columns:
        pbc_df["Full Address"] = pbc_df["Physical Address"].astype(str) + " PALM BEACH COUNTY"
        pbc_df["County"] = "Palm Beach"
    if "Physical Address" in brd_df.columns:
        brd_df["Full Address"] = brd_df["Physical Address"].astype(str) + " BROWARD COUNTY"
        brd_df["County"] = "Broward"

    # PBC municipality logic
    pbc_df = normalize_pbc_addresses(pbc_df)

    # Optional ZIP extraction for MD & Broward (does not alter Full Address)
    if "Physical Address" in md_df.columns:
        md_df = split_zip_from_address(md_df, "Physical Address")
    if "Physical Address" in brd_df.columns:
        brd_df = split_zip_from_address(brd_df, "Physical Address")

    # 3) Concatenate (keep everything)
    df = pd.concat([md_df, pbc_df, brd_df], ignore_index=True, sort=False)

    # 4) Cleaning / normalization (NO row drops for policy safety)
    if "Consideration" in df.columns:
        df = df.rename(columns={"Consideration": "Sale Price"})
        df["Sale Price"] = clean_sale_price(df["Sale Price"])

    if "Doc Type" in df.columns:
        df["Doc Type"] = normalize_doc_types(df["Doc Type"])

    if "Use Code Description" in df.columns:
        df["Use Code Description"] = df["Use Code Description"].astype(str).str.upper()
        df["Simple Classification"] = classify_use_code(df["Use Code Description"])

    # Replace cells containing NOT FOUND with empty string (keep the row)
    df = df.apply(lambda c: c.mask(c.astype(str).str.contains("NOT FOUND", case=False, na=False), ""))

    # Anchors
    df = add_anchors(df)

    # 5) Geocoding on Full Address (keep all rows; geometry may be null)
    if "Full Address" not in df.columns:
        df["Full Address"] = df.get("Physical Address","")
    df = geocode_addresses(df, addr_col="Full Address", cache_path=GEO_CACHE_PATH, api_key=google_maps_API_Key, show_progress=True)

    # Ensure geocode columns exist (defensive)
    if "latitude" not in df.columns:
        df["latitude"] = np.nan
    if "longitude" not in df.columns:
        df["longitude"] = np.nan

    # Export ungeocoded rows for triage (they stay in GeoJSON as geometry=null)
    miss_mask = df[["latitude","longitude"]].isna().any(axis=1)
    try:
        df.loc[miss_mask].to_csv(UNGEOCODED_CSV, index=False)
        log.info("Wrote %d ungeocoded rows to %s", int(miss_mask.sum()), UNGEOCODED_CSV)
    except Exception as e:
        log.warning("Could not write %s: %s", UNGEOCODED_CSV, e)

    # 6) Bundle mortgages with deeds (enrich deeds) — keep mortgages too
    if {"Doc Type","Seller","Buyer","Record Date"}.issubset(df.columns):
        deeds     = df[df["Doc Type"].eq("DEED")].copy()
        mortgages = df[df["Doc Type"].eq("MORTGAGE")].copy()
        if not deeds.empty and not mortgages.empty:
            mortgages["TransactionID"] = mortgages["Seller"].astype(str) + mortgages["Record Date"].astype(str)
            deeds["TransactionID"]     = deeds["Buyer"].astype(str)  + deeds["Record Date"].astype(str)
            if "Buyer" in mortgages.columns:
                mortgages = mortgages.rename(columns={"Buyer":"Lender"})
            if "Sale Price" in mortgages.columns:
                mortgages = mortgages.rename(columns={"Sale Price":"Loan Amount"})
            deeds_enriched = deeds.merge(
                mortgages[["TransactionID","Lender","Loan Amount"]],
                on="TransactionID", how="left"
            )
            # keep enriched deeds + all non-deed rows (including mortgages)
            others = df[~df["Doc Type"].eq("DEED")]
            df = pd.concat([deeds_enriched, others], ignore_index=True, sort=False)

    # 7) GeoDataFrame (geometry can be None)
    lons = df["longitude"]
    lats = df["latitude"]
    geometry = [Point(lon, lat) if pd.notna(lat) and pd.notna(lon) else None for lon, lat in zip(lons, lats)]
    gdf = gpd.GeoDataFrame(df, geometry=geometry, crs=DEFAULT_CRS)

    # Fill NaNs in properties for readability (leave geometry None for misses)
    for col in gdf.columns:
        if col == "geometry":
            continue
        if pd.api.types.is_numeric_dtype(gdf[col]):
            gdf[col] = gdf[col].fillna(0)
        else:
            gdf[col] = gdf[col].fillna("Data Not Found")

    # 8) Write GeoJSON
    gdf.to_file(OUTPUT_GEOJSON, driver="GeoJSON")
    total = len(gdf)
    geocoded = int(gdf["geometry"].notna().sum())
    log.info("Wrote %d rows to %s (%d geocoded, %d ungeocoded)", total, OUTPUT_GEOJSON, geocoded, total - geocoded)

if __name__ == "__main__":
    main()


2025-09-25 02:43:32 | INFO | API queries_quota: 60
2025-09-25 02:43:32 | INFO | Unique addresses to resolve: 2683
Geocoding: 100%|██████████████████████████| 2683/2683 [07:09<00:00,  6.25addr/s]
2025-09-25 02:50:42 | INFO | Wrote 2310 ungeocoded rows to ungeocoded.csv
2025-09-25 02:50:43 | INFO | Created 5,742 records
2025-09-25 02:50:43 | INFO | Wrote 5742 rows to map_data.geojson (3430 geocoded, 2312 ungeocoded)
