# MIMIC III Preprocessing

## Initialization and Data Loading

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *

In [2]:
conf = SparkConf().setAppName("preprocess").setMaster("local")
sc = SparkContext.getOrCreate(conf)
spark = SparkSession.builder.master("local").appName("preprocess").getOrCreate()

ne_struct = StructType([StructField("row_id", IntegerType(), True),
                      StructField("subject_id", IntegerType(), True),
                      StructField("hadm_id", IntegerType(), True),
                      StructField("chartdate", DateType(), True),
                      StructField("category", StringType(), True),
                      StructField("description", StringType(), True),
                      StructField("cgid", IntegerType(), True),
                      StructField("iserror", IntegerType(), True),
                      StructField("text", StringType(), True)])
#df_ne = spark.read.csv("./data/NOTEEVENTS-2.csv",
df_ne = spark.read.csv("./data/NOTEEVENTS-2sample.csv",
                       header=True,
                       schema=ne_struct)
df_ne.registerTempTable("noteevents")
df_ne.filter(df_ne.category=="Discharge summary") \
    .registerTempTable("noteevents2")
    
# i want to cache noteevents, but it's too big

# many icd to one hadm_id
diag_struct = StructType([StructField("ROW_ID", IntegerType(), True),
                          StructField("SUBJECT_ID", IntegerType(), True),
                          StructField("HADM_ID", IntegerType(), True),
                          StructField("SEQ_NUM", IntegerType(), True),
                          StructField("ICD9_CODE", StringType(), True)])
df_diag_m = spark.read.csv("./data/DIAGNOSES_ICD.csv",
                           header=True,
                           schema=diag_struct) \
            .selectExpr("ROW_ID as row_id", 
                        "SUBJECT_ID as subject_id",
                        "HADM_ID as hadm_id",
                        "SEQ_NUM as seq_num",
                        "ICD9_CODE as icd9_code")
df_diag_m.registerTempTable("diagnoses_icd_m")
df_diag_m.cache()

# one icd to one hadm_id (take the smallest seq number as primary)
diag_o_rdd = df_diag_m.rdd.sortBy(lambda x: (x.hadm_id, x.subject_id, x.seq_num)) \
    .groupBy(lambda x: x.hadm_id) \
    .mapValues(list) \
    .reduceByKey(lambda x, y: x if x.seq_num < y.seq_num else y) \
    .map(lambda (hid, d): d[0])
df_diag_o = spark.createDataFrame(diag_o_rdd)
df_diag_o.registerTempTable("diagnoses_icd_o")
df_diag_o.cache()

# get hadm_id list in noteevents
df_hadm_id_list = spark.sql("""
SELECT DISTINCT hadm_id FROM noteevents2
""")
df_hadm_id_list.registerTempTable("hadm_id_list")
df_hadm_id_list.cache()

# get subject_id list in noteevents
df_subject_id_list = spark.sql("""
SELECT DISTINCT subject_id FROM noteevents2
""")
df_subject_id_list.registerTempTable("subject_id_list")
df_subject_id_list.cache()

df_icd9desc = spark.read.csv("./data/D_ICD_DIAGNOSES.csv",
                       header=True, inferSchema=True)
df_icd9desc.registerTempTable("diagnoses_icd_desc")

df_diag_o2 = spark.sql("""
SELECT row_id, subject_id, diagnoses_icd_o.hadm_id AS hadm_id,
seq_num, icd9_code
FROM diagnoses_icd_o JOIN hadm_id_list
ON diagnoses_icd_o.hadm_id = hadm_id_list.hadm_id
""")
df_diag_o2.registerTempTable("diagnoses_icd_o2")
df_diag_o2.cache()

df_diag_m2 = spark.sql("""
SELECT row_id, subject_id, diagnoses_icd_m.hadm_id AS hadm_id,
seq_num, icd9_code
FROM diagnoses_icd_m JOIN hadm_id_list
ON diagnoses_icd_m.hadm_id = hadm_id_list.hadm_id
""")
df_diag_m2.registerTempTable("diagnoses_icd_m2")
df_diag_m2.cache()

print df_ne.dtypes
print df_diag_m.dtypes
print df_diag_o.dtypes
print df_hadm_id_list.dtypes
print df_subject_id_list.dtypes
print df_icd9desc.dtypes

[('row_id', 'int'), ('subject_id', 'int'), ('hadm_id', 'int'), ('chartdate', 'date'), ('category', 'string'), ('description', 'string'), ('cgid', 'int'), ('iserror', 'int'), ('text', 'string')]
[('row_id', 'int'), ('subject_id', 'int'), ('hadm_id', 'int'), ('seq_num', 'int'), ('icd9_code', 'string')]
[('row_id', 'bigint'), ('subject_id', 'bigint'), ('hadm_id', 'bigint'), ('seq_num', 'bigint'), ('icd9_code', 'string')]
[('hadm_id', 'int')]
[('subject_id', 'int')]
[('ROW_ID', 'int'), ('ICD9_CODE', 'string'), ('SHORT_TITLE', 'string'), ('LONG_TITLE', 'string')]


## Descriptive Statistics

### noteevents
Basic Counts:

In [5]:
spark.sql("""
SELECT COUNT(*), COUNT(DISTINCT subject_id), COUNT(DISTINCT hadm_id)
FROM noteevents
""").show()
spark.sql("""
SELECT COUNT(*), COUNT(DISTINCT subject_id), COUNT(DISTINCT hadm_id)
FROM noteevents2
""").show()

+--------+--------------------------+-----------------------+
|count(1)|count(DISTINCT subject_id)|count(DISTINCT hadm_id)|
+--------+--------------------------+-----------------------+
| 2083180|                     46146|                  58361|
+--------+--------------------------+-----------------------+

+--------+--------------------------+-----------------------+
|count(1)|count(DISTINCT subject_id)|count(DISTINCT hadm_id)|
+--------+--------------------------+-----------------------+
|   59652|                     41127|                  52726|
+--------+--------------------------+-----------------------+



In [6]:
spark.sql("""
SELECT COUNT(DISTINCT hadm_id) AS hadm_count
FROM diagnoses_icd_m2
WHERE icd9_code IN
    (SELECT icd9_code
    FROM diagnoses_icd_m2
    GROUP BY icd9_code
    ORDER BY COUNT(DISTINCT hadm_id) DESC
    LIMIT 10)
""").show()

spark.sql("""
SELECT COUNT(DISTINCT hadm_id) AS hadm_count
FROM diagnoses_icd_m2
WHERE icd9_code IN
    (SELECT icd9_code
    FROM diagnoses_icd_m2
    GROUP BY icd9_code
    ORDER BY COUNT(DISTINCT hadm_id) DESC
    LIMIT 50)
""").show()

spark.sql("""
SELECT COUNT(DISTINCT hadm_id) AS hadm_count
FROM diagnoses_icd_m2
WHERE icd9_code IN
    (SELECT icd9_code
    FROM diagnoses_icd_m2
    GROUP BY icd9_code
    ORDER BY COUNT(DISTINCT hadm_id) DESC
    LIMIT 100)
""").show()

+----------+
|hadm_count|
+----------+
|     40562|
+----------+

+----------+
|hadm_count|
+----------+
|     49354|
+----------+

+----------+
|hadm_count|
+----------+
|     50625|
+----------+



Categories:

In [25]:
spark.sql("""
SELECT DISTINCT(category)
FROM noteevents
""").show(truncate=False)

+-----------------+
|category         |
+-----------------+
|ECG              |
|Respiratory      |
|Nursing          |
|General          |
|Consult          |
|Echo             |
|Nutrition        |
|Physician        |
|Pharmacy         |
|Rehab Services   |
|Case Management  |
|Radiology        |
|Nursing/other    |
|Discharge summary|
|Social Work      |
+-----------------+



### diagnoses_icd: many (icd_code) to one (hadm_id)
Basic Counts:

In [8]:
spark.sql("""
SELECT COUNT(*), COUNT(DISTINCT subject_id), 
COUNT(DISTINCT hadm_id), COUNT(DISTINCT ICD9_CODE)
FROM diagnoses_icd_m
""").show()

spark.sql("""
SELECT COUNT(*), COUNT(DISTINCT subject_id), 
COUNT(DISTINCT hadm_id), COUNT(DISTINCT LOWER(ICD9_CODE))
FROM diagnoses_icd_m
""").show()

+--------+--------------------------+-----------------------+-------------------------+
|count(1)|count(DISTINCT subject_id)|count(DISTINCT hadm_id)|count(DISTINCT ICD9_CODE)|
+--------+--------------------------+-----------------------+-------------------------+
|  651047|                     46520|                  58976|                     6984|
+--------+--------------------------+-----------------------+-------------------------+

+--------+--------------------------+-----------------------+--------------------------------+
|count(1)|count(DISTINCT subject_id)|count(DISTINCT hadm_id)|count(DISTINCT lower(ICD9_CODE))|
+--------+--------------------------+-----------------------+--------------------------------+
|  651047|                     46520|                  58976|                            6984|
+--------+--------------------------+-----------------------+--------------------------------+



### diagnoses_icd: one (icd_code) to one (hadm_id)
Basic Counts:

In [9]:
spark.sql("""
SELECT COUNT(*), COUNT(DISTINCT subject_id), 
COUNT(DISTINCT hadm_id), COUNT(DISTINCT ICD9_CODE)
FROM diagnoses_icd_o
""").show()

spark.sql("""
SELECT COUNT(*), COUNT(DISTINCT subject_id), 
COUNT(DISTINCT hadm_id), COUNT(DISTINCT LOWER(ICD9_CODE))
FROM diagnoses_icd_o
""").show()

+--------+--------------------------+-----------------------+-------------------------+
|count(1)|count(DISTINCT subject_id)|count(DISTINCT hadm_id)|count(DISTINCT ICD9_CODE)|
+--------+--------------------------+-----------------------+-------------------------+
|   58976|                     46520|                  58976|                     2789|
+--------+--------------------------+-----------------------+-------------------------+

+--------+--------------------------+-----------------------+--------------------------------+
|count(1)|count(DISTINCT subject_id)|count(DISTINCT hadm_id)|count(DISTINCT lower(ICD9_CODE))|
+--------+--------------------------+-----------------------+--------------------------------+
|   58976|                     46520|                  58976|                            2789|
+--------+--------------------------+-----------------------+--------------------------------+



Just to check if I really did get "seq_num = 1" for all diagnosis, the code below should return empty. 

In [10]:
# check code
spark.sql("""
SELECT *
FROM diagnoses_icd_o
WHERE seq_num <> 1
""").show()

+------+----------+-------+-------+---------+
|row_id|subject_id|hadm_id|seq_num|icd9_code|
+------+----------+-------+-------+---------+
+------+----------+-------+-------+---------+



### noteevents and diagnoses_icd (one to one)
Basic Counts:

In [11]:
spark.sql("""
SELECT COUNT(DISTINCT subject_id), 
COUNT(DISTINCT hadm_id), COUNT(DISTINCT icd9_code)
FROM diagnoses_icd_o2
""").show()

+--------------------------+-----------------------+-------------------------+
|count(DISTINCT subject_id)|count(DISTINCT hadm_id)|count(DISTINCT icd9_code)|
+--------------------------+-----------------------+-------------------------+
|                     41127|                  52726|                     2706|
+--------------------------+-----------------------+-------------------------+



Top 50 ICD 9 codes based on "subject_id" count

In [12]:
spark.sql("""
SELECT a.icd9_code, sid_count, SHORT_TITLE, LONG_TITLE FROM
    (SELECT icd9_code, COUNT(DISTINCT subject_id) AS sid_count
    FROM diagnoses_icd_o2
    GROUP BY icd9_code
    ORDER BY sid_count DESC
    LIMIT 50) AS a
JOIN diagnoses_icd_desc
ON diagnoses_icd_desc.ICD9_CODE = a.icd9_code
""").show(n=50, truncate=50)

+---------+---------+------------------------+--------------------------------------------------+
|icd9_code|sid_count|             SHORT_TITLE|                                        LONG_TITLE|
+---------+---------+------------------------+--------------------------------------------------+
|    41401|     3435|Crnry athrscl natve vssl|Coronary atherosclerosis of native coronary artery|
|     0389|     1837|          Septicemia NOS|                            Unspecified septicemia|
|    41071|     1672|Subendo infarct, initial|Subendocardial infarction, initial episode of care|
|    V3001|     1390|  Single lb in-hosp w cs|Single liveborn, born in hospital, delivered by...|
|     4241|     1128|   Aortic valve disorder|                            Aortic valve disorders|
|    51881|      986|Acute respiratry failure|                         Acute respiratory failure|
|    V3000|      949|Single lb in-hosp w/o cs|Single liveborn, born in hospital, delivered wi...|
|      431|      948

Top 50 ICD 9 codes based on "hadm_id" count

In [13]:
spark.sql("""
SELECT a.icd9_code, hadm_count, SHORT_TITLE, LONG_TITLE FROM
    (SELECT icd9_code, COUNT(DISTINCT hadm_id) AS hadm_count
    FROM diagnoses_icd_o2
    GROUP BY icd9_code
    ORDER BY hadm_count DESC
    LIMIT 50) AS a
JOIN diagnoses_icd_desc
ON diagnoses_icd_desc.ICD9_CODE = a.icd9_code
""").show(n=50, truncate=50)

+---------+----------+------------------------+--------------------------------------------------+
|icd9_code|hadm_count|             SHORT_TITLE|                                        LONG_TITLE|
+---------+----------+------------------------+--------------------------------------------------+
|    41401|      3464|Crnry athrscl natve vssl|Coronary atherosclerosis of native coronary artery|
|     0389|      1976|          Septicemia NOS|                            Unspecified septicemia|
|    41071|      1719|Subendo infarct, initial|Subendocardial infarction, initial episode of care|
|    V3001|      1390|  Single lb in-hosp w cs|Single liveborn, born in hospital, delivered by...|
|     4241|      1136|   Aortic valve disorder|                            Aortic valve disorders|
|    51881|      1089|Acute respiratry failure|                         Acute respiratory failure|
|      431|       966|Intracerebral hemorrhage|                          Intracerebral hemorrhage|
|    V3000

### noteevents and diagnoses_icd (many to one)
Basic Counts:

In [7]:
spark.sql("""
SELECT COUNT(DISTINCT subject_id), 
COUNT(DISTINCT hadm_id), COUNT(DISTINCT icd9_code)
FROM diagnoses_icd_m2
""").show()

spark.sql("""
SELECT COUNT(DISTINCT subject_id), 
COUNT(DISTINCT hadm_id), COUNT(DISTINCT icd9_code)
FROM (
    SELECT row_id, subject_id, diagnoses_icd_m.hadm_id AS hadm_id,
    seq_num, icd9_code
    FROM diagnoses_icd_m JOIN (SELECT DISTINCT hadm_id FROM noteevents) AS a
    ON diagnoses_icd_m.hadm_id = a.hadm_id
)
""").show()

+--------------------------+-----------------------+-------------------------+
|count(DISTINCT subject_id)|count(DISTINCT hadm_id)|count(DISTINCT icd9_code)|
+--------------------------+-----------------------+-------------------------+
|                     41127|                  52726|                     6918|
+--------------------------+-----------------------+-------------------------+

+--------------------------+-----------------------+-------------------------+
|count(DISTINCT subject_id)|count(DISTINCT hadm_id)|count(DISTINCT icd9_code)|
+--------------------------+-----------------------+-------------------------+
|                     46139|                  58361|                     6967|
+--------------------------+-----------------------+-------------------------+



Top ICD 9 codes based on "subject_id" count

In [15]:
spark.sql("""
SELECT a.icd9_code, sid_count, SHORT_TITLE, LONG_TITLE FROM
    (SELECT icd9_code, COUNT(DISTINCT subject_id) AS sid_count
    FROM diagnoses_icd_m2
    GROUP BY icd9_code
    ORDER BY sid_count DESC
    LIMIT 50) AS a
JOIN diagnoses_icd_desc
ON diagnoses_icd_desc.ICD9_CODE = a.icd9_code
""").show(n=50, truncate=50)

+---------+---------+------------------------+--------------------------------------------------+
|icd9_code|sid_count|             SHORT_TITLE|                                        LONG_TITLE|
+---------+---------+------------------------+--------------------------------------------------+
|     4019|    17138|        Hypertension NOS|                Unspecified essential hypertension|
|    41401|    10579|Crnry athrscl natve vssl|Coronary atherosclerosis of native coronary artery|
|    42731|    10053|     Atrial fibrillation|                               Atrial fibrillation|
|     4280|     9669|                 CHF NOS|             Congestive heart failure, unspecified|
|     5849|     7505|Acute kidney failure NOS|                 Acute kidney failure, unspecified|
|     2724|     7324|  Hyperlipidemia NEC/NOS|              Other and unspecified hyperlipidemia|
|    25000|     7181|DMII wo cmp nt st uncntr|Diabetes mellitus without mention of complicati...|
|    51881|     6493

Top ICD 9 codes based on "hadm_id" count

In [16]:
spark.sql("""
SELECT a.icd9_code, hadm_count, SHORT_TITLE, LONG_TITLE FROM
    (SELECT icd9_code, COUNT(DISTINCT hadm_id) AS hadm_count
    FROM diagnoses_icd_m2
    GROUP BY icd9_code
    ORDER BY hadm_count DESC
    LIMIT 50) AS a
JOIN diagnoses_icd_desc
ON diagnoses_icd_desc.ICD9_CODE = a.icd9_code
""").show(n=50, truncate=50)

+---------+----------+------------------------+--------------------------------------------------+
|icd9_code|hadm_count|             SHORT_TITLE|                                        LONG_TITLE|
+---------+----------+------------------------+--------------------------------------------------+
|     4019|     20046|        Hypertension NOS|                Unspecified essential hypertension|
|     4280|     12842|                 CHF NOS|             Congestive heart failure, unspecified|
|    42731|     12589|     Atrial fibrillation|                               Atrial fibrillation|
|    41401|     12178|Crnry athrscl natve vssl|Coronary atherosclerosis of native coronary artery|
|     5849|      8906|Acute kidney failure NOS|                 Acute kidney failure, unspecified|
|    25000|      8783|DMII wo cmp nt st uncntr|Diabetes mellitus without mention of complicati...|
|     2724|      8503|  Hyperlipidemia NEC/NOS|              Other and unspecified hyperlipidemia|
|    51881

In [24]:
#sc.stop()
print "Done!"

Name: org.apache.toree.interpreter.broker.BrokerException
Message: null was reset!
StackTrace: org.apache.toree.interpreter.broker.BrokerState$$anonfun$reset$1.apply(BrokerState.scala:191)
org.apache.toree.interpreter.broker.BrokerState$$anonfun$reset$1.apply(BrokerState.scala:189)
scala.collection.Iterator$class.foreach(Iterator.scala:893)
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
org.apache.toree.interpreter.broker.BrokerState.reset(BrokerState.scala:189)
org.apache.toree.kernel.interpreter.pyspark.PySparkService$$anonfun$pySparkProcess$2.apply(PySparkService.scala:63)
org.apache.toree.kernel.interpreter.pyspark.PySparkService$$anonfun$pySparkProcess$2.apply(PySparkService.scala:61)
org.apache.toree.interpreter.broker.BrokerProcessHandler.onProcessComplete(BrokerProcessHandler.scala:67)
org.apache.commons.exec.DefaultExecutor$1.run(Defau