In [None]:
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


The necessary libaries are imported

In [None]:
import numpy as np
import pandas as pd
import findspark
from pyspark.sql import SparkSession

A spark session is initialized

In [None]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Basics') \
    .getOrCreate()

Indicate filepath of the CSV and print out data head to get a glimpse of dataset

In [None]:
data = spark.read.option("header", "true").csv('/content/drive/MyDrive/DE1_0_2008_to_2010_Inpatient_Claims_Sample_20.csv')
data.toPandas().head()

Unnamed: 0,DESYNPUF_ID,CLM_ID,SEGMENT,CLM_FROM_DT,CLM_THRU_DT,PRVDR_NUM,CLM_PMT_AMT,NCH_PRMRY_PYR_CLM_PD_AMT,AT_PHYSN_NPI,OP_PHYSN_NPI,...,HCPCS_CD_36,HCPCS_CD_37,HCPCS_CD_38,HCPCS_CD_39,HCPCS_CD_40,HCPCS_CD_41,HCPCS_CD_42,HCPCS_CD_43,HCPCS_CD_44,HCPCS_CD_45
0,0000F1EB530967F3,338091165532547,1,20080327,20080403,2302KU,4000.0,0.0,9833008208,1723547751.0,...,,,,,,,,,,
1,00036DA073115F08,338691165529304,1,20080829,20080902,3600CS,7000.0,0.0,5333645593,5333645593.0,...,,,,,,,,,,
2,00036DA073115F08,338881165476888,1,20080906,20080908,3600CS,6000.0,0.0,5333645593,,...,,,,,,,,,,
3,0003D0FBC87B8600,338161165483669,1,20091029,20091102,0300QJ,8000.0,0.0,9059626561,7308254980.0,...,,,,,,,,,,
4,000994C65A0D1143,338781165536222,1,20080429,20080509,1501ZA,7000.0,0.0,7234907498,5401024033.0,...,,,,,,,,,,


We will only be using the columns relating to ICD9 diagnosis and procedure codes, so the rest will be dropped.

In [None]:
columns_to_drop = [column for column in data.columns if 'ICD9' not in column]
data = data.drop(*columns_to_drop)
data.toPandas().head()

Unnamed: 0,ADMTNG_ICD9_DGNS_CD,ICD9_DGNS_CD_1,ICD9_DGNS_CD_2,ICD9_DGNS_CD_3,ICD9_DGNS_CD_4,ICD9_DGNS_CD_5,ICD9_DGNS_CD_6,ICD9_DGNS_CD_7,ICD9_DGNS_CD_8,ICD9_DGNS_CD_9,ICD9_DGNS_CD_10,ICD9_PRCDR_CD_1,ICD9_PRCDR_CD_2,ICD9_PRCDR_CD_3,ICD9_PRCDR_CD_4,ICD9_PRCDR_CD_5,ICD9_PRCDR_CD_6
0,431,5070,E8769,V5869,38600,2724.0,496.0,V5489,9351,56400.0,,4311.0,34590.0,,,,
1,5933,5920,7856,V1301,V5869,,,,,,,5551.0,29680.0,,,,
2,5939,1981,5990,56409,60000,2724.0,311.0,4254,V1005,4019.0,,,,,,,
3,78605,49121,2662,2761,78791,4019.0,7140.0,2859,34590,36250.0,,481.0,29281.0,,,,
4,27651,5119,41400,V1251,72700,42731.0,79902.0,53081,412,3559.0,,3326.0,2766.0,,,,


All diagnosis codes will be prepended with 'D_' while the procedure codes will be prepended with 'P_' so that it will be easier to distinguish between the two.

In [None]:
from pyspark.sql.functions import col, when, lit, concat


for column in data.columns[0:11]:
    data = data.withColumn(
        column,
        when(col(column).isNotNull(), concat(lit("D_"), col(column).cast("string")))
        .otherwise(col(column)))


for column in data.columns[11:18]:
    data = data.withColumn(
        column,
        when(col(column).isNotNull(), concat(lit("P_"), col(column).cast("string")))
        .otherwise(col(column)))

We again print the data head to make sure that the columns have been properly filtered and the codes have be properly prepended.

In [None]:
data.toPandas().head()

Unnamed: 0,ADMTNG_ICD9_DGNS_CD,ICD9_DGNS_CD_1,ICD9_DGNS_CD_2,ICD9_DGNS_CD_3,ICD9_DGNS_CD_4,ICD9_DGNS_CD_5,ICD9_DGNS_CD_6,ICD9_DGNS_CD_7,ICD9_DGNS_CD_8,ICD9_DGNS_CD_9,ICD9_DGNS_CD_10,ICD9_PRCDR_CD_1,ICD9_PRCDR_CD_2,ICD9_PRCDR_CD_3,ICD9_PRCDR_CD_4,ICD9_PRCDR_CD_5,ICD9_PRCDR_CD_6
0,D_431,D_5070,D_E8769,D_V5869,D_38600,D_2724,D_496,D_V5489,D_9351,D_56400,,P_4311,P_34590,,,,
1,D_5933,D_5920,D_7856,D_V1301,D_V5869,,,,,,,P_5551,P_29680,,,,
2,D_5939,D_1981,D_5990,D_56409,D_60000,D_2724,D_311,D_4254,D_V1005,D_4019,,,,,,,
3,D_78605,D_49121,D_2662,D_2761,D_78791,D_4019,D_7140,D_2859,D_34590,D_36250,,P_0481,P_29281,,,,
4,D_27651,D_5119,D_41400,D_V1251,D_72700,D_42731,D_79902,D_53081,D_412,D_3559,,P_3326,P_2766,,,,


A new column named 'items' is created, containing arrays of all non-null entries (i.e., only the ICD9 codes) from the corresponding row. Many of the rows have the same ICD9 code listed in multiple columns, so they appear multiple times in the items array. This will confuse the algorithm, so array_distinct is used to ensure that none of codes are repeated.

In [None]:
from pyspark.sql.functions import array, col, filter, array_distinct

data_w_items = data.withColumn(
    "items",
    array_distinct(
        filter(
            array(*[col(c) for c in data.columns]),
            lambda x: (x.isNotNull()))))

data_w_items.select('items').show(5, truncate=False)

+------------------------------------------------------------------------------------------------------+
|items                                                                                                 |
+------------------------------------------------------------------------------------------------------+
|[D_431, D_5070, D_E8769, D_V5869, D_38600, D_2724, D_496, D_V5489, D_9351, D_56400, P_4311, P_34590]  |
|[D_5933, D_5920, D_7856, D_V1301, D_V5869, P_5551, P_29680]                                           |
|[D_5939, D_1981, D_5990, D_56409, D_60000, D_2724, D_311, D_4254, D_V1005, D_4019]                    |
|[D_78605, D_49121, D_2662, D_2761, D_78791, D_4019, D_7140, D_2859, D_34590, D_36250, P_0481, P_29281]|
|[D_27651, D_5119, D_41400, D_V1251, D_72700, D_42731, D_79902, D_53081, D_412, D_3559, P_3326, P_2766]|
+------------------------------------------------------------------------------------------------------+
only showing top 5 rows



The minimum support and confidence parameters are provided. The dataframe is used as a parameter in fp.fit()

In [None]:
from pyspark.ml.fpm import FPGrowth

fp = FPGrowth(minSupport=0.0001, minConfidence=0.0001)
fpm = fp.fit(data_w_items)
fpm.setPredictionCol("newPrediction")

FPGrowthModel: uid=FPGrowth_c72650c4267a, numTrainingRecords=66514

From here we can choose to rank in terms of confidence, lift, or support by simply renaming.

In [None]:
from pyspark.sql.functions import asc

fpm.associationRules.sort(asc("support")).toPandas().head(20)

Unnamed: 0,antecedent,consequent,confidence,lift,support
0,"[P_4111, D_496, D_4019]",[D_2724],0.4375,2.461502,0.000105
1,"[D_75612, D_72402, P_4019]",[P_8108],0.583333,157.084345,0.000105
2,"[D_1977, D_78959]",[D_25000],0.259259,1.444373,0.000105
3,"[P_3995, D_32723]",[D_2761],0.241379,3.475883,0.000105
4,"[D_481, P_9672]",[D_0389],0.636364,14.696907,0.000105
5,"[P_34590, D_4019]",[D_41401],0.148936,0.946616,0.000105
6,"[P_34590, D_4019]",[D_2449],0.148936,1.419043,0.000105
7,"[P_34590, D_4019]",[D_3051],0.148936,2.397469,0.000105
8,"[P_5070, D_4280]",[D_40391],0.152174,3.708939,0.000105
9,"[P_5070, D_4280]",[P_5849],0.152174,19.061574,0.000105



$\text{Support(A)} = \frac{\text{Number of claims containing itemset A}}{\text{Total number of claims}}$

$\text{Confidence(X}{\longrightarrow}Y) = \frac{\text{Support }(X\cup Y)}{\text{Support}(X)}$

$\text{Lift(X}\longrightarrow{Y}) = \frac{\text{Support }(X\cup Y)}{\text{Support}(X)\bullet
\text{Support(Y)}}$
