In [3]:
print('starting')

starting


In [4]:
# Load datasets

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("MIMIC dataprep") \
    .getOrCreate()

In [5]:
initial_file_path = '../data/'
file_path = initial_file_path + 'LABEVENTS.csv'

labDF = spark.read.csv(file_path, sep=',', header=True, nullValue='NULL', 
                          ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True) \
    .cache()

# print(labDF.count())


In [6]:
file_path = initial_file_path + 'D_LABITEMS.csv'

lab_indexDF = spark.read.csv(file_path, sep=',', header=True, nullValue='NULL', 
                          ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True) \
    .cache()


In [7]:
file_path = initial_file_path + 'ADMISSIONS.csv'

admitDF = spark.read.csv(file_path, sep=',', header=True, nullValue='NULL', 
                          ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True) \
    .cache()

print(admitDF.count())

58976


In [8]:
file_path = initial_file_path + 'PATIENTS.csv'

demoDF = spark.read.csv(file_path, sep=',', header=True, nullValue='NULL', 
                          ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True) \
    .cache()

In [9]:
file_path = initial_file_path + 'D_ICD_DIAGNOSES.csv'

diag_indexDF = spark.read.csv(file_path, sep=',', header=True, nullValue='NULL', 
                          ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True) \
    .cache()

In [10]:
file_path = initial_file_path + 'DIAGNOSES_ICD.csv'

diagDF = spark.read.csv(file_path, sep=',', header=True, nullValue='NULL', 
                          ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True) \
    .cache()

In [11]:
##### Processing admit features #####
DF_admit= admitDF.join(demoDF,on='SUBJECT_ID',how='left') \
    .withColumn('DOB',col('DOB').cast(TimestampType())) \
    .withColumn('ADMITTIME',col('ADMITTIME').cast(TimestampType())) \
    .withColumn('DISCHTIME',col('DISCHTIME').cast(TimestampType())) \
    .withColumn('HADM_ID',col('HADM_ID').cast(StringType())) \

    
udf_gender=udf(lambda e: 0 if (e == 'M') else 1,IntegerType())
udf_urgent=udf(lambda e: 1 if (e == 'URGENT') else 0,IntegerType())
udf_emer=udf(lambda e: 1 if (e == 'EMERGENCY') else 0,IntegerType())


DF_admit=DF_admit \
.withColumn('GENDER', udf_gender(col('GENDER'))) \
.withColumn('age',year(col('ADMITTIME')) - year(col('DOB'))) \
.withColumn('age',col('age').cast(IntegerType())) \
.withColumn('admit_LOS',datediff(col('DISCHTIME') , col('ADMITTIME'))) \
.withColumn('is_urgent',udf_urgent(col('ADMISSION_TYPE'))) \
.withColumn('is_emergency',udf_emer(col('ADMISSION_TYPE'))) \
.drop(('ROW_ID'))


# Filter
DF_admit=DF_admit.filter(col('age') > 16).filter(col('age') < 100)\
# .filter(col('HADM_ID'),isNotNull())

In [12]:
# DF_admit.columns

In [13]:
# DF_admit.take(1)
# DF_admit.printSchema()
print(DF_admit.count())

48231


In [14]:
##### Processing lab features #####

# df_labs=labs.merge(labs_index,on='ITEMID',how='left')
labsJoinedDF= labDF.join(lab_indexDF,on='ITEMID',how='left')

In [15]:
labsJoinedDF.count()

27854055

In [16]:
a_labels=['Creatinine',
          'Bicarbonate',
          'Hematocrit',
          'Hemoglobin',
          'Potassium',
          'Sodium',
            #     'Bilirubin',
          'Ammonia',
          'Albumin',
          'Magnesium',
          'Urea Nitrogen',
            #     'Calcium',
          'Chloride',
          'Amylase',
          'Asparate Aminotransferase (AST)',
          'WBC', 
          'Temperature',
          'pH',
          'pO2',
          'pCO2',
          'Glucose',
          'Protein',
          'Triglycerides',
          'Globulin'
    ]

labrexp='|'.join(a_labels)
print(labrexp)

Creatinine|Bicarbonate|Hematocrit|Hemoglobin|Potassium|Sodium|Ammonia|Albumin|Magnesium|Urea Nitrogen|Chloride|Amylase|Asparate Aminotransferase (AST)|WBC|Temperature|pH|pO2|pCO2|Glucose|Protein|Triglycerides|Globulin


In [17]:
# labDF.
# a_labels=labsJoinedDF.select('LABEL').collect()
# res=a_labels.LABEL.str.contains(labrexp)

In [18]:
print(len(a_labels))

22


In [19]:
udf_filter = udf(lambda e: e in a_labels)
# labFeatsDF = labJoinedDF[res]

labFeatsDF = labsJoinedDF.filter(col('LABEL').isin(a_labels)) \
    .withColumn('labelname',col('LABEL')) \


In [20]:
labFeatsDF=labFeatsDF.join(admitDF.select('ADMITTIME','DISCHTIME','HADM_ID'),on='HADM_ID',how='left')

In [21]:
labFeatsDF.count()

10672272

In [22]:
labFeatsDF = labFeatsDF \
    .withColumn('CHARTTIME',col('CHARTTIME').cast(TimestampType())) \
    .withColumn('LOS',datediff(col('CHARTTIME') ,col('ADMITTIME'))) \
    .withColumn('day',col('LOS').cast(StringType())) \
    .withColumn('HADM_ID',col('HADM_ID').cast(StringType())) \
    .withColumn('hadm_los',concat(col('HADM_ID'),lit('_'),col('LOS'))) \

In [23]:
labFeatsDF.count()

10672272

In [24]:
labFeatsDF = labFeatsDF.filter(col('HADM_ID').isNotNull())

In [25]:
# labFeatsDF.take(1)
# labFeatsDF.count()

In [26]:
##### Labs processing #####
# import pyspark.sql.functions as F
# # from pyspark.sql.functions import rowNumber
# from pyspark.sql.window import Window

# # w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

# # labFeatsDF.partitionBy(['hadm_los','labelname']).orderby(col('CHARTTIME').desc())

# maxs=labFeatsDF.groupBy("hadm_los","labelname").agg(F.max("CHARTTIME").alias("mx"))
# max_labs_DF=labFeatsDF.join(maxs, 
#   (col("CHARTTIME") == col("mx"))
# )

# idx=df_nan_labs.groupBy(['hadm_los','labelname'])['CHARTTIME'].transform(max) == df_nan_labs['CHARTTIME']
# df_max_labs=df_nan_labs[idx]

# idx2=df_max_labs.groupby(['hadm_los','labelname'])['ROW_ID_x'].transform(max) == df_max_labs['ROW_ID_x']
# df_max2_labs=df_max_labs[idx2]
 

In [27]:
# max_labs_DF.count()

In [28]:
from pyspark.sql.window import Window
w3 = Window.partitionBy("hadm_los", "labelname").orderBy(desc("CHARTTIME"))
featsDF = labFeatsDF.select('*', rank().over(w3).alias('rank')) \
  .filter(col('rank') < 2) 
#   .show() 


In [29]:
# featsDF.count()
featsDF.show(1)

+-------+------+--------+----------+-------------------+-----+--------+--------+----+------+-----+-----+----------+----------+---------+-------------------+-------------------+---+---+--------+----+
|HADM_ID|ITEMID|  ROW_ID|SUBJECT_ID|          CHARTTIME|VALUE|VALUENUM|VALUEUOM|FLAG|ROW_ID|LABEL|FLUID|  CATEGORY|LOINC_CODE|labelname|          ADMITTIME|          DISCHTIME|LOS|day|hadm_los|rank|
+-------+------+--------+----------+-------------------+-----+--------+--------+----+------+-----+-----+----------+----------+---------+-------------------+-------------------+---+---+--------+----+
| 100003| 51491|21885288|     54610|2150-04-17 22:01:00|  5.5|     5.5|   units|null|   691|   pH|Urine|Hematology|    5803-2|       pH|2150-04-17 15:34:00|2150-04-21 17:30:00|  0|  0|100003_0|   1|
+-------+------+--------+----------+-------------------+-----+--------+--------+----+------+-----+-----+----------+----------+---------+-------------------+-------------------+---+---+--------+----+
only 

In [30]:
# pivot so we have a column per lab
daily_labs_DF=featsDF.groupBy('hadm_los').pivot('labelname').agg(last('VALUENUM'))


In [31]:
print(daily_labs_DF.count())

524765


In [32]:
udf_ham=udf(lambda e: e.split('_')[0])
udf_los=udf(lambda e: e.split('_')[1])

daily_labs_DF=daily_labs_DF \
    .withColumn('HADM_ID',udf_ham(col('hadm_los'))) \
    .withColumn('LOS',udf_los(col('hadm_los'))) \

In [33]:
daily_labs_DF.printSchema()

root
 |-- hadm_los: string (nullable = true)
 |-- Albumin: string (nullable = true)
 |-- Ammonia: string (nullable = true)
 |-- Amylase: string (nullable = true)
 |-- Asparate Aminotransferase (AST): string (nullable = true)
 |-- Bicarbonate: string (nullable = true)
 |-- Chloride: string (nullable = true)
 |-- Creatinine: string (nullable = true)
 |-- Globulin: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- Hematocrit: string (nullable = true)
 |-- Hemoglobin: string (nullable = true)
 |-- Magnesium: string (nullable = true)
 |-- Potassium: string (nullable = true)
 |-- Protein: string (nullable = true)
 |-- Sodium: string (nullable = true)
 |-- Temperature: string (nullable = true)
 |-- Triglycerides: string (nullable = true)
 |-- Urea Nitrogen: string (nullable = true)
 |-- WBC: string (nullable = true)
 |-- pCO2: string (nullable = true)
 |-- pH: string (nullable = true)
 |-- pO2: string (nullable = true)
 |-- HADM_ID: string (nullable = true)
 |-- LOS: string

In [34]:
daily_labs_DF.where(col('HADM_ID') == '100001').orderBy('LOS').show(5)

+--------+-------+-------+-------+-------------------------------+-----------+--------+----------+--------+-------+----------+----------+---------+---------+-------+------+-----------+-------------+-------------+----+----+----+----+-------+---+
|hadm_los|Albumin|Ammonia|Amylase|Asparate Aminotransferase (AST)|Bicarbonate|Chloride|Creatinine|Globulin|Glucose|Hematocrit|Hemoglobin|Magnesium|Potassium|Protein|Sodium|Temperature|Triglycerides|Urea Nitrogen| WBC|pCO2|  pH| pO2|HADM_ID|LOS|
+--------+-------+-------+-------+-------------------------------+-----------+--------+----------+--------+-------+----------+----------+---------+---------+-------+------+-----------+-------------+-------------+----+----+----+----+-------+---+
|100001_0|   null|   null|   null|                             16|         18|     114|       2.3|    null|    141|      32.2|        11|      1.9|      4.1|    300|   144|       null|         null|           36|   2|null| 7.5|null| 100001|  0|
|100001_1|   null|  

In [35]:
# Imputing values
# impute mean
# exclude list
l_ex=['hadm_los','LOS','HADM_ID']

a_stats=[]
def fill_with_mean(df, exclude=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c in a_labels
    ))
    a_stats=stats.first().asDict()
    print(a_stats)
#     return df.na.fill(ct())
    return df.na.fill(stats.first().asDict())

df=daily_labs_DF
# print([e for e in df.columns not in l_ex])
# labs_=[e for e in df.columns not in l_ex]
df=df.select(*(col(c).cast(FloatType()) for c in a_labels))
df_labs_imputed=fill_with_mean(daily_labs_DF, l_ex)

{'Hematocrit': 31.26933838192296, 'Amylase': 106.19542577994893, 'Asparate Aminotransferase (AST)': 138.8242838208272, 'Magnesium': 2.0318854587049846, 'Chloride': 103.6867390270662, 'Temperature': 37.18157011042203, 'WBC': 19.51009535655058, 'Triglycerides': 163.66817564508827, 'Hemoglobin': 10.461022227971277, 'Ammonia': 59.942885440739516, 'pCO2': 42.42785544204624, 'Globulin': 2.748986301369863, 'pO2': 113.8872822935269, 'Bicarbonate': 25.641860099792897, 'Sodium': 138.60713253151383, 'Protein': 82.53410283315844, 'Urea Nitrogen': 28.501929200926018, 'Creatinine': 1.4545759030635566, 'Albumin': 2.954614999564725, 'pH': 7.016327225961998, 'Potassium': 4.112344059107788, 'Glucose': 130.01655345091677}


In [36]:
# get one hot encoding representation
DF_labs_imputed=df_labs_imputed
# def set_one(e):
#     if e != 0:
#         return 1
    
# udf_set_one=udf(lambda e: set_one(e))
# DF_diag_group=DF_diag_pivot.na.fill(0)
DF_labsnew1=daily_labs_DF.select(*(min(col(c)).alias('min_'+c) for c in DF_labs_imputed.columns))
DF_labsnew2=daily_labs_DF.select(*(max(col(c)).alias('max_'+c) for c in DF_labs_imputed.columns))
DF_labsnew3=daily_labs_DF.select(*(mean(col(c)).alias('mean_'+c) for c in DF_labs_imputed.columns))

In [37]:
# df_const=pd.concat(DF_labsnew1.toPandas(),DF_labsnew2.toPandas())
# df_const.head(2)
print(DF_labsnew1.toPandas())
print(DF_labsnew2.toPandas())
print(DF_labsnew3.toPandas())

  min_hadm_los min_Albumin min_Ammonia min_Amylase  \
0     100001_0          .9           1           0   

  min_Asparate Aminotransferase (AST) min_Bicarbonate min_Chloride  \
0                                   0              10          100   

  min_Creatinine min_Globulin min_Glucose   ...   min_Sodium min_Temperature  \
0            .05           .3        -251   ...          100               0   

  min_Triglycerides min_Urea Nitrogen min_WBC min_pCO2 min_pH min_pO2  \
0                 1                 0      .1        0      0       0   

  min_HADM_ID min_LOS  
0      100001      -1  

[1 rows x 25 columns]
  max_hadm_los max_Albumin max_Ammonia max_Amylase  \
0     199999_6         6.9          99         999   

  max_Asparate Aminotransferase (AST) max_Bicarbonate max_Chloride  \
0                                 999             9.2           99   

  max_Creatinine max_Globulin max_Glucose   ...   max_Sodium max_Temperature  \
0            9.9          9.1         998

In [38]:
# import pandas as pd

# # Save constants
# # udf(lambda )

# df_consts=daily_labs_DF.select(*daily_labs_DF.columns)
# df_consts.to_csv('')

In [39]:
# ## filter numeric cols
# num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, daily_labs_DF.dtypes)]
# ### Compute a dict with <col_name, median_value>
# median_dict = dict()
# for c in num_cols:
#     median_dict[c] = daily_labs_DF.stat.approxQuantile(c, [0.5], 0.001)[0]

# df_imputed = daily_labs_DF.na.fill(median_dict)



In [40]:
# labs_=['Albumin', 'Ammonia', 'Bicarbonate',  'Chloride', 'Creatinine', 'Globulin', \
#         'Glucose', 'Hematocrit', 'Magnesium', 'Potassium', 'Protein', 'Sodium', 'Temperature', 'Urea Nitrogen', 
#        'WBC', 'pH', 'pO2']

In [41]:
labs_=a_labels

In [42]:
# no need to normalize

# # inormalize
# a_stats=[]
# def fill_with_mean(df, exclude=set()): 
#     stats = df.agg(*(
#         udf_norm(c,df_minmax(c),df_minmax(c)).alias(c) for c in df.columns if c in exclude
#     ))
#     a_stats=stats
#     return df.na.fill(stats.first().asDict())

# df_imputed=fill_with_mean(df, labs_)

In [43]:
# # impute with median
# df=daily_labs_DF.select(*(col(c).cast(FloatType()) for c in labs_))

# num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int","float"}, df.dtypes)]
# num_cols=['Albumin', 'Ammonia', 'Bicarbonate']
# median_dict = dict()
# mean_dict = dict()
# # print(df.dtypes)
# for c in num_cols:
#     ""
#     median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]
#     mean_dict[c] = avg(col('cb'))[0]
    

# # df_imputed = df.na.fill(median_dict)
# print(median_dict)
# print(mean_dict)

In [44]:
# # Imputing with mean
# # import pandas as pd
# # df_means=pd.DataFrame()

# def impute_mean(df):
#     res = df.copy()
# #     df_means=pd.DataFrame()
    
#     for feature_name in df.columns:
#         if feature_name in a_labels:
#             mean_value = df[feature_name].mean()
            
#             # fill with mean, alternative with median
#             res[feature_name]=df[feature_name].fillna(mean_value)
#             df_means[feature_name]=[mean_value]
# #             print(mean_value)
# #     print(df_means)
#     return res

# # df_daily_labs_imputed=impute_mean(df_daily_labs)
# # DF_labs_imputed = impute_mean(DF_daily_labs)

# udf_impute_mean=udf(impute_mean,DoubleType())

# DF_labs_imputed = daily_labs_DF.select(*udf_daily_labs_DF.columns)
# DF_labs_imputed = daily_labs_DF
DF_labs_imputed.count()

524765

In [45]:
##### CCSD9s processing #####
ccs9_dict = spark.read.csv(initial_file_path + 'ccs9_dict.csv', sep=',', header=True, nullValue='NULL', 
                          ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True) \
    .cache()
    
ccs9_icd9_mapping = spark.read.csv(initial_file_path + 'ccs9_icd9_mapping.csv', sep=',', header=True, nullValue='NULL', 
                          ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True) \
    .cache()


In [46]:
ccs9_data = ccs9_dict.join(ccs9_icd9_mapping, on='CCS9', how='inner')


In [47]:
DF_diag = diagDF.join(diag_indexDF,on='ICD9_CODE',how='left')
# do an inner join with ccs9_data so that we only keep the diagnoses we are interested in
DF_diag = DF_diag.join(ccs9_data, DF_diag.LONG_TITLE==ccs9_data.ICD9DSC, how='inner')

In [48]:
# DF_diag.take(1)
DF_diag.count()

516711

In [49]:
from pyspark.sql.window import Window
w4 = Window.partitionBy("HADM_ID", "CCS9").orderBy(desc('SUBJECT_ID'))

diagfeatsDF = DF_diag.select('*', rank().over(w4).alias('rank')) \
  .filter(col('rank') < 2) 

DF_diag_pivot=diagfeatsDF.groupBy('HADM_ID').pivot( 'CCS9').agg(first('CCS9'))

In [50]:
# mean_values = {column: DF_diag_pivot.agg({column:"mean"}).flatMap(list).collect()[0] for column in DF_diag_pivot.columns if column not in ['HADM_ID']}
# df_data = df_data.na.fill(fill_values)

# fill_values = {column: 0 for column in DF_diag_pivot.columns if column not in ['HADM_ID']}

In [51]:
# DF_diag_group.show(2)

In [52]:
# get one hot encoding representation

def set_one(e):
    if (e == 0):
        return 0
    else:
        return 1
    
udf_set_one=udf(lambda e: set_one(e),IntegerType())

fill_values = {column: 0 for column in DF_diag_pivot.columns if column not in ['HADM_ID']}
DF_diag_group=DF_diag_pivot.na.fill(fill_values)
# DF_diag_group=DF_diag_group.select(*(udf_set_one(col(c)).alias(c) for c in DF_diag_group.columns if c not in ['HADM_ID']))


In [53]:
# DF_diag_group.show(2)

In [54]:
# admitDF.groupBy('SUBJECT_ID').orderBy('ADMITTIME').
from pyspark.sql.window import Window
w = Window().partitionBy("SUBJECT_ID").orderBy("ADMITTIME")

# udf_con=udf(lambda e: 1 if e >= 0 else 0)
udf_con=udf(lambda e: 1 if e > 0 else 0,IntegerType())

labelDF = DF_admit \
    .withColumn("next_admit", lead(col("ADMITTIME"), 1).over(w)) \
    .withColumn('readmission_duration', datediff(col('next_admit'),col('DISCHTIME')))

labelDF = labelDF.fillna({'readmission_duration':0})
    
labelDF = labelDF \
    .withColumn('readmission',udf_con(col('readmission_duration'))) \
    .drop(('next_admit')) \
    .drop(('readmission_duration')) \

labelDF.show(1)

+----------+-------+-------------------+-------------------+---------+--------------+--------------------+------------------+---------+--------+-------------+--------------+---------+-------------------+-------------------+-----------+--------------------+--------------------+------+-------------------+----+--------+-------+-----------+---+---------+---------+------------+-----------+
|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|DEATHTIME|ADMISSION_TYPE|  ADMISSION_LOCATION|DISCHARGE_LOCATION|INSURANCE|LANGUAGE|     RELIGION|MARITAL_STATUS|ETHNICITY|          EDREGTIME|          EDOUTTIME|  DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|GENDER|                DOB| DOD|DOD_HOSP|DOD_SSN|EXPIRE_FLAG|age|admit_LOS|is_urgent|is_emergency|readmission|
+----------+-------+-------------------+-------------------+---------+--------------+--------------------+------------------+---------+--------+-------------+--------------+---------+-------------------+-------------------+-

In [55]:
# labelDF=labelDF.select('HADM_ID','readmission')
df = labelDF.select('HADM_ID','readmission').toPandas()


In [56]:
# print(len(df))

In [57]:
# len(set(labelDF.groupBy('HADM_ID').agg(first('HADM_ID')).select('HADM_ID').collect()))
# df[df.SUBJECT_ID == '10111']
# labelDF.drop(('next_admit')).drop(('readmission_duration'))

In [58]:
# labelDF.where(col('readmission') == 1).groupBy('HADM_ID').agg(first('HADM_ID')).select('*').toPandas()

In [59]:
# labelDF.where(col('readmission') == 0).groupBy('HADM_ID').agg(first('HADM_ID')).select('*').toPandas()

In [60]:
DF_admit2 = DF_admit.select('HADM_ID','SUBJECT_ID','GENDER','age','admit_LOS','is_urgent','is_emergency')

In [61]:
# DF_admit2.where(col('SUBJECT_ID')=='9150').collect()
# DF_admit2.where(col('HADM_ID') == 120283).collect()

In [62]:
# df_out=outDF.select('*').toPandas()

In [63]:
# Genreate labels

DF_y=labelDF.select('HADM_ID','readmission')


In [64]:
# Joining different sets
outDF=DF_labs_imputed.join(DF_admit2,on='HADM_ID',how='inner') \
    .join(DF_diag_group,on='HADM_ID',how='inner')\
    .join(DF_y,on='HADM_ID',how='left')\
    .drop('hadm_los')\
    .drop('SUBJECT_ID')

print(outDF.count())

478262


In [65]:
outDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in outDF.columns]).show()

+-------+-------+-------+-------+-------------------------------+-----------+--------+----------+--------+-------+----------+----------+---------+---------+-------+------+-----------+-------------+-------------+---+----+---+---+---+------+---+---------+---------+------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-

In [66]:
print(len(outDF.columns))
outDF.columns

258


['HADM_ID',
 'Albumin',
 'Ammonia',
 'Amylase',
 'Asparate Aminotransferase (AST)',
 'Bicarbonate',
 'Chloride',
 'Creatinine',
 'Globulin',
 'Glucose',
 'Hematocrit',
 'Hemoglobin',
 'Magnesium',
 'Potassium',
 'Protein',
 'Sodium',
 'Temperature',
 'Triglycerides',
 'Urea Nitrogen',
 'WBC',
 'pCO2',
 'pH',
 'pO2',
 'LOS',
 'GENDER',
 'age',
 'admit_LOS',
 'is_urgent',
 'is_emergency',
 '10',
 '100',
 '101',
 '102',
 '103',
 '104',
 '105',
 '106',
 '107',
 '108',
 '109',
 '11',
 '110',
 '111',
 '112',
 '113',
 '114',
 '115',
 '116',
 '117',
 '118',
 '119',
 '12',
 '120',
 '121',
 '122',
 '123',
 '124',
 '125',
 '126',
 '127',
 '128',
 '129',
 '13',
 '130',
 '131',
 '132',
 '133',
 '134',
 '135',
 '136',
 '137',
 '138',
 '139',
 '14',
 '140',
 '141',
 '142',
 '143',
 '144',
 '145',
 '146',
 '147',
 '148',
 '149',
 '15',
 '151',
 '152',
 '153',
 '154',
 '155',
 '156',
 '157',
 '158',
 '159',
 '16',
 '160',
 '161',
 '162',
 '163',
 '164',
 '165',
 '166',
 '168',
 '17',
 '170',
 '171',
 '

In [67]:
# Create data split
demo=demoDF.toPandas()


In [68]:
# # splitting and saving data
# import numpy as np
# np.random.seed(5)

# # demo = demoDF.toPandas()


# # split up the patients with 2/3 in training set and 1/6 in validation and test
# train, validation, test = np.split(demo.SUBJECT_ID.sample(frac=1), [int(0.667*demo.shape[0]), int(0.833*demo.shape[0])])
# train_patients = train.to_frame()
# validation_patients = validation.to_frame()
# test_patients = test.to_frame()

In [71]:
# sc._conf.set('spark.executor.memory','32g').set('spark.driver.memory','32g').set('spark.driver.maxResultsSize','0')

In [72]:
data_processed_path = '../data_processed/'
tempdir = data_processed_path + 'tempdatadir.csv'
outDF.coalesce(1).write.format("com.databricks.spark.csv") \
   .mode('overwrite') \
   .option("header", "true") \
   .save(tempdir)

In [73]:
# Temporal dataset
import pandas as pd
from os import listdir
tempcsv_file = [f for f in listdir(tempdir) if f[-4:] == '.csv'][0]
# df_out=pd.read_csv(data_processed_path + 'mydata.csv/*.csv',header='infer')
df_out=pd.read_csv(tempdir + '/' + tempcsv_file ,header='infer')

In [74]:
#############3 Overlap with prediction_dataprep ############

In [75]:
# Look back to only last 15 days
n_lookback=10

df_lookback=df_out.sort_values(['HADM_ID','LOS']).groupby('HADM_ID').tail(n_lookback)

In [76]:
# len(df_out)

In [77]:
df_new=pd.DataFrame()

def pad_frame(g):
#     print(len(g))
#     print(g)
    if len(g) < n_lookback:
        ""
        offset=n_lookback - len(g)
        df=g.iloc[0:1]
#         print('offset',offset)
#         print(df)
        df_pad = df.loc[df.index.repeat(offset)]
#         df_pad=pd.concat([df]*offset, ignore_index=True)
#         print(len(df_pad))
#         df_padded[0:offset]=g.iloc[0]
        df2=pd.concat([df_pad,g]).reset_index().drop('index',axis=1)

#         df_new=pd.concat([df_new,df2])
#         print(len(df2))
#         print(df2)
#         print(len(df_new))
        dfo=df2.sort_values('LOS',ascending=False)
#         print(dfo)
        return dfo
    else:
#         df_new=pd.concat([df_new,g])
        dfo=g.sort_values('LOS', ascending=False)
        return dfo

In [78]:
# !conda install  joblib --yes

In [79]:
# import pandas as pd
# from joblib import Parallel, delayed
# import multiprocessing

# def tmpFunc(df):
#     df['c'] = df.a + df.b
#     return df

# def applyParallel(dfGrouped, func):
#     print('numjobs',multiprocessing.cpu_count())
#     retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
#     return pd.concat(retLst)

# # if __name__ == '__main__':
# # df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
# print('parallel version: ')
# # print(applyParallel(df.groupby(df.index), tmpFunc))


# df_group=applyParallel(df.groupby('HADM_ID'), pad_frame)

In [80]:
# np.random.seed(5)
# # split up the patients with 2/3 in training set and 1/6 in validation and test
# train, validation, test = np.split(df_admit.HADM_ID.sample(frac=1), [int(0.667*df_admit.shape[0]), int(0.833*df_admit.shape[0])])
# train_patients = train.to_frame()
# validation_patients = validation.to_frame()
# test_patients = test.to_frame()

# print(train_patients.shape)
# print(validation_patients.shape)
# print(test_patients.shape)

In [81]:
# # creating the data split

# train_patients['HADM_ID']=train_patients.HADM_ID.map(str)
# validation_patients['HADM_ID']=validation_patients.HADM_ID.map(str)
# test_patients['HADM_ID']=test_patients.HADM_ID.map(str)

# # create the datasets based on hamid
# train_simple = df_outy.merge(train_patients, on='HADM_ID', how='inner')
# validation_simple = df_outy.merge(validation_patients, on='HADM_ID', how='inner')
# test_simple  = df_outy.merge(test_patients, on='HADM_ID', how='inner')

In [82]:
# import numpy as np
# labels = df_train.groupby(['HADM_ID']).first()[['readmission']].as_matrix()
# labels = np.squeeze(labels)
# y_train = labels
# print(y_train.shape)

# labels = df_test.groupby(['HADM_ID']).first()[['readmission']].as_matrix()
# labels = np.squeeze(labels)
# y_test = labels
# print(y_test.shape)

In [83]:
# df_minmax=pd.read_csv('df_minmax.csv',header='infer')

In [84]:
# labs_=['ALT', 'AST', 'Albumin', 'Ammonia', 'Bicarbonate', 'Bilirubin', 'Calcium', 'Chloride', 'Creatinine', 'Globulin', \
#         'Glucose', 'Hematocrit', 'Magnesium', 'Potassium', 'Protein', 'Sodium', 'Temperature', 'Triglyceride', 'Urea Nitrogen', 
#        'WBC', 'pH', 'pO2', 'LOS','admit_LOS','age']

In [85]:
# # labs_=df_minmax.columns.values[2:].filter(lambda e: e =='Unnamed: 24')
# print(labs_)
# def normalize(df):
#     result = df.copy()
#     for feature_name in df.columns:
#         if feature_name in features_to_use:
# #             print(feature_name)
#             # min
#             # mean
#             # max
#             min_value = df_minmax[feature_name][0]
#             max_value = df_minmax[feature_name][1]
#             #mean_value = df_mean[feature_name][2]
#             result[feature_name] = (df[feature_name] - min_value) / (max_value - min_value)
#     return result

# df_1=df_train.copy()
# df_2=df_test.copy()
# df_1[labs_]=normalize(df_train[labs_])
# df_2[labs_]=normalize(df_test[labs_])

In [86]:
# b=True
# if (b):
#     np.save('X_testd',X_test)
#     np.save('y_testd',y_test)
#     np.save('X_traind',X_train)
#     np.save('y_traind',y_train)

In [87]:
# All the output files

# Batching with labs only

In [88]:
# Batching with labs and ccds9

In [89]:
import numpy as np
np.random.seed(5)
# split up the patients with 2/3 in training set and 1/6 in validation and test
train, validation, test = np.split(demo.SUBJECT_ID.sample(frac=1), [int(0.667*demo.shape[0]), int(0.833*demo.shape[0])])
train_patients = train.to_frame()
validation_patients = validation.to_frame()
test_patients = test.to_frame()

print(train_patients.shape)
print(validation_patients.shape)
print(test_patients.shape)


# for each patient, select a random admission from which to make a prediction
dataf_admit=pd.read_csv(initial_file_path + 'ADMISSIONS.csv')
admissions_to_predict_from = dataf_admit[['SUBJECT_ID', 'HADM_ID', 'ADMITTIME']].groupby('SUBJECT_ID') \
            .apply(lambda x: x.iloc[np.random.choice(range(0,len(x)))]) \
            [['ADMITTIME','HADM_ID']].reset_index()

# print(admissions_to_predict_from)
print(admissions_to_predict_from.shape)
print(dataf_admit.SUBJECT_ID.unique().shape)
print(dataf_admit.HADM_ID.unique().shape)
print(dataf_admit.shape)

(31028, 1)
(7723, 1)
(7769, 1)
(46520, 3)
(46520,)
(58976,)
(58976, 19)


In [90]:
model_data = df_out.merge(dataf_admit[['ADMITTIME', 'HADM_ID', 'SUBJECT_ID']], on='HADM_ID', how='inner')
model_data = model_data.merge(admissions_to_predict_from[['SUBJECT_ID','ADMITTIME']], on='SUBJECT_ID', how='inner')
print(model_data.shape)
# since we are trying to predict what happens after ADMITTIME, only keep rows <= ADMITTIME
model_data = model_data[model_data.ADMITTIME_x <= model_data.ADMITTIME_y]
model_data.SUBJECT_ID = model_data.SUBJECT_ID.astype(str)
del model_data['ADMITTIME_x']
del model_data['ADMITTIME_y']
print(model_data.shape)

def update_colname(col):
    if col.isdigit():
        return 'dx' + col
    col = col.replace(" ", "")
    col = col.replace("(", "")
    col = col.replace(")", "")
    return col
        
model_data.columns = [update_colname(col) for col in model_data.columns]

readmission_true_vals = model_data.merge(admissions_to_predict_from, on='HADM_ID', how='inner') \
    .groupby(['HADM_ID']) \
    .agg({'readmission':np.max, 'SUBJECT_ID_y':np.max}).reset_index()
readmission_true_vals.rename(columns={'readmission':'readmission_true', 'SUBJECT_ID_y':'SUBJECT_ID'}, inplace=True)
readmission_true_vals.SUBJECT_ID = readmission_true_vals.SUBJECT_ID.astype(str)
del readmission_true_vals['HADM_ID']

model_data = model_data.merge(readmission_true_vals, on='SUBJECT_ID', how='inner')
del model_data['readmission']
model_data.rename(columns={'readmission_true':'readmission'}, inplace=True)

print(model_data.HADM_ID.count())
print(model_data.readmission.sum())

(478262, 261)
(418628, 259)
417834
65577


In [91]:
# create the lab datasets
train_final_seq = model_data.merge(train_patients, on='SUBJECT_ID', how='inner')
validation_final_seq = model_data.merge(validation_patients, on='SUBJECT_ID', how='inner')
test_final_seq = model_data.merge(test_patients, on='SUBJECT_ID', how='inner')

print(train_final_seq.shape)
print(validation_final_seq.shape)
print(test_final_seq.shape)
train_final_seq.to_csv(data_processed_path + 'train_final_seq.csv', index=False)
validation_final_seq.to_csv(data_processed_path + 'validation_final_seq.csv', index=False)
test_final_seq.to_csv(data_processed_path + 'test_final_seq.csv', index=False)


(276420, 259)
(69480, 259)
(71934, 259)


In [92]:
agg_dict = {}
agg_dict['readmission'] = np.max
agg_dict['LOS'] = np.max
agg_dict['GENDER'] = np.max
agg_dict['age'] = np.max
agg_dict['is_urgent'] = np.max
agg_dict['is_emergency'] = np.max
lab_tests_to_include = ['Albumin','Ammonia','Amylase','AsparateAminotransferaseAST','Bicarbonate','Chloride','Creatinine','Globulin','Glucose','Hematocrit','Hemoglobin','Magnesium','Potassium','Protein','Sodium','Temperature','Triglycerides','UreaNitrogen','WBC','pCO2','pH','pO2']
# lab_dx_to_include = lab_tests_to_include + ['dx2','dx3','dx4','dx5','dx6','dx7','dx8','dx10','dx11','dx12','dx13','dx14','dx15','dx16','dx17','dx18','dx19','dx22','dx23','dx24','dx25','dx26','dx27','dx29','dx32','dx33','dx35','dx36','dx37','dx38','dx39','dx40','dx42','dx43','dx44','dx45','dx46','dx47','dx48','dx49','dx50','dx51','dx52','dx53','dx54','dx55','dx57','dx58','dx59','dx60','dx62','dx63','dx64','dx76','dx77','dx78','dx79','dx80','dx81','dx82','dx83','dx84','dx85','dx86','dx87','dx88','dx89','dx90','dx91','dx92','dx93','dx94','dx95','dx96','dx97','dx98','dx99','dx100','dx101','dx102','dx103','dx104','dx105','dx106','dx107','dx108','dx109','dx110','dx111','dx112','dx113','dx114','dx115','dx116','dx117','dx118','dx119','dx120','dx121','dx122','dx123','dx124','dx125','dx126','dx127','dx128','dx129','dx130','dx131','dx132','dx133','dx134','dx135','dx136','dx137','dx138','dx139','dx140','dx141','dx142','dx143','dx144','dx145','dx146','dx147','dx148','dx149','dx151','dx152','dx153','dx154','dx155','dx156','dx157','dx158','dx159','dx160','dx161','dx162','dx163','dx164','dx165','dx166','dx168','dx170','dx171','dx173','dx175','dx178','dx181','dx195','dx197','dx198','dx199','dx200','dx201','dx202','dx203','dx204','dx205','dx206','dx207','dx208','dx209','dx210','dx211','dx212','dx213','dx215','dx217','dx224','dx225','dx226','dx228','dx229','dx230','dx231','dx232','dx233','dx234','dx235','dx236','dx237','dx238','dx239','dx240','dx241','dx242','dx243','dx244','dx245','dx246','dx247','dx248','dx249','dx250','dx251','dx252','dx253','dx255','dx256','dx257','dx259','dx650','dx651','dx652','dx653','dx654','dx655','dx657','dx658','dx659','dx660','dx661','dx662','dx663','dx670','dx2603','dx2607','dx2608','dx2613','dx2615','dx2616','dx2617','dx2618','dx2619','dx2620','dx2621']
# lab_dx_to_include = lab_tests_to_include + ['dx10','dx100','dx101','dx102','dx103','dx104','dx105','dx106','dx107','dx108','dx109','dx11','dx110','dx111','dx112','dx113','dx114','dx115','dx116','dx117','dx118','dx119','dx12','dx120','dx121','dx122','dx123','dx124','dx125','dx126','dx127','dx128','dx129','dx13','dx130','dx131','dx132','dx133','dx134','dx135','dx136','dx137','dx138','dx139','dx14','dx140','dx141','dx142','dx143','dx144','dx145','dx146','dx147','dx148','dx149','dx15','dx151','dx152','dx153','dx154','dx155','dx156','dx157','dx158','dx159','dx16','dx160','dx161','dx162','dx163','dx164','dx165','dx166','dx168','dx17','dx170','dx171','dx173','dx175','dx178','dx18','dx181','dx19','dx195','dx197','dx198','dx199','dx2','dx200','dx201','dx202','dx203','dx204','dx205','dx206','dx207','dx208','dx209','dx210','dx211','dx212','dx213','dx215','dx217','dx22','dx224','dx225','dx226','dx228','dx229','dx23','dx230','dx231','dx232','dx233','dx234','dx235','dx236','dx237','dx238','dx239','dx24','dx240','dx241','dx242','dx243','dx244','dx245','dx246','dx247','dx248','dx249','dx25','dx250','dx251','dx252','dx253','dx255','dx256','dx257','dx259','dx26','dx2603','dx2607','dx2608','dx2613','dx2615','dx2616','dx2617','dx2618','dx2619','dx2620','dx2621','dx27','dx29','dx3','dx32','dx33','dx35','dx36','dx37','dx38','dx39','dx4','dx40','dx42','dx43','dx44','dx45','dx46','dx47','dx48','dx49','dx5','dx50','dx51','dx52','dx53','dx54','dx55','dx57','dx58','dx59','dx6','dx60','dx62','dx63','dx64','dx650','dx651','dx652','dx653','dx654','dx655','dx657','dx658','dx659','dx660','dx661','dx662','dx663','dx670','dx7','dx76','dx77','dx78','dx79','dx8','dx80','dx81','dx82','dx83','dx84','dx85','dx86','dx87','dx88','dx89','dx90','dx91','dx92','dx93','dx94','dx95','dx96','dx97','dx98','dx99']
lab_dx_to_include = lab_tests_to_include + ['dx10','dx100','dx101','dx102','dx103','dx104','dx105','dx106','dx107','dx108','dx109','dx11','dx110','dx111','dx112','dx113','dx114','dx115','dx116','dx117','dx118','dx119','dx12','dx120','dx121','dx122','dx123','dx124','dx125','dx126','dx127','dx128','dx129','dx13','dx130','dx131','dx132','dx133','dx134','dx135','dx136','dx137','dx138','dx139','dx14','dx140','dx141','dx142','dx143','dx144','dx145','dx146','dx147','dx148','dx149','dx15','dx151','dx152','dx153','dx154','dx155','dx156','dx157','dx158','dx159','dx16','dx160','dx161','dx162','dx163','dx164','dx165','dx166','dx168','dx17','dx170','dx171','dx173','dx175','dx178','dx18','dx181','dx19','dx197','dx198','dx199','dx2','dx200','dx201','dx202','dx203','dx204','dx205','dx206','dx207','dx208','dx209','dx210','dx211','dx212','dx213','dx215','dx217','dx22','dx225','dx226','dx228','dx229','dx23','dx230','dx231','dx232','dx233','dx234','dx235','dx236','dx237','dx238','dx239','dx24','dx241','dx242','dx243','dx244','dx245','dx246','dx247','dx248','dx249','dx25','dx250','dx251','dx252','dx253','dx255','dx256','dx257','dx259','dx26','dx2603','dx2607','dx2608','dx2613','dx2615','dx2616','dx2617','dx2618','dx2619','dx2620','dx2621','dx27','dx29','dx3','dx32','dx33','dx35','dx36','dx37','dx38','dx39','dx4','dx40','dx42','dx43','dx44','dx45','dx46','dx47','dx48','dx49','dx5','dx50','dx51','dx52','dx53','dx54','dx55','dx57','dx58','dx59','dx6','dx60','dx62','dx63','dx64','dx650','dx651','dx652','dx653','dx654','dx655','dx657','dx658','dx659','dx660','dx661','dx662','dx663','dx670','dx7','dx76','dx77','dx78','dx79','dx8','dx80','dx81','dx82','dx83','dx84','dx85','dx86','dx87','dx88','dx89','dx90','dx91','dx92','dx93','dx94','dx95','dx96','dx97','dx98','dx99']


for lab_dx in lab_dx_to_include:
    agg_dict[lab_dx] = [np.mean]

# aggregate so we have 1 row per patient
model_data_lab_dx = model_data.groupby(['SUBJECT_ID']).agg(agg_dict).reset_index()

# rename columns
model_data_lab_dx.columns = ['_'.join(col).strip() for col in model_data_lab_dx.columns.values]
model_data_lab_dx = model_data_lab_dx.rename(columns = {'SUBJECT_ID_': 'SUBJECT_ID',
                                                    'LOS_amax': 'LOS',
                                                    'GENDER_amax': 'GENDER',
                                                    'age_amax': 'age',
                                                    'is_urgent_amax': 'is_urgent',
                                                    'is_emergency_amax': 'is_emergency'})

model_data_lab_dx['readmission'] = model_data_lab_dx['readmission_amax']
del model_data_lab_dx['readmission_amax']

model_data_lab_dx.head()

Unnamed: 0,SUBJECT_ID,dx231_mean,dx146_mean,dx119_mean,dx85_mean,dx87_mean,Chloride_mean,dx105_mean,dx2613_mean,dx2618_mean,...,dx109_mean,is_urgent,dx123_mean,dx2603_mean,dx160_mean,dx153_mean,dx7_mean,pH_mean,dx40_mean,readmission
0,100,0.0,0.0,0.0,0.0,0.0,104.25,105.0,0.0,0.0,...,0.0,0,0.0,0.0,0.0,0.0,0.0,7.283164,0.0,0
1,1000,0.0,0.0,0.0,0.0,0.0,101.540541,0.0,0.0,0.0,...,0.0,1,0.0,0.0,0.0,153.0,0.0,7.284611,40.0,0
2,10000,0.0,0.0,0.0,0.0,0.0,101.428571,0.0,0.0,0.0,...,0.0,0,0.0,0.0,0.0,0.0,0.0,6.85244,0.0,0
3,10003,231.0,0.0,0.0,0.0,0.0,98.468674,0.0,0.0,0.0,...,0.0,0,0.0,0.0,0.0,0.0,0.0,7.049164,0.0,0
4,10004,0.0,0.0,0.0,0.0,0.0,105.710842,0.0,0.0,0.0,...,0.0,0,0.0,2603.0,0.0,0.0,0.0,6.985957,0.0,1


In [93]:
# create the lab datasets
train_with_lab_dx = model_data_lab_dx.merge(train_patients, on='SUBJECT_ID', how='inner')
validation_with_lab_dx = model_data_lab_dx.merge(validation_patients, on='SUBJECT_ID', how='inner')
test_with_lab_dx = model_data_lab_dx.merge(test_patients, on='SUBJECT_ID', how='inner')

print(train_with_lab_dx.shape)
print(validation_with_lab_dx.shape)
print(test_with_lab_dx.shape)
train_with_lab_dx.to_csv(data_processed_path + 'train_final.csv', index=False)
validation_with_lab_dx.to_csv(data_processed_path + 'validation_final.csv', index=False)
test_with_lab_dx.to_csv(data_processed_path + 'test_final.csv', index=False)


(24089, 254)
(6037, 254)
(6120, 254)
