In [1]:
from __future__ import annotations

import re
import json
import zipfile
from pathlib import Path

import numpy as np
import pandas as pd

pd.set_option("display.max_columns", 220)
pd.set_option("display.width", 180)

YEARS = list(range(2010, 2016))

HWK3_ROOT = Path.cwd()
CODE_DIR = HWK3_ROOT / "code"
CACHE_DIR = HWK3_ROOT / "data" / "cache"
PROCESSED_DIR = HWK3_ROOT / "data" / "processed"

CODE_DIR.mkdir(parents=True, exist_ok=True)
CACHE_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

# Rebuild switches
FORCE_REBUILD_ENROLL = False
FORCE_REBUILD_LAND   = False
FORCE_REBUILD_STARS  = True

OUT_PLAN_COUNTY_YEAR = PROCESSED_DIR / "hw3_plan_county_year_2010_2015.csv"
OUT_PLAN_YEAR        = PROCESSED_DIR / "hw3_plan_year_2010_2015.csv"
OUT_CONTRACT_RATINGS = PROCESSED_DIR / "hw3_contract_ratings_2010_2015.csv"
OUT_RATING_DIST      = PROCESSED_DIR / "hw3_rating_distribution_2010_2015.csv"
OUT_RD_2010          = PROCESSED_DIR / "hw3_rd_2010_threshold_samples.csv"
OUT_MANIFEST         = PROCESSED_DIR / "hw3_build_manifest.json"

print("HWK3_ROOT:", HWK3_ROOT)
print("YEARS:", YEARS)
print("PROCESSED_DIR:", PROCESSED_DIR)
print("CACHE_DIR:", CACHE_DIR)
print("FORCE_REBUILD_ENROLL:", FORCE_REBUILD_ENROLL)
print("FORCE_REBUILD_LAND:", FORCE_REBUILD_LAND)
print("FORCE_REBUILD_STARS:", FORCE_REBUILD_STARS)

manifest: dict = {"ma_root": None, "files_used": {}, "outputs": {}}

HWK3_ROOT: /home/rpat638/econ470/a0/work/hwk3
YEARS: [2010, 2011, 2012, 2013, 2014, 2015]
PROCESSED_DIR: /home/rpat638/econ470/a0/work/hwk3/data/processed
CACHE_DIR: /home/rpat638/econ470/a0/work/hwk3/data/cache
FORCE_REBUILD_ENROLL: False
FORCE_REBUILD_LAND: False
FORCE_REBUILD_STARS: True


In [2]:
def pick_existing(paths: list[Path]) -> Path:
    for p in paths:
        if p.exists():
            return p
    raise FileNotFoundError("Could not find MA_ROOT. Tried:\n" + "\n".join(map(str, paths)))

CANDIDATE_MA_ROOTS = [
    Path("/scion/5261/econ470001/ma-data/ma"),
    HWK3_ROOT.parent / "ma-data" / "ma",
    HWK3_ROOT.parent.parent / "ma-data" / "ma",
]

MA_ROOT = pick_existing(CANDIDATE_MA_ROOTS)

ENROLL_DIR    = MA_ROOT / "enrollment"
LANDSCAPE_DIR = MA_ROOT / "landscape"
STAR_DIR      = MA_ROOT / "star-ratings"

manifest["ma_root"] = str(MA_ROOT)

print("MA_ROOT:", MA_ROOT)
print("ENROLL_DIR exists:", ENROLL_DIR.exists(), ENROLL_DIR)
print("LANDSCAPE_DIR exists:", LANDSCAPE_DIR.exists(), LANDSCAPE_DIR)
print("STAR_DIR exists:", STAR_DIR.exists(), STAR_DIR)

MA_ROOT: /scion/5261/econ470001/ma-data/ma
ENROLL_DIR exists: True /scion/5261/econ470001/ma-data/ma/enrollment
LANDSCAPE_DIR exists: True /scion/5261/econ470001/ma-data/ma/landscape
STAR_DIR exists: True /scion/5261/econ470001/ma-data/ma/star-ratings


In [3]:
def norm_colname(s: str) -> str:
    s = str(s).strip().lower()
    s = re.sub(r"[^a-z0-9]+", "_", s)
    s = re.sub(r"_+", "_", s).strip("_")
    return s

def _make_unique(cols: list[str]) -> list[str]:
    seen = {}
    out = []
    for c in cols:
        if c not in seen:
            seen[c] = 0
            out.append(c)
        else:
            seen[c] += 1
            out.append(f"{c}_dup{seen[c]}")
    return out

def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    cols = [norm_colname(c) for c in df.columns]
    df.columns = _make_unique(cols)
    return df

def to_num(x):
    if isinstance(x, pd.DataFrame):
        if x.shape[1] == 0:
            return pd.Series([np.nan] * len(x), index=x.index)
        x = x.iloc[:, 0]
    return pd.to_numeric(x, errors="coerce")

def to_num_clean(x):
    if isinstance(x, pd.DataFrame):
        if x.shape[1] == 0:
            return pd.Series([np.nan] * len(x), index=x.index)
        x = x.iloc[:, 0]
    s = x.astype(str)
    s = s.str.replace(r"[,%$]", "", regex=True)
    s = s.str.replace(r"[^0-9\.\-]", "", regex=True)
    return pd.to_numeric(s, errors="coerce")

def clean_contract_id(s: pd.Series) -> pd.Series:
    s = s.astype(str).str.strip().str.upper()
    s = s.str.replace(r"[^A-Z0-9]", "", regex=True)
    s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})
    return s

def clean_plan_id(s: pd.Series) -> pd.Series:
    s = s.astype(str).str.strip()
    s = s.str.replace(r"[^0-9]", "", regex=True)
    s = s.replace({"": np.nan, "nan": np.nan, "NaN": np.nan})
    s = s.apply(lambda v: v.zfill(3) if isinstance(v, str) else v)
    s = s.where(s.str.len() == 3, np.nan)
    return s

def coerce_fips(s: pd.Series) -> pd.Series:
    s = s.astype(str).str.strip()
    s = s.str.replace(r"[^0-9]", "", regex=True)
    s = s.replace({"": np.nan, "nan": np.nan, "NaN": np.nan})
    s = s.apply(lambda v: v.zfill(5) if isinstance(v, str) else v)
    s = s.where(s.str.len() == 5, np.nan)
    return s

def safe_rglob(root: Path, pattern: str, limit: int = 200000) -> list[Path]:
    if not root.exists():
        return []
    out: list[Path] = []
    try:
        for p in root.rglob(pattern):
            out.append(p)
            if len(out) >= limit:
                break
    except Exception as e:
        print("rglob failed:", root, pattern, "err:", e)
    return sorted(out)

def parse_year_month_from_name(name: str) -> tuple[int | None, int | None]:
    n = name.lower()
    pats = [
        r"(?P<year>20\d{2})[_-](?P<month>0[1-9]|1[0-2])(?=[^0-9]|$)",
        r"(?P<year>20\d{2})(?P<month>0[1-9]|1[0-2])(?=[^0-9]|$)",
        r"(?P<month>0[1-9]|1[0-2])[_-](?P<year>20\d{2})(?=[^0-9]|$)",
    ]
    for pat in pats:
        m = re.search(pat, n)
        if m:
            return int(m.group("year")), int(m.group("month"))
    return None, None

def pick_col(cols: list[str], must: list[str], prefer: list[str] | None = None, avoid: list[str] | None = None) -> str | None:
    prefer = prefer or []
    avoid = avoid or []
    best, best_score = None, -10**9
    for c in cols:
        cl = c.lower()
        if any(a in cl for a in avoid):
            continue
        if not all(m in cl for m in must):
            continue
        score = 0
        for p in prefer:
            if p in cl:
                score += 2
        for m in must:
            if m in cl:
                score += 1
        if score > best_score:
            best_score = score
            best = c
    return best

In [4]:
CHUNKSIZE = 500_000

def _detect_delim_from_bytes(b: bytes) -> str:
    s = b.decode("latin1", errors="ignore")
    counts = {",": s.count(","), "\t": s.count("\t"), "|": s.count("|"), ";": s.count(";")}
    return max(counts, key=counts.get)

def safe_read_csv(path: Path, *, skiprows: int = 0, header: int | None = 0, nrows: int | None = None, usecols=None) -> pd.DataFrame:
    with open(path, "rb") as f:
        b = f.read(12000)
    sep = _detect_delim_from_bytes(b)

    for enc in ["utf-8", "cp1252", "latin1"]:
        try:
            return pd.read_csv(
                path,
                dtype=str,
                skiprows=skiprows,
                header=header,
                sep=sep,
                engine="c",
                encoding=enc,
                encoding_errors="replace",
                nrows=nrows,
                usecols=usecols,
            )
        except Exception:
            pass

    return pd.read_csv(
        path,
        dtype=str,
        skiprows=skiprows,
        header=header,
        sep=sep,
        engine="python",
        encoding="latin1",
        encoding_errors="replace",
        on_bad_lines="skip",
        nrows=nrows,
        usecols=usecols,
    )

def detect_header_row_csv(path: Path, nrows_scan: int = 60) -> int:
    try:
        raw = safe_read_csv(path, skiprows=0, header=None, nrows=nrows_scan)
    except Exception:
        return 0

    keys = ["contract", "plan", "pbp", "enroll", "enrollment", "fips", "county", "star", "rating"]
    bad  = ["dictionary", "layout", "readme"]

    best_i, best_score = 0, -10**9
    for i in range(min(len(raw), nrows_scan)):
        row = raw.iloc[i].astype(str).str.lower()
        score = 0
        for k in keys:
            score += row.str.contains(k, na=False).sum()
        for b in bad:
            score -= 10 * row.str.contains(b, na=False).sum()
        if score > best_score:
            best_score = score
            best_i = i

    return int(best_i)

def read_csv_autoheader(path: Path, nrows_scan: int = 60) -> pd.DataFrame:
    h = detect_header_row_csv(path, nrows_scan=nrows_scan)
    if h == 0:
        return safe_read_csv(path, skiprows=0, header=0)
    return safe_read_csv(path, skiprows=h, header=0)

def read_any(path: Path, nrows_scan: int = 60) -> pd.DataFrame:
    if path.suffix.lower() == ".csv":
        return read_csv_autoheader(path, nrows_scan=nrows_scan)
    if path.suffix.lower() == ".parquet":
        return pd.read_parquet(path)
    raise ValueError("Unsupported: " + str(path))

def read_first_csv_inside_zip(zip_path: Path) -> pd.DataFrame:
    with zipfile.ZipFile(zip_path, "r") as z:
        names = [n for n in z.namelist() if n.lower().endswith(".csv")]
        if not names:
            raise FileNotFoundError("No CSV inside zip: " + str(zip_path))
        names_sorted = sorted(names, key=lambda s: (("contract" not in s.lower()), ("star" not in s.lower()), len(s)))
        target = names_sorted[0]
        with z.open(target) as f:
            return pd.read_csv(f, dtype=str, encoding_errors="replace")

In [5]:
def _score_enroll_candidate(path: Path) -> int:
    nm = path.name.lower()
    if "contract_info" in nm or "dictionary" in nm or "layout" in nm or "readme" in nm:
        return -10**9

    score = 0
    if "enroll" in nm or "enrollment" in nm:
        score += 50
    if "monthly" in nm:
        score += 5

    try:
        raw = safe_read_csv(path, skiprows=0, header=None, nrows=35)
    except Exception:
        return -10**9

    best = -10**9
    for i in range(min(len(raw), 35)):
        row = raw.iloc[i].astype(str).str.lower()
        s = 0
        s += 4 * row.str.contains("contract", na=False).sum()
        s += 4 * row.str.contains("plan", na=False).sum()
        s += 8 * row.str.contains("enroll", na=False).sum()
        s += 2 * row.str.contains("pbp", na=False).sum()
        s += 2 * row.str.contains("fips", na=False).sum()
        if row.str.contains("info", na=False).sum() > 0:
            s -= 30
        best = max(best, s)

    if best < 10:
        return -10**9

    return int(score + best)

def find_monthly_enrollment_files(year: int) -> dict[int, Path]:
    files = safe_rglob(ENROLL_DIR, "*.csv", limit=200000) + safe_rglob(ENROLL_DIR, "*.parquet", limit=200000)

    buckets: dict[int, list[Path]] = {m: [] for m in range(1, 13)}
    for p in files:
        y, mo = parse_year_month_from_name(p.name)
        if y == year and mo is not None and 1 <= mo <= 12:
            buckets[mo].append(p)

    chosen: dict[int, Path] = {}
    for mo, hits in buckets.items():
        if not hits:
            continue
        scored = [(_score_enroll_candidate(p), p) for p in hits]
        scored.sort(key=lambda t: t[0], reverse=True)
        best_score, best_path = scored[0]
        if best_score <= -10**8:
            best_path = sorted(hits, key=lambda q: (len(q.name), str(q)))[0]
        chosen[mo] = best_path

    return chosen

picked_2010 = find_monthly_enrollment_files(2010)
print("2010 months found:", sorted(picked_2010.keys()))
print("2010 month 1 pick:", picked_2010.get(1))

2010 months found: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
2010 month 1 pick: /scion/5261/econ470001/ma-data/ma/enrollment/Extracted Data/CPSC_Enrollment_Info_2010_01.csv


In [6]:
def _detect_sep_and_encoding(path: Path) -> tuple[str, str]:
    with open(path, "rb") as f:
        b = f.read(12000)
    sep = _detect_delim_from_bytes(b)

    for enc in ["utf-8", "cp1252", "latin1"]:
        try:
            _ = pd.read_csv(path, dtype=str, sep=sep, engine="c", encoding=enc, encoding_errors="replace", nrows=5)
            return sep, enc
        except Exception:
            continue

    return sep, "latin1"

def _find_enrollment_cols_norm(norm_cols: list[str]) -> tuple[str, str, str, str]:
    c_contract = pick_col(norm_cols, must=["contract"], prefer=["id", "number"])
    c_plan = pick_col(norm_cols, must=["plan"], prefer=["id"], avoid=["name"])
    if c_plan is None:
        c_plan = pick_col(norm_cols, must=["pbp"], prefer=[], avoid=["name"])

    c_enroll = pick_col(norm_cols, must=["enroll"], prefer=[], avoid=["eligible", "eligib"])
    if c_enroll is None:
        c_enroll = pick_col(norm_cols, must=["enrollment"], prefer=[], avoid=["eligible", "eligib"])

    c_fips = None
    for cand in ["fips_state_county_code", "fips", "county_fips"]:
        if cand in norm_cols:
            c_fips = cand
            break
    if c_fips is None:
        c_fips = pick_col(norm_cols, must=["fips"], prefer=[], avoid=[])

    if c_fips is None:
        c_fips = "fips_missing"

    return c_contract, c_plan, c_enroll, c_fips

def load_enrollment_month(path: Path) -> pd.DataFrame:
    h = detect_header_row_csv(path, nrows_scan=80)
    skip = h if h > 0 else 0
    sep, enc = _detect_sep_and_encoding(path)

    head = pd.read_csv(
        path, dtype=str, sep=sep, engine="c", encoding=enc, encoding_errors="replace",
        skiprows=skip, header=0, nrows=5
    )

    orig_cols = list(head.columns)
    norm_cols = [norm_colname(c) for c in orig_cols]

    norm_to_idx = {}
    for i, nc in enumerate(norm_cols):
        if nc not in norm_to_idx:
            norm_to_idx[nc] = i

    c_contract, c_plan, c_enroll, c_fips = _find_enrollment_cols_norm(norm_cols)

    if c_contract is None or c_plan is None or c_enroll is None:
        raise KeyError(f"Enrollment file missing columns. file={path} norm_cols_sample={norm_cols[:80]}")

    idxs = [norm_to_idx[c_contract], norm_to_idx[c_plan], norm_to_idx[c_enroll]]
    has_fips = (c_fips in norm_to_idx)
    if has_fips:
        idxs.append(norm_to_idx[c_fips])

    agg: dict[tuple[str, str, str], float] = {}

    reader = pd.read_csv(
        path,
        dtype=str,
        sep=sep,
        engine="c",
        encoding=enc,
        encoding_errors="replace",
        skiprows=skip,
        header=0,
        usecols=idxs,
        chunksize=CHUNKSIZE,
    )

    for chunk in reader:
        chunk = normalize_columns(chunk)

        d = pd.DataFrame()
        d["contractid"] = clean_contract_id(chunk[c_contract])
        d["planid"] = clean_plan_id(chunk[c_plan])
        d["enrollment"] = to_num(chunk[c_enroll])

        if has_fips:
            d["fips"] = coerce_fips(chunk[c_fips])
        else:
            d["fips"] = np.nan

        d = d.dropna(subset=["contractid", "planid", "fips"])
        d = d[d["fips"] != "00000"]

        g = d.groupby(["contractid", "planid", "fips"], as_index=False)["enrollment"].sum()

        for row in g.itertuples(index=False):
            key = (row.contractid, row.planid, row.fips)
            agg[key] = agg.get(key, 0.0) + (0.0 if pd.isna(row.enrollment) else float(row.enrollment))

    out = pd.DataFrame([(k[0], k[1], k[2], v) for k, v in agg.items()],
                       columns=["contractid", "planid", "fips", "enrollment"])
    return out

p = picked_2010[1]
h = detect_header_row_csv(p, nrows_scan=80)
sep, enc = _detect_sep_and_encoding(p)
head = pd.read_csv(p, dtype=str, sep=sep, engine="c", encoding=enc, encoding_errors="replace", skiprows=h, header=0, nrows=3)
print("Picked file:", p.name)
print("Raw columns:", list(head.columns))
print("Normalized:", [norm_colname(c) for c in head.columns])

Picked file: CPSC_Enrollment_Info_2010_01.csv
Raw columns: ['Contract Number', 'Plan ID', 'SSA State County Code', 'FIPS State County Code', 'State', 'County', 'Enrollment']
Normalized: ['contract_number', 'plan_id', 'ssa_state_county_code', 'fips_state_county_code', 'state', 'county', 'enrollment']


In [7]:
def build_plan_county_year_enrollment(year: int) -> tuple[pd.DataFrame, dict]:
    files_by_month = find_monthly_enrollment_files(year)
    months = sorted(files_by_month.keys())

    meta = {
        "year": year,
        "months_found": months,
        "month_files": {m: str(files_by_month[m]) for m in months},
    }

    if not months:
        raise FileNotFoundError(f"No monthly enrollment files found for {year} under {ENROLL_DIR}")

    parts = []
    for m in months:
        p = files_by_month[m]
        d = load_enrollment_month(p)
        d["month"] = m
        parts.append(d)

    allm = pd.concat(parts, ignore_index=True)

    g = (
        allm.groupby(["contractid", "planid", "fips"], as_index=False)
           .agg(enroll_sum=("enrollment", "sum"),
                months_observed=("month", "nunique"))
    )
    g["avg_enrollment"] = g["enroll_sum"] / g["months_observed"].replace(0, np.nan)
    g["year"] = year

    g = g[["year", "contractid", "planid", "fips", "months_observed", "avg_enrollment"]].copy()
    return g, meta

In [8]:
CACHE_ENROLL = CACHE_DIR / "hw3_plan_county_year_enrollment_2010_2015.csv"
CACHE_META   = CACHE_DIR / "hw3_enrollment_file_manifest.json"

if FORCE_REBUILD_ENROLL:
    for p in [CACHE_ENROLL, CACHE_META]:
        if p.exists():
            p.unlink()
            print("Deleted:", p)

if CACHE_ENROLL.exists():
    enroll_cty = pd.read_csv(CACHE_ENROLL, dtype={"contractid": str, "planid": str, "fips": str})
    meta_enroll = json.loads(CACHE_META.read_text()) if CACHE_META.exists() else {}
    print("Loaded cached:", CACHE_ENROLL, "rows:", enroll_cty.shape[0])
else:
    parts = []
    meta_enroll = {}
    for y in YEARS:
        print("Building enrollment for year:", y)
        d, meta = build_plan_county_year_enrollment(y)
        parts.append(d)
        meta_enroll[str(y)] = meta
    enroll_cty = pd.concat(parts, ignore_index=True)
    enroll_cty.to_csv(CACHE_ENROLL, index=False)
    CACHE_META.write_text(json.dumps(meta_enroll, indent=2))
    print("Wrote cache:", CACHE_ENROLL)

manifest["files_used"]["enrollment"] = meta_enroll

print(enroll_cty.head())
print("Rows:", enroll_cty.shape[0], "| Years:", sorted(enroll_cty["year"].unique().tolist()))

Loaded cached: /home/rpat638/econ470/a0/work/hwk3/data/cache/hw3_plan_county_year_enrollment_2010_2015.csv rows: 14090286
   year contractid planid   fips  months_observed  avg_enrollment
0  2010      E0654    801  01001               12             0.0
1  2010      E0654    801  01003               12             0.0
2  2010      E0654    801  01005               12             0.0
3  2010      E0654    801  01007               12             0.0
4  2010      E0654    801  01009               12             0.0
Rows: 14090286 | Years: [2010, 2011, 2012, 2013, 2014, 2015]


In [9]:
enroll_cty = enroll_cty.copy()
enroll_cty["avg_enrollment"] = to_num(enroll_cty["avg_enrollment"])

county_totals = (
    enroll_cty.groupby(["year", "fips"], as_index=False)["avg_enrollment"]
              .sum()
              .rename(columns={"avg_enrollment": "avg_enrolled"})
)

plan_county = enroll_cty.merge(county_totals, on=["year", "fips"], how="left", validate="m:1")
plan_county["mkt_share"] = plan_county["avg_enrollment"] / plan_county["avg_enrolled"].replace(0, np.nan)

chk = plan_county.groupby(["year", "fips"])["mkt_share"].sum().reset_index()
print("Mean county share-sum by year (should be ~1):")
print(chk.groupby("year")["mkt_share"].mean().round(6))

Mean county share-sum by year (should be ~1):
year
2010    0.993812
2011    0.995668
2012    0.995359
2013    0.994740
2014    0.994431
2015    0.992583
Name: mkt_share, dtype: float64


In [10]:
def _score_landscape_candidate(path: Path) -> int:
    nm = path.name.lower()
    if any(bad in nm for bad in ["dictionary", "layout", "readme"]):
        return -10**9
    score = 0
    if "land" in nm or "landscape" in nm:
        score += 10

    try:
        dfh = safe_read_csv(path, skiprows=detect_header_row_csv(path, nrows_scan=120), header=0, nrows=40)
        dfh = normalize_columns(dfh)
        cols = list(dfh.columns)
    except Exception:
        return -10**9

    score += 20 if any("contract" in c for c in cols) else 0
    score += 20 if any(("plan" in c or "pbp" in c) for c in cols) else 0
    score += 5 if any("type" in c for c in cols) else 0
    score += 5 if any("partd" in c for c in cols) else 0

    return score

def find_landscape_file_for_year(year: int) -> Path:
    files = (
        safe_rglob(LANDSCAPE_DIR, "*.csv", limit=200000)
        + safe_rglob(LANDSCAPE_DIR, "*.parquet", limit=200000)
        + safe_rglob(LANDSCAPE_DIR, "*.zip", limit=200000)
    )
    y = str(year)
    cand = [p for p in files if y in str(p)]
    if not cand:
        raise FileNotFoundError(f"No landscape candidates found for year {year} under {LANDSCAPE_DIR}")
    scored = [(_score_landscape_candidate(p), p) for p in cand]
    scored.sort(key=lambda t: t[0], reverse=True)
    best_score, best_path = scored[0]
    if best_score <= -10**8:
        best_path = sorted(cand, key=lambda q: (len(q.name), str(q)))[0]
    return best_path

def load_landscape_year(year: int) -> tuple[pd.DataFrame, dict]:
    p = find_landscape_file_for_year(year)
    df = read_first_csv_inside_zip(p) if p.suffix.lower() == ".zip" else read_any(p, nrows_scan=120)
    df = normalize_columns(df)
    cols = list(df.columns)

    c_contract = pick_col(cols, must=["contract"], prefer=["id", "number"])
    c_plan = pick_col(cols, must=["plan"], prefer=["id"], avoid=["name"])
    if c_plan is None:
        c_plan = pick_col(cols, must=["pbp"], prefer=[], avoid=["name"])

    c_plan_type = pick_col(cols, must=["type"], prefer=["plan"], avoid=[])
    if c_plan_type is None:
        c_plan_type = pick_col(cols, must=["plan"], prefer=["type"], avoid=["name"])

    c_partd = None
    for cand in ["partd", "part_d", "partd_yn", "part_d_yn"]:
        if cand in cols:
            c_partd = cand
            break

    if c_contract is None or c_plan is None:
        raise KeyError(f"Landscape missing contract/plan. year={year} file={p} cols_sample={cols[:80]}")

    out = pd.DataFrame()
    out["year"] = int(year)
    out["contractid"] = clean_contract_id(df[c_contract])
    out["planid"] = clean_plan_id(df[c_plan])
    out["plan_type"] = df[c_plan_type].astype(str) if c_plan_type and c_plan_type in df.columns else ""
    out["partd"] = to_num_clean(df[c_partd]) if c_partd and c_partd in df.columns else np.nan

    pt = out["plan_type"].astype(str).str.upper()
    out["hmo"] = np.where(pt.str.contains("HMO", na=False), 1.0, 0.0)

    out = out.dropna(subset=["contractid", "planid"])
    out = out.drop_duplicates(subset=["year", "contractid", "planid"])
    return out, {"file": str(p)}

CACHE_LAND = CACHE_DIR / "hw3_landscape_plan_year_2010_2015.csv"
CACHE_LAND_META = CACHE_DIR / "hw3_landscape_file_manifest.json"

if FORCE_REBUILD_LAND:
    for p in [CACHE_LAND, CACHE_LAND_META]:
        if p.exists():
            p.unlink()
            print("Deleted:", p)

if CACHE_LAND.exists():
    land = pd.read_csv(CACHE_LAND, dtype={"contractid": str, "planid": str})
    land_meta = json.loads(CACHE_LAND_META.read_text()) if CACHE_LAND_META.exists() else {}
    print("Loaded cached landscape:", CACHE_LAND, "rows:", land.shape[0])
else:
    parts = []
    land_meta = {}
    for y in YEARS:
        print("Loading landscape for year:", y)
        d, meta = load_landscape_year(y)
        parts.append(d)
        land_meta[str(y)] = meta
    land = pd.concat(parts, ignore_index=True)
    land.to_csv(CACHE_LAND, index=False)
    CACHE_LAND_META.write_text(json.dumps(land_meta, indent=2))
    print("Wrote cache:", CACHE_LAND)

manifest["files_used"]["landscape"] = land_meta

print(land.head())
print("Rows:", land.shape[0], "| Years:", sorted(pd.Series(land["year"]).dropna().unique().tolist()))

Loaded cached landscape: /home/rpat638/econ470/a0/work/hwk3/data/cache/hw3_landscape_plan_year_2010_2015.csv rows: 6399
   year contractid planid  plan_type  partd  hmo
0   NaN      H0104    004  Local PPO    NaN  0.0
1   NaN      H0104    008  Local PPO    NaN  0.0
2   NaN      H0104    002  Local PPO    NaN  0.0
3   NaN      H2762    015     PFFS *    NaN  0.0
4   NaN      H2762    019     PFFS *    NaN  0.0
Rows: 6399 | Years: []


In [11]:
STAR_RE = re.compile(r"^[A-Z][0-9]{4}$")

def _star_year_candidates(year: int) -> list[Path]:
    y = str(year)
    candidates = []

    # common folder structure
    for folder in [
        STAR_DIR / "Extracted Star Ratings" / y,
        STAR_DIR / "Extracted Star Ratings" / y / "summary",
        STAR_DIR / "Extracted Star Ratings",
    ]:
        if folder.exists():
            candidates += safe_rglob(folder, "*.csv", limit=200000)

    if not candidates:
        candidates = [p for p in safe_rglob(STAR_DIR, "*.csv", limit=200000) if y in str(p)]

    # de-dup
    return sorted(set(candidates))

def _best_contract_col(df: pd.DataFrame) -> tuple[str | None, float]:
    cols = list(df.columns)
    contract_like = [c for c in cols if "contract" in c]
    best_c = None
    best_score = -1.0
    for c in contract_like:
        v = clean_contract_id(df[c])
        cov = float(v.notna().mean())
        m = float(v.dropna().str.match(STAR_RE).mean()) if cov > 0 else 0.0
        score = 0.7 * m + 0.3 * cov
        if score > best_score:
            best_score = score
            best_c = c
    return best_c, best_score

def _best_star_col(df: pd.DataFrame) -> str | None:
    cols = list(df.columns)
    for cand in ["partc_score", "star_rating", "star", "overall_star", "overall_rating"]:
        if cand in cols:
            return cand

    best = None
    best_score = -10**9
    for c in cols:
        cl = c.lower()
        if any(t in cl for t in ["contract", "org", "state", "county", "fips", "name", "ssa"]):
            continue
        x = to_num_clean(df[c])
        if x.notna().mean() < 0.25:
            continue
        uniq = x.nunique(dropna=True)
        if uniq < 2 or uniq > 14:
            continue
        vals = sorted(set(x.dropna().unique().tolist()))
        half_ok = all(abs(v * 2 - round(v * 2)) < 1e-6 for v in vals[: min(10, len(vals))])
        in_range = ((x.dropna() >= 0) & (x.dropna() <= 5.5)).mean() if x.notna().any() else 0.0
        score = 0
        score += 30 if half_ok else 0
        score += 20 if in_range >= 0.85 else 0
        score += 10 if ("star" in cl or "rating" in cl) else 0
        score += uniq
        if score > best_score:
            best_score = score
            best = c
    return best

def _best_raw_col(df: pd.DataFrame) -> str | None:
    cols = list(df.columns)
    best = None
    best_score = -10**9
    for c in cols:
        cl = c.lower()
        if any(t in cl for t in ["contract", "org", "state", "county", "fips", "name", "ssa"]):
            continue
        if "star" in cl or "rating" in cl:
            continue
        x = to_num_clean(df[c])
        cov = x.notna().mean()
        uniq = x.nunique(dropna=True)
        if cov < 0.25 or uniq < 10:
            continue
        x2 = x.dropna()
        in_range = ((x2 >= 0) & (x2 <= 5.5)).mean() if len(x2) else 0.0
        score = uniq
        if "raw" in cl:
            score += 600
        if "summary" in cl:
            score += 250
        if "score" in cl:
            score += 150
        if in_range >= 0.85:
            score += 400
        if score > best_score:
            best_score = score
            best = c
    return best

def _measure_cols_for_mean(df: pd.DataFrame) -> list[str]:
    cols = list(df.columns)
    keep = []
    for c in cols:
        cl = c.lower()
        if any(t in cl for t in ["contract", "org", "state", "county", "fips", "name", "ssa"]):
            continue
        if "star" in cl or "rating" in cl:
            continue
        x = to_num_clean(df[c])
        cov = x.notna().mean()
        uniq = x.nunique(dropna=True)
        if cov < 0.25 or uniq < 10:
            continue
        x2 = x.dropna()
        in_range = ((x2 >= 0) & (x2 <= 5.5)).mean() if len(x2) else 0.0
        if in_range >= 0.85:
            keep.append(c)
    return keep

def _score_star_file(path: Path) -> tuple[float, dict]:
    try:
        dfh = read_any(path, nrows_scan=200)
        dfh = normalize_columns(dfh)
    except Exception:
        return -10**9, {"file": str(path), "error": "read_fail"}

    c_contract, cscore = _best_contract_col(dfh)
    c_star = _best_star_col(dfh)
    c_raw = _best_raw_col(dfh)
    measures = _measure_cols_for_mean(dfh)

    # require a decent contract column
    if c_contract is None or cscore < 0.10:
        return -10**9, {"file": str(path), "error": "no_contract"}

    score = 0.0
    score += 2000 * cscore
    score += 200 if c_star is not None else 0
    score += 300 if c_raw is not None else 0
    score += min(300, 10 * len(measures))

    meta = {
        "file": str(path),
        "contract_col": c_contract,
        "contract_score": float(cscore),
        "star_col": c_star,
        "raw_col": c_raw,
        "n_measures": int(len(measures)),
    }
    return score, meta

def find_star_file_for_year(year: int) -> tuple[Path, dict]:
    cand = _star_year_candidates(year)
    if not cand:
        raise FileNotFoundError(f"No star CSV candidates for year {year} under {STAR_DIR}")

    scored = []
    for p in cand:
        sc, meta = _score_star_file(p)
        scored.append((sc, p, meta))
    scored.sort(key=lambda t: t[0], reverse=True)

    best_sc, best_p, best_meta = scored[0]
    if best_sc <= -10**8:
        raise RuntimeError(f"Could not find a usable star file for year {year}. Best score too low.")

    # keep top few for debugging
    best_meta["top3"] = [{"score": float(scored[i][0]), "file": str(scored[i][1])} for i in range(min(3, len(scored)))]
    return best_p, best_meta

def load_star_year(year: int) -> tuple[pd.DataFrame, dict]:
    p, meta_pick = find_star_file_for_year(year)

    df = read_any(p, nrows_scan=200)
    df = normalize_columns(df)

    c_contract, cscore = _best_contract_col(df)
    if c_contract is None:
        raise KeyError(f"Chosen star file has no contract column after load. year={year} file={p}")

    c_star = _best_star_col(df)
    c_raw = _best_raw_col(df)
    measures = _measure_cols_for_mean(df)

    out = pd.DataFrame()
    out["year"] = int(year)
    out["contractid"] = clean_contract_id(df[c_contract])

    out["star_rating"] = to_num_clean(df[c_star]) if (c_star is not None and c_star in df.columns) else np.nan

    raw_source = None
    if c_raw is not None and c_raw in df.columns:
        out["raw_rating"] = to_num_clean(df[c_raw])
        raw_source = f"raw_col:{c_raw}"
    else:
        if len(measures) >= 5:
            out["raw_rating"] = pd.concat([to_num_clean(df[c]) for c in measures], axis=1).mean(axis=1, skipna=True)
            raw_source = f"mean_of_measures:n={len(measures)}"
        else:
            out["raw_rating"] = np.nan
            raw_source = "missing_raw"

    out = out.dropna(subset=["contractid"])
    out = out.groupby(["year", "contractid"], as_index=False).agg(
        star_rating=("star_rating", "first"),
        raw_rating=("raw_rating", "first"),
    )

    meta = {
        "file": str(p),
        "contract_col": c_contract,
        "contract_score": float(cscore),
        "star_col": c_star,
        "raw_source": raw_source,
        "n_measures": int(len(measures)),
        "top3": meta_pick.get("top3", []),
    }
    return out, meta

CACHE_STAR = CACHE_DIR / "hw3_contract_ratings_2010_2015.csv"
CACHE_STAR_META = CACHE_DIR / "hw3_star_file_manifest.json"

if FORCE_REBUILD_STARS:
    for p in [CACHE_STAR, CACHE_STAR_META]:
        if p.exists():
            p.unlink()
            print("Deleted:", p)

if CACHE_STAR.exists():
    stars = pd.read_csv(CACHE_STAR, dtype={"contractid": str})
    star_meta = json.loads(CACHE_STAR_META.read_text()) if CACHE_STAR_META.exists() else {}
    print("Loaded cached stars:", CACHE_STAR, "rows:", stars.shape[0])
else:
    parts = []
    star_meta = {}
    for y in YEARS:
        print("Loading star ratings for year:", y)
        d, meta = load_star_year(y)
        parts.append(d)
        star_meta[str(y)] = meta
    stars = pd.concat(parts, ignore_index=True)
    stars.to_csv(CACHE_STAR, index=False)
    CACHE_STAR_META.write_text(json.dumps(star_meta, indent=2))
    print("Wrote cache:", CACHE_STAR)

manifest["files_used"]["star_ratings"] = star_meta

print(stars.head())
print("Rows:", stars.shape[0])
print("Star missing rate:", float(stars["star_rating"].isna().mean()) if len(stars) else np.nan)
print("Raw missing rate:", float(stars["raw_rating"].isna().mean()) if len(stars) else np.nan)

Deleted: /home/rpat638/econ470/a0/work/hwk3/data/cache/hw3_contract_ratings_2010_2015.csv
Deleted: /home/rpat638/econ470/a0/work/hwk3/data/cache/hw3_star_file_manifest.json
Loading star ratings for year: 2010


  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})
  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})


Loading star ratings for year: 2011


  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})
  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})


Loading star ratings for year: 2012


  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})
  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})


Loading star ratings for year: 2013


  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})
  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})


Loading star ratings for year: 2014


  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})
  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})


Loading star ratings for year: 2015


  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})
  s = s.replace({"": np.nan, "NAN": np.nan, "NONE": np.nan, "NULL": np.nan})


Wrote cache: /home/rpat638/econ470/a0/work/hwk3/data/cache/hw3_contract_ratings_2010_2015.csv
Empty DataFrame
Columns: [year, contractid, star_rating, raw_rating]
Index: []
Rows: 0
Star missing rate: nan
Raw missing rate: nan
