# PySpark Data Wrangling

In [1]:
import os
import pandas as pd
from PatientJourney import PatientJourney
from ConstantsNamespace import PatientJourneyConstantsNamespace

In [2]:
from pyspark.sql import SparkSession, DataFrame
from functools import reduce
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType

In [3]:
spark = SparkSession.builder.appName('PatientJourney').getOrCreate()

23/07/11 14:06:25 WARN Utils: Your hostname, niranjan-Virtual-Machine resolves to a loopback address: 127.0.1.1; using 172.20.253.131 instead (on interface eth0)
23/07/11 14:06:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/11 14:06:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/11 14:06:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
constants = PatientJourneyConstantsNamespace()

### Read consumption data from separate files and union them

In [5]:
diagbh2021_df = spark.read.csv(path=constants.DIAGBH2021_FILEPATH, sep=r'\t', header=True)
diagmd2021_df = spark.read.csv(path=constants.DIAGMD2021_FILEPATH, sep=r'\t', header=True)
procmd2021_df = spark.read.csv(path=constants.PROCMD2021_FILEPATH, sep=r'\t', header=True)
rx2021_df     = spark.read.csv(path=constants.RX2021_FILEPATH, sep=r'\t', header=True)
servbh2021_df = spark.read.csv(path=constants.SERVBH2021_FILEPATH, sep=r'\t', header=True)
servmd2021_df = spark.read.csv(path=constants.SERVMD2021_FILEPATH, sep=r'\t', header=True)

diagbh2022_df = spark.read.csv(path=constants.DIAGBH2022_FILEPATH, sep=r'\t', header=True)
diagmd2022_df = spark.read.csv(path=constants.DIAGMD2022_FILEPATH, sep=r'\t', header=True)
procmd2022_df = spark.read.csv(path=constants.PROCMD2022_FILEPATH, sep=r'\t', header=True)
rx2022_df     = spark.read.csv(path=constants.RX2022_FILEPATH, sep=r'\t', header=True)
servbh2022_df = spark.read.csv(path=constants.SERVBH2022_FILEPATH, sep=r'\t', header=True)
servmd2022_df = spark.read.csv(path=constants.SERVMD2022_FILEPATH, sep=r'\t', header=True)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [6]:
diagbh2021_df.show()

+-------+----------+---------+
|  memid|dimMonthID|featureid|
+-------+----------+---------+
|1794474|  20210301|      d19|
|  88972|  20210401|      d19|
|1654929|  20211201|      d19|
| 414065|  20211001|      d19|
|1384890|  20210101|      d19|
| 983331|  20210301|      d19|
|1654929|  20210501|      d19|
|1823456|  20210501|       d6|
|1560573|  20210801|      d19|
|1559339|  20211001|      d19|
| 219885|  20211101|      d19|
|1558160|  20210601|      d19|
| 253024|  20210301|      d19|
|1099536|  20210401|      d19|
|1572285|  20210301|      d15|
|1573212|  20210901|      d19|
|1370945|  20211101|      d19|
|1523991|  20210601|      d19|
|1816312|  20211001|      d19|
|1779469|  20211001|      d19|
+-------+----------+---------+
only showing top 20 rows



In [7]:
rx2021_df.show()

+-----+----------+---------+
|memid|dimMonthID|featureid|
+-----+----------+---------+
|   46|  20210101|      r79|
|   46|  20210301|      r33|
|   46|  20210301|      r37|
|   46|  20210301|      r39|
|   46|  20210301|      r49|
|   46|  20210401|      r38|
|   46|  20210401|      r39|
|   46|  20210401|      r49|
|   46|  20210401|      r79|
|   46|  20210501|      r22|
|   46|  20210501|      r32|
|   46|  20210501|      r37|
|   46|  20210501|      r49|
|   46|  20210501|      r79|
|   46|  20210601|      r27|
|   46|  20210601|      r37|
|   46|  20210601|      r41|
|   46|  20210601|      r43|
|   46|  20210701|      r37|
|   46|  20210701|      r39|
+-----+----------+---------+
only showing top 20 rows



### Union consumption data into one dataframe

In [8]:
all_data_dfs_list = [diagbh2021_df, diagmd2021_df, procmd2021_df, rx2021_df, servbh2021_df, servmd2021_df, diagbh2022_df, diagmd2022_df, procmd2022_df, rx2022_df, servbh2022_df, servmd2022_df]
all_data_dfs = reduce(DataFrame.unionAll, all_data_dfs_list)

### Select unique 'memid' , 'dimMonthID' pairs and create rank column 

In [9]:
memid_mths = all_data_dfs.select('memid', 'dimMonthID').drop_duplicates()
windowSpec  = Window.partitionBy('memid').orderBy("dimMonthID")
memid_mths = memid_mths.withColumn('rnk', dense_rank().over(windowSpec))

In [10]:
memid_mths.show()

[Stage 16:>                                                         (0 + 4) / 4]

+-------+----------+---+
|  memid|dimMonthID|rnk|
+-------+----------+---+
|1002449|  20210101|  1|
|1002449|  20210201|  2|
|1002449|  20210301|  3|
|1002449|  20210401|  4|
|1002449|  20210501|  5|
|1002449|  20210601|  6|
|1002449|  20210701|  7|
|1002449|  20210801|  8|
|1002449|  20210901|  9|
|1002449|  20211001| 10|
|1002449|  20211101| 11|
|1002449|  20211201| 12|
|1002449|  20220101| 13|
|1002449|  20220201| 14|
|1002449|  20220301| 15|
|1002449|  20220401| 16|
|1002449|  20220501| 17|
|1002449|  20220701| 18|
|1002449|  20220801| 19|
|1002449|  20220901| 20|
+-------+----------+---+
only showing top 20 rows



                                                                                

### Read featureids in different source files

In [11]:
dummy_ids_diag = spark.read.csv(path=constants.DUMMY_IDS_DIAG_FILEPATH, sep=r'\t', header=True)
dummy_ids_proc = spark.read.csv(path=constants.DUMMY_IDS_PROC_FILEPATH, sep=r'\t', header=True)
dummy_ids_rx = spark.read.csv(path=constants.DUMMY_IDS_RX_FILEPATH, sep=r'\t', header=True)
dummy_ids_serv = spark.read.csv(path=constants.DUMMY_IDS_SERV_FILEPATH, sep=r'\t', header=True)

In [12]:
dummy_ids_diag.show()

+-------------+---------+-----------+--------------------+
|level1dummyid|featureid|featuretype|              Level1|
+-------------+---------+-----------+--------------------+
|            1|       d1|       diag|Certain condition...|
|            2|       d2|       diag|Certain infectiou...|
|            3|       d3|       diag|Codes for special...|
|            4|       d4|       diag|Congenital malfor...|
|            5|       d5|       diag|Diseases of the b...|
|            6|       d6|       diag|Diseases of the c...|
|            7|       d7|       diag|Diseases of the d...|
|            8|       d8|       diag|Diseases of the e...|
|            9|       d9|       diag|Diseases of the e...|
|           10|      d10|       diag|Diseases of the g...|
|           11|      d11|       diag|Diseases of the m...|
|           12|      d12|       diag|Diseases of the n...|
|           13|      d13|       diag|Diseases of the r...|
|           14|      d14|       diag|Diseases of the s..

In [13]:
dummy_ids_rx.show()

+-------------+---------+-----------+--------------------+
|level1dummyid|featureid|featuretype|              Level1|
+-------------+---------+-----------+--------------------+
|           11|      r11|         rx|         ANTIFUNGALS|
|           81|      r81|         rx|DIETARY PRODUCTS/...|
|           88|      r88|         rx|MOUTH/THROAT/DENT...|
|           68|      r68|         rx|         GOUT AGENTS|
|           99|      r99|         rx|    ASSORTED CLASSES|
|           14|      r14|         rx|          AMEBICIDES|
|           74|      r74|         rx|NEUROMUSCULAR AGENTS|
|           61|      r61|         rx|ADHD/ANTI-NARCOLE...|
|           40|      r40|         rx|CARDIOVASCULAR AG...|
|            7|      r07|         rx|     AMINOGLYCOSIDES|
|           35|      r35|         rx|     ANTIARRHYTHMICS|
|           50|      r50|         rx|         ANTIEMETICS|
|           85|      r85|         rx|HEMATOLOGICAL AGE...|
|           64|      r64|         rx|ANALGESICS - NONN..

### Union featureids into one dataframe

In [14]:
all_dummy_ids_list = [dummy_ids_diag, dummy_ids_proc, dummy_ids_rx, dummy_ids_serv]
dummy_ids_df = reduce(DataFrame.unionAll, all_dummy_ids_list)

### Order featureids by 'featuretype' and 'level1dummyid'. Cast a column for this to happen.

In [15]:
dummy_ids_df = dummy_ids_df.withColumn('level1dummyid', dummy_ids_df['level1dummyid'].cast(IntegerType()))
dummy_ids_df = dummy_ids_df.orderBy('featuretype', 'level1dummyid')

In [16]:
dummy_ids_df.show()

+-------------+---------+-----------+--------------------+
|level1dummyid|featureid|featuretype|              Level1|
+-------------+---------+-----------+--------------------+
|            1|       d1|       diag|Certain condition...|
|            2|       d2|       diag|Certain infectiou...|
|            3|       d3|       diag|Codes for special...|
|            4|       d4|       diag|Congenital malfor...|
|            5|       d5|       diag|Diseases of the b...|
|            6|       d6|       diag|Diseases of the c...|
|            7|       d7|       diag|Diseases of the d...|
|            8|       d8|       diag|Diseases of the e...|
|            9|       d9|       diag|Diseases of the e...|
|           10|      d10|       diag|Diseases of the g...|
|           11|      d11|       diag|Diseases of the m...|
|           12|      d12|       diag|Diseases of the n...|
|           13|      d13|       diag|Diseases of the r...|
|           14|      d14|       diag|Diseases of the s..

### Collect ordered features

In [17]:
order_of_features = dummy_ids_df.rdd.map(lambda x : x.featureid).collect()

                                                                                

In [18]:
order_of_features

['d1',
 'd2',
 'd3',
 'd4',
 'd5',
 'd6',
 'd7',
 'd8',
 'd9',
 'd10',
 'd11',
 'd12',
 'd13',
 'd14',
 'd15',
 'd16',
 'd17',
 'd18',
 'd19',
 'd20',
 'd21',
 'd22',
 'p1',
 'p2',
 'p3',
 'p4',
 'p5',
 'p6',
 'p7',
 'p8',
 'p9',
 'p10',
 'p11',
 'p12',
 'p13',
 'p14',
 'p15',
 'p16',
 'p17',
 'r00',
 'r01',
 'r02',
 'r03',
 'r04',
 'r05',
 'r07',
 'r08',
 'r09',
 'r11',
 'r12',
 'r13',
 'r14',
 'r15',
 'r16',
 'r17',
 'r18',
 'r19',
 'r20',
 'r21',
 'r22',
 'r23',
 'r24',
 'r25',
 'r26',
 'r27',
 'r28',
 'r29',
 'r30',
 'r31',
 'r32',
 'r33',
 'r34',
 'r35',
 'r36',
 'r37',
 'r38',
 'r39',
 'r40',
 'r41',
 'r42',
 'r43',
 'r44',
 'r45',
 'r46',
 'r47',
 'r48',
 'r49',
 'r50',
 'r51',
 'r52',
 'r53',
 'r54',
 'r55',
 'r56',
 'r57',
 'r58',
 'r59',
 'r60',
 'r61',
 'r62',
 'r64',
 'r65',
 'r66',
 'r67',
 'r68',
 'r69',
 'r70',
 'r72',
 'r73',
 'r74',
 'r75',
 'r76',
 'r77',
 'r78',
 'r79',
 'r80',
 'r81',
 'r82',
 'r83',
 'r84',
 'r85',
 'r86',
 'r87',
 'r88',
 'r89',
 'r90',
 'r92',
 '

### Crossjoin the unique 'memid', 'dimMonthID' pairs with featureids

In [19]:
memidmths_cross_dummy_ids = memid_mths.crossJoin(dummy_ids_df)

In [20]:
memidmths_cross_dummy_ids.show()



+-------+----------+---+-------------+---------+-----------+--------------------+
|  memid|dimMonthID|rnk|level1dummyid|featureid|featuretype|              Level1|
+-------+----------+---+-------------+---------+-----------+--------------------+
|1002449|  20210101|  1|            1|       d1|       diag|Certain condition...|
|1002449|  20210101|  1|            2|       d2|       diag|Certain infectiou...|
|1002449|  20210101|  1|            3|       d3|       diag|Codes for special...|
|1002449|  20210101|  1|            4|       d4|       diag|Congenital malfor...|
|1002449|  20210101|  1|            5|       d5|       diag|Diseases of the b...|
|1002449|  20210101|  1|            6|       d6|       diag|Diseases of the c...|
|1002449|  20210101|  1|            7|       d7|       diag|Diseases of the d...|
|1002449|  20210101|  1|            8|       d8|       diag|Diseases of the e...|
|1002449|  20210101|  1|            9|       d9|       diag|Diseases of the e...|
|1002449|  20210

                                                                                

### Rename columns

In [21]:
all_data_dfs = all_data_dfs.withColumnRenamed('memid', 'a_memid').withColumnRenamed('dimMonthID', 'a_dimMonthID').withColumnRenamed('featureid', 'a_featureid')

In [22]:
all_data_dfs.printSchema()

root
 |-- a_memid: string (nullable = true)
 |-- a_dimMonthID: string (nullable = true)
 |-- a_featureid: string (nullable = true)



### Right join to create sparse consumption data flags for each member, for respective month, for each feature

In [23]:
fj = memidmths_cross_dummy_ids.join(all_data_dfs, (memidmths_cross_dummy_ids['memid']==all_data_dfs['a_memid']) & (memidmths_cross_dummy_ids['dimMonthID']==all_data_dfs['a_dimMonthID']) & (memidmths_cross_dummy_ids['featureid']==all_data_dfs['a_featureid']), 'left')
fjmod = fj.withColumn('is_feature_present', when(fj['a_memid'].isNull(), lit(0)).otherwise(lit(1))).drop(col('a_memid'), col('a_dimMonthID'), col('a_featureid'))

In [24]:
fjmod.show()

                                                                                

+-------+----------+---+-------------+---------+-----------+--------------------+------------------+
|  memid|dimMonthID|rnk|level1dummyid|featureid|featuretype|              Level1|is_feature_present|
+-------+----------+---+-------------+---------+-----------+--------------------+------------------+
|1000231|  20210101|  1|           11|      d11|       diag|Diseases of the m...|                 0|
|1000231|  20210101|  1|           13|      d13|       diag|Diseases of the r...|                 1|
|1000231|  20210101|  1|            5|       d5|       diag|Diseases of the b...|                 0|
|1002449|  20210101|  1|           11|      d11|       diag|Diseases of the m...|                 0|
|1002449|  20210101|  1|           14|      d14|       diag|Diseases of the s...|                 0|
|1002449|  20210101|  1|           15|      d15|       diag|Endocrine, nutrit...|                 1|
|1002449|  20210101|  1|           18|      d18|       diag|Injury, poisoning...|          

[Stage 55:>                                                         (0 + 2) / 2]                                                                                

### For each member for respective month, create a list of consumption data flags

In [25]:
windowSpec2 = Window.partitionBy('memid', 'dimMonthID').orderBy('featuretype', 'level1dummyid').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
fjmod_short = fjmod.withColumn('feature_presence', collect_list('is_feature_present').over(windowSpec2))
fjmod_short = fjmod_short.select('memid', 'dimMonthID', 'feature_presence').distinct()
fjmod_short = fjmod_short.withColumn('feature_presence', fjmod_short['feature_presence'].cast(StringType()))

In [26]:
fjmod_short.show()

[Stage 69:>                                                         (0 + 1) / 1]

+-------+----------+--------------------+
|  memid|dimMonthID|    feature_presence|
+-------+----------+--------------------+
|1000231|  20210501|[0, 0, 0, 0, 0, 1...|
|1000428|  20210701|[0, 0, 0, 0, 0, 0...|
|1000428|  20211201|[0, 1, 0, 0, 0, 0...|
|1000442|  20210701|[0, 0, 0, 0, 0, 1...|
|1002449|  20210101|[0, 0, 0, 0, 0, 1...|
|1002449|  20210501|[0, 0, 0, 0, 0, 0...|
|1002449|  20220901|[0, 0, 0, 0, 0, 1...|
|1002449|  20221201|[0, 0, 0, 0, 0, 1...|
|1002593|  20210101|[0, 0, 0, 0, 0, 1...|
|1002593|  20211101|[0, 0, 0, 0, 0, 1...|
|1002655|  20210201|[0, 0, 0, 0, 0, 0...|
|1002655|  20210601|[0, 0, 0, 0, 0, 0...|
|1002655|  20211001|[0, 0, 0, 0, 0, 0...|
|1002655|  20211101|[0, 0, 0, 0, 0, 0...|
|1002655|  20220101|[0, 0, 0, 0, 0, 0...|
|1002655|  20220901|[0, 0, 0, 0, 0, 0...|
|1002655|  20221101|[0, 0, 0, 0, 0, 0...|
|1002695|  20220301|[0, 0, 0, 0, 0, 0...|
|1003468|  20220201|[0, 0, 0, 0, 1, 0...|
| 100354|  20210501|[0, 0, 0, 0, 1, 1...|
+-------+----------+--------------

                                                                                

### Partition by 'memid' and write data

In [27]:
fjmod_short.write.option('header',True).partitionBy('memid').mode("overwrite").csv("consumption_data")

                                                                                