# 0. Initialize environment

initialize spark

In [1]:
sc

load necessary packages

In [2]:
from pyspark.sql.functions import udf, col, size, collect_list, concat_ws, flatten
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover

import pandas as pd
import numpy as np

# 1. Load Dataset

## load the dataset

In [3]:
mimic_path = 'mimic-iii-clinical-database-1.4'
note_file_path = '{}/NOTEEVENTS.csv'.format(mimic_path)
diag_file_path = '{}/DIAGNOSES_ICD.csv'.format(mimic_path)
proc_file_path = '{}/PROCEDURES_ICD.csv'.format(mimic_path)

In [4]:
dfdiag = spark.read.option('header', True).csv(diag_file_path)
dfproc = spark.read.option('header', True).csv(proc_file_path)

# 2. Code Preprocessing

add period to ICD codes to avoid confliction

In [5]:
def add_period(x):
    if x is None:
        return ''
    
    if x[0] == 'E':
        if len(x) > 4:
            x = x[:4] + '.' + x[4:]
    else:
        if len(x) > 3:
            x = x[:3] + '.' + x[3:]

    return x
udf_diag = udf(lambda x: add_period(x))
            
dfdiag = dfdiag.withColumn('ICD9_CODE_P', udf_diag('ICD9_CODE'))

drop na

In [6]:
dfdiag = dfdiag.na.drop()
dfdiag = dfdiag.drop('ICD9_CODE').withColumnRenamed('ICD9_CODE_P', 'ICD9_CODE')

udf_proc = udf(lambda x: x[:2]+'.'+x[2:])
dfproc = dfproc.withColumn('ICD9_CODE', udf_proc('ICD9_CODE'))

combine code in diag and proc into a single df

In [7]:
dfcode = dfdiag.union(dfproc)

In [8]:
count_diag_code = dfdiag.select('ICD9_CODE').distinct().count()
count_proc_code = dfproc.select('ICD9_CODE').distinct().count()
count_total_code = dfcode.select('ICD9_CODE').distinct().count()

print("Count of unique ICD-9 codes in diag: ", count_diag_code)
print("Count of unique ICD-9 codes in proc: ", count_proc_code)
print("Count of unique ICD-9 codes in total: ", count_total_code)

Count of unique ICD-9 codes in diag:  6984
Count of unique ICD-9 codes in proc:  2032
Count of unique ICD-9 codes in total:  9016


select only top 50 labels with most occurrences

In [9]:
count_code = dfcode.select('ICD9_CODE').groupBy('ICD9_CODE').count()
df = count_code.sort(col('count').desc()).toPandas()
top_50_labels = df.iloc[:50, 0].to_list()

# 3. Text Preprocessing

load notes text data

In [10]:
cols = ('SUBJECT_ID', 'HADM_ID', 'CHARTDATE', 'CHARTTIME', 'TEXT')
dfnote = spark.read.option("header",True).option("multiLine", True).csv(note_file_path)
dfdisc = dfnote.where(dfnote.CATEGORY == 'Discharge summary').select(*cols)
dfdisc = dfdisc.drop('CHARTTIME', 'CHARTDATE')

clean texts

- lowercase
- tokenize
- remove numeric tokens


In [11]:
regexTokenizer = RegexTokenizer(inputCol="TEXT", outputCol="TOKENS", pattern=r"\d*[a-zA-Z]+\d*", gaps=False)
dfdisc = regexTokenizer.transform(dfdisc).drop('TEXT')

filter text based on top 50 labels

In [12]:
dfcode_filter = dfcode.filter(dfcode.ICD9_CODE.isin(top_50_labels))

# 4. Join codes with texts

aggregate df by patient and SUBJECT_ID and HADM_ID

In [13]:
dfcode_agg = dfcode_filter.groupby('SUBJECT_ID', 'HADM_ID').agg(collect_list('ICD9_CODE').alias('LABEL'))

dfdisc_agg = dfdisc.groupby('HADM_ID').agg(collect_list('TOKENS').alias('TOKENS'))
dfdisc_agg = dfdisc_agg.select('HADM_ID', flatten('TOKENS').alias('TOKENS'))

join the two dfs (codes & texts)

In [14]:
cols = (dfcode_agg.SUBJECT_ID, dfcode_agg.HADM_ID, 'TOKENS', 'LABEL')

df = dfdisc_agg.join(dfcode_agg, dfcode_agg.HADM_ID==dfdisc_agg.HADM_ID, how='left')
df = df.select(*cols)

remove stop words

In [15]:
swr = StopWordsRemover(inputCol='TOKENS', outputCol='TOKENS_SW_RMED')
df = swr.transform(df)
df = df.drop('TOKENS')

save to local

In [16]:
# df.withColumn('TOKENS_SW_RMED', concat_ws(',', 'TOKENS_SW_RMED'))\
#     .withColumn('LABEL', concat_ws(',', 'LABEL'))\
#     .coalesce(1)\
#     .write.option('header', True)\
#     .csv('data/tokens_stopwords_removed__with_label_full.csv')