# CMS Data Ingestion via API

## Project Summary

This notebook demonstrates a custom Python ingestion pipeline for downloading and preparing CMS healthcare data from multiple years via their public API. I implemented batching, pagination, and schema tagging to enable multi-year analysis. All data used is publicly available.

## Ingestion

In [None]:
import os
import requests
import pandas as pd
import numpy as np
pd.set_option('display.float_format', '{:.2f}'.format)


def download_cms_dataset(dataset_url: str, file_path: str, partition_key: str, 
                           file_format: str, version_mapping: dict) -> str:
    """
    Downloads the sample CMS dataset for all years using pagination and a mapping of year to UUID.
    For each year, it retrieves data in batches (with a maximum of 10,000 rows per year),
    adds a column for the year (using partition_key), and finally concatenates all the data.
    The combined dataset is saved locally as CSV or Parquet.

    Parameters:
        dataset_url (str): The base CMS API endpoint (e.g., "https://data.cms.gov/data-api/v1/dataset").
        file_path (str): Local file path where the combined data will be saved.
        partition_key (str): The column name to add for partitioning.
        file_format (str): The output file format: "csv" or "parquet".
        version_mapping (dict): Mapping from year to dataset UUID for that year.
    
    Returns:
        str: The local file path of the saved file.
    """
    all_data = []
    page_size = 5000  # per CMS FAQ
    max_rows = 10000  # maximum rows to retrieve per year, I am only using 10K per year

    # Loop over each year using the provided mapping
    # TO-DO: Can be read through CMS catalog - https://data.cms.gov/data.json which will avoid hardcoding years and UUIDs
    for year, uuid in version_mapping.items():
        print(f"Fetching data for year {year} (UUID: {uuid})")
        base_link = f"{dataset_url}/{uuid}/data"
        offset = 0
        year_data = []

        # fetch batches until no more data or we reach max_rows for that year
        while offset < max_rows:
            paged_url = f"{base_link}?size={page_size}&offset={offset}"
            response = requests.get(paged_url)
            try:
                data = response.json()
            except Exception as e:
                print(f"Error from {paged_url}: {e}")
                break

            if not data:
                break

            year_data.extend(data)
            if len(data) < page_size:  # No more data available for this year
                break
            offset += page_size

        if year_data:
            df_year = pd.DataFrame(year_data)
            # Tag each row with the current year
            df_year[partition_key] = int(year)
            # Limit to max_rows in case API returns more than expected
            if df_year.shape[0] > max_rows:
                df_year = df_year.iloc[:max_rows, :]
            all_data.append(df_year)
            print(f"Got {df_year.shape[0]} rows for {year}")
        else:
            print(f"No data found for year {year}.")

    if not all_data:
        raise ValueError("No data was downloaded for any year.")

    final_df = pd.concat(all_data, ignore_index=True)
    os.makedirs(os.path.dirname(file_path), exist_ok=True)

    if file_format.lower() == "csv":
        final_df.to_csv(file_path, index=False)
    elif file_format.lower() == "parquet":
        final_df.to_parquet(file_path, index=False)
    else:
        raise ValueError("Unsupported file format. Use 'csv' or 'parquet'.")

    print(f"Final dataset saved to: {file_path} | Total rows: {final_df.shape[0]}")
    return file_path

In [None]:
part_d_version_mapping = {
    "2022": "bed99012-c527-4d9d-92ea-67ec2510abea",
    "2021": "3f7ab9ce-6fb6-4e6b-9af3-b681f2d3a95e",
    "2020": "ed6548c1-c905-4fdf-84cc-1c897ca6210d",
    "2019": "007f61da-2c20-4c80-90a5-88be67c8d022",
    "2018": "a422bf49-e5b3-429a-862f-723db5e15704",
    "2017": "dc524dbb-6115-48c9-a4dc-aa4fb0d21b69",
    "2016": "9c3eb417-a310-42d2-8065-9e2129e6a680",
    "2015": "2aa87eb6-6555-42fa-a51d-4a332ab7d5e8",
    "2014": "8c3806d2-0692-4564-ab38-7bc19206521c",
    "2013": "93645ea4-1a3f-4444-95c5-c109e6a4b267"
}
part_b_version_mapping = {
    "2022": "e650987d-01b7-4f09-b75e-b0b075afbf98",
    "2021": "31dc2c47-f297-4948-bfb4-075e1bec3a02",
    "2020": "c957b49e-1323-49e7-8678-c09da387551d",
    "2019": "867b8ac7-ccb7-4cc9-873d-b24340d89e32",
    "2018": "fb6d9fe8-38c1-4d24-83d4-0b7b291000b2",
    "2017": "85bf3c9c-2244-490d-ad7d-c34e4c28f8ea",
    "2016": "7918e22a-fbfb-4a07-9f59-f8aab2b757d4",
    "2015": "f8cdb11a-d5f7-4fbe-aac4-05abc8ee2c83",
    "2014": "f63b48ae-946e-48f7-9f56-327a68da4e0b",
    "2013": "ad5e7548-98ab-4325-af4b-b2a7099b9351"
}

download_cms_dataset(
    dataset_url="https://data.cms.gov/data-api/v1/dataset",
    file_path="data/part_d_all_years.parquet",
    partition_key="year",
    file_format="parquet",
    version_mapping=part_d_version_mapping
)

download_cms_dataset(
    dataset_url="https://data.cms.gov/data-api/v1/dataset",
    file_path="data/part_b_all_years.parquet",
    partition_key="year",
    file_format="parquet",
    version_mapping=part_b_version_mapping
)

Fetching data for year 2022 (UUID: bed99012-c527-4d9d-92ea-67ec2510abea)
Got 10000 rows for 2022
Fetching data for year 2021 (UUID: 3f7ab9ce-6fb6-4e6b-9af3-b681f2d3a95e)
Got 10000 rows for 2021
Fetching data for year 2020 (UUID: ed6548c1-c905-4fdf-84cc-1c897ca6210d)
Got 10000 rows for 2020
Fetching data for year 2019 (UUID: 007f61da-2c20-4c80-90a5-88be67c8d022)
Got 10000 rows for 2019
Fetching data for year 2018 (UUID: a422bf49-e5b3-429a-862f-723db5e15704)
Got 10000 rows for 2018
Fetching data for year 2017 (UUID: dc524dbb-6115-48c9-a4dc-aa4fb0d21b69)
Got 10000 rows for 2017
Fetching data for year 2016 (UUID: 9c3eb417-a310-42d2-8065-9e2129e6a680)
Got 10000 rows for 2016
Fetching data for year 2015 (UUID: 2aa87eb6-6555-42fa-a51d-4a332ab7d5e8)
Got 10000 rows for 2015
Fetching data for year 2014 (UUID: 8c3806d2-0692-4564-ab38-7bc19206521c)
Got 10000 rows for 2014
Fetching data for year 2013 (UUID: 93645ea4-1a3f-4444-95c5-c109e6a4b267)
Got 10000 rows for 2013
Final dataset saved to: data/p

'data/part_b_all_years.parquet'

Each year’s dataset is published as a separate API endpoint, with a unique UUID that cannot be dynamically filtered using query parameters. For example:
	•	2022 Part B: https://data.cms.gov/data-api/v1/dataset/e650987d-01b7-4f09-b75e-b0b075afbf98/data
	•	2021 Part B: https://data.cms.gov/data-api/v1/dataset/31dc2c47-f297-4948-bfb4-075e1bec3a02/data

So I had to create a mapping of the years and UUID.
However, a better solution is to use the https://data.cms.gov/data.json, and obtain the UUIDs for each year and dataset (part D, part B).

To-Dos:
	1. Schema validation
	2. Retry Logic
	3. Save Batches to disk for memory
	4. Logging
	5. Use the CMS Catalog
	6. Multi-threading

## Analysis

### Part B

In [None]:
# Read the Parquet files
# Load only the columns we care about and the latest year
partb_cols = ["Rndrng_NPI", "Tot_Srvcs", "Tot_Benes", "Rndrng_Prvdr_Zip5", "HCPCS_Cd", "Place_Of_Srvc", "year"]
df_partb = pd.read_parquet("data/part_b_all_years.parquet", columns=partb_cols) # The last part of the question asks to compare year over year so I need to load all years.
# df_partb_latest = pd.read_parquet("data/part_b_all_years.parquet", filters=[("year", "==", 2022)])


pd.set_option( 'display.max_columns', None)
df_partb.head()



Unnamed: 0,Rndrng_NPI,Tot_Srvcs,Tot_Benes,Rndrng_Prvdr_Zip5,HCPCS_Cd,Place_Of_Srvc,year
0,1003000126,44,42,20817,99217,F,2022
1,1003000126,17,17,20817,99219,F,2022
2,1003000126,35,35,20817,99220,F,2022
3,1003000126,16,16,20817,99221,F,2022
4,1003000126,12,12,20817,99222,F,2022


In [None]:
df_partb.tail()

Unnamed: 0,Rndrng_NPI,Tot_Srvcs,Tot_Benes,Rndrng_Prvdr_Zip5,HCPCS_Cd,Place_Of_Srvc,year
99995,1003075755,23,23,8360,99204,O,2013
99996,1003075755,15,14,8360,99212,O,2013
99997,1003075755,22,21,8360,99213,O,2013
99998,1003075755,23,21,8360,99214,O,2013
99999,1003075755,28,22,8360,99232,F,2013


In [None]:
df_partb.shape

(100000, 7)

In [None]:
df_partb.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 7 columns):
 #   Column             Non-Null Count   Dtype 
---  ------             --------------   ----- 
 0   Rndrng_NPI         100000 non-null  object
 1   Tot_Srvcs          100000 non-null  object
 2   Tot_Benes          100000 non-null  object
 3   Rndrng_Prvdr_Zip5  100000 non-null  object
 4   HCPCS_Cd           100000 non-null  object
 5   Place_Of_Srvc      100000 non-null  object
 6   year               100000 non-null  int64 
dtypes: int64(1), object(6)
memory usage: 5.3+ MB


In [None]:
# -- For Part B (Providers) --
df_partb.columns = df_partb.columns.str.lower()

# Cast to correct data types
df_partb = df_partb.astype({
    "rndrng_npi": "string",          # NPI is a unique identifier but not for calculations
    "rndrng_prvdr_zip5": "string",   # Zip codes might have leading 0s
    "tot_srvcs": "float",            # Total services – numeric for aggregation
    "tot_benes": "int",              # Total beneficiaries – numeric
    "year": "int"                    # Year is an integer
})

# For geographic analysis, we need zip codes. Ensure the ZIP column is a string and 5 digits.
df_partb['rndrng_prvdr_zip5'] = df_partb['rndrng_prvdr_zip5'].astype(str).str.zfill(5)


In [None]:
# Filter for latest year (2022)
df_partb_latest = df_partb[df_partb["year"] == 2022]

In [None]:
def get_fill_rates(df):
    """
    Calculate the fill rates for each column in the DataFrame."
    """
    fill_rates = df.notnull().mean().sort_values(ascending=True) * 100
    return fill_rates.to_frame(name='Fill Rate (%)')

fill_rates_df = get_fill_rates(df_partb_latest)
fill_rates_df

Unnamed: 0,Fill Rate (%)
rndrng_npi,100.0
tot_srvcs,100.0
tot_benes,100.0
rndrng_prvdr_zip5,100.0
hcpcs_cd,100.0
place_of_srvc,100.0
year,100.0


In [None]:
df_partb_latest.describe()[["tot_benes", "tot_srvcs"]]


Unnamed: 0,tot_benes,tot_srvcs
count,10000.0,10000.0
mean,69.78,209.57
std,159.44,1884.87
min,11.0,11.0
25%,17.0,19.0
50%,30.0,38.0
75%,69.0,98.0
max,5064.0,122400.0


1. Min 11 aligns with CMS suppression of <11
2. The mean is much higher than the median, indicating a right-skewed distribution (some providers see a lot of patients)
3. TO - DO Outlier: One provider served 5,000+ beneficiaries (quick check)
4. Similar distribution with total services per NPI, hcpcs, POS.

In [None]:
df_partb_latest.groupby("hcpcs_cd")["tot_srvcs"].sum().sort_values(ascending=False).head(10)

# Tot_Srvcs
# Number of services provided; note that the metrics used to
# count the number provided can vary from service to
# service.
# A0425 - High ambulance mileage billed
# J0585 - Injection, Onabotulinumtoxina, 1 unit

hcpcs_cd
J0585   167555.00
Q0138   122400.00
99214    87991.00
A0425    69439.70
J3111    68670.00
99213    58712.00
97110    50701.00
J0897    49681.00
Q9967    44874.00
99232    41139.00
Name: tot_srvcs, dtype: float64

In [None]:
df_partb_latest.groupby("place_of_srvc")["tot_srvcs"].sum()

place_of_srvc
F    468457.70
O   1627239.50
Name: tot_srvcs, dtype: float64

In [None]:
# Assertions: Check that Rndrng_NPI is exactly 10 characters for every record.
npi_lengths = df_partb["rndrng_npi"].str.len()
assert (npi_lengths == 10).all(), f"NPIs not all length 10. Length counts: {npi_lengths.value_counts()}"

# Assertions: Check that Rndrng_Prvdr_Zip5 is exactly 5 characters for every record.
zip_lengths = df_partb["rndrng_prvdr_zip5"].str.len()
assert (zip_lengths == 5).all(), f"ZIP codes not all length 5. Length counts: {zip_lengths.value_counts()}"

# Assertions: Check that place_of_srvc is either F or O for every record.
valid_place_of_srvc = {"F", "O"}
place_of_srvc_values = set(df_partb["place_of_srvc"].unique())
assert place_of_srvc_values.issubset(valid_place_of_srvc), \
    f"Invalid Place of Service codes found: {place_of_srvc_values - valid_place_of_srvc}"

# Assert that each row is uniquely identified by place_of_srvc, rndrng_npi, hcpcs_cd and year
assert not df_partb.duplicated(subset=["place_of_srvc", "rndrng_npi", "hcpcs_cd", "year"]).any(), \
    "Duplicate rows found for the key combination ['place_of_srvc', 'rndrng_npi', 'hcpcs_cd']"

print("Checked for some assertions and they look good!")

Checked for some assertions and they look good!


### Part D

In [None]:
# Read the Parquet files
# Load only the columns we care about and the latest year
partd_cols = ["Prscrbr_NPI", "Tot_Clms", "Tot_Benes", "Tot_Drug_Cst", "year"]
df_partd = pd.read_parquet("data/part_d_all_years.parquet", columns=partd_cols) # The last part of the question asks to compare year over year so I need to load all years.
# df_partd_latest = pd.read_parquet("data/part_d_all_years.parquet", filters=[("year", "==", 2022)])


# We only need to load the latest year so we can filter the partitioned data early on.
pd.set_option( 'display.max_columns', None)
df_partd.head()



Unnamed: 0,Prscrbr_NPI,Tot_Clms,Tot_Benes,Tot_Drug_Cst,year
0,1003000126,413,155,36330.41,2022
1,1003000142,1520,502,64055.5,2022
2,1003000167,104,50,737.04,2022
3,1003000423,188,64,19684.11,2022
4,1003000480,27,14,391.36,2022


In [None]:
# -- For Part D (Providers) --

# Convert column names to lowercase first
df_partd.columns = df_partd.columns.str.lower()

# Define the columns that should be numeric
cols_to_convert = ["tot_drug_cst", "tot_benes", "tot_clms"]

# Replace "NA" strings with np.nan
df_partd[cols_to_convert] = df_partd[cols_to_convert].replace("NA", np.nan)

# Convert the columns to numeric (invalid parsing will be coerced to NaN)
df_partd[cols_to_convert] = df_partd[cols_to_convert].apply(pd.to_numeric, errors="coerce")


df_partd["prscrbr_npi"] = df_partd["prscrbr_npi"].astype("string")

df_partd.dtypes
# For geographic analysis, we need zip codes. Ensure the ZIP column is a string and 5 digits.
# df_partd['rndrng_prvdr_zip5'] = df_partd['rndrng_prvdr_zip5'].astype(str).str.zfill(5)


prscrbr_npi     string[python]
tot_clms                 int64
tot_benes              float64
tot_drug_cst           float64
year                     int64
dtype: object

In [None]:
# Filter for latest year (2022)
df_partd_latest = df_partd[df_partd["year"] == 2022]

In [None]:
pd.set_option( 'display.max_rows', None)
fill_rates_df = get_fill_rates(df_partd_latest)
fill_rates_df.head()

Unnamed: 0,Fill Rate (%)
tot_benes,90.47
prscrbr_npi,100.0
tot_clms,100.0
tot_drug_cst,100.0
year,100.0


One thing to note here is that total beneficiaries are null ~10%. The dictionary states that counts fewer than 11 are suppressed so this is expected.
However, based on the problem we are solving, we could utilize tot_benes from previous years to impute these missing values.

In [None]:
df_partd_latest.describe()[["tot_benes", "tot_clms", "tot_drug_cst"]]

Unnamed: 0,tot_benes,tot_clms,tot_drug_cst
count,9047.0,10000.0,10000.0
mean,164.77,1047.87,162776.66
std,604.97,2448.64,491784.47
min,11.0,11.0,24.53
25%,33.0,54.0,1864.85
50%,82.0,194.0,11721.27
75%,205.0,855.0,116574.67
max,53538.0,76247.0,15242189.12


In [None]:
# Check uniqueness: each provider (prscrbr_npi) should appear only once in 2022.
duplicates = df_partd_latest.duplicated(subset=["prscrbr_npi", "year"])
duplicate_count = duplicates.sum()
print("Duplicate count (prscrbr_npi, year):", duplicate_count)

Duplicate count (prscrbr_npi, year): 0


### 1. Approach
1. Only relevant columns were ingested from the raw Parquet files assuming the actual data in production would be large considering all years will be ingested. This elimates the need to read other columns.
2. Filtered for year 2022 early on to manage memory, but the final question asked for YOY, hence had to load all years.
3. TO-DO - Instead of heardcoding the years, read them from the CMS catalog along with the dataset UUID.
### Cleaning and standardization
1. All columns lowercased
2. Data types were read as objects so I changed them to relevant data type for downward processing.
3. Zip Codes are at times stripped off of leading 0s, so padded the 0s.
4. Ensuring the length of NPIs, HCPCS Codes, POS are as expected.
5. Checking the Unique Keys (NPI, HCPCS_CD & POS for Part B and NPI for Part D)
6. Found missing Beneficiaries - but data dictionary confirms that is expected since <11 counts were suppressed. These can be imputed based on other years values.
7. TO-DO - I saw other columns had weird values ("*", "NA") - but we're not using these columns so did not read these columns or did a QA on them. But this is something I'd like to do is check all important/ relevant columns to find any other weird values and nullify them. One way is to order the columns (asc and desc) and check for values that might be a different data type.
    a. Do all states use valid 2-letter abbreviations?
    b. Entity Codes expected?
8. TO - DO - Histograms/ KDE plots/ Boxplots can showcase the distribution of data; helps to figure out skewness/ outliers.



### 2. Top 5 Part B Providers per zipcode by total services rendered

In [None]:
def get_top_n_providers_by_zip(df, top_n: int, zip_col='rndrng_prvdr_zip5', 
                                provider_npi='rndrng_npi', 
                                services_col='tot_srvcs'):
    """
    Returns the top N providers per ZIP code based on total services rendered.

    Parameters:
        df (pd.DataFrame): The input dataframe.
        zip_col (str): Column name for ZIP code.
        provider_npi (str): Column name for Provider NPI.
        services_col (str): Column name for total services rendered.
        top_n (int): Number of top providers to return per ZIP.

    Returns:
        pd.DataFrame: Ranked providers per ZIP code.
    """

    # Aggregate: total services per provider per zip
    agg_df = (
        df.groupby([zip_col, provider_npi], as_index=False)[services_col]
        .sum()
    )

    # Rank within each ZIP
    agg_df['rank'] = (
        agg_df.groupby(zip_col)[services_col]
        .rank(method='dense', ascending=False)
    )

    # Filter top N
    top_df = agg_df[agg_df['rank'] <= top_n].sort_values([zip_col, 'rank'])
    top_df = top_df.drop(columns=['rank'])

    return top_df

top_n_providers = get_top_n_providers_by_zip(df_partb_latest, top_n=5)
top_n_providers.head()

Unnamed: 0,rndrng_prvdr_zip5,rndrng_npi,tot_srvcs
0,725,1003008517,765.0
2,907,1003049289,660.0
1,907,1003016775,178.0
3,966,1003023813,72.0
4,1007,1003062647,845.0


### 3. Calculate the min, max, and median total drug cost per beneficiary for each provider, and rank by the median
#### Assumption - The Part D medicare data is on a provider level, so that means that the min, max and median are all going to be the same value for a given year (2022). Given the question I am assuming that we are calculating this accross all years (not just 2022), which means that the provider would appear multiple times (once per year), and we are calculating the mix, max and median accross years for that provider.

In [None]:
# Assume df_partd_latest contains data for multiple years (not just 2022)
df_all = df_partd_latest.copy()

# Compute cost per beneficiary for each row (handling division by zero)
df_all["drug_cost_per_bene"] = df_all["tot_drug_cst"] / df_all["tot_benes"].replace(0, np.nan)

# Group by provider (prscrbr_npi) across all years and calculate the min, max, and median cost per beneficiary.
# assign rank to the median cost and sort by the rank
agg_df = (
    df_all.groupby("prscrbr_npi", as_index=False)
    .agg(
        min_cost=("drug_cost_per_bene", "min"),
        max_cost=("drug_cost_per_bene", "max"),
        median_cost=("drug_cost_per_bene", "median")
    )
    .assign(rank=lambda x: x["median_cost"].rank(method="dense", ascending=False))
    .sort_values("rank")
    .reset_index(drop=True)
)

print("Top providers by median drug cost per beneficiary (across all years):")
agg_df.head(10)

Top providers by median drug cost per beneficiary (across all years):


Unnamed: 0,prscrbr_npi,min_cost,max_cost,median_cost,rank
0,1003206392,69292.24,69292.24,69292.24,1.0
1,1003101429,47931.41,47931.41,47931.41,2.0
2,1003103383,47016.7,47016.7,47016.7,3.0
3,1003222647,43018.18,43018.18,43018.18,4.0
4,1003824210,42371.35,42371.35,42371.35,5.0
5,1003172529,37003.85,37003.85,37003.85,6.0
6,1003823535,35495.45,35495.45,35495.45,7.0
7,1003830399,35227.95,35227.95,35227.95,8.0
8,1003027764,32511.32,32511.32,32511.32,9.0
9,1003171778,31975.22,31975.22,31975.22,10.0


### 4. Joining the two data sets

#### Assumptions -
1. For this problem, I am assuming that we are interested in providers that have serviced both Part B and Part D beneficiaries, which is why I will be using an INNER JOIN for joining the two data sets. It is possible that we might not see all providers present in part B in Part D and vice-versa because it could be possible that the beneficiaries only have part A or part D coverages.
2. I would have used an OUTER JOIN if we were interested in using fields from both the datasets in a metric for evey single provider. ALso, I would have addresses any missing values. For example, I might assume that if a provider is missing from Part D, then part_d_beneficiaries and part_d_services are 0.
3. Also, Part B is at the NPI, HCPCS CD and POS level, while Part D is at the NPI level, which means NPI in Part B will repeat based on hcpcs and place of service. Hence, I will aggregate the Part B dataset before joining it with Part D.

In [None]:
# Use only necessary columns for the final output
df_partb_filtered = df_partb_latest[["rndrng_npi", "tot_srvcs", "tot_benes"]].copy()
df_partd_filtered = df_partd_latest[["prscrbr_npi", "tot_clms", "tot_benes"]].copy()

agg_partb = df_partb_filtered.groupby(["rndrng_npi"], as_index=False).agg(
    part_b_services=("tot_srvcs", "sum"),
    part_b_beneficiaries=("tot_benes", "sum")
)
# Rename 'rndrng_npi' to 'npi' for consistency.
agg_partb = agg_partb.rename(columns={"rndrng_npi": "npi"})
agg_partd = df_partd_filtered.rename(columns={"prscrbr_npi": "npi",
                                   "tot_clms": "part_d_services",
                                    "tot_benes": "part_d_beneficiaries"})


# Use a full outer join to include all providers nationally.
# This ensures that providers appearing in only one dataset are also included.
combined_provider = pd.merge(agg_partb, agg_partd, on=["npi"], how="outer")

# For providers missing in one dataset, fill missing numeric values with 0.
fill_cols = ["part_b_services", "part_b_beneficiaries", "part_d_services", "part_d_beneficiaries"]
combined_provider[fill_cols] = combined_provider[fill_cols].fillna(0)


# Total services rendered per provider per year across both parts.
combined_provider["total_services"] = combined_provider["part_b_services"] + combined_provider["part_d_services"]

# Calculate the percentage of services coming from Part B and Part D.
# avoid division by zero.
combined_provider["part_b_services_percent"] = np.where(
    combined_provider["total_services"] > 0,
    (combined_provider["part_b_services"] / combined_provider["total_services"]) * 100,
    0
)
combined_provider["part_d_services_percent"] = np.where(
    combined_provider["total_services"] > 0,
    (combined_provider["part_d_services"] / combined_provider["total_services"]) * 100,
    0
)

# Calculate percentiles for total services
combined_provider["percentile_total_services"] = (
    combined_provider["total_services"].rank(method="dense", pct=True) * 100
)

combined_provider.head(10)

Unnamed: 0,npi,part_b_services,part_b_beneficiaries,part_d_services,part_d_beneficiaries,total_services,part_b_services_percent,part_d_services_percent,percentile_total_services
0,1003000126,837.0,624.0,413.0,155.0,1250.0,66.96,33.04,37.06
1,1003000134,6760.0,3984.0,0.0,0.0,6760.0,100.0,0.0,87.47
2,1003000142,1531.0,985.0,1520.0,502.0,3051.0,50.18,49.82,64.3
3,1003000167,0.0,0.0,104.0,50.0,104.0,0.0,100.0,3.19
4,1003000423,76.0,76.0,188.0,64.0,264.0,28.79,71.21,8.63
5,1003000480,104.0,101.0,27.0,14.0,131.0,79.39,20.61,4.11
6,1003000530,1183.0,850.0,6655.0,465.0,7838.0,15.09,84.91,90.52
7,1003000597,2736.0,2172.0,1899.0,634.0,4635.0,59.03,40.97,77.92
8,1003000639,79.0,79.0,58.0,0.0,137.0,57.66,42.34,4.31
9,1003000704,105.0,100.0,0.0,0.0,105.0,100.0,0.0,3.23


##### Percentage change in services from previous year (percent_change_total_services_prev)
Had to do this separately, because all other metrics were based on the latest year 2022.

In [None]:
# Aggregate Part B
agg_partb = (
    df_partb
    .groupby(["rndrng_npi", "year"], as_index=False)["tot_srvcs"]
    .sum()
    .rename(columns={"rndrng_npi": "npi", "tot_srvcs": "part_b_services"})
)

# Aggregate Part D
agg_partd = (
    df_partd
    .groupby(["prscrbr_npi", "year"], as_index=False)["tot_clms"]
    .sum()
    .rename(columns={"prscrbr_npi": "npi", "tot_clms": "part_d_services"})
)

# Merge & calculate
combined_year = (
    pd.merge(agg_partb, agg_partd, on=["npi", "year"], how="outer")
    .fillna({"part_b_services": 0, "part_d_services": 0})
)

combined_year["total_services"] = (
    combined_year["part_b_services"] + combined_year["part_d_services"]
)

combined_year = combined_year.sort_values(["npi", "year"])
combined_year["percent_change_total_services_prev"] = (
    combined_year.groupby("npi")["total_services"].pct_change() * 100
)

In [None]:
combined_year.head(10)

Unnamed: 0,npi,year,part_b_services,part_d_services,total_services,percent_change_total_services_prev
0,1003000126,2013,1607.0,363.0,1970.0,
1,1003000126,2014,2728.0,675.0,3403.0,72.74
2,1003000126,2015,2751.0,825.0,3576.0,5.08
3,1003000126,2016,1450.0,545.0,1995.0,-44.21
4,1003000126,2017,1637.0,677.0,2314.0,15.99
5,1003000126,2018,1192.0,492.0,1684.0,-27.23
6,1003000126,2019,1367.0,589.0,1956.0,16.15
7,1003000126,2020,738.0,324.0,1062.0,-45.71
8,1003000126,2021,3678.0,1123.0,4801.0,352.07
9,1003000126,2022,837.0,413.0,1250.0,-73.96
