In [1]:
from __future__ import annotations

import re
import gc
from pathlib import Path

import numpy as np
import pandas as pd

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)

YEARS = list(range(2014, 2020))

CACHE_DIR = Path("data/cache")
OUT_DIR   = Path("data/processed")
CACHE_DIR.mkdir(parents=True, exist_ok=True)
OUT_DIR.mkdir(parents=True, exist_ok=True)

CHUNKSIZE = 300_000

CANDIDATE_MA_ROOTS = [
    Path("/scion/5261/econ470001/ma-data/ma"),
    Path("/econ470/a0/work/ma-data/ma"),
    Path.cwd().parent / "ma-data" / "ma",
]

def pick_existing(paths: list[Path]) -> Path | None:
    for p in paths:
        if p.exists():
            return p
    return None

MA_ROOT = pick_existing(CANDIDATE_MA_ROOTS)
if MA_ROOT is None:
    raise FileNotFoundError("Could not find ma-data/ma. Update CANDIDATE_MA_ROOTS if needed.")

ENROLL_EXTRACTED = MA_ROOT / "enrollment" / "Extracted Data"
SAREA_EXTRACTED  = MA_ROOT / "service-area" / "Extracted Data"
PEN_EXTRACTED    = MA_ROOT / "penetration" / "Extracted Data"

print("MA_ROOT:", MA_ROOT)
print("ENROLL_EXTRACTED exists:", ENROLL_EXTRACTED.exists())
print("SAREA_EXTRACTED exists:", SAREA_EXTRACTED.exists())
print("PEN_EXTRACTED exists:", PEN_EXTRACTED.exists())

MA_ROOT: /scion/5261/econ470001/ma-data/ma
ENROLL_EXTRACTED exists: True
SAREA_EXTRACTED exists: True
PEN_EXTRACTED exists: True


In [2]:
def norm_colname(s: str) -> str:
    s = str(s).strip().lower()
    s = re.sub(r"\s+", "_", s)
    s = re.sub(r"[^a-z0-9_]", "", s)
    return s

def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = [norm_colname(c) for c in df.columns]
    return df

def first_existing_col(cols: list[str], candidates: list[str]) -> str | None:
    s = set(cols)
    for c in candidates:
        if c in s:
            return c
    return None

def coerce_id_series(x: pd.Series, width: int | None = None) -> pd.Series:
    s = x.astype(str).str.strip()
    s = s.replace({"nan": np.nan, "None": np.nan, "": np.nan})
    s = s.str.replace(r"\.0$", "", regex=True)
    if width is not None:
        s = s.str.zfill(width)
    return s

def to_numeric_clean(x: pd.Series) -> pd.Series:
    s = x.astype(str).str.replace(",", "", regex=False).str.strip()
    s = s.replace({"nan": np.nan, "None": np.nan, "": np.nan, "*": np.nan, "$-": np.nan, "-": np.nan})
    s = s.str.replace("$", "", regex=False)
    return pd.to_numeric(s, errors="coerce")

def _digits_zfill(x: pd.Series, width: int) -> pd.Series:
    s = x.astype(str).str.strip()
    s = s.replace({"nan": np.nan, "None": np.nan, "": np.nan})
    s = s.str.replace(r"\.0$", "", regex=True)
    s = s.str.replace(r"[^0-9]", "", regex=True)
    s = s.where(s.str.len() > 0, np.nan)
    return s.str.zfill(width)

def coerce_fips(df: pd.DataFrame) -> pd.Series:
    cols = list(df.columns)

    fips_col = first_existing_col(
        cols,
        [
            "fips",
            "fips_state_county_code",
            "fipsstatecountycode",
            "county_fips",
            "countyfips",
            "cnty_fips",
            "fipscounty",
            "fipscnty",
        ],
    )
    if fips_col is not None:
        return _digits_zfill(df[fips_col], 5)

    st_col = first_existing_col(cols, ["state_fips", "statefips", "fipsst", "fips_state"])
    ct_col = first_existing_col(cols, ["county_fips", "countyfips", "fipscnty", "fips_county"])

    if st_col is not None and ct_col is not None:
        st = _digits_zfill(df[st_col], 2)
        ct = _digits_zfill(df[ct_col], 3)
        out = (st.fillna("") + ct.fillna("")).replace({"": np.nan})
        return out

    return pd.Series([np.nan] * len(df), index=df.index, dtype="object")

In [3]:
def find_month_file(root: Path, year: int, month: int, kind: str) -> Path:
    m2 = f"{month:02d}"
    files = [p for p in root.rglob("*") if p.is_file()]

    if kind == "enroll":
        pats = [rf".*{year}.*{m2}.*enroll.*\.(csv|txt)$", rf".*enroll.*{year}.*{m2}.*\.(csv|txt)$"]
    elif kind == "sarea":
        pats = [
            rf".*{year}.*{m2}.*(service|sa|sarea|cnty_sa|service_area).*?\.(csv|txt)$",
            rf".*(service|sa|sarea|cnty_sa|service_area).*{year}.*{m2}.*?\.(csv|txt)$",
        ]
    else:
        raise ValueError("unknown kind")

    for pat in pats:
        rx = re.compile(pat, flags=re.I)
        hits = [p for p in files if rx.match(p.name)]
        if hits:
            hits.sort(key=lambda p: len(p.name))
            return hits[0]

    rx = re.compile(rf".*{year}.*{m2}.*\.(csv|txt)$", flags=re.I)
    hits = [p for p in files if rx.match(p.name)]
    if hits:
        hits.sort(key=lambda p: len(p.name))
        return hits[0]

    raise FileNotFoundError(f"Could not find {kind} file for {year}-{m2} under {root}")

def find_year_file(root: Path, year: int, kind: str) -> Path:
    files = [p for p in root.rglob("*") if p.is_file()]
    if kind != "penetration":
        raise ValueError("unknown kind")

    pats = [rf".*{year}.*penet.*\.(csv|txt)$", rf".*penet.*{year}.*\.(csv|txt)$"]
    for pat in pats:
        rx = re.compile(pat, flags=re.I)
        hits = [p for p in files if rx.match(p.name)]
        if hits:
            hits.sort(key=lambda p: len(p.name))
            return hits[0]

    rx = re.compile(rf".*{year}.*\.(csv|txt)$", flags=re.I)
    hits = [p for p in files if rx.match(p.name)]
    if hits:
        hits.sort(key=lambda p: len(p.name))
        return hits[0]

    raise FileNotFoundError(f"Could not find {kind} file for {year} under {root}")

def read_csv_any(path: Path, usecols=None) -> pd.DataFrame:
    return pd.read_csv(
        path,
        dtype=str,
        na_values=["*", "NA", "N/A", ""],
        keep_default_na=True,
        encoding_errors="replace",
        low_memory=False,
        usecols=usecols,
    )

In [4]:
def read_enroll_min(path: Path) -> pd.DataFrame:
    want = {
        "contract_number",
        "contract_id",
        "plan_id",
        "fips_state_county_code",
        "enrollment",
        "snp",
        "snp_type",
        "partc",
        "part_c",
        "plan_type",
        "contract_type",
    }

    df = read_csv_any(path, usecols=lambda c: norm_colname(c) in want)
    df = normalize_columns(df)

    contract_col = first_existing_col(list(df.columns), ["contract_id", "contract_number"])
    if contract_col is None:
        raise KeyError("Enrollment file missing contract column")

    df["contract_id"] = coerce_id_series(df[contract_col], None)
    df["plan_id"] = coerce_id_series(df["plan_id"], 3)
    df["fips"] = _digits_zfill(df.get("fips_state_county_code", pd.Series([np.nan] * len(df))), 5)

    df["enrollment"] = to_numeric_clean(df["enrollment"])

    keep = ["contract_id", "plan_id", "fips", "enrollment"]
    for c in ["snp", "snp_type", "partc", "part_c", "plan_type", "contract_type"]:
        if c in df.columns:
            keep.append(c)
    return df[keep]

def read_sarea_min(path: Path) -> pd.DataFrame:
    keep = {
        "contract_id",
        "contract_number",
        "plan_id",
        "plan",
        "planid",
        "plan_number",
        "fips",
        "fips_state_county_code",
        "fipscounty",
        "fipscnty",
        "county_fips",
        "cnty_fips",
    }

    df = read_csv_any(path, usecols=lambda c: norm_colname(c) in keep)
    df = normalize_columns(df)

    cols = list(df.columns)
    contract_col = first_existing_col(cols, ["contract_id", "contract_number", "contract"])
    if contract_col is None:
        raise KeyError(f"Service area missing contract column. Sample cols: {sorted(cols)[:60]}")
    df["contract_id"] = coerce_id_series(df[contract_col], None)

    plan_col = first_existing_col(cols, ["plan_id", "planid", "plan", "plan_number"])
    if plan_col is None:
        df["plan_id"] = np.nan
    else:
        df["plan_id"] = coerce_id_series(df[plan_col], 3)

    df["fips"] = coerce_fips(df)

    return df[["contract_id", "plan_id", "fips"]]

In [5]:
def flag_exclusions(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    df["plan_id"] = df["plan_id"].astype(str).str.strip().str.zfill(3)
    df["is_800_series"] = df["plan_id"].str.startswith("8")

    cols = list(df.columns)

    snp_col = first_existing_col(cols, ["snp", "snp_type"])
    if snp_col is None:
        df["is_snp"] = False
    else:
        s = df[snp_col].astype(str).str.lower().str.strip()
        df["is_snp"] = s.isin(["y", "yes", "1", "true"]) | (s.notna() & (s != "") & ~s.isin(["n", "no", "0", "false", "nan", "none"]))

    partc_col = first_existing_col(cols, ["partc", "part_c"])
    if partc_col is not None:
        s = df[partc_col].astype(str).str.lower().str.strip()
        df["is_pdp_only"] = s.isin(["n", "no", "0", "false"])
    else:
        plan_type_col = first_existing_col(cols, ["plan_type", "contract_type"])
        if plan_type_col is None:
            df["is_pdp_only"] = False
        else:
            s = df[plan_type_col].astype(str).str.lower().str.strip()
            df["is_pdp_only"] = s.str.contains(r"\bpdp\b", regex=True) | s.str.contains("prescription drug", regex=False)

    df["drop_hw2"] = df["is_snp"] | df["is_800_series"] | df["is_pdp_only"]
    return df

In [6]:
def build_plan_county_year(year: int) -> Path:
    out_path = CACHE_DIR / f"plan_county_year_{year}.csv"
    if out_path.exists():
        return out_path

    accum = None

    for m in range(1, 13):
        enroll_path = find_month_file(ENROLL_EXTRACTED, year, m, "enroll")
        sarea_path  = find_month_file(SAREA_EXTRACTED,  year, m, "sarea")

        enroll = read_enroll_min(enroll_path)
        sarea  = read_sarea_min(sarea_path)

        enroll = enroll.dropna(subset=["contract_id", "plan_id", "fips"])
        sarea  = sarea.dropna(subset=["contract_id", "fips"])

        has_plan = sarea["plan_id"].notna().any()
        if has_plan:
            sarea = sarea.dropna(subset=["plan_id"])
            merged = enroll.merge(sarea, on=["contract_id", "plan_id", "fips"], how="inner")
        else:
            merged = enroll.merge(sarea[["contract_id", "fips"]], on=["contract_id", "fips"], how="inner")

        merged["year"] = int(year)

        merged = flag_exclusions(merged)

        g = merged.groupby(["contract_id", "plan_id", "fips", "year"], as_index=False).agg(
            enroll_sum=("enrollment", "sum"),
            is_snp=("is_snp", "max"),
            is_800_series=("is_800_series", "max"),
            is_pdp_only=("is_pdp_only", "max"),
            drop_hw2=("drop_hw2", "max"),
        )

        g["months_observed"] = 1
        g["dec_enrollment"] = g["enroll_sum"] if m == 12 else np.nan

        if accum is None:
            accum = g
        else:
            keys = ["contract_id", "plan_id", "fips", "year"]
            accum = accum.merge(g, on=keys, how="outer", suffixes=("", "_new"))

            accum["enroll_sum"] = accum["enroll_sum"].fillna(0) + accum["enroll_sum_new"].fillna(0)
            accum["months_observed"] = accum["months_observed"].fillna(0) + accum["months_observed_new"].fillna(0)

            for c in ["is_snp", "is_800_series", "is_pdp_only", "drop_hw2"]:
                accum[c] = np.maximum(
                    pd.to_numeric(accum[c], errors="coerce").fillna(0),
                    pd.to_numeric(accum[f"{c}_new"], errors="coerce").fillna(0),
                ).astype(int)

            accum["dec_enrollment"] = accum["dec_enrollment"].combine_first(accum["dec_enrollment_new"])

            dropcols = [c for c in accum.columns if c.endswith("_new")]
            accum = accum.drop(columns=dropcols)

        del enroll, sarea, merged, g
        gc.collect()

    accum["avg_enrollment"] = accum["enroll_sum"] / accum["months_observed"].replace({0: np.nan})

    out = accum[
        [
            "contract_id",
            "plan_id",
            "fips",
            "year",
            "avg_enrollment",
            "dec_enrollment",
            "months_observed",
            "is_snp",
            "is_800_series",
            "is_pdp_only",
            "drop_hw2",
        ]
    ].copy()

    out["contract_id"] = out["contract_id"].astype(str).str.strip()
    out["plan_id"] = out["plan_id"].astype(str).str.strip().str.zfill(3)
    out["fips"] = out["fips"].astype(str).str.strip().str.zfill(5)
    out["year"] = pd.to_numeric(out["year"], errors="coerce").astype(int)

    for c in ["is_snp", "is_800_series", "is_pdp_only", "drop_hw2"]:
        out[c] = pd.to_numeric(out[c], errors="coerce").fillna(0).astype(int)

    for c in ["avg_enrollment", "dec_enrollment", "months_observed"]:
        out[c] = pd.to_numeric(out[c], errors="coerce")

    out.to_csv(out_path, index=False)

    del accum, out
    gc.collect()
    return out_path

In [7]:
def build_county_plan_counts_2014_2019() -> Path:
    out_path = OUT_DIR / "county_plan_counts_2014_2019.csv"
    if out_path.exists():
        return out_path

    counts = {}

    for y in YEARS:
        print("Counting plans. Year:", y)
        yp = build_plan_county_year(y)

        usecols = ["fips", "year", "contract_id", "plan_id", "drop_hw2"]
        for chunk in pd.read_csv(yp, dtype=str, usecols=usecols, chunksize=CHUNKSIZE):
            drop = chunk["drop_hw2"].astype(str).str.lower().str.strip().isin(["1", "true", "t", "yes", "y"])
            chunk = chunk.loc[~drop, ["fips", "year", "contract_id", "plan_id"]].dropna()

            chunk["plan_id"] = chunk["plan_id"].astype(str).str.zfill(3)
            chunk["plan_key"] = chunk["contract_id"].astype(str).str.strip() + "-" + chunk["plan_id"].astype(str).str.strip()

            chunk = chunk.drop_duplicates(subset=["fips", "year", "plan_key"])

            for (fips, year), g in chunk.groupby(["fips", "year"]):
                counts[(fips, year)] = counts.get((fips, year), 0) + int(g["plan_key"].nunique())

            del chunk
            gc.collect()

        gc.collect()

    out = pd.DataFrame([(k[0], k[1], v) for k, v in counts.items()], columns=["fips", "year", "plan_count"])
    out = out.sort_values(["year", "fips"]).reset_index(drop=True)
    out.to_csv(out_path, index=False)
    return out_path

In [8]:
def read_penetration_year(year: int) -> pd.DataFrame:
    path = find_year_file(PEN_EXTRACTED, year, "penetration")
    df = read_csv_any(path)
    df = normalize_columns(df)

    df["fips"] = coerce_fips(df)

    cols = list(df.columns)
    ma_col = first_existing_col(cols, ["ma_enrollment", "ma_enroll", "enrolled", "enroll"])
    elig_col = first_existing_col(cols, ["eligibles", "total_eligibles", "medicare_eligibles", "tot_eligibles"])

    if ma_col is None or elig_col is None:
        raise KeyError(f"Penetration file missing enrolled or eligibles. Col sample: {sorted(cols)[:100]}")

    out = df[["fips", ma_col, elig_col]].copy()
    out = out.rename(columns={ma_col: "ma_enrollment", elig_col: "eligibles"})
    out["ma_enrollment"] = to_numeric_clean(out["ma_enrollment"])
    out["eligibles"] = to_numeric_clean(out["eligibles"])
    out["year"] = int(year)
    out = out.dropna(subset=["fips"])
    out["fips"] = out["fips"].astype(str).str.zfill(5)
    return out

pen = pd.concat([read_penetration_year(y) for y in YEARS], ignore_index=True)
print("Penetration shape:", pen.shape)
print(pen.head())

Penetration shape: (19494, 4)
    fips  ma_enrollment  eligibles  year
0  01001         3000.0       9438  2014
1  01003        13183.0      41640  2014
2  01005          859.0       6004  2014
3  01007         1665.0       4599  2014
4  01009         4680.0      11193  2014


In [9]:
def build_county_hhi_ma_share_2014_2019(pen: pd.DataFrame) -> Path:
    out_path = OUT_DIR / "county_hhi_ma_share_2014_2019.csv"
    if out_path.exists():
        return out_path

    pen2 = pen.copy()
    pen2["year"] = pen2["year"].astype(int)
    pen2["fips"] = pen2["fips"].astype(str).str.zfill(5)

    rows = []

    for y in YEARS:
        print("HHI and MA share. Year:", y)
        yp = build_plan_county_year(y)

        usecols = ["fips", "year", "contract_id", "plan_id", "dec_enrollment", "avg_enrollment", "drop_hw2"]
        pieces = []

        for chunk in pd.read_csv(yp, dtype=str, usecols=usecols, chunksize=CHUNKSIZE):
            drop = chunk["drop_hw2"].astype(str).str.lower().str.strip().isin(["1", "true", "t", "yes", "y"])
            chunk = chunk.loc[~drop].copy()

            dec = pd.to_numeric(chunk["dec_enrollment"], errors="coerce")
            avg = pd.to_numeric(chunk["avg_enrollment"], errors="coerce")
            e = dec.fillna(avg).fillna(0.0)

            chunk["e"] = e
            chunk["e2"] = e * e

            tmp = (
                chunk.groupby(["fips", "year"], as_index=False)
                     .agg(total_enroll=("e", "sum"), sumsq_enroll=("e2", "sum"))
            )
            pieces.append(tmp)

            del chunk, tmp
            gc.collect()

        out = pd.concat(pieces, ignore_index=True)
        out = out.groupby(["fips", "year"], as_index=False).agg(
            total_enroll=("total_enroll", "sum"),
            sumsq_enroll=("sumsq_enroll", "sum"),
        )

        out["hhi"] = out["sumsq_enroll"] / (out["total_enroll"] * out["total_enroll"])
        out = out.rename(columns={"total_enroll": "ma_total_from_plans"})
        out["fips"] = out["fips"].astype(str).str.zfill(5)
        out["year"] = out["year"].astype(int)

        out = out.merge(pen2, on=["fips", "year"], how="left")
        out["ma_share"] = out["ma_enrollment"] / out["eligibles"]

        rows.append(out)

        del out, pieces
        gc.collect()

    final = pd.concat(rows, ignore_index=True).sort_values(["year", "fips"]).reset_index(drop=True)
    final.to_csv(out_path, index=False)
    return out_path

In [10]:
county_counts_path = build_county_plan_counts_2014_2019()
county_hhi_path = build_county_hhi_ma_share_2014_2019(pen)

print("Wrote:", county_counts_path)
print(pd.read_csv(county_counts_path, dtype=str, nrows=5))

print("Wrote:", county_hhi_path)
print(pd.read_csv(county_hhi_path, dtype=str, nrows=5))

Wrote: data/processed/county_plan_counts_2014_2019.csv
    fips  year plan_count
0  01001  2014        386
1  01003  2014        431
2  01005  2014        370
3  01007  2014        384
4  01009  2014        387
Wrote: data/processed/county_hhi_ma_share_2014_2019.csv
    fips  year ma_total_from_plans sumsq_enroll                  hhi ma_enrollment eligibles             ma_share
0  01001  2014              3027.0    1223071.0  0.13348326683022055        3000.0    9438.0   0.3178639542275906
1  01003  2014             13627.0   27941489.0  0.15046969271875732       13183.0   41640.0   0.3165946205571566
2  01005  2014               863.0     252081.0   0.3384687064042676         859.0    6004.0  0.14307128580946035
3  01007  2014              1661.0     484639.0  0.17566251443952183        1665.0    4599.0  0.36203522504892366
4  01009  2014              4653.0    3412299.0  0.15760898558544553        4680.0   11193.0   0.4181184668989547


In [11]:
gitignore_path = Path(".gitignore")
existing = gitignore_path.read_text().splitlines() if gitignore_path.exists() else []

rules = [
    "data/cache/",
]

new_lines = existing[:]
for r in rules:
    if r not in new_lines:
        new_lines.append(r)

gitignore_path.write_text("\n".join(new_lines) + "\n")

print("Updated .gitignore")
print("Keep these for HW2 analysis:")
print(" ", OUT_DIR / "county_plan_counts_2014_2019.csv")
print(" ", OUT_DIR / "county_hhi_ma_share_2014_2019.csv")
print("Do not push data/cache")

Updated .gitignore
Keep these for HW2 analysis:
  data/processed/county_plan_counts_2014_2019.csv
  data/processed/county_hhi_ma_share_2014_2019.csv
Do not push data/cache
