## Update variables

In [1]:
# list all tables in spark db

from pyspark.sql import SparkSession

for table in spark.catalog.listTables():
    print(table.name)

StatementMeta(SparkPool01, 0, 1, Finished, Available)

In [2]:
# drop tables

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.sql('drop table tb2')

StatementMeta(SparkPool01, 0, 2, Finished, Available)

AnalysisException: Table or view not found: tb2;

In [8]:
storage_account_name = 'asastgssuaefdbhdg2dbc4'
container_name = 'raw'
#sql_pool_name = 'SQLPool01'

StatementMeta(SparkPool01, 0, 8, Finished, Available)

## Load datasets

In [11]:
# load 2018-2021 SDUD data from ADLS2 into Spark dataframe and join

df_18 = spark.read.format('csv').options(header='true', inferschema='true').load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/SDUD/state-drug-utilization-data-2018.csv")
df_19 = spark.read.format('csv').options(header='true', inferschema='true').load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/SDUD/state-drug-utilization-data-2019.csv")
df_20 = spark.read.format('csv').options(header='true', inferschema='true').load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/SDUD/state-drug-utilization-data-2020.csv")
df_21 = spark.read.format('csv').options(header='true', inferschema='true').load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/SDUD/state-drug-utilization-data-2021.csv")


df = df_18.union(df_19).union(df_20).union(df_21)

StatementMeta(SparkPool01, 0, 11, Finished, Available)

## Explore the data

In [12]:
# display joined dataframe

display(df.limit(10))

StatementMeta(SparkPool01, 0, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, cc949b13-180d-4a60-97ee-254fac42c0e5)

In [13]:
# how many rows, columns

print((df.count(), len(df.columns)))

StatementMeta(SparkPool01, 0, 13, Finished, Available)

(15841411, 15)

### Print schema and modify

In [14]:
# current schema

df.printSchema()

StatementMeta(SparkPool01, 0, 14, Finished, Available)

root
 |-- Utilization Type: string (nullable = true)
 |-- State: string (nullable = true)
 |-- NDC: long (nullable = true)
 |-- Labeler Code: integer (nullable = true)
 |-- Product Code: integer (nullable = true)
 |-- Package Size: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Suppression Used: boolean (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Units Reimbursed: double (nullable = true)
 |-- Number of Prescriptions: integer (nullable = true)
 |-- Total Amount Reimbursed: double (nullable = true)
 |-- Medicaid Amount Reimbursed: double (nullable = true)
 |-- Non Medicaid Amount Reimbursed: double (nullable = true)

In [15]:
# cast Number_of_Prescriptions from string to int

df1 = df.withColumn("Number of Prescriptions", df["Number of Prescriptions"].cast('int'))

StatementMeta(SparkPool01, 0, 15, Finished, Available)

In [41]:
# rename columns

oldColumns = df1.schema.names
newColumns = ["Utilization_Type", "State", "NDC", "Labeler_Code", "Product_Code",
       "Package_Size", "Year", "Quarter", "Supression_Used", "Product_Name",
       "Units_Reimbursed", "Number_of_Prescriptions",
       "Total_Amount_Reimbursed", "Medicaid_Amount_Reimbursed",
       "Non_Medicaid_Amount_Reimbursed"]

from functools import reduce

df2 = reduce(lambda df1, idx: df1.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df1)

StatementMeta(SparkPool01, 0, 41, Finished, Available)

In [42]:
# new schema

df2.printSchema()


StatementMeta(SparkPool01, 0, 42, Finished, Available)

root
 |-- Utilization_Type: string (nullable = true)
 |-- State: string (nullable = true)
 |-- NDC: long (nullable = true)
 |-- Labeler_Code: integer (nullable = true)
 |-- Product_Code: integer (nullable = true)
 |-- Package_Size: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Supression_Used: boolean (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Units_Reimbursed: double (nullable = true)
 |-- Number_of_Prescriptions: integer (nullable = true)
 |-- Total_Amount_Reimbursed: double (nullable = true)
 |-- Medicaid_Amount_Reimbursed: double (nullable = true)
 |-- Non_Medicaid_Amount_Reimbursed: double (nullable = true)

### Create temp table for downstream querying

In [43]:
#df2.write.saveAsTable("tb1")

StatementMeta(SparkPool01, 0, 43, Finished, Available)

AnalysisException: Table `tb1` already exists.;

In [44]:
# temp view

df2.createOrReplaceTempView("tb1")

StatementMeta(SparkPool01, 0, 44, Finished, Available)

### Get top prescriptions by product from 2018-2021

In [45]:
# top Rx by product - 2018-2020

display(spark.sql("select Product_Name, sum(Number_of_Prescriptions) \
    from tb1 \
    where Product_Name <> 'UNKNOWN' \
    group by Product_Name \
    order by sum(Number_of_Prescriptions) desc").limit(25))

StatementMeta(SparkPool01, 0, 45, Finished, Available)

SynapseWidget(Synapse.DataFrame, dd40c89d-e63d-4209-bef0-4f48aa84d16d)

### Note that Albuterol (respiratory inhaler) had one of the highest prespription counts between 2018-2021.  It was a common drug used to treat COVID-19 symptoms, such as breathing difficulties.  Let's try to find additional trends by looking a little closer at the top prescriptions in 2020.

In [46]:
# top Rx by product - 2020

display(spark.sql("select Product_Name, sum(Number_of_Prescriptions) \
    from tb1 \
    where Year = '2020' \
    and Product_Name <> 'UNKNOWN' \
    group by Product_Name \
    order by sum(Number_of_Prescriptions) desc").limit(50))

StatementMeta(SparkPool01, 0, 46, Finished, Available)

SynapseWidget(Synapse.DataFrame, 24d8da50-058c-4060-869f-d30a734cab02)

### In addition to using Albuterol to treat common COVID-19 symptoms, Corticosteroids (e.g., Dexamethasone, Prednisone, Methylprednisolone, Hydrocortisone) were also used to control inflammation in severe cases*.  Let's look at these drug trends from 2018-2021.  

In [61]:
# top Rx by product by year (Albuterol + Corticosteroids) - 2018-2021

# methyprednisolone - Brand Names: A-Methapred, DEPO-Medrol, SOLU-Medrol. See https://www.drugs.com/mtm/methylprednisolone-injection.html
# hydrocortisone - Brand Names: Solu-CORTEF.  See https://www.drugs.com/mtm/solu-cortef-injection.html

display(spark.sql("select Product_Name, Year, sum(Number_of_Prescriptions) \
                   from tb1 \
                   where Product_Name LIKE 'ALBUTEROL%' \
                   OR \
                   Product_Name LIKE ('DEXAMETHA%') \
                   OR \
                   Product_Name LIKE ('PREDNISON%') \
                   OR \
                   Product_Name LIKE ('A-METHA%') \
                   OR \
                   Product_Name LIKE ('SOLU-MEDR%') \
                   OR \
                   Product_Name LIKE ('DEPO-MEDR%') \
                   OR \
                   Product_Name LIKE ('SOLU-CORTE%') \
                   group by Product_Name, Year \
                   order by Year asc, Product_Name asc, sum(Number_of_Prescriptions) desc"))

StatementMeta(SparkPool01, 0, 62, Finished, Available)

SynapseWidget(Synapse.DataFrame, 43e0c4a1-6a2d-495c-a69f-ee53555f1b3b)

### Albuterol + Corticosteroid trends for 2020 only.

In [48]:
# top Rx by product by year quarter (Albuterol + Corticosteroids) - 2020

display(spark.sql("select Product_Name, Year, Quarter, sum(Number_of_Prescriptions) \
                  from tb1 \
                  where Year = '2020' \
                  and Product_Name LIKE ('ALBUTER%') \
                        OR \
                        Product_Name LIKE ('DEXAMETH%') \
                        OR \
                        Product_Name LIKE ('PREDNISON%') \
                        OR \
                        Product_Name LIKE ('A-METHA%') \
                        OR \
                        Product_Name LIKE ('SOLU-MEDR%') \
                        OR \
                        Product_Name LIKE ('DEPO-MEDR%') \
                        OR \
                        Product_Name LIKE ('SOLU-CORTE%') \
                  group by Product_Name, Year, Quarter \
                  order by Product_Name asc, sum(Number_of_Prescriptions) desc"))

StatementMeta(SparkPool01, 0, 48, Finished, Available)

SynapseWidget(Synapse.DataFrame, cbadd871-48ad-4138-8c59-d3b0c5eb84e1)

## Pre-process the data

### We've noted some interesting Rx trends among Albuterol + Corticosteroid drugs throughout 2020.   Note the drop in Albuterol Rx from Q1 to Q4.  Prednisone was also more widely prescribed vs other Corticosteroids.  We will consolidate these product groups for cleaner reporting.

In [49]:
# check for similar Corticosteroid Product_Names

display(spark.sql("select DISTINCT Product_Name, NDC \
                    from tb1 \
                    where Product_Name LIKE ('ALBUTER%') \
                        OR \
                        Product_Name LIKE ('DEXAMETHA%') \
                        OR \
                        Product_Name LIKE ('PREDNISON%') \
                        OR \
                        Product_Name LIKE ('A-METHA%') \
                        OR \
                        Product_Name LIKE ('SOLU-MEDR%') \
                        OR \
                        Product_Name LIKE ('DEPO-MEDR%') \
                        OR \
                        Product_Name LIKE ('SOLU-CORTE%') \
                    order by Product_Name asc" ))

StatementMeta(SparkPool01, 0, 49, Finished, Available)

SynapseWidget(Synapse.DataFrame, fed65717-fe00-4490-aaff-c8b892574545)

### Consolidate Product Names

In [53]:
# write spark table out as dataframe and consolidate Product_Name

from pyspark.sql.functions import when

df2 = spark.read.table("tb1")

df3 = df2.withColumn('Product_Name', 
      when(df2.Product_Name.startswith("DEXAMETHA"), "DEXAMETHASONE") \
      .when(df2.Product_Name.startswith("PREDNISON"), "PREDNISONE") \
      .when(df2.Product_Name == "A-METHAPRED", "METHYLPREDNISOLONE") \
      .when(df2.Product_Name == "DEPO-MEDRO", "METHYLPREDNISOLONE") \
      .when(df2.Product_Name == "SOLU-MEDRO", "METHYLPREDNISOLONE") \
      .when(df2.Product_Name == "SOLU-CORTE", "HYDROCORTISONE") \
      .otherwise(df2.Product_Name))

#display(df3)


StatementMeta(SparkPool01, 0, 53, Finished, Available)

In [55]:
# temporary view

df3.createOrReplaceTempView("tb2")

StatementMeta(SparkPool01, 0, 55, Finished, Available)

In [56]:
# view after operation

display(spark.sql("select DISTINCT Product_Name \
                   from tb2 \
                   where Product_Name =='ALBUTEROL' \
                         OR \
                         Product_Name =='DEXAMETHASONE' \
                         OR \
                         Product_Name == 'PREDNISONE' \
                         OR \
                         Product_Name == 'METHYLPREDNISOLONE' \
                         OR \
                         Product_Name == 'HYDROCORTISONE' \
                    order by Product_Name asc" ))

StatementMeta(SparkPool01, 0, 56, Finished, Available)

SynapseWidget(Synapse.DataFrame, d36d2568-7293-421a-92e9-66565e502d5d)

### Re-run query (Albuterol + Corticosteroid trends for 2020 only.)

In [57]:
# run query again

display(spark.sql("select Product_Name, Year, Quarter, sum(Number_of_Prescriptions) \
                   from tb2 \
                   where Year = '2020' \
                   and Product_Name =='ALBUTEROL' \
                         OR \
                         Product_Name =='DEXAMETHASONE' \
                         OR \
                         Product_Name == 'PREDNISONE' \
                         OR \
                         Product_Name == 'METHYLPREDNISOLONE' \
                         OR \
                         Product_Name == 'HYDROCORTISONE' \
                   group by Product_Name, Year, Quarter \
                   order by Product_Name asc, sum(Number_of_Prescriptions) desc"))

StatementMeta(SparkPool01, 0, 57, Finished, Available)

SynapseWidget(Synapse.DataFrame, 60eb29c9-133a-47b7-a69c-991a432fd9ac)

In [None]:
# save to ADLS2 as parquet

df_all_mod.write.parquet(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/SDUD/State_Drug_Utilization_Data_All.parquet")

StatementMeta(, , , Cancelled, )

## Write Spark dataframe into SQL Pool table

In [58]:
%%spark
//create Scala dataframe - use %%spark magic command

val df_scala = spark.sql("SELECT * FROM tb2")

StatementMeta(SparkPool01, 0, 59, Finished, Available)

df_scala: org.apache.spark.sql.DataFrame = [Utilization_Type: string, State: string ... 13 more fields]


In [59]:
%%spark
// write the dataframe into sql pool 

import org.apache.spark.sql.SqlAnalyticsConnector._
import com.microsoft.spark.sqlanalytics.utils.Constants

val sql_pool_name = "SQLPool01"

df_scala.write.sqlanalytics(s"$sql_pool_name.dbo.mergeSDUD", Constants.INTERNAL)


StatementMeta(SparkPool01, 0, 60, Finished, Available)

import org.apache.spark.sql.SqlAnalyticsConnector._
import com.microsoft.spark.sqlanalytics.utils.Constants
sql_pool_name: String = SQLPool01
