In [1]:
import datetime
import warnings

import numpy as np
import pandas as pd
import pyspark
import yaml
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    array, col, collect_list, when, explode,
    size, split, udf, substring, substring_index,
)
from pyspark.sql.types import IntegerType
from sklearn import preprocessing as pp
from sklearn.preprocessing import MultiLabelBinarizer

warnings.filterwarnings("ignore")

In [2]:
spark = (
    SparkSession.builder.master("local")
    .appName("p6250")
    .config("spark.executor.memory", "16g")
    .getOrCreate()
)

21/12/11 20:18:55 WARN Utils: Your hostname, Ex resolves to a loopback address: 127.0.1.1; using 172.27.161.66 instead (on interface eth0)
21/12/11 20:18:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/11 20:18:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
procedure_cols = [
    "DESYNPUF_ID",
    "ICD9_PRCDR_CD_1",
    "ICD9_PRCDR_CD_2",
    "ICD9_PRCDR_CD_3",
    "ICD9_PRCDR_CD_4",
    "ICD9_PRCDR_CD_5",
    "ICD9_PRCDR_CD_6",
]

diagnosis_cols = [
    "DESYNPUF_ID",
    "ICD9_DGNS_CD_1",
    "ICD9_DGNS_CD_2",
    "ICD9_DGNS_CD_3",
    "ICD9_DGNS_CD_4",
    "ICD9_DGNS_CD_5",
    "ICD9_DGNS_CD_6",
    "ICD9_DGNS_CD_7",
    "ICD9_DGNS_CD_8",
    "ICD9_DGNS_CD_9",
    "ICD9_DGNS_CD_10",
]

In [4]:
with open("resources/hcup_ccs_2015_definitions.yaml", "r") as file:
    xd_yml = yaml.safe_load(file)
map_diag = pd.DataFrame(xd_yml).T.reset_index().explode("codes")[["id", "codes"]]

map_diag["codes"] = map_diag["codes"].apply(lambda x: x[:3] if x[0] != "E" else x[:4])
map_diag = map_diag.drop_duplicates()
map_diag = spark.createDataFrame(map_diag)

df_patient = spark.read.csv(
    "data/sample-01/DE1_0_2008_Beneficiary_Summary_File_Sample_1.csv", header=True
)

In [5]:
df_patient = df_patient.filter(col("BENE_DEATH_DT").isNull())

df_patient = df_patient.withColumn(
    "age", ((20090101 - df_patient.BENE_BIRTH_DT) / 10000).cast(IntegerType())
)

df_patient = df_patient.withColumnRenamed("BENE_SEX_IDENT_CD", "sex").select(
    "DESYNPUF_ID", "sex", "age"
)

In [6]:
# For the drug code, the 4 digits in the middle are for the main medicine code.

df_rx = spark.read.csv(
    "data/sample-01/DE1_0_2008_to_2010_Prescription_Drug_Events_Sample_1.csv",
    header=True,
)

# is the the right code!!!!!!
df_rx = df_rx.withColumn("rx", substring(df_rx.PROD_SRVC_ID, 5, 4))
df_rx = df_rx.select("DESYNPUF_ID", "rx").dropDuplicates()
df_rx = (
    df_rx.groupBy("DESYNPUF_ID")
    .agg(collect_list("rx"))
    .withColumnRenamed("collect_list(rx)", "rx")
)
df_rx = df_rx.withColumn("rx_cnt", size(df_rx.rx))

In [7]:
df_dxsg = spark.read.csv(
    "data/sample-01/DE1_0_2008_to_2010_Inpatient_Claims_Sample_1.csv", header=True
).unionByName(
    spark.read.csv(
        "data/sample-01/DE1_0_2008_to_2010_Outpatient_Claims_Sample_1.csv", header=True
    ),
    allowMissingColumns=True,
)

df_sg = (
    df_dxsg.select(*procedure_cols)
    .withColumn("ICD9_sg", array(*procedure_cols[1:]))
    .select("DESYNPUF_ID", "ICD9_sg")
)
df_dx = (
    df_dxsg.select(*diagnosis_cols)
    .withColumn("ICD9_dx", array(*diagnosis_cols[1:]))
    .select("DESYNPUF_ID", "ICD9_dx")
)

df_sg = df_sg.withColumn("ICD9_sg", explode("ICD9_sg")).drop_duplicates().na.drop()
df_dx = df_dx.withColumn("ICD9_dx", explode("ICD9_dx")).drop_duplicates().na.drop()

df_sg = df_sg.withColumn("ICD9_sg", substring_index(col("ICD9_sg"), ".", 1))
df_sg = df_sg.withColumn(
    "ICD9_sg",
    when(df_sg.ICD9_sg.substr(1, 1) == "E", df_sg.ICD9_sg.substr(1, 4)).otherwise(
        df_sg.ICD9_sg.substr(1, 3)
    ),
).drop_duplicates()
df_dx = df_dx.withColumn("ICD9_dx", substring_index(col("ICD9_dx"), ".", 1))
df_dx = df_dx.withColumn(
    "ICD9_dx",
    when(df_dx.ICD9_dx.substr(1, 1) == "E", df_dx.ICD9_dx.substr(1, 4)).otherwise(
        df_dx.ICD9_dx.substr(1, 3)
    ),
).drop_duplicates()

In [8]:
df_sg = (
    df_sg.join(map_diag, df_sg.ICD9_sg == map_diag.codes)
    .drop_duplicates()
    .groupBy("DESYNPUF_ID")
    .agg(collect_list("id").alias("sg"))
)
df_dx = (
    df_dx.join(map_diag, df_dx.ICD9_dx == map_diag.codes)
    .drop_duplicates()
    .groupBy("DESYNPUF_ID")
    .agg(collect_list("id").alias("dx"))
)
df_sg = df_sg.withColumn("sg_cnt", size(df_sg.sg))
df_dx = df_dx.withColumn("dx_cnt", size(df_dx.dx))

In [9]:
df_tensor = (
    df_patient.join(df_rx, ['DESYNPUF_ID'], how="left")
    .join(df_dx, ['DESYNPUF_ID'], how="left")
    .join(df_sg, ['DESYNPUF_ID'], how="left")
    .fillna(0)
)

In [10]:
df_tensor.na.fill(0).show()

[Stage 28:>                                                         (0 + 1) / 1]

+----------------+---+---+--------------------+------+--------------------+------+--------------------+------+
|     DESYNPUF_ID|sex|age|                  rx|rx_cnt|                  dx|dx_cnt|                  sg|sg_cnt|
+----------------+---+---+--------------------+------+--------------------+------+--------------------+------+
|00013D2EFD8E45D1|  1| 85|[1003, 6456, 6075...|   170|[206, 98, 212, 23...|    37|                null|     0|
|00016F745862898F|  1| 66|[4580, 1025, 7163...|    15|[238, 653, 211, 1...|    61|[2603, 170, 64, 2...|     8|
|0001FDD721E223DC|  2| 72|[0126, 8201, 9005...|    69|[103, 118, 78, 1,...|    25|                null|     0|
|00021CA6FF03E670|  1| 67|[9315, 6099, 6038...|     9|                null|     0|                null|     0|
|00024B3D2352D2D0|  1| 72|[9071, 7169, 3128...|     4|[164, 53, 58, 167...|     5|                null|     0|
|0002DAE1C81CC70D|  1| 65|[4045, 9148, 3093...|     8|                null|     0|                null|     0|
|

                                                                                

In [11]:
df = df_tensor.toPandas().set_index("DESYNPUF_ID").fillna(0)

                                                                                

In [12]:
df["rx"] = df["rx"].apply(
    lambda x: [x] if x == 0 else [int(i) for i in x if str(i).isnumeric()]
)
df["dx"] = df["dx"].apply(
    lambda x: [x] if x == 0 else [int(i) for i in x if str(i).isnumeric()]
)
df["sg"] = df["sg"].apply(
    lambda x: [x] if x == 0 else [int(i) for i in x if str(i).isnumeric()]
)

In [13]:
df.head()

Unnamed: 0_level_0,sex,age,rx,rx_cnt,dx,dx_cnt,sg,sg_cnt
DESYNPUF_ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
00013D2EFD8E45D1,1,85,"[1003, 6456, 6075, 7333, 6566, 8080, 2010, 231...",170,"[206, 98, 212, 238, 245, 259, 58, 163, 99, 200...",37,[0],0
00016F745862898F,1,66,"[4580, 1025, 7163, 76, 6040, 8251, 5048, 6095,...",15,"[238, 653, 211, 158, 134, 663, 98, 170, 2614, ...",61,"[2603, 170, 64, 257, 136, 254, 200, 45]",8
0001FDD721E223DC,2,72,"[126, 8201, 9005, 2004, 4016, 7176, 2316, 4078...",69,"[103, 118, 78, 1, 254, 133, 251, 224, 139, 117...",25,[0],0
00021CA6FF03E670,1,67,"[9315, 6099, 6038, 9094, 6072, 9061, 8265, 602...",9,[0],0,[0],0
00024B3D2352D2D0,1,72,"[9071, 7169, 3128, 7079]",4,"[164, 53, 58, 167, 258]",5,[0],0


In [14]:
df.to_pickle("df_pd.pkl")

In [15]:
df = pd.read_pickle("df_pd.pkl")

In [16]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 114538 entries, 00013D2EFD8E45D1 to FFFF7C107A4E385A
Data columns (total 8 columns):
 #   Column  Non-Null Count   Dtype 
---  ------  --------------   ----- 
 0   sex     114538 non-null  object
 1   age     114538 non-null  int32 
 2   rx      114538 non-null  object
 3   rx_cnt  114538 non-null  int32 
 4   dx      114538 non-null  object
 5   dx_cnt  114538 non-null  int32 
 6   sg      114538 non-null  object
 7   sg_cnt  114538 non-null  int32 
dtypes: int32(4), object(4)
memory usage: 6.1+ MB


In [17]:
df.head(10)

Unnamed: 0_level_0,sex,age,rx,rx_cnt,dx,dx_cnt,sg,sg_cnt
DESYNPUF_ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
00013D2EFD8E45D1,1,85,"[1003, 6456, 6075, 7333, 6566, 8080, 2010, 231...",170,"[206, 98, 212, 238, 245, 259, 58, 163, 99, 200...",37,[0],0
00016F745862898F,1,66,"[4580, 1025, 7163, 76, 6040, 8251, 5048, 6095,...",15,"[238, 653, 211, 158, 134, 663, 98, 170, 2614, ...",61,"[2603, 170, 64, 257, 136, 254, 200, 45]",8
0001FDD721E223DC,2,72,"[126, 8201, 9005, 2004, 4016, 7176, 2316, 4078...",69,"[103, 118, 78, 1, 254, 133, 251, 224, 139, 117...",25,[0],0
00021CA6FF03E670,1,67,"[9315, 6099, 6038, 9094, 6072, 9061, 8265, 602...",9,[0],0,[0],0
00024B3D2352D2D0,1,72,"[9071, 7169, 3128, 7079]",4,"[164, 53, 58, 167, 258]",5,[0],0
0002DAE1C81CC70D,1,65,"[4045, 9148, 3093, 7034, 9142, 4028, 1060, 8082]",8,[0],0,[0],0
0002F28CE057345B,1,86,"[9076, 5053, 9180, 6082, 1006, 9008]",6,"[211, 651, 245, 151, 230, 49, 661, 47, 211, 24...",99,[0],0
000308435E3E5B76,1,73,[3024],1,"[53, 158, 106, 95, 127, 105, 205, 45, 254, 653...",53,[0],0
000345A39D4157C9,2,32,"[9576, 9041, 8035]",3,"[95, 91, 155, 117, 99, 244, 95, 86, 238, 49, 9...",38,[0],0
00036A21B65B0206,2,70,"[3070, 8003, 7245, 6073, 5208, 2704, 6099, 852...",78,[0],0,[0],0


In [18]:
mlb = MultiLabelBinarizer(sparse_output=True)
df = df.join(
    pd.DataFrame.sparse.from_spmatrix(
        mlb.fit_transform(df.pop("rx")),
        index=df.index,
        columns=["rx_" + str(i) for i in mlb.classes_],
    )
)

In [19]:
df = df.join(
    pd.DataFrame.sparse.from_spmatrix(
        mlb.fit_transform(df.pop("dx")),
        index=df.index,
        columns=["dx_" + str(i) for i in mlb.classes_],
    )
)

In [20]:
df = df.join(
    pd.DataFrame.sparse.from_spmatrix(
        mlb.fit_transform(df.pop("sg")),
        index=df.index,
        columns=["sg_" + str(i) for i in mlb.classes_],
    )
)

In [21]:
for i in df.columns:
    df[[i]] = df[[i]].astype(np.int16)

In [22]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 114538 entries, 00013D2EFD8E45D1 to FFFF7C107A4E385A
Columns: 8308 entries, sex to sg_2621
dtypes: Sparse[int16, 0](8303), int16(5)
memory usage: 62.3+ MB


In [23]:
# np.save("df_tensor", df.to_numpy())