In [1]:
from pathlib import Path
import re
import pandas as pd
import numpy as np

ENROLL_DIR = Path("../ma-data/ma/enrollment/Extracted Data")
YEAR = 2018

In [2]:
enroll_files = sorted(ENROLL_DIR.glob(f"CPSC_Enrollment_Info_{YEAR}_??.csv"))
contract_files = sorted(ENROLL_DIR.glob(f"CPSC_Contract_Info_{YEAR}_??.csv"))

print("ENROLL_DIR exists:", ENROLL_DIR.exists())
print("Enrollment files:", len(enroll_files))
print("Contract files:", len(contract_files))

if len(enroll_files) > 0:
    print("First enroll:", enroll_files[0].name)
    print("Last  enroll:", enroll_files[-1].name)
if len(contract_files) > 0:
    print("First contract:", contract_files[0].name)
    print("Last  contract:", contract_files[-1].name)

ENROLL_DIR exists: True
Enrollment files: 12
Contract files: 12
First enroll: CPSC_Enrollment_Info_2018_01.csv
Last  enroll: CPSC_Enrollment_Info_2018_12.csv
First contract: CPSC_Contract_Info_2018_01.csv
Last  contract: CPSC_Contract_Info_2018_12.csv


In [3]:
CONTRACT_COLS = [
    "contractid","planid","org_type","plan_type","partd","snp","eghp",
    "org_name","org_marketing_name","plan_name","parent_org","contract_date"
]
CONTRACT_DTYPES = {
    "contractid": "string",
    "planid": "float64",
    "org_type": "string",
    "plan_type": "string",
    "partd": "string",
    "snp": "string",
    "eghp": "string",
    "org_name": "string",
    "org_marketing_name": "string",
    "plan_name": "string",
    "parent_org": "string",
    "contract_date": "string",
}

ENROLL_COLS = ["contractid","planid","ssa","fips","state","county","enrollment"]
ENROLL_DTYPES = {
    "contractid": "string",
    "planid": "float64",
    "ssa": "float64",
    "fips": "float64",
    "state": "string",
    "county": "string",
    "enrollment": "float64",
}

def read_contract_month(path: Path) -> pd.DataFrame:
    df = pd.read_csv(
        path,
        skiprows=1,
        header=None,
        names=CONTRACT_COLS,
        dtype=CONTRACT_DTYPES,
        encoding="latin1",
        low_memory=False,
    )
    return df.drop_duplicates(subset=["contractid","planid"], keep="first")

In [4]:
CHUNK = 150_000  # if memory is tight, change to 50_000

monthly_small = []

for ef in enroll_files:
    m = re.search(rf"{YEAR}_(\d\d)\.csv$", ef.name).group(1)
    cf = ENROLL_DIR / f"CPSC_Contract_Info_{YEAR}_{m}.csv"

    print("month", m, "| loading contract")
    contract = read_contract_month(cf)

    print("month", m, "| streaming enrollment")
    reader = pd.read_csv(
        ef,
        skiprows=1,
        header=None,
        names=ENROLL_COLS,
        dtype=ENROLL_DTYPES,
        na_values=["*"],
        encoding="latin1",
        low_memory=False,
        chunksize=CHUNK,
    )

    per_chunk = []
    for chunk in reader:
        tmp = contract.merge(chunk, on=["contractid","planid"], how="left")
        tmp["year"] = YEAR
        tmp["month"] = int(m)

        per_chunk.append(
            tmp.groupby(["contractid","planid","fips","year","month"], dropna=False, as_index=False)
               .agg(
                   enroll_month=("enrollment","sum"),
                   state=("state","last"),
                   county=("county","last"),
                   plan_type=("plan_type","last"),
                   org_type=("org_type","last"),
                   partd=("partd","last"),
                   snp=("snp","last"),
                   eghp=("eghp","last"),
               )
        )

    month_panel = (
        pd.concat(per_chunk, ignore_index=True)
          .groupby(["contractid","planid","fips","year","month"], dropna=False, as_index=False)
          .agg(
              enroll_month=("enroll_month","sum"),
              state=("state","last"),
              county=("county","last"),
              plan_type=("plan_type","last"),
              org_type=("org_type","last"),
              partd=("partd","last"),
              snp=("snp","last"),
              eghp=("eghp","last"),
          )
    )

    print("month", m, "| month_panel:", month_panel.shape)
    monthly_small.append(month_panel)

enroll_2018_plan_county_month = pd.concat(monthly_small, ignore_index=True)
print("plan-county-month stacked:", enroll_2018_plan_county_month.shape)
enroll_2018_plan_county_month.head()

month 01 | loading contract
month 01 | streaming enrollment
month 01 | month_panel: (2313897, 13)
month 02 | loading contract
month 02 | streaming enrollment
month 02 | month_panel: (2309940, 13)
month 03 | loading contract
month 03 | streaming enrollment
month 03 | month_panel: (2310803, 13)
month 04 | loading contract
month 04 | streaming enrollment
month 04 | month_panel: (2299065, 13)
month 05 | loading contract
month 05 | streaming enrollment
month 05 | month_panel: (2297258, 13)
month 06 | loading contract
month 06 | streaming enrollment
month 06 | month_panel: (2293698, 13)
month 07 | loading contract
month 07 | streaming enrollment
month 07 | month_panel: (2298911, 13)
month 08 | loading contract
month 08 | streaming enrollment
month 08 | month_panel: (2298123, 13)
month 09 | loading contract
month 09 | streaming enrollment
month 09 | month_panel: (2302195, 13)
month 10 | loading contract
month 10 | streaming enrollment
month 10 | month_panel: (2300828, 13)
month 11 | loading c

Unnamed: 0,contractid,planid,fips,year,month,enroll_month,state,county,plan_type,org_type,partd,snp,eghp
0,90091,,,2018,1,0.0,,,HCPP - 1833 Cost,HCPP - 1833 Cost,No,No,No
1,E0654,801.0,1001.0,2018,1,0.0,AL,Autauga,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes
2,E0654,801.0,1003.0,2018,1,13.0,AL,Baldwin,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes
3,E0654,801.0,1005.0,2018,1,0.0,AL,Barbour,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes
4,E0654,801.0,1007.0,2018,1,0.0,AL,Bibb,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes


In [6]:
# Keep only columns needed
df = enroll_2018_plan_county_month[
    ["contractid","planid","fips","year","month","enroll_month",
     "state","county","plan_type","org_type","partd","snp","eghp"]
].copy()

df = df.dropna(subset=["contractid","planid","fips"])

df = df.sort_values(["contractid","planid","fips","year","month"], kind="mergesort")


g = df.groupby(["contractid","planid","fips","year"], dropna=False)

yearly = g.agg(
    n_months=("enroll_month","count"),
    sum_enrollment=("enroll_month","sum"),
    min_enrollment=("enroll_month","min"),
    max_enrollment=("enroll_month","max"),
    state=("state","last"),
    county=("county","last"),
    plan_type=("plan_type","last"),
    org_type=("org_type","last"),
    partd=("partd","last"),
    snp=("snp","last"),
    eghp=("eghp","last"),
).reset_index()

# first/last monthly enrollment (after sorting)
first_last = g["enroll_month"].agg(first_enrollment="first", last_enrollment="last").reset_index()

# Merge
enroll_2018_plan_county_year = yearly.merge(first_last, on=["contractid","planid","fips","year"], how="left")

# Average monthly enrollment
enroll_2018_plan_county_year["avg_enrollment"] = (
    enroll_2018_plan_county_year["sum_enrollment"] / enroll_2018_plan_county_year["n_months"]
)

print("enroll_2018_plan_county_year shape:", enroll_2018_plan_county_year.shape)
enroll_2018_plan_county_year.head()

enroll_2018_plan_county_year shape: (2473062, 18)


Unnamed: 0,contractid,planid,fips,year,n_months,sum_enrollment,min_enrollment,max_enrollment,state,county,plan_type,org_type,partd,snp,eghp,first_enrollment,last_enrollment,avg_enrollment
0,E0654,801.0,1001.0,2018,12,0.0,0.0,0.0,AL,Autauga,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,0.0,0.0,0.0
1,E0654,801.0,1003.0,2018,12,166.0,13.0,15.0,AL,Baldwin,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,13.0,13.0,13.833333
2,E0654,801.0,1005.0,2018,12,0.0,0.0,0.0,AL,Barbour,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,0.0,0.0,0.0
3,E0654,801.0,1007.0,2018,12,0.0,0.0,0.0,AL,Bibb,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,0.0,0.0,0.0
4,E0654,801.0,1009.0,2018,12,0.0,0.0,0.0,AL,Blount,Employer/Union Only Direct Contract PDP,Employer/Union Only Direct Contract PDP,Yes,No,Yes,0.0,0.0,0.0


In [7]:
enroll_2018_plan_county_year.to_csv("enroll_2018_plan_county_year.csv", index=False)
print("saved enroll_2018_plan_county_year.csv")

# free big memory objects from the monthly build
try:
    del enroll_2018_plan_county_month
    del monthly_small
except NameError:
    pass

saved enroll_2018_plan_county_year.csv


In [8]:
SAREA_DIR = Path("../ma-data/ma/service-area/Extracted Data")

sarea_files = sorted(SAREA_DIR.glob(f"MA_Cnty_SA_{YEAR}_??.csv"))
print("Service area files:", len(sarea_files))
print("First:", sarea_files[0].name)
print("Last :", sarea_files[-1].name)

Service area files: 12
First: MA_Cnty_SA_2018_01.csv
Last : MA_Cnty_SA_2018_12.csv


In [9]:
SAREA_CONTRACT_IDX = 0
SAREA_PLAN_IDX     = 6
SAREA_FIPS_IDX     = 7

def read_sarea_keys(path: Path) -> pd.DataFrame:
    raw = pd.read_csv(path, skiprows=1, header=None, dtype=str, encoding="latin1", low_memory=False)
    out = pd.DataFrame({
        "contractid": raw[SAREA_CONTRACT_IDX].astype(str).str.strip(),
        "planid": pd.to_numeric(raw[SAREA_PLAN_IDX], errors="coerce"),
        "fips": pd.to_numeric(raw[SAREA_FIPS_IDX], errors="coerce"),
        "year": YEAR
    })
    return out.dropna(subset=["contractid","planid","fips"])[["contractid","planid","fips","year"]].drop_duplicates()

sarea_parts = []
for f in sarea_files:
    sarea_parts.append(read_sarea_keys(f))

sarea_2018 = pd.concat(sarea_parts, ignore_index=True).drop_duplicates()
print("sarea_2018 shape:", sarea_2018.shape)
sarea_2018.head()

sarea_2018 shape: (332302, 4)


Unnamed: 0,contractid,planid,fips,year
0,H0022,36110.0,39023.0,2018
1,H0022,36170.0,39035.0,2018
2,H0022,36260.0,39051.0,2018
3,H0022,36280.0,39055.0,2018
4,H0022,36290.0,39057.0,2018


In [11]:
# enrollment: fips -> 5-digit string
enroll_2018_plan_county_year["fips5"] = (
    enroll_2018_plan_county_year["fips"]
    .astype("Int64")
    .astype(str)
    .str.zfill(5)
)

# service area: fips -> 5-digit string
sarea_2018["fips5"] = (
    sarea_2018["fips"]
    .astype("Int64")
    .astype(str)
    .str.zfill(5)
)

print(enroll_2018_plan_county_year[["fips","fips5"]].head())
print(sarea_2018[["fips","fips5"]].head())

     fips  fips5
0  1001.0  01001
1  1003.0  01003
2  1005.0  01005
3  1007.0  01007
4  1009.0  01009
      fips  fips5
0  39023.0  39023
1  39035.0  39035
2  39051.0  39051
3  39055.0  39055
4  39057.0  39057


In [12]:
ma2018_inner = enroll_2018_plan_county_year.merge(
    sarea_2018.drop(columns=["fips"]),
    on=["contractid","planid","fips5","year"],
    how="inner",
    validate="m:1"
)

print("ma2018_inner shape:", ma2018_inner.shape)
ma2018_inner.head()

ma2018_inner shape: (0, 19)


Unnamed: 0,contractid,planid,fips,year,n_months,sum_enrollment,min_enrollment,max_enrollment,state,county,plan_type,org_type,partd,snp,eghp,first_enrollment,last_enrollment,avg_enrollment,fips5


In [13]:
ssa_maps = []

for ef in enroll_files:  # you already have enroll_files from earlier
    m = re.search(rf"{YEAR}_(\d\d)\.csv$", ef.name).group(1)

    reader = pd.read_csv(
        ef,
        skiprows=1,
        header=None,
        names=ENROLL_COLS,
        dtype=ENROLL_DTYPES,
        na_values=["*"],
        encoding="latin1",
        low_memory=False,
        chunksize=200_000,
    )

    for chunk in reader:
        sub = chunk[["contractid","planid","fips","ssa"]].dropna(subset=["contractid","planid","fips","ssa"])
        ssa_maps.append(sub.drop_duplicates())

    print("done month", m)

ssa_map = pd.concat(ssa_maps, ignore_index=True).drop_duplicates(subset=["contractid","planid","fips"])
print("ssa_map shape:", ssa_map.shape)
ssa_map.head()

done month 01
done month 02
done month 03
done month 04
done month 05
done month 06
done month 07
done month 08
done month 09
done month 10
done month 11
done month 12
ssa_map shape: (2473062, 4)


Unnamed: 0,contractid,planid,fips,ssa
0,E0654,801.0,1001.0,1000.0
1,E0654,801.0,1003.0,1010.0
2,E0654,801.0,1005.0,1020.0
3,E0654,801.0,1007.0,1030.0
4,E0654,801.0,1009.0,1040.0


In [14]:
enroll_2018_plan_county_year = enroll_2018_plan_county_year.merge(
    ssa_map,
    on=["contractid","planid","fips"],
    how="left",
    validate="m:1"
)

print("missing SSA rows:", enroll_2018_plan_county_year["ssa"].isna().sum())
enroll_2018_plan_county_year[["contractid","planid","fips","ssa"]].head()

missing SSA rows: 0


Unnamed: 0,contractid,planid,fips,ssa
0,E0654,801.0,1001.0,1000.0
1,E0654,801.0,1003.0,1010.0
2,E0654,801.0,1005.0,1020.0
3,E0654,801.0,1007.0,1030.0
4,E0654,801.0,1009.0,1040.0


In [15]:
SAREA_SSA_IDX  = 6
SAREA_FIPS_IDX = 7
SAREA_CONTRACT_IDX = 0

def read_sarea_keys_correct(path: Path) -> pd.DataFrame:
    raw = pd.read_csv(path, skiprows=1, header=None, dtype=str, encoding="latin1", low_memory=False)
    out = pd.DataFrame({
        "contractid": raw[SAREA_CONTRACT_IDX].astype(str).str.strip(),
        "ssa": pd.to_numeric(raw[SAREA_SSA_IDX], errors="coerce"),
        "fips": pd.to_numeric(raw[SAREA_FIPS_IDX], errors="coerce"),
        "year": YEAR,
    })
    return out.dropna(subset=["contractid","ssa","fips"])[["contractid","ssa","fips","year"]].drop_duplicates()

sarea_parts = [read_sarea_keys_correct(f) for f in sarea_files]
sarea_2018_fix = pd.concat(sarea_parts, ignore_index=True).drop_duplicates()

print("sarea_2018_fix shape:", sarea_2018_fix.shape)
sarea_2018_fix.head()

sarea_2018_fix shape: (332302, 4)


Unnamed: 0,contractid,ssa,fips,year
0,H0022,36110.0,39023.0,2018
1,H0022,36170.0,39035.0,2018
2,H0022,36260.0,39051.0,2018
3,H0022,36280.0,39055.0,2018
4,H0022,36290.0,39057.0,2018


In [16]:
# make fips and ssa comparable (strings are safest)
enroll_2018_plan_county_year["fips5"] = (
    enroll_2018_plan_county_year["fips"].astype("Int64").astype(str).str.zfill(5)
)
enroll_2018_plan_county_year["ssa_str"] = (
    enroll_2018_plan_county_year["ssa"].astype("Int64").astype(str)
)

sarea_2018_fix["fips5"] = sarea_2018_fix["fips"].astype("Int64").astype(str).str.zfill(5)
sarea_2018_fix["ssa_str"] = sarea_2018_fix["ssa"].astype("Int64").astype(str)

ma2018_inner = enroll_2018_plan_county_year.merge(
    sarea_2018_fix[["contractid","ssa_str","fips5","year"]].drop_duplicates(),
    on=["contractid","ssa_str","fips5","year"],
    how="inner",
    validate="m:m"
)

print("ma2018_inner shape:", ma2018_inner.shape)
ma2018_inner.head()

ma2018_inner shape: (1366487, 21)


Unnamed: 0,contractid,planid,fips,year,n_months,sum_enrollment,min_enrollment,max_enrollment,state,county,...,org_type,partd,snp,eghp,first_enrollment,last_enrollment,avg_enrollment,fips5,ssa,ssa_str
0,H0022,1.0,39023.0,2018,12,7181.0,558.0,638.0,OH,Clark,...,Demo,Yes,No,No,558.0,622.0,598.416667,39023,36110.0,36110
1,H0022,1.0,39035.0,2018,12,43836.0,3549.0,3829.0,OH,Cuyahoga,...,Demo,Yes,No,No,3596.0,3657.0,3653.0,39035,36170.0,36170
2,H0022,1.0,39051.0,2018,12,1390.0,107.0,126.0,OH,Fulton,...,Demo,Yes,No,No,107.0,126.0,115.833333,39051,36260.0,36260
3,H0022,1.0,39055.0,2018,12,928.0,68.0,84.0,OH,Geauga,...,Demo,Yes,No,No,80.0,80.0,77.333333,39055,36280.0,36280
4,H0022,1.0,39057.0,2018,12,6853.0,539.0,618.0,OH,Greene,...,Demo,Yes,No,No,539.0,601.0,571.083333,39057,36290.0,36290


In [17]:
ma2018 = ma2018_inner.drop(columns=["ssa_str"])  # keep ssa numeric, drop helper
print("ma2018 shape:", ma2018.shape)
ma2018.head()

ma2018 shape: (1366487, 20)


Unnamed: 0,contractid,planid,fips,year,n_months,sum_enrollment,min_enrollment,max_enrollment,state,county,plan_type,org_type,partd,snp,eghp,first_enrollment,last_enrollment,avg_enrollment,fips5,ssa
0,H0022,1.0,39023.0,2018,12,7181.0,558.0,638.0,OH,Clark,Medicare-Medicaid Plan HMO/HMOPOS,Demo,Yes,No,No,558.0,622.0,598.416667,39023,36110.0
1,H0022,1.0,39035.0,2018,12,43836.0,3549.0,3829.0,OH,Cuyahoga,Medicare-Medicaid Plan HMO/HMOPOS,Demo,Yes,No,No,3596.0,3657.0,3653.0,39035,36170.0
2,H0022,1.0,39051.0,2018,12,1390.0,107.0,126.0,OH,Fulton,Medicare-Medicaid Plan HMO/HMOPOS,Demo,Yes,No,No,107.0,126.0,115.833333,39051,36260.0
3,H0022,1.0,39055.0,2018,12,928.0,68.0,84.0,OH,Geauga,Medicare-Medicaid Plan HMO/HMOPOS,Demo,Yes,No,No,80.0,80.0,77.333333,39055,36280.0
4,H0022,1.0,39057.0,2018,12,6853.0,539.0,618.0,OH,Greene,Medicare-Medicaid Plan HMO/HMOPOS,Demo,Yes,No,No,539.0,601.0,571.083333,39057,36290.0


In [18]:
ma2018.to_csv("ma2018_plan_county_year_inner.csv", index=False)
print("saved ma2018_plan_county_year_inner.csv")

saved ma2018_plan_county_year_inner.csv
