In [2]:
# Import paths
from pathlib import Path
import re
import numpy as np
import pandas as pd

In [3]:
pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)

In [4]:
# Set up paths
BASE = Path("../../..")

ENROLL_DIR      = BASE / "data" / "input" / "enrollment_2018"
SERVICE_AREA_DIR = BASE / "data" / "input" / "service_area_2018"

OUT_DIR = BASE / "data" / "output"
OUT_DIR.mkdir(parents=True, exist_ok=True)

YEAR   = 2018
MONTHS = [f"{m:02d}" for m in range(1, 13)]

In [14]:
# Read enrollment and contract 2018 data
def _clean_colnames(cols):
    """Standardize column names: lower-case, strip, replace spaces/punct with _."""
    out = []
    for c in cols:
        c2 = re.sub(r"[^0-9a-zA-Z]+", "_", str(c).strip()).strip("_").lower()
        out.append(c2)
    return out


def read_contract(path: Path) -> pd.DataFrame:
    """Read a single monthly Contract Info file (CPSC_Contract_Info_2018_MM.csv)."""
    df = pd.read_csv(path, dtype=str, encoding="latin1")
    df.columns = _clean_colnames(df.columns)

    # Harmonize common column-name variants
    ren = {
        "contract_id": "contractid",
        "contract":    "contractid",
        "plan_id":     "planid",
        "plan":        "planid",
        "organization_type": "org_type",
        "plan_type":   "plan_type",
        "offers_part_d": "partd",
        "snp_plan":    "snp",
        "organization_name": "org_name",
        "organization_marketing_name": "org_marketing_name",
        "parent_organization": "parent_org",
        "contract_effective_date": "contract_date",
    }
    df = df.rename(columns={k: v for k, v in ren.items() if k in df.columns})

    if "planid" in df.columns:
        df["planid"] = pd.to_numeric(df["planid"], errors="coerce")

    return df

In [19]:
def read_enroll(path: Path) -> pd.DataFrame:
    """Read a single monthly Enrollment Info file (CPSC_Enrollment_Info_2018_MM.csv)."""
    df = pd.read_csv(path, dtype=str, na_values=["*"], encoding="latin1")
    df.columns = _clean_colnames(df.columns)

    # Harmonize common column-name variants
    ren = {
        "contract_number": "contractid",
        "contract_id":     "contractid",
        "contract":        "contractid",
        "plan_id":         "planid",
        "plan":            "planid",
        "county_name":     "county",
        "state_abbr":      "state",
        "fips_state_county_code": "fips",
        "ssa_state_county_code":  "ssa",
    }
    df = df.rename(columns={k: v for k, v in ren.items() if k in df.columns})

    if "planid" in df.columns:
        df["planid"] = pd.to_numeric(df["planid"], errors="coerce")
    if "fips" in df.columns:
        df["fips"] = pd.to_numeric(df["fips"], errors="coerce")

    return df


def load_month(m: str, y: int) -> pd.DataFrame:
    """Load contract + enrollment for one month and merge them."""
    # File names: CPSC_Contract_Info_2018_01.csv, CPSC_Enrollment_Info_2018_01.csv
    c_path = ENROLL_DIR / f"CPSC_Contract_Info_{y}_{m}.csv"
    e_path = ENROLL_DIR / f"CPSC_Enrollment_Info_{y}_{m}.csv"

    contract = read_contract(c_path)

    # Keep one row per (contractid, planid) — mirrors R distinct(..., .keep_all=TRUE)
    if {"contractid", "planid"}.issubset(contract.columns):
        contract = contract.drop_duplicates(subset=["contractid", "planid"], keep="first")

    enroll = read_enroll(e_path)

    # Merge contract info into enrollment
    if {"contractid", "planid"}.issubset(enroll.columns) and {"contractid", "planid"}.issubset(contract.columns):
        df = enroll.merge(contract, on=["contractid", "planid"], how="left", suffixes=("", "_contract"))
    else:
        df = enroll.copy()

    df["month"] = int(m)
    df["year"]  = int(y)
    return df

In [20]:
# Read all 12 months and stack
plan_year = pd.concat([load_month(m, YEAR) for m in MONTHS], ignore_index=True)

# Stable sort before forward/back fills
sort_cols = [c for c in ["contractid", "planid", "state", "county", "month"] if c in plan_year.columns]
plan_year = plan_year.sort_values(sort_cols).reset_index(drop=True)

# Fill missing FIPS within (state, county) — down then up
if {"state", "county", "fips"}.issubset(plan_year.columns):
    plan_year["fips"] = (
        plan_year.groupby(["state", "county"], dropna=False)["fips"]
        .transform(lambda s: s.ffill().bfill())
    )

# Fill plan-level descriptors within (contractid, planid)
fill_cols_planid = [c for c in ["plan_type", "partd", "snp", "eghp", "plan_name"] if c in plan_year.columns]
if fill_cols_planid and {"contractid", "planid"}.issubset(plan_year.columns):
    plan_year[fill_cols_planid] = (
        plan_year.groupby(["contractid", "planid"], dropna=False)[fill_cols_planid]
        .transform(lambda df: df.ffill().bfill())
    )

# Fill org-level descriptors within contractid
fill_cols_contract = [c for c in ["org_name", "org_marketing_name", "org_type", "parent_org", "contract_date"] if c in plan_year.columns]
if fill_cols_contract and "contractid" in plan_year.columns:
    plan_year[fill_cols_contract] = (
        plan_year.groupby(["contractid"], dropna=False)[fill_cols_contract]
        .transform(lambda df: df.ffill().bfill())
    )

plan_year.head()

Unnamed: 0,contractid,planid,ssa,fips,state,county,enrollment,org_type,plan_type,partd,snp,eghp,org_name,org_marketing_name,plan_name,parent_org,contract_date,month,year
0,E0654,801,2013,2013.0,AK,Aleutians East,,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,IBT VOLUNTARY EMPLOYEE BENEFITS TRUST,TEAMStar Medicare Part D Prescription Drug Pro...,IBT Voluntary Employee Benefits Trust (Employe...,IBT Voluntary Employee Benefits Trust,01/01/2007 0:00:00,1,2018
1,E0654,801,2013,2013.0,AK,Aleutians East,,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,IBT VOLUNTARY EMPLOYEE BENEFITS TRUST,TEAMStar Medicare Part D Prescription Drug Pro...,IBT Voluntary Employee Benefits Trust (Employe...,IBT Voluntary Employee Benefits Trust,01/01/2007 0:00:00,2,2018
2,E0654,801,2013,2013.0,AK,Aleutians East,,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,IBT VOLUNTARY EMPLOYEE BENEFITS TRUST,TEAMStar Medicare Part D Prescription Drug Pro...,IBT Voluntary Employee Benefits Trust (Employe...,IBT Voluntary Employee Benefits Trust,01/01/2007 0:00:00,3,2018
3,E0654,801,2013,2013.0,AK,Aleutians East,,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,IBT VOLUNTARY EMPLOYEE BENEFITS TRUST,TEAMStar Medicare Part D Prescription Drug Pro...,IBT Voluntary Employee Benefits Trust (Employe...,IBT Voluntary Employee Benefits Trust,01/01/2007 0:00:00,4,2018
4,E0654,801,2013,2013.0,AK,Aleutians East,,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,IBT VOLUNTARY EMPLOYEE BENEFITS TRUST,TEAMStar Medicare Part D Prescription Drug Pro...,IBT Voluntary Employee Benefits Trust (Employe...,IBT Voluntary Employee Benefits Trust,01/01/2007 0:00:00,5,2018


In [21]:
# Collapse enrollment 
group_cols = [c for c in ["contractid", "planid", "fips", "year"] if c in plan_year.columns]
if not group_cols:
    raise ValueError("Could not find grouping columns. Check raw files for contractid / planid / fips / year.")

plan_year = plan_year.sort_values([*group_cols, "month"]).reset_index(drop=True)

# Columns to carry forward (take the last month's value after sorting)
carry_cols = [c for c in [
    "state", "county", "org_type", "plan_type", "partd", "snp", "eghp",
    "org_name", "org_marketing_name", "plan_name", "parent_org", "contract_date",
] if c in plan_year.columns]

# Also grab any enrollment-count columns that may be present
enroll_like = [c for c in plan_year.columns
               if any(k in c for k in ["enroll", "enrollment", "benefici", "member", "eligible"])]
carry_cols += [c for c in enroll_like if c not in carry_cols and c not in group_cols and c != "month"]

final_plans = (
    plan_year.groupby(group_cols, dropna=False, as_index=False)
    .agg({c: "last" for c in carry_cols})
)

final_plans.head()

# Save plan data
final_plans.to_csv(OUT_DIR / "plan_data.csv", index=False)
print("Wrote:", OUT_DIR / "plan_data.csv")

Wrote: ../../../data/output/plan_data.csv


In [22]:
# Read service area data
def read_service_area(path: Path) -> pd.DataFrame:
    """Read a single monthly service-area file (MA_Cnty_SA_2018_MM.csv)."""
    df = pd.read_csv(path, dtype=str, na_values=["*"], encoding="latin1")
    df.columns = _clean_colnames(df.columns)

 # Harmonize common column-name variants
    ren = {
        "contract_id":        "contractid",
        "organization_name":  "org_name",
        "organization_type":  "org_type",
    }
    df = df.rename(columns={k: v for k, v in ren.items() if k in df.columns})

    # Type fixes
    if "partial" in df.columns:
        df["partial"] = df["partial"].map(
            lambda x: str(x).strip().lower() in {"true", "t", "1", "yes", "y"}
            if pd.notna(x) else np.nan
        )
    for col in ["ssa", "fips"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    return df


def load_month_sa(m: str, y: int) -> pd.DataFrame:
    """Load one month of service-area data."""
    # File name: MA_Cnty_SA_2018_01.csv
    path = SERVICE_AREA_DIR / f"MA_Cnty_SA_{y}_{m}.csv"
    df   = read_service_area(path)
    df["month"] = int(m)
    df["year"]  = int(y)
    return df

In [23]:
# Read all 12 months and stack 
service_year = pd.concat([load_month_sa(m, YEAR) for m in MONTHS], ignore_index=True)

# Stable sort before fills
sort_cols = [c for c in ["contractid", "fips", "state", "county", "month"] if c in service_year.columns]
service_year = service_year.sort_values(sort_cols).reset_index(drop=True)

# Fill missing fips within (state, county)
if {"state", "county", "fips"}.issubset(service_year.columns):
    service_year["fips"] = (
        service_year.groupby(["state", "county"], dropna=False)["fips"]
        .transform(lambda s: s.ffill().bfill())
    )

# Fill labels within contractid
fill_cols_sa = [c for c in ["org_name", "org_type", "plan_type", "partial", "eghp", "ssa", "notes"]
                if c in service_year.columns]
if fill_cols_sa and "contractid" in service_year.columns:
    service_year[fill_cols_sa] = (
        service_year.groupby(["contractid"], dropna=False)[fill_cols_sa]
        .transform(lambda df: df.ffill().bfill())
    )

service_year.head()

Unnamed: 0,contractid,org_name,org_type,plan_type,partial,eghp,ssa,fips,county,state,notes,month,year
0,90091,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,,,,"Covers the entire US, all States and Counties",1,2018
1,90091,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,,,,"Covers the entire US, all States and Counties",2,2018
2,90091,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,,,,"Covers the entire US, all States and Counties",3,2018
3,90091,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,,,,"Covers the entire US, all States and Counties",4,2018
4,90091,UNITED MINE WORKERS OF AMERICA HLTH & RETIREMENT,HCPP - 1833 Cost,HCPP - 1833 Cost,,,,,,,"Covers the entire US, all States and Counties",5,2018


In [24]:
# Collapse service area data to one row
group_cols_sa = [c for c in ["contractid", "fips", "year"] if c in service_year.columns]
service_year  = service_year.sort_values([*group_cols_sa, "month"]).reset_index(drop=True)

carry_cols_sa = [c for c in ["state", "county", "org_name", "org_type", "plan_type",
                              "partial", "eghp", "ssa", "notes"]
                 if c in service_year.columns]

final_service_area = (
    service_year.groupby(group_cols_sa, dropna=False, as_index=False)
    .agg({c: "last" for c in carry_cols_sa})
)

final_service_area.head()

# Save service-area data
final_service_area.to_csv(OUT_DIR / "service_area.csv", index=False)
print("Wrote:", OUT_DIR / "service_area.csv")

Wrote: ../../../data/output/service_area.csv


In [25]:
# Inner merge 
merged = final_plans.merge(
    final_service_area,
    on=["contractid", "fips", "year"],
    how="inner",                        # <-- inner merge as required
    suffixes=("_plan", "_sa"),
)

print(f"final_plans rows:        {len(final_plans)}")
print(f"final_service_area rows: {len(final_service_area)}")
print(f"merged rows (inner):     {len(merged)}")

merged.head()

# Save the final merged dataset
merged.to_csv(OUT_DIR / "merged_plan_service_area.csv", index=False)
print("Wrote:", OUT_DIR / "merged_plan_service_area.csv")

final_plans rows:        2475109
final_service_area rows: 331593
merged rows (inner):     1366535
Wrote: ../../../data/output/merged_plan_service_area.csv
