In [None]:
import time

# Timer
start = time.time()

## Initial setup
Create a new Spark SQL session or get the session if it already exists.

In [2]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CMS-Refactor").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
spark

24/12/12 13:47:00 WARN Utils: Your hostname, Akilan-Yoga resolves to a loopback address: 127.0.1.1; using 172.17.55.5 instead (on interface eth0)
24/12/12 13:47:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/12 13:47:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/12 13:47:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Paths for required files
All the public files (mentioned in **bold**) which are downloaded from the public sources are under `Data/Public`. 
All the private files (mentioned in *italic*) which will be used to filter the large datasets are under `Data/Private`. 
#### Required files
- **CMS B** - `Medicare_Physician_Other_Practitioners_by_Provider_and_Service_2022.csv`
- **CMS D** - `MUP_DPR_RY24_P04_V10_DY22_NPIBN.csv`
- **Open Payments** - `OP_DTL_GNRL_PGYR2023_P06282024_06122024.csv`
- **NPPES** - `npidata_pfile_20050523-20241110.csv`
- **NUCC Taxanomy** - `nucc_taxonomy_241.csv`
- *Unique HCPCS* - `cms_b_unique_hcpcs.csv`
- *CMS D Generic Names* - `cms_d_gnrc_name.csv`
- *Openpay Drug Mappings* - `openpay_drug_mappings.csv`
- *Speciality Mappings* - `speciality_mappings.csv`

In [3]:
public_dir = "Data/Public"
private_workbook = "Data/Private/filtering_info.xlsx"

public_files = {
    "cms_b": "Medicare_Physician_Other_Practitioners_by_Provider_and_Service_2022.csv",
    "cms_d": "MUP_DPR_RY24_P04_V10_DY22_NPIBN.csv",
    "openpay": "OP_DTL_GNRL_PGYR2023_P06282024_06122024.csv",
    "nppes": "npidata_pfile_20050523-20241110.csv",
    "nucc": "nucc_taxonomy_241.csv"
}

private_sheets = {
    "cms_b": 0,
    "cms_d": 1,
    "openpay": 2,
    "taxonomy": 3
}

## CMS-B Data
The CMS-B dataset contains information about various medicare practitioners - by provider and service. 
This data has to be processed to get the information about the total beneficiaries and total claims relating to the drugs of interest.

### Filtering the data
The CMS-B dataset contains information about numerous Medicare procedures, supplies, and other services. 
The dataset can be filtered based on two criteria.
- Interested drug or service (Based on **HCPCS Code**)
- Interested area (Based on **NPI**)

In [4]:
def shape(dataframe: pyspark.sql.DataFrame):
    """
    Returns the shape of the dataframe as a tuple.
    """
    return (dataframe.count(), len(dataframe.columns))

# Read the CMS-B dataset
cms_b = spark.read.csv(
    f"{public_dir}/{public_files['cms_b']}",
    header = True,
    inferSchema=True
)

# Filter based on interested drug or service
interested_drugs = spark.createDataFrame(
    pd.read_excel(
        private_workbook,
        sheet_name=private_sheets['cms_b']
    )   
)
cms_b_filtered = cms_b.join(
    interested_drugs, 
    'HCPCS_Cd', 
    'inner'
)
print(f"Shape after filtering based on drugs: {shape(cms_b_filtered)}")



Shape after filtering based on drugs: (3915, 31)


                                                                                

### Select the required columns
The CMS-B dataset contains a lot of information which is not required for this project.
Only a few columns are required for this project.
- Rndrng_NPI
- BRAND
- Tot_Benes
- Tot_Srvcs

In [5]:
# Select the required columns
cms_b_selected = cms_b_filtered.select(['Rndrng_NPI', 'BRAND', 'Tot_Benes', 'Tot_Srvcs'])
cms_b_selected.columns

['Rndrng_NPI', 'BRAND', 'Tot_Benes', 'Tot_Srvcs']

### Rename the columns
The columns are renamed for better understandability.
- Rndrng_NPI --> NPI
- BRAND --> Drug_Name
- Tot_Benes (*No changes*)
- Tot_Srvcs --> Tot_Clms

In [6]:
# Rename the columns
cms_b_renamed = (
    cms_b_selected
    .withColumnRenamed("Rndrng_NPI", "NPI")
    .withColumnRenamed("BRAND", "Drug_Name")
    .withColumnRenamed("Tot_Srvcs", "Tot_Clms")
)
cms_b_renamed.columns

['NPI', 'Drug_Name', 'Tot_Benes', 'Tot_Clms']

### Restructuring the data
- The **Tot_Clms** field is in float, but the total claims should be an integer. The **Tot_Clms** field is converted into an integer.
- Define a function to check for null values, which can be reused whenever there is a need to check for null values in the future. Check for null values.
- The data is grouped based on the **Drug_Name** and the **Tot_Benes** & **Tot_Clms** are summed.

In [7]:
# Change the data type of Tot_Clms to 'int'
cms_b_corrected = cms_b_renamed.withColumn(
    "Tot_Clms",
    cms_b_renamed.Tot_Clms.cast('int')
)
cms_b_corrected.printSchema()

root
 |-- NPI: integer (nullable = true)
 |-- Drug_Name: string (nullable = true)
 |-- Tot_Benes: integer (nullable = true)
 |-- Tot_Clms: integer (nullable = true)



In [8]:
from pyspark.sql.functions import *

def is_null(dataframe: pyspark.sql.DataFrame):
    """
    Checks for null values in every field of the dataframe and prints the count. 
    """
    return dataframe.select(
        [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataframe.columns]
    )

In [9]:
cms_b_grouped = cms_b_corrected.groupby("Drug_Name").agg(
    sum("Tot_Benes").alias("Tot_Benes"),
    sum("Tot_Clms").alias("Tot_Clms")
)
cms_b_grouped.show()



+---------+---------+--------+
|Drug_Name|Tot_Benes|Tot_Clms|
+---------+---------+--------+
|  STELARA|      224|   34310|
|  ENTYVIO|     9307| 6101149|
|RENFLEXIS|     2156|  329964|
|  SIMPONI|    28219|17031175|
| REMICADE|    32759| 5713699|
|   CIMZIA|    21819|51914531|
|INFLECTRA|     1274|  188303|
|   AVSOLA|      342|   50012|
+---------+---------+--------+



                                                                                

## CMS-D Data
The CMS-D dataset contains information about various medicare prescribers - by provider and drug. 
This data has to be processed to get the information about the total beneficiaries and total claims relating to the drugs of interest.

### Filtering the data
The CMS-D dataset can be filtered based on,
- Brand name
- Interested speciality

Before starting to process the data, only the required columns are selected.
- Prscrbr_NPI
- Brnd_Name
- Gnrc_Name
- Tot_Clms
- Tot_Benes

Once the required columns are selected, all the values in the **Brnd_Name** column is converted into lowercase.
This conversion will ensure that the brand names match with the brand names in the file containing information about the drugs of interest.

In [10]:
# Load the CMS Part-D data
cms_d = spark.read.csv(
    f"{public_dir}/{public_files['cms_d']}",
    header=True,
    inferSchema=True
)

# Select the required columns
cms_d_selected = cms_d.select(["Prscrbr_NPI", "Brnd_Name", "Gnrc_Name", "Tot_Clms", "Tot_Benes"])

# Convert the 'Brnd_Name' field to lowercase
cms_d_lower = cms_d_selected.withColumn(
    "Brnd_Name",
    lower(cms_d_selected["Brnd_Name"])
)

                                                                                

### Filter the data
This data can be filtered based on two criteria:
- Brand name (*Given in a separate file*)
- Area (*Given in speciality_mappings*)

In [11]:
# Filter the data based on drug
interested_drugs = spark.createDataFrame(
    pd.read_excel(
        private_workbook,
        sheet_name=private_sheets['cms_d']
    )
)
cms_d_filtered = cms_d_lower.join(
    interested_drugs,
    "Brnd_Name",
    "inner"
)
print(f"Shape after filtering based on drugs: {shape(cms_d_filtered)}")



Shape after filtering based on drugs: (24285, 7)


                                                                                

### Select the required columns
Redundant and unwanted columns will exist after joining.
Only a few columns are required for this project.
- Prscrbr_NPI
- BRAND
- Tot_Benes
- Tot_Clms

In [12]:
cms_d_selected = cms_d_filtered.select(["Prscrbr_NPI", "BRAND", "Tot_Benes", "Tot_Clms"])
cms_d_selected.columns

['Prscrbr_NPI', 'BRAND', 'Tot_Benes', 'Tot_Clms']

### Rename the columns
The columns are renamed for better understandability.
- Prscrbr_NPI --> NPI
- BRAND --> Drug_Name
- Tot_Benes (*No changes*)
- Tot_Srvcs (*No changes*)

In [13]:
# Rename the columns
cms_d_renamed = (
    cms_d_selected
    .withColumnRenamed("Prscrbr_NPI", "NPI")
    .withColumnRenamed("BRAND", "Drug_Name")
)
cms_d_renamed.columns

['NPI', 'Drug_Name', 'Tot_Benes', 'Tot_Clms']

### Check for null values
Check for null vlaues in every field of the data frame and count the number of null values in each field.
All the null values are filled with 0.

In [14]:
# Check for null values
is_null(cms_d_renamed).toPandas()

# Replace null values with 0
cms_d_full = cms_d_renamed.fillna(
    value = 0,
    subset = "Tot_Benes"
)

                                                                                

### Group by drug name
The data is grouped based on the drug name and the values in the fields **Tot_Benes** & **Tot_Clms** are aggregated.

In [18]:
cms_d_grouped = cms_d_full.groupby("Drug_Name").agg(
    sum("Tot_Benes").alias("Tot_Benes"),
    sum("Tot_Clms").alias("Tot_Clms")
)
cms_d_grouped.show()



+---------+---------+--------+
|Drug_Name|Tot_Benes|Tot_Clms|
+---------+---------+--------+
|   CIMZIA|      189|   25341|
|   HUMIRA|    22531|  532317|
|  SIMPONI|      217|   18305|
|RENFLEXIS|        0|     335|
|INFLECTRA|       76|    1464|
| REMICADE|      338|    7148|
|  SKYRIZI|      413|   11756|
|  STELARA|     2715|   55854|
|  ZEPOSIA|        0|    2253|
|  ENTYVIO|        0|    3427|
+---------+---------+--------+



                                                                                

## Combining CMS-B and CMS-D
This block processes CMS data by merging two datasets (`cms_b_corrected` and `cms_d_full`) into `cms_combined` and grouping it by `NPI` and `Drug_Name` to aggregate total beneficiaries (`Tot_Benes`) and claims (`Tot_Clms`). The grouped data is pivoted on `Drug_Name` to create a wide-format table, `cms_pivot`, with sums for each drug. Missing values are filled with zeros, and null values are checked.

In [19]:
cms_combined = cms_b_corrected.union(cms_d_full)
cms_grouped = cms_combined.groupBy("NPI", "Drug_Name").agg(
    sum("Tot_Benes").alias("Tot_Benes"),
    sum("Tot_Clms").alias("Tot_Clms")
)
cms_pivot = (
    cms_grouped
    .groupBy("NPI")
    .pivot("Drug_Name")
    .sum("Tot_Benes", "Tot_Clms")
)
cms_pivot = cms_pivot.fillna(value=0)

                                                                                

## Open Payments Data
Only required columns are selected from the *Open Payments* data and the records containing **NULL** values in `Covered_Recipient_NPI` are dropped.
The data is filtered based on interested drugs and then columns are renamed for convenience. 
The data is pivoted on `Nature_Of_Payment` and `Covered_Recipient_NPI`.

In [20]:
# Open open payments data and select only the required fields
open_payments = spark.read.csv(
    f"{public_dir}/{public_files['openpay']}",
    header=True,
    inferSchema=True
)
open_payments_selected = open_payments.select([
    "Covered_Recipient_NPI",
    "Total_Amount_of_Payment_USDollars",
    "Date_of_Payment",
    "Number_of_Payments_Included_in_Total_Amount",
    "Form_of_Payment_or_Transfer_of_Value",
    "Nature_of_Payment_or_Transfer_of_Value",
    "Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_1"
])

# Drop entries with null values
open_payments_dropped = open_payments_selected.dropna(subset="Covered_Recipient_NPI")

                                                                                

In [21]:
# Filtering based on interested drugs
openpay_intd_drugs = spark.createDataFrame(
    pd.read_excel(
        private_workbook,
        sheet_name=private_sheets['openpay']
    )
)
open_payments_filtered = open_payments_dropped.join(
    openpay_intd_drugs,
    open_payments_dropped.Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_1 == openpay_intd_drugs.Drug_Name,
    "inner"
)

In [22]:
# Drop unwanted fields and rename categories
open_payments_filtered = open_payments_filtered.drop("Drug_Name")

open_payments_mapped = open_payments_filtered.withColumn(
    "Nature_of_Payment",
    when(col("Nature_of_Payment_or_Transfer_of_Value") == "Food and Beverage", "FOOD&BEVERAGE")
    .when(col("Nature_of_Payment_or_Transfer_of_Value") == "Consulting Fee", "CONSULTING")
    .when(col("Nature_of_Payment_or_Transfer_of_Value") == "Travel and Lodging", "TRAVEL")
    .when(col("Nature_of_Payment_or_Transfer_of_Value") == "Education", "EDUCATION")
    .when(col("Nature_of_Payment_or_Transfer_of_Value").rlike("Compensation"), "SPEAKER")
    .otherwise("OTHERS_GENERAL")
)

In [23]:
# Convert 'Total_Amount_of_Payment_USDollars' to 'int'
open_payments_casted = open_payments_mapped.withColumn(
    "Total_Amount_of_Payment_USDollars",
    open_payments_mapped.Total_Amount_of_Payment_USDollars.cast('int')
)

# Pivot 'Nature_of_Payment' and fill null values with 0
open_payments_pivot_1 = (
    open_payments_casted
    .groupBy("Covered_Recipient_NPI", "Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_1")
    .pivot("Nature_of_Payment")
    .sum("Total_Amount_of_Payment_USDollars")
    .fillna(value=0)
)

# Pivot 'Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_1'
open_payments_pivot_2 = (
    open_payments_pivot_1
    .groupBy("Covered_Recipient_NPI")
    .pivot("Name_of_Drug_or_Biological_or_Device_or_Medical_Supply_1")
    .sum('CONSULTING','EDUCATION','FOOD&BEVERAGE','OTHERS_GENERAL','SPEAKER','TRAVEL')
)

                                                                                

## Merging CMS and OpenPayments
The *CMS* and *Open Payments* data are combined using an outer join on `NPI`.
The columns `Covered_Recipient_NPI` and `NPI` are combined since they both contain NPIs and the null values are filled with 0.

In [24]:
# Outer join based on 'NPI'
openpay_cms = open_payments_pivot_2.join(
    cms_pivot,
    open_payments_pivot_2.Covered_Recipient_NPI == cms_pivot.NPI,
    "outer"
)

# Combine both the NPI fields
openpay_cms_combined_npi = openpay_cms.withColumn(
    "Covered_Recipient_NPI",
    coalesce(
        openpay_cms['Covered_Recipient_NPI'],
        openpay_cms['NPI']
    )
)

# Drop the duplicate NPI field and fill null values with 0
openpay_cms_drop_NPI = openpay_cms_combined_npi.drop("NPI")
openpay_cms_final = openpay_cms_drop_NPI.fillna(value=0)

## NPPES Data
Only required columns are selected from the *NPPES* data and the `Provider Last Name (Legal Name)` renamed to `Provider Last Name` for convenience.

In [25]:
nppes = spark.read.csv(
    f"{public_dir}/{public_files['nppes']}",
    header=True,
    inferSchema=True
)

# Select only the required columns and rename the 'Last Name' column
nppes_selected = nppes.select(
    'NPI', 
    'Provider Last Name (Legal Name)', 
    'Provider First Name', 
    'Provider Middle Name', 
    'Provider First Line Business Mailing Address', 
    'Provider Business Mailing Address City Name', 
    'Provider Business Mailing Address State Name', 
    'Provider Business Mailing Address Postal Code',
    'Healthcare Provider Taxonomy Code_1',
    'Healthcare Provider Taxonomy Code_2',
    'Healthcare Provider Taxonomy Code_3'
)
nppes_selected = nppes_selected.withColumnRenamed(
    'Provider Last Name (Legal Name)',
    'Provider Last Name'
)

                                                                                

## Taxonomy Data
Combine NPPES provider data with NUCC taxonomy data to enrich the provider records with classification and specialization details. The datasets are joined using taxonomy codes, and the resulting data includes up to three levels of taxonomy classifications and specializations, with unnecessary columns removed after each join.

In [26]:
# Read the taxonomy data
nucc = spark.read.csv(
    f"{public_dir}/{public_files['nucc']}",
    header=True,
    inferSchema=True
)

# Merging NUCC and NPPES data
nppes_nucc = nppes_selected.join(
    nucc,
    nppes_selected["Healthcare Provider Taxonomy Code_1"] == nucc.Code,
    'left'
)
nppes_nucc = nppes_nucc.drop('Code', 'grouping', 'display_name')
nppes_nucc = nppes_nucc.withColumnRenamed("classification", "Primary_Classification")
nppes_nucc = nppes_nucc.withColumnRenamed("specialization", "Primary_Specialization")

nppes_nucc = nppes_nucc.join(
    nucc,
    nppes_nucc["Healthcare Provider Taxonomy Code_2"] == nucc.Code,
    'left'
)
nppes_nucc = nppes_nucc.drop('Code', 'grouping', 'display_name')
nppes_nucc = nppes_nucc.withColumnRenamed("classification", "Secondary_Classification")
nppes_nucc = nppes_nucc.withColumnRenamed("specialization", "Secondary_Specialization")

nppes_nucc = nppes_nucc.join(
    nucc,
    nppes_nucc["Healthcare Provider Taxonomy Code_3"] == nucc.Code,
    'left'
)
nppes_nucc = nppes_nucc.drop('Code', 'grouping', 'display_name')
nppes_nucc = nppes_nucc.withColumnRenamed("classification", "Tertiary_Classification")
nppes_nucc = nppes_nucc.withColumnRenamed("specialization", "Tertiary_Specialization")

### Filtering and Cleaning
1. **Filter by Taxonomy Codes**:
   - Taxonomy codes of interest are loaded from the file and extracted into a list.
   - The NPPES dataset is filtered to include only records where at least one of the three taxonomy codes matches the required codes.
2. **Drop Unwanted Columns**:
   - After filtering, unnecessary columns such as taxonomy codes and intermediate fields (`Code`, `grouping`, `display_name`) are removed.
3. **Resulting Dataset**:
   - The final dataset includes only the relevant providers and retains essential classification and specialization details.

In [27]:
# Filter only the interested area based on the taxonomy codes
tax_codes = spark.createDataFrame(
    pd.read_excel(
        private_workbook,
        sheet_name=private_sheets['taxonomy']
    )
)
required_codes = list(
    map(
        lambda obj: obj.Code,
        tax_codes.select('Code').collect()
    )
)

# Only filter if taxonomy codes are given
if len(required_codes) > 0:
    tax_code_cond = (
        (nppes_nucc["Healthcare Provider Taxonomy Code_1"].isin(required_codes)) 
        | (nppes_nucc["Healthcare Provider Taxonomy Code_2"].isin(required_codes)) 
        | (nppes_nucc["Healthcare Provider Taxonomy Code_3"].isin(required_codes))
    )
    nppes_nucc_filtered = nppes_nucc.filter(tax_code_cond)
else:
    nppes_nucc_filtered = nppes_nucc

# Drop unwanted columns
nppes_nucc_dropped = nppes_nucc_filtered.drop(*[
    'Code',
    'grouping',
    'display_name',
    'Healthcare Provider Taxonomy Code_1',
    'Healthcare Provider Taxonomy Code_2',
    'Healthcare Provider Taxonomy Code_3'
])

## Merging with CMS
1. **Merge with CMS Data**:  
   - The filtered NPPES dataset is joined with CMS data on the `NPI` and `Covered_Recipient_NPI` columns to combine relevant information.
   - The duplicate `Covered_Recipient_NPI` column is dropped after the join.
2. **Rename Columns**:
   - Columns containing `Tot_Benes` are renamed to replace it with `Patients`.
   - Columns containing `Tot_Clms` are renamed to replace it with `Claims`.
   - Columns with aggregated function names (e.g., `sum(`) have the function name and parentheses removed for cleaner naming.

The resulting dataset is saved to `final_out.csv`.

In [28]:
# Merge with CMS data and drop the duplicate NPI column
final_joined = nppes_nucc_dropped.join(
    openpay_cms_final,
    nppes_nucc_dropped.NPI == openpay_cms_final.Covered_Recipient_NPI,
    'inner'
)
final = final_joined.drop("Covered_Recipient_NPI")

# Renaming the columns
for col in final.columns:
    if "Tot_Benes" in col:
        final=final.withColumnRenamed(col,col.replace("Tot_Benes","Patients"))
    if "Tot_Clms" in col:
        final=final.withColumnRenamed(col,col.replace("Tot_Clms","Claims"))

for col in final.columns:
    if "sum(" in col:
        final=final.withColumnRenamed(col,col.replace("sum(","").replace(")",""))
        
final.toPandas().to_csv("final_out.csv")
shape(final)

                                                                                

(10196, 72)

In [29]:
# Timer
end = time.time()
print(f"Completed execution in {end - start} seconds.")

Completed execution in 839.5360860824585 seconds.


In [30]:
shape(cms_b_corrected), shape(cms_d_full)

                                                                                

((3915, 4), (24285, 4))

In [31]:
shape(open_payments_pivot_2)

                                                                                

(44338, 25)