##

## Read CMS Provider Drug Cost flat table (Silver Layer) and populate Star Schema Tables (Gold Layer)

This is Notebook is the second step in the sample, it reads the *cms_provider_drug_costs" Delta Parquet table created in the first step as input and generates star schema tables as output.

In [11]:
%run Utils

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 15, Finished, Available, Finished)

In [12]:
config_dict = get_config_dict()

cms_cost_table_name = "cms_provider_drug_costs"
cms_cost_table_dir = f"Tables/{cms_cost_table_name}"
cms_cost_table_dir_full_path =  get_full_abfss_path(config_dict['workspace_id'], config_dict['lakehouse_id'], cms_cost_table_dir)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 16, Finished, Available, Finished)

In [13]:
cms_provider_drug_costs_df = spark.read.format("delta").load(cms_cost_table_dir_full_path)
cms_provider_drug_costs_df.createOrReplaceTempView("cms_provider_drug_costs")

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 17, Finished, Available, Finished)

In [14]:
#dim_year_df = spark.sql("SELECT DISTINCT Year, CONCAT(CAST(Year AS String), '-01-01') AS Year_Date_Key FROM CMSDemo-062025.cms_lakehouse2.cms_provider_drug_costs")
dim_year_df = spark.sql(f"SELECT DISTINCT Year, CONCAT(CAST(Year AS String), '-01-01') AS Year_Date_Key FROM {cms_cost_table_name}")
display(dim_year_df)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7b6216d8-354b-4c16-a32c-eb3db87313cb)

In [15]:
from pyspark.sql.types import DateType
from pyspark.sql.functions import col


dim_year_df = dim_year_df.withColumn("Year_Date_Key", col('Year_Date_Key').cast(DateType()))
display(dim_year_df)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 01c2dbce-21c9-4c51-923b-eb4839e36634)

In [16]:
dim_year_table_name = "cms_provider_dim_year"
dim_year_table_dir = f"Tables/{dim_year_table_name}" 
dim_year_table_full_path = get_full_abfss_path(config_dict['workspace_id'], config_dict['lakehouse_id'], dim_year_table_dir)
print(f'dim_year_table_full_path: {dim_year_table_full_path}')
dim_year_df.write.mode("overwrite").save(dim_year_table_full_path)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 20, Finished, Available, Finished)

dim_year_table_full_path: abfss://c8f59358-83d0-4711-8b57-ebc3c414b0a1@onelake.dfs.fabric.microsoft.com/93c691f7-1409-44ef-ab45-fa3a3fb74289/Tables/cms_provider_dim_year


In [17]:
dim_geo_df = spark.sql(f'''SELECT Prscrbr_City, Prscrbr_City_State, Prscrbr_State_Abrvtn, Prscrbr_State_FIPS, MAX(Year) AS Max_Year, MIN(Year) AS Min_Year,
    row_number() OVER (ORDER BY Prscrbr_State_Abrvtn,Prscrbr_City_State ASC) AS geo_key
    FROM {cms_cost_table_name}
    GROUP BY Prscrbr_City,Prscrbr_City_State,Prscrbr_State_Abrvtn,Prscrbr_State_FIPS ''')

#print(dim_geo_df.count())
display(dim_geo_df)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 64b2cb56-c898-4ca4-88a2-5064753ce3c5)

In [18]:
dim_geo_table_name = "cms_provider_dim_geography"
dim_geo_table_dir = f"Tables/{dim_geo_table_name}" 
dim_geo_table_full_path = get_full_abfss_path(config_dict['workspace_id'], config_dict['lakehouse_id'], dim_geo_table_dir)
print(f'dim_geo_table_full_path: {dim_geo_table_full_path}')

dim_geo_df.write.mode("overwrite").save(dim_geo_table_full_path)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 22, Finished, Available, Finished)

dim_geo_table_full_path: abfss://c8f59358-83d0-4711-8b57-ebc3c414b0a1@onelake.dfs.fabric.microsoft.com/93c691f7-1409-44ef-ab45-fa3a3fb74289/Tables/cms_provider_dim_geography


In [19]:
dim_provider_df = spark.sql(f'''SELECT Prscrbr_First_Name
,Prscrbr_Full_Name
,Prscrbr_Last_Org_Name
,Prscrbr_NPI
,Prscrbr_Type
,Prscrbr_Type_Src
,MAX(Year) AS Max_Year
,MIN(Year) AS Min_Year
,row_number() OVER (ORDER BY Prscrbr_Full_Name,Prscrbr_NPI,Prscrbr_Type,Prscrbr_Type_Src ASC) AS provider_key
FROM {cms_cost_table_name}
GROUP BY Prscrbr_First_Name,Prscrbr_Full_Name,Prscrbr_Last_Org_Name,Prscrbr_NPI,Prscrbr_Type,Prscrbr_Type_Src''')

#print(dim_provider_df.count())
#display(dim_provider_df)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 23, Finished, Available, Finished)

In [21]:
dim_prov_table_name = "cms_provider_dim_provider"
dim_prov_table_dir = f"Tables/{dim_prov_table_name}" 
dim_prov_table_full_path = get_full_abfss_path(config_dict['workspace_id'], config_dict['lakehouse_id'], dim_prov_table_dir)
print(f'dim_prov_table_full_path: {dim_prov_table_full_path}')


dim_provider_df.write.mode("overwrite").save(dim_prov_table_full_path)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 25, Finished, Available, Finished)

dim_prov_table_full_path: abfss://c8f59358-83d0-4711-8b57-ebc3c414b0a1@onelake.dfs.fabric.microsoft.com/93c691f7-1409-44ef-ab45-fa3a3fb74289/Tables/cms_provider_dim_provider


In [22]:
dim_drug_df = spark.sql(f'''SELECT Brnd_Name
,Gnrc_Name
,MAX(Year) AS Max_Year
,MIN(Year) AS Min_Year
,row_number() OVER (ORDER BY Brnd_Name,Gnrc_Name ASC) AS drug_key
FROM {cms_cost_table_name}
GROUP BY Brnd_Name, Gnrc_Name''')

#print(dim_drug_df.count())
#display(dim_drug_df)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 26, Finished, Available, Finished)

In [23]:
dim_drug_table_name = "cms_provider_dim_drug"
dim_drug_table_dir = f"Tables/{dim_drug_table_name}" 
dim_drug_table_full_path = get_full_abfss_path(config_dict['workspace_id'], config_dict['lakehouse_id'], dim_drug_table_dir)
print(f'dim_drug_table_full_path: {dim_drug_table_full_path}')

dim_drug_df.write.mode("overwrite").save(dim_drug_table_full_path)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 27, Finished, Available, Finished)

dim_drug_table_full_path: abfss://c8f59358-83d0-4711-8b57-ebc3c414b0a1@onelake.dfs.fabric.microsoft.com/93c691f7-1409-44ef-ab45-fa3a3fb74289/Tables/cms_provider_dim_drug


In [24]:
dim_drug_df.createOrReplaceTempView(dim_drug_table_name)
dim_geo_df.createOrReplaceTempView(dim_geo_table_name)
dim_provider_df.createOrReplaceTempView(dim_prov_table_name)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 28, Finished, Available, Finished)

In [25]:
drug_costs_star_df = spark.sql(f'''SELECT GE65_Bene_Sprsn_Flag
,GE65_Sprsn_Flag
,GE65_Tot_30day_Fills
,GE65_Tot_Benes
,GE65_Tot_Clms
,GE65_Tot_Day_Suply
,GE65_Tot_Drug_Cst
,Tot_30day_Fills
,Tot_Benes
,Tot_Clms
,Tot_Day_Suply
,Tot_Drug_Cst
,Year
,b.drug_key
,c.geo_key
,d.provider_key
FROM {cms_cost_table_name} a
LEFT OUTER JOIN {dim_drug_table_name} b ON a.Brnd_Name = b.Brnd_Name AND a.Gnrc_Name = b.Gnrc_Name
LEFT OUTER JOIN {dim_geo_table_name} c ON a.Prscrbr_City_State IS NOT DISTINCT FROM c.Prscrbr_City_State 
LEFT OUTER JOIN {dim_prov_table_name} d ON a.Prscrbr_Full_Name IS NOT DISTINCT FROM d.Prscrbr_Full_Name AND a.Prscrbr_NPI = d.Prscrbr_NPI AND a.Prscrbr_Type IS NOT DISTINCT FROM d.Prscrbr_Type AND a.Prscrbr_Type_Src = d.Prscrbr_Type_Src''')

#print(drug_costs_star_df.count())
#display(drug_costs_star_df)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 29, Finished, Available, Finished)

In [26]:
fact_table_name = "cms_provider_drug_costs_star"
fact_table_dir = f"Tables/{fact_table_name}" 
fact_table_full_path = get_full_abfss_path(config_dict['workspace_id'], config_dict['lakehouse_id'], fact_table_dir)
print(f'fact_table_full_path: {fact_table_full_path}')

drug_costs_star_df.write.mode("overwrite").format("delta").save(fact_table_full_path)

StatementMeta(, dac634f9-9da8-4f3b-ab49-e3adcde93421, 30, Finished, Available, Finished)

fact_table_full_path: abfss://c8f59358-83d0-4711-8b57-ebc3c414b0a1@onelake.dfs.fabric.microsoft.com/93c691f7-1409-44ef-ab45-fa3a3fb74289/Tables/cms_provider_drug_costs_star
