In [1]:
import pyspark
from pyspark import SQLContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.host","localhost").appName("testing").getOrCreate()
conf = pyspark.SparkConf()
spark_context = SparkSession.builder.config(conf=conf).getOrCreate()
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [2]:
claims = spark.read.options(header=True).csv(r"C:\Users\kring\test\DE1_0_2008_to_2010_Outpatient_Claims_Sample_20.csv")

In [3]:
summary = spark.read.options(header=True).csv(r"C:\Users\kring\test\DE1_0_2009_Beneficiary_Summary_File_Sample_20.csv")

In [4]:
# Convert chronic illness cols to single, concatenating multiple diagnoses

In [5]:
diagnoses_cols = ["SP_ALZHDMTA", "SP_CHF", "SP_CHRNKIDN", "SP_CNCR", "SP_COPD", "SP_DEPRESSN", "SP_DIABETES", "SP_ISCHMCHT", "SP_OSTEOPRS", "SP_RA_OA", "SP_STRKETIA"]

In [6]:
new_summary = summary

In [7]:
for column in new_summary.columns:
    if column in diagnoses_cols:
        new_summary = new_summary.withColumn(column, F.when(new_summary[column] == "2", "0").otherwise(new_summary[column]))

In [8]:
new_summary = new_summary.withColumn("Conditions_Total", F.expr("+".join(diagnoses_cols)))

In [9]:
new_summary.show(5, truncate=False, vertical=True)

-RECORD 0------------------------------------
 DESYNPUF_ID              | 000002F7E0A96C32 
 BENE_BIRTH_DT            | 19190701         
 BENE_DEATH_DT            | NULL             
 BENE_SEX_IDENT_CD        | 2                
 BENE_RACE_CD             | 2                
 BENE_ESRD_IND            | 0                
 SP_STATE_CODE            | 05               
 BENE_COUNTY_CD           | 400              
 BENE_HI_CVRAGE_TOT_MONS  | 0                
 BENE_SMI_CVRAGE_TOT_MONS | 0                
 BENE_HMO_CVRAGE_TOT_MONS | 0                
 PLAN_CVRG_MOS_NUM        | 00               
 SP_ALZHDMTA              | 0                
 SP_CHF                   | 0                
 SP_CHRNKIDN              | 0                
 SP_CNCR                  | 0                
 SP_COPD                  | 0                
 SP_DEPRESSN              | 0                
 SP_DIABETES              | 0                
 SP_ISCHMCHT              | 0                
 SP_OSTEOPRS              | 0     

In [10]:
# If a member has 3 or more diagnoses, categorize as multiple

In [11]:
new_summary = new_summary.withColumn("Conditions_Total", F.when(F.col("Conditions_Total") >= 3, "Multiple").otherwise(F.col("Conditions_Total")))

In [12]:
new_summary.show(5, truncate=False, vertical=True)

-RECORD 0------------------------------------
 DESYNPUF_ID              | 000002F7E0A96C32 
 BENE_BIRTH_DT            | 19190701         
 BENE_DEATH_DT            | NULL             
 BENE_SEX_IDENT_CD        | 2                
 BENE_RACE_CD             | 2                
 BENE_ESRD_IND            | 0                
 SP_STATE_CODE            | 05               
 BENE_COUNTY_CD           | 400              
 BENE_HI_CVRAGE_TOT_MONS  | 0                
 BENE_SMI_CVRAGE_TOT_MONS | 0                
 BENE_HMO_CVRAGE_TOT_MONS | 0                
 PLAN_CVRG_MOS_NUM        | 00               
 SP_ALZHDMTA              | 0                
 SP_CHF                   | 0                
 SP_CHRNKIDN              | 0                
 SP_CNCR                  | 0                
 SP_COPD                  | 0                
 SP_DEPRESSN              | 0                
 SP_DIABETES              | 0                
 SP_ISCHMCHT              | 0                
 SP_OSTEOPRS              | 0     

In [13]:
# Join claims and benefit data

In [14]:
joined = new_summary.join(claims, "DESYNPUF_ID", "full")

In [15]:
joined.show(5, truncate=False, vertical=True)

-RECORD 0------------------------------------------
 DESYNPUF_ID                    | 00028CFDA8612B87 
 BENE_BIRTH_DT                  | 19431001         
 BENE_DEATH_DT                  | NULL             
 BENE_SEX_IDENT_CD              | 2                
 BENE_RACE_CD                   | 1                
 BENE_ESRD_IND                  | 0                
 SP_STATE_CODE                  | 14               
 BENE_COUNTY_CD                 | 740              
 BENE_HI_CVRAGE_TOT_MONS        | 12               
 BENE_SMI_CVRAGE_TOT_MONS       | 12               
 BENE_HMO_CVRAGE_TOT_MONS       | 0                
 PLAN_CVRG_MOS_NUM              | 00               
 SP_ALZHDMTA                    | 0                
 SP_CHF                         | 0                
 SP_CHRNKIDN                    | 0                
 SP_CNCR                        | 0                
 SP_COPD                        | 0                
 SP_DEPRESSN                    | 1                
 SP_DIABETES

In [16]:
# What is the distribution of races

In [17]:
df = joined.withColumn("BENE_RACE_CD", F.when(F.col("BENE_RACE_CD") == "1", "White").otherwise(F.col("BENE_RACE_CD")))
df = df.withColumn("BENE_RACE_CD", F.when(F.col("BENE_RACE_CD") == "2", "Black").otherwise(F.col("BENE_RACE_CD")))
df = df.withColumn("BENE_RACE_CD", F.when(F.col("BENE_RACE_CD") == "3", "Others").otherwise(F.col("BENE_RACE_CD")))
df = df.withColumn("BENE_RACE_CD", F.when(F.col("BENE_RACE_CD") == "5", "Hispanic").otherwise(F.col("BENE_RACE_CD")))

In [18]:
df.select("BENE_RACE_CD").groupBy("BENE_RACE_CD").count().orderBy("count", ascending=False).show()

+------------+------+
|BENE_RACE_CD| count|
+------------+------+
|       White|688357|
|       Black| 81907|
|      Others| 29811|
|    Hispanic| 18017|
|        NULL|  1995|
+------------+------+



In [19]:
# What is the most common chronic illness combination

In [20]:
df3b = df.select("DESYNPUF_ID", *diagnoses_cols)

In [21]:
df3b = df3b.withColumn("SP_ALZHDMTA", F.when(F.col("SP_ALZHDMTA") == "1", "Alzheimer").otherwise(None))
df3b = df3b.withColumn("SP_CHF", F.when(F.col("SP_CHF") == "1", "Heart Failure").otherwise(None))
df3b = df3b.withColumn("SP_CHRNKIDN", F.when(F.col("SP_CHRNKIDN") == "1", "Chronic Kidney Disease").otherwise(None))
df3b = df3b.withColumn("SP_CNCR", F.when(F.col("SP_CNCR") == "1", "Cancer").otherwise(None))
df3b = df3b.withColumn("SP_COPD", F.when(F.col("SP_COPD") == "1", "COPD").otherwise(None))
df3b = df3b.withColumn("SP_DEPRESSN", F.when(F.col("SP_DEPRESSN") == "1", "Depression").otherwise(None))
df3b = df3b.withColumn("SP_DIABETES", F.when(F.col("SP_DIABETES") == "1", "Diabetes").otherwise(None))
df3b = df3b.withColumn("SP_ISCHMCHT", F.when(F.col("SP_ISCHMCHT") == "1", "Ischemic Heart Disease").otherwise(None))
df3b = df3b.withColumn("SP_OSTEOPRS", F.when(F.col("SP_OSTEOPRS") == "1", "Osteoporosis").otherwise(None))
df3b = df3b.withColumn("SP_RA_OA", F.when(F.col("SP_RA_OA") == "1", "RA/OA").otherwise(None))
df3b = df3b.withColumn("SP_STRKETIA", F.when(F.col("SP_STRKETIA") == "1", "Stroke").otherwise(None))
df3b = df3b.distinct()

In [22]:
result = df3b \
          .withColumn("temp", F.array(*diagnoses_cols)) \
            .withColumn("Illness_List", F.expr("FILTER(temp, x -> x is not null)")) \
                .drop("temp")

In [23]:
df3b = result.select("DESYNPUF_ID", "Illness_List").distinct()

In [24]:
df3b.select("Illness_List").groupBy("Illness_List").count().orderBy("count", ascending=False).show(10, truncate=False)

+-------------------------------------------------+-----+
|Illness_List                                     |count|
+-------------------------------------------------+-----+
|[]                                               |37545|
|[Ischemic Heart Disease]                         |4167 |
|[Diabetes, Ischemic Heart Disease]               |2043 |
|[Diabetes]                                       |1928 |
|[Heart Failure]                                  |1490 |
|[Depression]                                     |1485 |
|[Heart Failure, Ischemic Heart Disease]          |1469 |
|[Heart Failure, Diabetes, Ischemic Heart Disease]|1417 |
|[Osteoporosis]                                   |1163 |
|[Alzheimer]                                      |1159 |
+-------------------------------------------------+-----+
only showing top 10 rows



In [25]:
rejoined = df.join(df3b, "DESYNPUF_ID", "left")
rejoined = rejoined.distinct()

In [26]:
# Which chronic illness combo has highest total cost

In [27]:
cost_cols = ["MEDREIMB_IP", "BENRES_IP", "PPPYMT_IP", "MEDREIMB_OP", "BENRES_OP", "PPPYMT_OP", "MEDREIMB_CAR", "BENRES_CAR", "PPPYMT_CAR"]

In [28]:
df3c = rejoined.select("DESYNPUF_ID", *cost_cols, "Illness_List").distinct()

In [29]:
df3c = df3c.withColumn('Total_Cost', F.expr('+'.join(cost_cols)))

In [31]:
df3c.groupBy("Illness_List").agg(F.sum("Total_Cost")).orderBy("sum(Total_Cost)", ascending=False).show(10, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------+---------------+
|Illness_List                                                                                                               |sum(Total_Cost)|
+---------------------------------------------------------------------------------------------------------------------------+---------------+
|[]                                                                                                                         |1.298727E7     |
|[Ischemic Heart Disease]                                                                                                   |9047010.0      |
|[Alzheimer, Heart Failure, Chronic Kidney Disease, COPD, Depression, Diabetes, Ischemic Heart Disease]                     |9037316.0      |
|[Heart Failure, Chronic Kidney Disease, Diabetes, Ischemic Heart Disease]                                                  |8757578.0      |
|[Diab

In [32]:
# Which chronic illness combo has highest cost per member

In [34]:
df3d = df3c.select("DESYNPUF_ID", "Illness_List", "Total_Cost")

In [35]:
w = Window.partitionBy("Illness_List")
df3d = df3d.withColumn("Total_Cost_Sum", F.sum("Total_Cost").over(w))

In [36]:
df3d = df3d.withColumn("Illness_Count", F.count("Illness_List").over(w))

In [38]:
df3d.groupby("Illness_List", "Total_Cost_Sum", "Illness_Count") \
    .agg((F.col("Total_Cost_Sum") / F.col("Illness_Count")).alias("Cost_Per_Mem")) \
        .orderBy("Cost_Per_Mem", ascending=False).show(20, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------
 Illness_List   | [Chronic Kidney Disease, Cancer, COPD, Ischemic Heart Disease, Osteoporosis]                                                                
 Total_Cost_Sum | 85524.0                                                                                                                                     
 Illness_Count  | 1                                                                                                                                           
 Cost_Per_Mem   | 85524.0                                                                                                                                     
-RECORD 1-----------------------------------------------------------------------------------------------------------------------------------------------------
 Illness_List   | [Alzheimer, Chronic Kidney D

In [39]:
spark.stop()