In [1]:
# ────────────────────────────────────────────────────────────────────────────────
# CMS Ownership Ingest (robust) → Clean monthlies → Combine (normalized)
#    - Works with monthly inner zips named like _MM_YYYY, _YYYY_MM, OR just "NH_Ownership"
#    - Filename filter + header-sniff fallback (e.g., 2020-10 case)
# ────────────────────────────────────────────────────────────────────────────────
import os, re, zipfile
from io import BytesIO
from pathlib import Path
from datetime import datetime
import pandas as pd
from tqdm.auto import tqdm

# ----------------------------- Config / Paths ---------------------------------
PROJECT_ROOT = Path.cwd()
while not (PROJECT_ROOT / "src").is_dir() and PROJECT_ROOT != PROJECT_ROOT.parent:
    PROJECT_ROOT = PROJECT_ROOT.parent

RAW_DIR    = Path(os.getenv("NH_DATA_DIR", PROJECT_ROOT / "data" / "raw"))
NH_ZIP_DIR = RAW_DIR / "nh-compare"          # where nh_archive_*.zip live
OWN_DIR    = RAW_DIR / "ownership-files"     # flat cleaned monthlies + combined
OWN_DIR.mkdir(parents=True, exist_ok=True)

OUT_COMBINED_CSV = OWN_DIR / "ownership_combined.csv"
OUT_COMBINED_PQ  = OWN_DIR / "ownership_combined.parquet"  # optional

# Optional knobs
SKIP_EXISTING_MONTHS = True
SPEED_MODE            = False
MIN_YEAR, MIN_MONTH   = 2017, 1

print(f"[paths] RAW_DIR={RAW_DIR}")
print(f"[paths] NH_ZIP_DIR={NH_ZIP_DIR}")
print(f"[paths] OWN_DIR={OWN_DIR}")

# ----------------------------- Header Map --------------------------------------
COLUMN_MAP = {
    "cms certification number (ccn)": "cms_certification_number",
    "federal provider number":        "cms_certification_number",
    "provnum":                        "cms_certification_number",
    "provider name":                  "provider_name",
    "provname":                       "provider_name",
    "role played by owner or manager in facility": "role",
    "role played by owner in facility":            "role",
    "role of owner or manager":                    "role",
    "owner role":                                  "role",
    "role_desc":                                   "role",
    "role desc":                                   "role",
    "owner type":                 "owner_type",
    "owner name":                 "owner_name",
    "ownership percentage":       "ownership_percentage",
    "owner percentage":           "ownership_percentage",
    "association date":           "association_date",
    "processing date":            "processing_date",
    "processingdate":             "processing_date",
    "process date":               "processing_date",
    "processdate":                "processing_date",
    "filedate":                   "processing_date",
}
EXPECTED = [
    "cms_certification_number","provider_name","role","owner_type","owner_name",
    "ownership_percentage","association_date","processing_date"
]

# ----------------------------- Helpers -----------------------------------------
def safe_read_csv(raw: bytes, nrows=None) -> pd.DataFrame:
    for enc in ("utf-8", "latin-1"):
        try:
            return pd.read_csv(BytesIO(raw), dtype=str, encoding=enc,
                               low_memory=False, engine="c", nrows=nrows)
        except Exception:
            pass
    return pd.read_csv(BytesIO(raw), dtype=str, encoding="utf-8",
                       encoding_errors="replace", low_memory=False,
                       engine="c", nrows=nrows)

def norm_header(h: str) -> str:
    s = str(h or "").strip().lower().replace("_", " ")
    s = re.sub(r"\s+", " ", s)
    return s

def map_headers(cols):
    return [COLUMN_MAP.get(norm_header(c), norm_header(c)) for c in cols]

def parse_month_year_from_zipname(name: str):
    """
    Supports ..._MM_YYYY.zip or ..._YYYY_MM.zip.
    Returns (month:int, year:int) or (None, None) if not present in name.
    """
    s = Path(name).stem
    toks = [t for t in re.split(r"[^0-9]+", s) if t]
    if len(toks) >= 2:
        a, b = toks[-2], toks[-1]
        if len(a) == 4 and 1 <= int(b) <= 12:   # a=YYYY, b=MM
            return int(b), int(a)
        if len(b) == 4 and 1 <= int(a) <= 12:   # b=YYYY, a=MM
            return int(a), int(b)
        # fallback: prefer 4-digit as year
        aa, bb = int(a), int(b)
        if len(a) == 4:  # a=year
            return max(1, min(bb, 12)), aa
        if len(b) == 4:  # b=year
            return max(1, min(aa, 12)), bb
    return None, None

def parse_year_from_archive(outer_name: str):
    m = re.search(r"(20\d{2})", outer_name)
    return int(m.group(1)) if m else None

def is_ownership_file(filename: str) -> bool:
    """
    Looser, case-insensitive match. Includes 'Ownership_Display' and other variants.
    """
    f = filename.lower()
    if not f.endswith(".csv"):
        return False
    return (
        ("ownership" in f and "download" in f)  # typical
        or ("ownership" in f and "display" in f)  # one-off
        or f.startswith("nh_ownership_")  # other variants
        or f == "nh_ownership.csv"  # ultra-minimal name case
    )

def looks_like_ownership_csv_bytes(b: bytes) -> bool:
    """Filename-agnostic: sniff headers to decide if this is an ownership CSV."""
    try:
        probe = safe_read_csv(b, nrows=5)
    except Exception:
        return False
    cols = [norm_header(c) for c in probe.columns]
    keys = set(cols)
    needed_any = [
        {"cms certification number (ccn)", "federal provider number", "provnum"},
        {"provider name", "provname"},
        {"owner name"},
        {"ownership percentage", "owner percentage"},
        {"role", "role desc", "role_desc",
         "role played by owner in facility", "role of owner or manager",
         "role played by owner or manager in facility"},
    ]
    hits = sum(1 for group in needed_any if keys & group)
    return hits >= 3  # lenient

def infer_month_year_from_inner_csvs(zipfile_obj: zipfile.ZipFile, default_year=None):
    """
    Inspect CSVs inside the inner monthly zip to infer processing month/year
    from 'processing_date' column values. Returns (month, year) or (None, None).
    """
    candidates = []
    for entry in zipfile_obj.namelist():
        if not entry.lower().endswith(".csv"):
            continue
        try:
            df = safe_read_csv(zipfile_obj.read(entry), nrows=1000)
        except Exception:
            continue
        df.columns = map_headers(df.columns)
        if "processing_date" in df.columns:
            s = pd.to_datetime(df["processing_date"], errors="coerce")
            s = s.dropna()
            if not s.empty:
                dt = s.iloc[0]
                candidates.append((int(dt.month), int(dt.year)))
    if candidates:
        # mode by frequency
        sr = pd.Series(candidates).value_counts()
        (m, y) = sr.index[0]
        return m, y
    # fallback: at least return year if we have it
    return (None, int(default_year)) if default_year else (None, None)

def read_csv_bytes_month(b: bytes, source_name: str, month: int, year: int) -> pd.DataFrame:
    df = safe_read_csv(b)
    df.columns = map_headers(df.columns)
    # ensure expected columns
    for col in EXPECTED:
        if col not in df.columns:
            df[col] = pd.NA

    # Vectorized normalization
    # CCN
    df["cms_certification_number"] = (
        df["cms_certification_number"].astype(str)
          .str.replace(r"\D", "", regex=True)
          .str.zfill(6)
    )

    # Role → only classify real ownership, not DIRECTOR/OFFICER
    role = df["role"].fillna("").str.upper()
    df["role"] = "OTHER"
    df.loc[role.str.contains("INDIRECT OWNERSHIP"), "role"] = "INDIRECT"
    df.loc[role.str.contains("DIRECT OWNERSHIP"),   "role"] = "DIRECT"
    df.loc[role.str.contains("OPERATIONAL/MANAGERIAL CONTROL") |
           role.str.contains("MANAGING EMPLOYEE"),  "role"] = "MANAGER"

    # Ownership % → float with NaN when missing
    pct = df["ownership_percentage"].fillna("")
    pct = pct.str.replace("%","", regex=False).str.replace(",","", regex=False).str.strip()
    pct = pct.mask(pct.eq("") | pct.str.contains("NO PERCENTAGE", case=False))
    df["ownership_percentage"] = pd.to_numeric(pct, errors="coerce")

    # processing_date: prefer column, else synthesize first-of-month
    df["processing_date"] = pd.to_datetime(df.get("processing_date"), errors="coerce")
    synth = pd.Timestamp(year=int(year), month=int(month), day=1)
    df.loc[df["processing_date"].isna(), "processing_date"] = synth

    # association_date left as string here (parse after concat)
    df["month"] = int(month)
    df["year"]  = int(year)
    df["date"]  = synth
    df["source_file"] = source_name

    # De-dup within month
    dedup_keys = [
        "cms_certification_number","provider_name","role","owner_type","owner_name",
        "ownership_percentage","association_date","year","month"
    ]
    df = df.sort_values(["cms_certification_number","owner_name"]).drop_duplicates(dedup_keys)

    # Guarantee role normalization succeeded this month
    assert set(df["role"].dropna().unique()) <= {"DIRECT","INDIRECT","MANAGER","OTHER"}, \
        f"Monthly role values not normalized in {source_name}"

    keep = [
        "cms_certification_number","provider_name","role","owner_type","owner_name",
        "ownership_percentage","association_date","processing_date",
        "source_file","month","year","date"
    ]
    return df[[c for c in keep if c in df.columns]]

def yield_csvs_from_zip(zpath: Path):
    """Yield (inner_name, bytes) for CSVs in a zip; also walks nested zips."""
    with zipfile.ZipFile(zpath, "r") as z:
        for info in z.infolist():
            name = info.filename
            if name.lower().endswith(".zip"):
                with z.open(info) as inner:
                    with zipfile.ZipFile(BytesIO(inner.read()), "r") as z2:
                        for info2 in z2.infolist():
                            if info2.filename.lower().endswith(".csv"):
                                yield (f"{zpath.name}::{info2.filename}", z2.read(info2))
            elif name.lower().endswith(".csv"):
                yield (f"{zpath.name}::{name}", z.read(info))

# ----------------------------- Ingest & Write Monthlies ------------------------
summary_rows = []
written_monthlies = set()
archives = sorted(NH_ZIP_DIR.glob("nh_archive_*.zip"))
print(f"[scan] Found {len(archives)} yearly nh_archive_*.zip files")

already = {p.name for p in OWN_DIR.glob("ownership_download_*.csv")} if SKIP_EXISTING_MONTHS else set()

for yearly_zip in tqdm(archives, desc="Yearly archives"):
    outer_year = parse_year_from_archive(yearly_zip.name)
    with zipfile.ZipFile(yearly_zip, "r") as yz:
        inner_zips = sorted(n for n in yz.namelist() if n.lower().endswith(".zip"))
        for mzip in inner_zips:
            # 1) Try to parse from inner zip name
            month, year = parse_month_year_from_zipname(mzip)

            # 2) If missing (e.g., "NH_Ownership"), infer from CSV contents
            if not month or not year:
                try:
                    with zipfile.ZipFile(BytesIO(yz.read(mzip)), "r") as mz:
                        month, year = infer_month_year_from_inner_csvs(mz, default_year=outer_year)
                        if not month and year:
                            # we found only year; log and skip (can't synthesize month reliably)
                            print(f"[warn] Could not infer month from {mzip}; year={year}. Skipping.")
                            continue
                        if not month or not year:
                            print(f"[warn] Could not infer month/year from {mzip}; skipping.")
                            continue
                except KeyError:
                    print(f"[warn] Missing {mzip} inside {yearly_zip.name}; skipping.")
                    continue

            if SPEED_MODE:
                if (year < MIN_YEAR) or (year == MIN_YEAR and month < MIN_MONTH):
                    continue

            out_name = f"ownership_download_{year:04d}_{month:02d}.csv"
            if SKIP_EXISTING_MONTHS and out_name in already:
                written_monthlies.add((year, month))
                continue

            print(f"[month] {year:04d}-{month:02d} → scanning ownership CSVs...")
            try:
                with zipfile.ZipFile(BytesIO(yz.read(mzip)), "r") as mz:
                    rows_accum, num_found = [], 0

                    # Pass 1: filename filter
                    for entry in mz.namelist():
                        fname = Path(entry).name
                        if not is_ownership_file(fname):
                            continue
                        num_found += 1
                        try:
                            df = read_csv_bytes_month(
                                mz.read(entry),
                                source_name=f"{year:04d}-{month:02d}:{fname}",
                                month=month, year=year
                            )
                            rows_accum.append(df)
                        except Exception as e:
                            print(f"[warn] Failed member {fname} in {mzip}: {e}")

                    # Pass 2: header-sniff fallback if nothing matched names
                    if num_found == 0 and not rows_accum:
                        sniffed = 0
                        for entry in mz.namelist():
                            if not entry.lower().endswith(".csv"):
                                continue
                            blob = mz.read(entry)
                            if not looks_like_ownership_csv_bytes(blob):
                                continue
                            sniffed += 1
                            try:
                                df = read_csv_bytes_month(
                                    blob,
                                    source_name=f"{year:04d}-{month:02d}:{Path(entry).name}",
                                    month=month, year=year
                                )
                                rows_accum.append(df)
                            except Exception as e:
                                print(f"[warn] Sniffed member failed {entry} in {mzip}: {e}")
                        print(f"[month] {year:04d}-{month:02d} → sniffed {sniffed} ownership file(s)")

                    print(f"[month] {year:04d}-{month:02d} → {num_found} ownership file(s)")
                    if rows_accum:
                        out_df = pd.concat(rows_accum, ignore_index=True)
                        out_path = OWN_DIR / out_name
                        out_df.to_csv(out_path, index=False)
                        written_monthlies.add((year, month))
                        summary_rows.append({
                            "file": out_path.name,
                            "rows": len(out_df),
                            "pct_null_role": out_df["role"].isna().mean() * 100.0,
                            "pct_null_ownership_pct": out_df["ownership_percentage"].isna().mean() * 100.0,
                        })
            except KeyError:
                print(f"[warn] Missing {mzip} inside {yearly_zip.name}; continuing")

# ----------------------------- Coverage (last 18 months) -----------------------
from pandas import PeriodIndex
if written_monthlies:
    cov = (pd.DataFrame(list(written_monthlies), columns=["year","month"])
             .sort_values(["year","month"])
             .assign(ym=lambda d: PeriodIndex.from_fields(
                 year=d["year"].to_numpy(), month=d["month"].to_numpy(), freq="M"
             ).astype(str)))
    print("\n[coverage written] last 18 months:")
    print(cov.tail(18).to_string(index=False))
else:
    print("\n[coverage written] No monthly outputs written. Check patterns and zip contents.")

# ----------------------------- Combine (post-concat normalize) -----------------
all_monthlies = sorted(OWN_DIR.glob("ownership_download_*.csv"))
if not all_monthlies:
    raise RuntimeError("No monthly ownership_download_YYYY_MM.csv files found to combine.")

frames = []
for p in tqdm(all_monthlies, desc="Combining monthly CSVs"):
    frames.append(pd.read_csv(
        p,
        dtype={
            "cms_certification_number":"string",
            "provider_name":"string",
            "role":"string",
            "owner_type":"string",
            "owner_name":"string",
            "ownership_percentage":"string",
            "source_file":"string",
        },
        low_memory=False
    ))

combined = pd.concat(frames, ignore_index=True)

# Post-concat normalization (preserve canonical roles if already present)
combined["cms_certification_number"] = (
    combined["cms_certification_number"].astype(str)
      .str.replace(r"\D","", regex=True)
      .str.zfill(6)
)

role_raw = combined["role"].fillna("").str.upper()
# Stage 0: keep canonical where already present
combined["role"] = role_raw.where(role_raw.isin({"DIRECT","INDIRECT","MANAGER","OTHER"}), None)

# Stage 1: map long labels to canonical where not already canonical
mask = combined["role"].isna()
r = role_raw[mask]
mapped = pd.Series("OTHER", index=r.index, dtype="object")
mapped[r.str.contains("INDIRECT OWNERSHIP")] = "INDIRECT"
mapped[r.str.contains("DIRECT OWNERSHIP")]   = "DIRECT"
mapped[r.str.contains("OPERATIONAL/MANAGERIAL CONTROL") |
       r.str.contains("MANAGING EMPLOYEE")]  = "MANAGER"
combined.loc[mask, "role"] = mapped

# Ownership % to numeric
pct = combined["ownership_percentage"].fillna("")
pct = pct.str.replace("%","", regex=False).str.replace(",","", regex=False).str.strip()
pct = pct.mask(pct.eq("") | pct.str.contains("NO PERCENTAGE", case=False))
combined["ownership_percentage"] = pd.to_numeric(pct, errors="coerce")

# Dates (specify formats where safe)
if "date" in combined.columns:
    combined["date"] = pd.to_datetime(combined["date"], errors="coerce", format="%Y-%m-%d")
if "processing_date" in combined.columns:
    combined["processing_date"] = pd.to_datetime(combined["processing_date"], errors="coerce", format="%Y-%m-%d")
if "association_date" in combined.columns:
    combined["association_date"] = pd.to_datetime(combined["association_date"], errors="coerce")

combined = combined.sort_values(["cms_certification_number","date","owner_name"])

# Write outputs
combined.to_csv(OUT_COMBINED_CSV, index=False)
print(f"\n[save] wrote combined CSV: {OUT_COMBINED_CSV} ({len(combined):,} rows)")

# Optional Parquet
try:
    combined2 = combined.convert_dtypes(dtype_backend="pyarrow")
    combined2.to_parquet(OUT_COMBINED_PQ, index=False)
    print(f"[save] wrote combined Parquet: {OUT_COMBINED_PQ}")
except Exception as e:
    print(f"[info] Parquet not written (install pyarrow to enable): {e}")

# ----------------------------- QC Summaries ------------------------------------
null_df = pd.DataFrame(summary_rows)
if not null_df.empty:
    print("\n--- ROLE NULL % (by file) ---")
    print(null_df["pct_null_role"].describe().round(2))
    print("\n--- OWNERSHIP % NULL % (by file) ---")
    print(null_df["pct_null_ownership_pct"].describe().round(2))

role_nulls = (
    combined.assign(is_null_pct = combined["ownership_percentage"].isna())
            .groupby("role")["is_null_pct"]
            .agg(total="size", nulls="sum")
)
role_nulls["pct_null"] = (role_nulls["nulls"] / role_nulls["total"] * 100.0).round(2)
print("\n--- OWNERSHIP % NULL (by normalized role) ---")
print(role_nulls[["total","nulls","pct_null"]].sort_values("pct_null", ascending=False))

[paths] RAW_DIR=C:\Users\Owner\OneDrive\NursingHomeData
[paths] NH_ZIP_DIR=C:\Users\Owner\OneDrive\NursingHomeData\nh-compare
[paths] OWN_DIR=C:\Users\Owner\OneDrive\NursingHomeData\ownership-files
[scan] Found 9 yearly nh_archive_*.zip files


Yearly archives:   0%|          | 0/9 [00:00<?, ?it/s]

[month] 2020-10 → scanning ownership CSVs...
[month] 2020-10 → 1 ownership file(s)

[coverage written] last 18 months:
 year  month      ym
 2024      1 2024-01
 2024      2 2024-02
 2024      3 2024-03
 2024      4 2024-04
 2024      5 2024-05
 2024      6 2024-06
 2024      7 2024-07
 2024      8 2024-08
 2024      9 2024-09
 2024     10 2024-10
 2024     11 2024-11
 2024     12 2024-12
 2025      2 2025-02
 2025      3 2025-03
 2025      4 2025-04
 2025      5 2025-05
 2025      6 2025-06
 2025      7 2025-07


Combining monthly CSVs:   0%|          | 0/200 [00:00<?, ?it/s]

  combined["association_date"] = pd.to_datetime(combined["association_date"], errors="coerce")



[save] wrote combined CSV: C:\Users\Owner\OneDrive\NursingHomeData\ownership-files\ownership_combined.csv (31,284,982 rows)
[save] wrote combined Parquet: C:\Users\Owner\OneDrive\NursingHomeData\ownership-files\ownership_combined.parquet

--- ROLE NULL % (by file) ---
count    1.0
mean     0.0
std      NaN
min      0.0
25%      0.0
50%      0.0
75%      0.0
max      0.0
Name: pct_null_role, dtype: float64

--- OWNERSHIP % NULL % (by file) ---
count     1.00
mean     86.79
std        NaN
min      86.79
25%      86.79
50%      86.79
75%      86.79
max      86.79
Name: pct_null_ownership_pct, dtype: float64

--- OWNERSHIP % NULL (by normalized role) ---
             total     nulls  pct_null
role                                  
MANAGER    7887189   7887189    100.00
OTHER     13155001  13155001    100.00
INDIRECT   2942638   2383910     81.01
DIRECT     7300154   3684661     50.47
