In [1]:
import os
import sys
import re

import pandas as pd
import numpy as np
from collections import Counter

import keyboard

In [2]:
from myspark import load_spark, assert_pyspark
spark = load_spark()
F, Window, types = assert_pyspark()

22/06/15 17:21:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/15 17:21:28 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
22/06/15 17:21:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/06/15 17:21:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/06/15 17:21:28 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [3]:
spark

In [4]:
# These file paths include sensitive information that is not suitable for publishing externally.
# If internal researchers want to use these files internally, please contact Takahiro Kiritoshi.
eicu_path = 
eicu_hsi_path = 
project_path_1 = 
project_path_2 = 

In [5]:
def filter_eicu_patients_baseline(eicu_path, eicu_hsi_path, disease, sepsis):
    
    # disease category: 'Medical', 'Noncardiac_Surg', 'Cardiac_Surg', 'Trauma'
    # sepsis: if sepsis, 1, else, 0
    
    min_icu_stay = 1 * 24 * 60 # minutes
    max_icu_stay = 14 * 24 * 60 # minutes
    min_a_line_duration = 6 * 60 # minutes
    min_exposure_duration = 24 * 60 # minutes
    invasive_data_missing_percentage = 30
    adult_patient_age = 18
    lower_BMI, upper_BMI = 10, 60
    
    patients_df = pd.read_parquet(os.path.join(eicu_path, 'patient.parquet'))
    patients_df = patients_df[['patientUnitStayID', 'gender', 'age', 'ethnicity', 'admissionHeight', 'admissionWeight',
                               'hospitalAdmitYear', 'hospitalDischargeOffset', 'unitAdmitSource',
                               'unitDischargeOffset', 'hospitalID']]
    # Overall cohort size 
    print('Total ICU stays=', patients_df['patientUnitStayID'].nunique())
    
    # Merge with admission category dataframe
    icu_stay_type_df = pd.read_csv(os.path.join(project_path_1, 'Entire_Cohort_ICU_admission_category.csv'))
    icu_stay_type_df.rename(columns={'patientunitstayid': 'patientUnitStayID'}, inplace=True)
    patients_df = patients_df.merge(icu_stay_type_df.loc[:, ['patientUnitStayID', 'ICU_admission_category']],
                                    on='patientUnitStayID', how='left')
    
    # Merge with sepsis dataframe
    infection_sepsis_df = pd.read_parquet(os.path.join(project_path_1, 'Entire_Cohort_Sepsis.parquet'))
    patients_df = patients_df.merge(infection_sepsis_df.loc[:, ['patientUnitStayID', 'Sepsis']],
                                    on='patientUnitStayID', how='left')
    
    # Drop patients with invalid age, sex, admission diagnosis
    patients_df['age'] = pd.to_numeric(patients_df['age'], errors='coerce')
    patients_df.dropna(subset=['age', 'ICU_admission_category'], inplace=True)
    target_idx = patients_df['gender'].str.contains('Male|Female')
    patients_df = patients_df.loc[target_idx, :]    
    print('ICU stays with valid age, sex, admission diagnosis=', len(patients_df))
    
    # Select target patients
    if sepsis==1:
        target_idx_disease = np.logical_and(patients_df['ICU_admission_category']==disease, patients_df['Sepsis']==sepsis)
        patients_df = patients_df.loc[target_idx_disease, :]
        print('Total ICU stays with sepsis=', patients_df['patientUnitStayID'].nunique())
    else:
        patients_df = patients_df.loc[patients_df['ICU_admission_category']==disease, :]
        print(f'Total ICU stays with {disease}=', patients_df['patientUnitStayID'].nunique())
    
    # Limit patients to those aged 18 or more
    target_idx = patients_df['age'] >= adult_patient_age
    patients_df = patients_df.loc[target_idx, :]
    overall_cohort_size = len(patients_df)
    print('ICU stays with age >= 18 years=', overall_cohort_size)
    print('    Note: SOFA score is calculated only on patients >= 18 years')
    assert (patients_df['age'] >= adult_patient_age).all()
    
    # Merge with APACHE dataframe
    apache_df = pd.read_parquet(os.path.join(eicu_path, 'apachePatientResults.parquet'))
    apache_df.rename(columns={'patientunitstayid': 'patientUnitStayID'}, inplace=True)
    target_idx_IVa = np.logical_and(apache_df['apacheversion'] == 'IVa', apache_df['apachescore'] > 0)
    apache_df = apache_df.loc[target_idx_IVa, ['patientUnitStayID', 'apachescore', 'actualicumortality', 'actualhospitalmortality']]
    patients_df = patients_df.merge(apache_df, on='patientUnitStayID', how='inner')
    
    # As APACHE table has the reliable mortality, we added the time of death at this point
    patients_df['time_of_death'] = patients_df.loc[:, ['unitDischargeOffset', 'hospitalDischargeOffset']].min(axis=1)
    patients_df.loc[patients_df['actualicumortality'] == 'ALIVE', 'time_of_death'] = np.NaN
    
    w_apache = len(patients_df)
    print('ICU stays w/ APACHE score=', w_apache)
    print('  w/o APACHE score=', overall_cohort_size - w_apache)
    
    # BMI
    # Setting admission height and weight to be NaN if zero
    patients_df.loc[patients_df['admissionHeight'] == 0, 'admissionHeight'] = np.NaN
    patients_df.loc[patients_df['admissionWeight'] == 0, 'admissionWeight'] = np.NaN
    # Computing BMI
    patients_df['admission_BMI'] = patients_df['admissionWeight'] / ((patients_df['admissionHeight'] * 0.01) ** 2)
    # Retaining only BMI > 10 and BMI < 60 of course not null
    target_idx = np.logical_and.reduce((patients_df['admission_BMI'].notnull(), patients_df['admission_BMI'] > lower_BMI, patients_df['admission_BMI'] < upper_BMI))
    patients_df = patients_df.loc[target_idx, :]
    w_BMI = len(patients_df)
    print('Patients w/ valid BMI=', w_BMI)
    print('  w/o valid BMI=', w_apache - w_BMI)
    
    # DNR
    dnr_df = pd.read_parquet(os.path.join(eicu_hsi_path, 'icustay_details_clean.parquet'))
    dnr_df.rename(columns={'patientunitstayid': 'patientUnitStayID'}, inplace=True)
    patients_df = patients_df.merge(dnr_df.loc[:, ['patientUnitStayID', 'DNR']], on='patientUnitStayID', how='left')
    patients_df = patients_df.loc[patients_df['DNR']==False, :]
    no_DNR = len(patients_df)
    print('Patients w/o DNR status=', no_DNR)
    print('  w/ DNR status=', w_BMI-no_DNR)
    
    # Mechanical support
    resp_charting_device_df = pd.read_parquet(os.path.join(project_path_2, 'device_usage_from_resp_charting.parquet'))
    nurse_charting_device_df = pd.read_parquet(os.path.join(project_path_2, 'device_usage_from_nurse_charting.parquet'))
    treatments_device_df = pd.read_parquet(os.path.join(project_path_2, 'device_usage_from_treatments.parquet'))
    patients_with_devices = pd.concat([treatments_device_df['patientUnitStayID'], nurse_charting_device_df['patientUnitStayID'], resp_charting_device_df['patientUnitStayID']], ignore_index=True)
    target_idx = ~patients_df['patientUnitStayID'].isin(patients_with_devices)
    patients_df = patients_df.loc[target_idx, :]
    no_mechanical_support = len(patients_df)
    print('Patients NO MECH. devices=', no_mechanical_support)
    print('  with IABP+LVAD+RVAD+BVAD+Impella+ECMO=', no_DNR - no_mechanical_support)
    
    # LOS<1 or >14 days
    target_idx = np.logical_or(patients_df['actualicumortality'] == 'ALIVE', np.logical_and(patients_df['actualicumortality'] == 'EXPIRED', patients_df['time_of_death'] > min_icu_stay))
    patients_df = patients_df.loc[target_idx, :]
    target_idx = np.logical_and(patients_df['unitDischargeOffset'] >= min_icu_stay, patients_df['unitDischargeOffset'] <= max_icu_stay)
    patients_df = patients_df.loc[target_idx, :]
    valid_LOS = len(patients_df)
    print('Patients stayed in ICU 1-14 days=', valid_LOS)
    print('  stayed in ICU <1 day or > 14 days=', no_mechanical_support-valid_LOS)
    
    # Outcome ascertainment
    # Myocardial injury
    troponin_df = pd.read_parquet(os.path.join(project_path_2, 'troponin_df_max_value_first_offset.parquet.gzip'))
    troponin_df = troponin_df.merge(patients_df.loc[:, ['patientUnitStayID', 'unitDischargeOffset']], how='inner', on='patientUnitStayID')
    target_idx = np.logical_and(troponin_df['labResultOffset'] < min_exposure_duration, troponin_df['labResultOffset'] >= 0)
    exclude_MI_24h = troponin_df.loc[target_idx, 'patientUnitStayID']
    patients_df = patients_df.merge(troponin_df.loc[:, ['patientUnitStayID', 'labResultOffset']], on='patientUnitStayID', how='left')
    patients_df.loc[(patients_df['labResultOffset']>patients_df['unitDischargeOffset']) | (patients_df['labResultOffset']<min_exposure_duration), 'labResultOffset'] = np.NaN
    patients_df = patients_df.loc[~patients_df['patientUnitStayID'].isin(exclude_MI_24h), :]
    no_MI_24h = len(patients_df)
    print('Patients w/o MI within 24h=', no_MI_24h)
    print('  w/ MI within 24h=', valid_LOS-no_MI_24h)
    print('Total patients with Myocardoial injury=', patients_df['labResultOffset'].notnull().sum())
    
    # AKI
    diagnosis_df = pd.read_parquet(os.path.join(eicu_path, 'diagnosis.parquet'))
    diagnosis_df = diagnosis_df.merge(patients_df['patientUnitStayID'], on='patientUnitStayID', how='inner')
    diagnosis_df['ICD9Code'].fillna('None', inplace=True)
    AKI_df = diagnosis_df.loc[diagnosis_df.ICD9Code.str.contains('584.9'), ['patientUnitStayID', 'diagnosisOffset']]
    AKI_df = AKI_df.groupby('patientUnitStayID').agg(AKI_offset=('diagnosisOffset', 'min'))
    AKI_df.reset_index(inplace=True)
    AKI_df = AKI_df.rename(columns = {'index':'patientUnitStayID'})
    target_idx = np.logical_and(AKI_df['AKI_offset'] < min_exposure_duration, AKI_df['AKI_offset'] >= 0)
    exclude_AKI_24h = AKI_df.loc[target_idx, 'patientUnitStayID']
    patients_df = patients_df.merge(AKI_df.loc[:, ['patientUnitStayID', 'AKI_offset']],
                                    on='patientUnitStayID', how='left')
    
    patients_df = patients_df.loc[~patients_df['patientUnitStayID'].isin(exclude_AKI_24h), :]
    patients_df.loc[(patients_df['AKI_offset']>patients_df['unitDischargeOffset']) | (patients_df['AKI_offset']<min_exposure_duration), 'AKI_offset'] = np.NaN
    no_AKI_24h = len(patients_df)
    print('Patients w/o AKI within 24h=', no_AKI_24h)
    print('  w/ AKI within 24h=', no_MI_24h-no_AKI_24h)
    print('Total patients with AKI=', patients_df['AKI_offset'].notnull().sum())
    
    del icu_stay_type_df, infection_sepsis_df, apache_df, dnr_df, resp_charting_device_df, nurse_charting_device_df, treatments_device_df, patients_with_devices, troponin_df, diagnosis_df

    patients_df.rename(columns={'min_observationOffset': 'first_MBP_timestamp', 'max_observationOffset': 'last_MBP_timestamp',
                                'labResultOffset': 'MI_offset', 'time_of_death': 'Death_offset',
                                'unitDischargeOffset': 'Discharge_offset'},
                       inplace=True)

    # Patients who died their discharge time is set to NaN
    target_idx = patients_df['Death_offset'].notnull()
    patients_df.loc[target_idx, 'Discharge_offset'] = np.NaN
    assert len(np.intersect1d(np.where(patients_df['Death_offset'].notnull())[0], np.where(patients_df['Discharge_offset'].notnull())[0])) == 0

    events_of_interest = ['MI_offset', 'AKI_offset', 'Death_offset', 'Discharge_offset']
    patients_df['patient_last_timestamp'] = patients_df[events_of_interest].max(axis=1)
    
    return patients_df

In [6]:
patients_df = filter_eicu_patients_baseline(eicu_path, eicu_hsi_path, 'Medical', 1)
patients_df.to_csv(os.path.join(project_path_2, 'sepsis_patients_filters_first_pass.csv'))

Total ICU stays= 3336449
ICU stays with valid age, sex, admission diagnosis= 2412019
Total ICU stays with sepsis= 301447
ICU stays with age >= 18 years= 301447
    Note: SOFA score is calculated only on patients >= 18 years
ICU stays w/ APACHE score= 228415
  w/o APACHE score= 73032
Patients w/ valid BMI= 215653
  w/o valid BMI= 12762
Patients w/o DNR status= 212516
  w/ DNR status= 3137
Patients NO MECH. devices= 208367
  with IABP+LVAD+RVAD+BVAD+Impella+ECMO= 4149
Patients stayed in ICU 1-14 days= 197462
  stayed in ICU <1 day or > 14 days= 10905
Patients w/o MI within 24h= 168396
  w/ MI within 24h= 29066
Total patients with Myocardoial injury= 4005
Patients w/o AKI within 24h= 137197
  w/ AKI within 24h= 31199
Total patients with AKI= 4921


In [7]:
def load_preprocess_merged_df(start_p, end_p, merged_cols, patients_df):

    merged_df = pd.DataFrame()

    merged_df_filename = os.path.join(project_path_2, 'invasive_BP_selectively_merged_batch_%s_%s.parquet' % (start_p, end_p))

    if os.path.exists(merged_df_filename):
        merged_df = pd.read_parquet(merged_df_filename)

        if len(merged_df) > 0:
            target_idx = merged_df['mergedSystolic'] < merged_df['mergedDiastolic'] + 5
            merged_df.loc[target_idx, 'mergedSystolic'] = np.NaN

            target_idx = np.logical_or(merged_df['mergedMean'] >= merged_df['mergedSystolic'], merged_df['mergedMean'] <= merged_df['mergedDiastolic'])
            merged_df.loc[target_idx, 'mergedMean'] = np.NaN

            mbp_nan_idx = merged_df['mergedMean'].isnull()
            merged_df.loc[mbp_nan_idx, 'mergedMean'] = merged_df.loc[mbp_nan_idx, 'mergedDiastolic'] +\
                                    (1/3 * (merged_df.loc[mbp_nan_idx, 'mergedSystolic'] - merged_df.loc[mbp_nan_idx, 'mergedDiastolic']))

            merged_df['pulse_pressure'] = merged_df['mergedSystolic'] - merged_df['mergedDiastolic']
            assert (merged_df.loc[merged_df['pulse_pressure'].notnull(), 'pulse_pressure'] > 0).all()

            # Dropping rows that have one or more NaN's in the merged columns
            merged_df = merged_df.loc[~merged_df[merged_cols].isnull().any(axis=1), :]
            merged_df = merged_df.merge(patients_df[['patientUnitStayID', 'patient_last_timestamp']], on='patientUnitStayID', how='left')
            merged_df = merged_df.loc[np.logical_and(merged_df['observationOffset'] > 0, merged_df['observationOffset'] <= merged_df['patient_last_timestamp']), :]

            # Sorting by patients and by observation offset
            merged_df = merged_df.sort_values(by=['patientUnitStayID', 'observationOffset'])
            merged_df.reset_index(drop=True, inplace=True)
            merged_df['time_diff'] = merged_df.groupby('patientUnitStayID')['observationOffset'].diff()
            assert merged_df['time_diff'].isnull().sum() == merged_df['patientUnitStayID'].nunique()

    return (merged_df)

######################################################################################################
def get_min_max_start_end_idx():

    patient_id_bins = np.arange(0, 4000001, 100000)
    merged_cols = ['mergedSystolic', 'mergedDiastolic', 'mergedMean', 'pulse_pressure']
    patients_df = pd.read_csv(os.path.join(project_path_2, 'sepsis_patients_filters_first_pass.csv'), index_col=0)

    f = {'observationOffset': ['max', 'min', 'count'],
        'mergedSystolic': ['idxmin', 'min'],
        'mergedDiastolic': ['idxmin', 'min'],
        'mergedMean': ['idxmin', 'min'],
        'pulse_pressure': ['idxmin', 'min'],
        'time_diff': ['max'],
        'systemicMean': ['count', lambda x: x.isnull().sum()],
        'nonInvasiveMean': ['count']
        }

    gather_df = pd.DataFrame()
    for p_idx, p in enumerate(np.arange(1, len(patient_id_bins))):
        merged_df = load_preprocess_merged_df(patient_id_bins[p-1], patient_id_bins[p], merged_cols, patients_df)
        if len(merged_df) > 0:
            temp_df = merged_df[['patientUnitStayID', 'observationOffset', 'time_diff', 'nonInvasiveMean', 'systemicMean']+merged_cols].groupby('patientUnitStayID').agg(f)
            temp_df.columns = ["_".join(x) for x in temp_df.columns.ravel()]
            temp_df.reset_index(inplace=True)
            temp_df.rename(columns={'systemicMean_<lambda_0>': 'invasive_MBP_nan_counts'}, inplace=True)
            temp_df['BP_contribution'] = -1
            temp_df.loc[np.logical_and(temp_df['systemicMean_count'] > 0, temp_df['nonInvasiveMean_count'] > 0), 'BP_contribution'] = 1
            temp_df.loc[np.logical_and(temp_df['systemicMean_count'] > 0, temp_df['nonInvasiveMean_count'] == 0), 'BP_contribution'] = 2
            temp_df.loc[np.logical_and(temp_df['systemicMean_count'] == 0, temp_df['nonInvasiveMean_count'] > 0), 'BP_contribution'] = 3

            for i in temp_df.columns:
                if '_idxmin' in i:
                    temp_df.loc[:, i.split('_')[0]+'_min_observationOffset'] = merged_df.loc[temp_df[i], 'observationOffset'].values

            assert temp_df['patientUnitStayID'].nunique() == len(temp_df)
            gather_df = pd.concat([gather_df, temp_df], ignore_index=True)
            assert gather_df['patientUnitStayID'].nunique() == len(gather_df)

    gather_df.to_csv(os.path.join(project_path_2, 'sepsis_min_max_idx_start_end.csv'))

######################################################################################################
def filter_eicu_patients_second_set():

    eICU_sampling_interval = 5.0
    bp_readings_max_gap_allowed = 2 * 60
    events_of_interest = ['MI_offset', 'AKI_offset', 'Death_offset', 'Discharge_offset']

    patients_df = pd.read_csv(os.path.join(project_path_2, 'sepsis_patients_filters_first_pass.csv'), index_col=0)
    print('Total identified patients=', len(patients_df))

    gather_df = pd.read_csv(os.path.join(project_path_2, 'sepsis_min_max_idx_start_end.csv'), index_col=0)
    print('Patients with invasive+non-invasive (all 3 BPs per data source must be present) between time of admission and time of death|discharge=', len(gather_df))

    patients_df = patients_df.merge(gather_df, on=['patientUnitStayID'], how='inner')

    # Finding patients that have a gap at the beginning, in between BP readings and at the end of the ICU stay
    # Negating this logical or to get valid patients
    target_idx = ~np.logical_or.reduce((patients_df['observationOffset_min'] > bp_readings_max_gap_allowed,\
                        patients_df['time_diff_max'] > bp_readings_max_gap_allowed,\
                        (patients_df['patient_last_timestamp'] - patients_df['observationOffset_max']) > bp_readings_max_gap_allowed))

    print('Max gap <= %s minutes=' % (bp_readings_max_gap_allowed), target_idx.sum())
    patients_df = patients_df.loc[target_idx, :]

    # Few patients (76 to be precise) that have AKI offset after death|discharge
    # We adjust the time of AKI to be the time of death|discharge since ICD 9 code recorded time is unreliable
    for m in ['MI_offset', 'AKI_offset']:
        for d in ['Death_offset', 'Discharge_offset']:
            target_idx = patients_df[m] > patients_df[d]
            if (target_idx).sum() > 0:
                patients_df.loc[target_idx, m] = patients_df.loc[target_idx, d]
            assert (patients_df[m] > patients_df[d]).sum() == 0

    patients_df.to_csv(os.path.join(project_path_2, 'sepsis_patients_filters_second_pass.csv'))

######################################################################################################
def find_minimum_for_x_hours():

    below_max_window = 2 * 60 # 2 hours
    patient_id_bins = np.arange(0, 4000001, 100000)
    merged_cols = ['mergedSystolic', 'mergedDiastolic', 'mergedMean', 'pulse_pressure']
    events_of_interest = ['MI', 'AKI', 'Death', 'Discharge']

    patients_df_second_pass = pd.read_csv(os.path.join(project_path_2, 'sepsis_patients_filters_second_pass.csv'), index_col=0)

    below_cols = ['below_%s_%s_%s' % (below_max_window, j, e) for j in merged_cols for e in events_of_interest]
    patients_df_second_pass[below_cols] = np.NaN

    for p_idx, p in enumerate(np.arange(1, len(patient_id_bins))):
        merged_df = load_preprocess_merged_df(patient_id_bins[p-1], patient_id_bins[p], merged_cols, patients_df_second_pass)

        if len(merged_df) > 0:
            unique_patients = list(merged_df.loc[merged_df['patientUnitStayID'].isin(patients_df_second_pass['patientUnitStayID']), 'patientUnitStayID'].unique())
            for u in unique_patients:
                for e in events_of_interest:
                    event_end = patients_df_second_pass.loc[patients_df_second_pass['patientUnitStayID'] == u, '%s_offset' % (e)].values[0]
                    if ~np.isnan(event_end):
                        patient_data = merged_df.loc[np.logical_and(merged_df['patientUnitStayID'] == u, merged_df['observationOffset'] <= event_end), :]

                        if len(patient_data) > 1:
                            patient_start = patient_data['observationOffset'].values[0]
                            patient_end = patient_data['observationOffset'].values[-1]

                            for m in merged_cols:
                                interpolated_signal = np.interp(np.arange(patient_start, patient_end, 1), patient_data['observationOffset'], patient_data[m])

                                # Few patients have constant BP readings i.e. flat line. I am increasing the max by 1 to execute the for loop below	
                                min_interpolated_signal = np.min(interpolated_signal)
                                max_interpolated_signal = np.max(interpolated_signal)
                                if min_interpolated_signal == max_interpolated_signal:
                                    max_interpolated_signal += 1

                                for i in np.arange(min_interpolated_signal, max_interpolated_signal, 1):
                                    if np.sum(interpolated_signal <= i) >= below_max_window:
                                        patients_df_second_pass.loc[patients_df_second_pass['patientUnitStayID'] == u,\
                                                        'below_%s_%s_%s' % (below_max_window, m, e)] = i
                                        break

    patients_df_second_pass.to_csv(os.path.join(project_path_2, 'sepsis_patients_filters_second_pass.csv'))

######################################################################################################
if __name__ == '__main__':

    # like mimic for eICU we have subject_id=patientHealthSystemStayID and icustay_id=patientUnitStayID
    eicu_path = 
    eicu_hsi_path = 


    get_min_max_start_end_idx()

    filter_eicu_patients_second_set()

    find_minimum_for_x_hours()



Total identified patients= 137197
Patients with invasive+non-invasive (all 3 BPs per data source must be present) between time of admission and time of death|discharge= 128717
Max gap <= 120 minutes= 52261


In [8]:
def perform_third_pass_filters(eicu_path):

    # settings
    past_history_to_exclude = ['Exclude_PastHistory_Chronic_Kidney_Disease', 'Exclude_PastHistory_Myocardial_Infarction',\
                    'Exclude_PastHistory_Stroke', 'Exclude_PastHistory_Coronary_Artery_Disease']
    
    # Loading second pass patient filter
    patients_df = pd.read_csv(os.path.join(project_path_2, 'sepsis_patients_filters_second_pass.csv'), index_col=0)
    no_gap_2h = len(patients_df)
    print('Patients w/o >2 hours BP reading gap=', no_gap_2h)

    # Loading df with hospital information ~459 hospitals in total
    hospital_df = pd.read_parquet(os.path.join(eicu_path, 'hospital.parquet'))
    hospital_df['hospitalID'] = hospital_df['hospitalID'].astype(int)
    patients_df = patients_df.merge(hospital_df, on='hospitalID', how='left')

    # Medication
    admission_drug_df = pd.read_parquet(os.path.join(project_path_2, 'all_admission_drug.parquet'))
    admission_drug_df['patientUnitStayID'] = admission_drug_df['patientUnitStayID'].astype(int)
    patients_df = patients_df.merge(admission_drug_df, on='patientUnitStayID', how='left')
    patients_df = patients_df.fillna({'aspirin': 0, 'diuretics': 0, 'ace_inhibitors': 0, 'ARBs': 0, 'beta_blockers': 0, 'ca_channel_blockers': 0})

    # Past History
    previous_history_df = pd.read_parquet(os.path.join(project_path_2, 'all_previous_history.parquet'))
    previous_history_df['patientUnitStayID'] = previous_history_df['patientUnitStayID'].astype(int)
    patients_df = patients_df.merge(previous_history_df, on='patientUnitStayID', how='left')

    patients_df = patients_df.loc[patients_df[past_history_to_exclude].sum(axis=1) == 0, :]
    w_o_past_history = len(patients_df)
    print('Patients who have no past medical history=', w_o_past_history)
    print('  who have past medical history=', no_gap_2h-w_o_past_history)

    # Lab
    df_lab = pd.read_parquet(os.path.join(project_path_2, 'initial_lab_values.parquet'))
    patients_df = patients_df.merge(df_lab.loc[:, ['patientUnitStayID', 'Initial_Hgb', 'Initial_albumin',
                                                   'Initial_WBCx1000', 'Initial_BUN', 'Initial_lactate', 'eGFR']],
                                    on='patientUnitStayID', how='left')
    patients_df = patients_df.loc[patients_df['eGFR']>=60, :]
    patients_df = patients_df.drop(['eGFR'], axis=1)
    eGFR_60 = len(patients_df)
    print('Patients w/ eGFR >= 60=', eGFR_60)
    print('  w/ eGFR < 60=', w_o_past_history-eGFR_60)
              
    patients_df.to_csv(os.path.join(project_path_2, 'sepsis_patients_filters_third_pass.csv'))
    
    return patients_df

In [9]:
patients_df = perform_third_pass_filters(eicu_path)

Patients w/o >2 hours BP reading gap= 52261
Patients who have no past medical history= 34012
  who have past medical history= 18249
Patients w/ eGFR >= 60= 22494
  w/ eGFR < 60= 11518


In [10]:
df_sepsis = spark.read.csv(os.path.join(project_path_2, 'sepsis_patients_filters_third_pass.csv'),
                    inferSchema = True, header = True)

df_event = spark.read.parquet('/home/lx099-scratch/users/asif/multitask_events.parquet',)

df_vent = (df_event.filter(F.col('Event')=='InvasiveVentilation')
           .filter((F.col('StartOffset')<24*60)&(F.col('EndOffset')>0))
           .withColumn('On_vent', F.lit(1).cast('int'))
           .groupBy('patientUnitStayID')
           .agg(F.max('On_vent').alias('On_vent')))

df_sepsis = (df_sepsis.join(df_vent, on='patientUnitStayID', how='left')
             .na.fill({'On_vent': 0}))

In [11]:
def get_df_outcome(df, outcome):
    df_outcome = (df.select('patientUnitStayID', F.col('age').cast('int').alias('age'),
                            F.when(F.col('gender')=='Female', 0)
                            .otherwise(1).alias('male'),
                            F.when(F.col('ethnicity').isNull(), 'Other/Unknown')
                            .otherwise(F.col('ethnicity')).alias('ethnicity'),
                            F.round(F.col('admission_BMI'), 1).alias('BMI'),
                            'apachescore',
                            F.when(F.col('unitAdmitSource')=='Emergency Department', 'Emergency_Department')
                            .when(F.col('unitAdmitSource')=='Other Hospital', 'Other_Hospital')
                            .when((F.col('unitAdmitSource').isNull())|(F.col('unitAdmitSource')=='Other'), 'Other/Unknown')
                            .when(F.col('unitAdmitSource')=='Direct Admit', 'Elective')
                            .otherwise('Other_Ward').alias('Admission_Type'),
                            F.when(F.col('hospitalAdmitYear')<=2008, '2004-2008')
                            .when(F.col('hospitalAdmitYear')>=2013, '2013-2016')
                            .otherwise('2009-2012').alias('Admit_Year'),
                            F.when(F.col('numbedscategory').isNull(), 'Unknown')
                            .otherwise(F.col('numbedscategory')).alias('Hospital_Bed_Size'),
                            'aspirin', 'diuretics', 'ace_inhibitors', 'ARBs', 'beta_blockers', 'ca_channel_blockers',
                            F.when(F.col('Initial_Hgb').isNull(), 'No_reading_available')
                            .when(F.col('Initial_Hgb')<8, '<8')
                            .when(F.col('Initial_Hgb')>=11, '>=11')
                            .otherwise('8<=Hb<11').alias('Hb'),
                            F.when(F.col('Initial_albumin').isNull(), 'No_reading_available')
                            .when(F.col('Initial_albumin')<2, '<2')
                            .when(F.col('Initial_albumin')>=3, '>=3')
                            .otherwise('2<=Alb<3').alias('Alb'),
                            F.when(F.col('Initial_WBCx1000').isNull(), 'No_reading_available')
                            .when(F.col('Initial_WBCx1000')<4, '<4')
                            .when(F.col('Initial_WBCx1000')>=12, '>=12')
                            .otherwise('4<=WBCx1000<12').alias('WBCx1000'),
                            F.when(F.col('Initial_BUN').isNull(), 'No_reading_available')
                            .when(F.col('Initial_BUN')<=30, '<=30')
                            .otherwise('>30').alias('BUN'),
                            F.when(F.col('Initial_lactate').isNull(), 'No_reading_available')
                            .when(F.col('Initial_lactate')<2, '<2')
                            .when(F.col('Initial_lactate')>=5, '>=5')
                            .otherwise('2<=Lac<5').alias('Lac'),
                            'PastHistory_Hypertension', 'PastHistory_Diabetes', 'PastHistory_COPD',
                            'PastHistory_Congestive_Heart_Failure', 'PastHistory_Peripheral_Vascular_Disease',
                            'PastHistory_Valve_disease', 'PastHistory_Pulmonary_Embolism',
                            'PastHistory_Neuromuscular_Disease', 'PastHistory_Hypothyroidism',
                            'PastHistory_Liver_Disease', 'PastHistory_AIDS', 'PastHistory_Cancer_Tumor',
                            'PastHistory_Arthritis_Vasculitis', 'PastHistory_Coagulopathy',
                            'PastHistory_Anemia', 'PastHistory_Home_Oxygen', 'PastHistory_Organ_Transplant',
                            'On_vent',
                            F.when(F.col('below_120_mergedSystolic_'+outcome).isNotNull(), F.col('below_120_mergedSystolic_'+outcome))
                            .when(F.col('below_120_mergedSystolic_Death').isNotNull(), F.col('below_120_mergedSystolic_Death'))
                            .otherwise(F.col('below_120_mergedSystolic_Discharge')).alias('Systolic_below_120'),
                            F.when(F.col('below_120_mergedDiastolic_'+outcome).isNotNull(), F.col('below_120_mergedDiastolic_'+outcome))
                            .when(F.col('below_120_mergedDiastolic_Death').isNotNull(), F.col('below_120_mergedDiastolic_Death'))
                            .otherwise(F.col('below_120_mergedDiastolic_Discharge')).alias('Diastolic_below_120'),
                            F.when(F.col('below_120_mergedMean_'+outcome).isNotNull(), F.col('below_120_mergedMean_'+outcome))
                            .when(F.col('below_120_mergedMean_Death').isNotNull(), F.col('below_120_mergedMean_Death'))
                            .otherwise(F.col('below_120_mergedMean_Discharge')).alias('Mean_below_120'),
                            F.when(F.col('below_120_pulse_pressure_'+outcome).isNotNull(), F.col('below_120_pulse_pressure_'+outcome))
                            .when(F.col('below_120_pulse_pressure_Death').isNotNull(), F.col('below_120_pulse_pressure_Death'))
                            .otherwise(F.col('below_120_pulse_pressure_Discharge')).alias('Pulsepressure_below_120'),
                            F.when(F.col('actualicumortality')=='ALIVE', 0)
                            .otherwise(1).alias('ICU_mortality'),
                            F.when(F.col('actualhospitalmortality')=='ALIVE', 0)
                            .otherwise(1).alias('Hospital_mortality'),
                            F.when(F.col(outcome+'_offset').isNotNull(), 1)
                            .otherwise(0).alias(outcome),
                            F.when(F.col(outcome+'_offset').isNotNull(), F.col(outcome+'_offset'))
                            .when(F.col('Death_offset').isNotNull(), F.col('Death_offset'))
                            .otherwise(F.col('Discharge_offset')).alias('Outcome_offset')))
    
    return df_outcome

In [12]:
df_Death = get_df_outcome(df_sepsis, 'Death')

In [13]:
df_vsif = spark.read.parquet(os.path.join(project_path_2, 'vasopressors.parquet'))

In [14]:
def get_df_outcome_vsif(df_outcome, df_vsif):
    w_lag = (Window
             .partitionBy('patientUnitStayID', 'drugConcept')
             .orderBy(F.col('infusionOffset').desc())
            )

    vsif_amount = (df_outcome.select('patientUnitStayID', 'Outcome_offset')
                   .join(df_vsif, 'patientUnitStayID')
                   .withColumn('infusionEndOffset', F.lag(F.col('infusionOffset')).over(w_lag))
                   .filter((F.col('infusionEndOffset')>0)&(F.col('infusionOffset')<F.col('Outcome_offset')))
                   .withColumn('Start',
                               F.when(F.col('infusionOffset')>=0, F.col('infusionOffset'))
                               .otherwise(0).cast('int'))
                   .withColumn('End',
                               F.when(F.col('infusionEndOffset')>=F.col('Outcome_offset'), F.col('Outcome_offset'))
                               .when((F.col('infusionEndOffset').isNull())&(F.col('Start')+60<=F.col('Outcome_offset')), 60)
                               .when((F.col('infusionEndOffset').isNull())&(F.col('Start')+60>F.col('Outcome_offset')), F.col('Outcome_offset'))
                               .otherwise(F.col('infusionEndOffset')).cast('int'))
                   .orderBy('patientUnitStayID', 'drugConcept', 'Start')
                   .withColumn('Duration', (F.col('End')-F.col('Start')).cast('int'))
                   .withColumn('Amount', F.col('NEEquivalentDrugRate')*F.col('Duration'))
                   .groupBy('patientUnitStayID')
                   .agg(F.round(F.sum('Amount'), 2).alias('TotalAmountNEE')))
    
    df_outcome_vsif = (df_outcome.join(vsif_amount, 'patientUnitStayID', 'left')
                       .withColumn('NEE_mcg_perkg_perminx1000', F.round(1000*F.col('TotalAmountNEE')/F.col('Outcome_offset')).cast('int'))
                       .fillna(0, subset=['TotalAmountNEE', 'NEE_mcg_perkg_perminx1000']))
    
    return df_outcome_vsif

In [15]:
df_Death_vsif = get_df_outcome_vsif(df_Death, df_vsif).toPandas()
df_Death_vsif.to_csv(os.path.join(project_path_2, 'Sepsis_Death.csv'), index=False)

22/06/15 17:39:37 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
                                                                                

In [16]:
def get_df_outcome_composite(df, outcome):
    df_outcome = (df.select('patientUnitStayID', F.col('age').cast('int').alias('age'),
                            F.when(F.col('gender')=='Female', 0)
                            .otherwise(1).alias('male'),
                            F.when(F.col('ethnicity').isNull(), 'Other/Unknown')
                            .otherwise(F.col('ethnicity')).alias('ethnicity'),
                            F.round(F.col('admission_BMI'), 1).alias('BMI'),
                            'apachescore',
                            F.when(F.col('unitAdmitSource')=='Emergency Department', 'Emergency_Department')
                            .when(F.col('unitAdmitSource')=='Other Hospital', 'Other_Hospital')
                            .when((F.col('unitAdmitSource').isNull())|(F.col('unitAdmitSource')=='Other'), 'Other/Unknown')
                            .when(F.col('unitAdmitSource')=='Direct Admit', 'Elective')
                            .otherwise('Other_Ward').alias('Admission_Type'),
                            F.when(F.col('hospitalAdmitYear')<=2008, '2004-2008')
                            .when(F.col('hospitalAdmitYear')>=2013, '2013-2016')
                            .otherwise('2009-2012').alias('Admit_Year'),
                            F.when(F.col('numbedscategory').isNull(), 'Unknown')
                            .otherwise(F.col('numbedscategory')).alias('Hospital_Bed_Size'),
                            'aspirin', 'diuretics', 'ace_inhibitors', 'ARBs', 'beta_blockers', 'ca_channel_blockers',
                            F.when(F.col('Initial_Hgb').isNull(), 'No_reading_available')
                            .when(F.col('Initial_Hgb')<8, '<8')
                            .when(F.col('Initial_Hgb')>=11, '>=11')
                            .otherwise('8<=Hb<11').alias('Hb'),
                            F.when(F.col('Initial_albumin').isNull(), 'No_reading_available')
                            .when(F.col('Initial_albumin')<2, '<2')
                            .when(F.col('Initial_albumin')>=3, '>=3')
                            .otherwise('2<=Alb<3').alias('Alb'),
                            F.when(F.col('Initial_WBCx1000').isNull(), 'No_reading_available')
                            .when(F.col('Initial_WBCx1000')<4, '<4')
                            .when(F.col('Initial_WBCx1000')>=12, '>=12')
                            .otherwise('4<=WBCx1000<12').alias('WBCx1000'),
                            F.when(F.col('Initial_BUN').isNull(), 'No_reading_available')
                            .when(F.col('Initial_BUN')<=30, '<=30')
                            .otherwise('>30').alias('BUN'),
                            F.when(F.col('Initial_lactate').isNull(), 'No_reading_available')
                            .when(F.col('Initial_lactate')<2, '<2')
                            .when(F.col('Initial_lactate')>=5, '>=5')
                            .otherwise('2<=Lac<5').alias('Lac'),
                            'PastHistory_Hypertension', 'PastHistory_Diabetes', 'PastHistory_COPD',
                            'PastHistory_Congestive_Heart_Failure', 'PastHistory_Peripheral_Vascular_Disease',
                            'PastHistory_Valve_disease', 'PastHistory_Pulmonary_Embolism',
                            'PastHistory_Neuromuscular_Disease', 'PastHistory_Hypothyroidism',
                            'PastHistory_Liver_Disease', 'PastHistory_AIDS', 'PastHistory_Cancer_Tumor',
                            'PastHistory_Arthritis_Vasculitis', 'PastHistory_Coagulopathy',
                            'PastHistory_Anemia', 'PastHistory_Home_Oxygen', 'PastHistory_Organ_Transplant',
                            'On_vent',
                            F.when(F.col('below_120_mergedSystolic_'+outcome).isNotNull(), F.col('below_120_mergedSystolic_'+outcome))
                            .when(F.col('below_120_mergedSystolic_Death').isNotNull(), F.col('below_120_mergedSystolic_Death'))
                            .otherwise(F.col('below_120_mergedSystolic_Discharge')).alias('Systolic_below_120'),
                            F.when(F.col('below_120_mergedDiastolic_'+outcome).isNotNull(), F.col('below_120_mergedDiastolic_'+outcome))
                            .when(F.col('below_120_mergedDiastolic_Death').isNotNull(), F.col('below_120_mergedDiastolic_Death'))
                            .otherwise(F.col('below_120_mergedDiastolic_Discharge')).alias('Diastolic_below_120'),
                            F.when(F.col('below_120_mergedMean_'+outcome).isNotNull(), F.col('below_120_mergedMean_'+outcome))
                            .when(F.col('below_120_mergedMean_Death').isNotNull(), F.col('below_120_mergedMean_Death'))
                            .otherwise(F.col('below_120_mergedMean_Discharge')).alias('Mean_below_120'),
                            F.when(F.col('below_120_pulse_pressure_'+outcome).isNotNull(), F.col('below_120_pulse_pressure_'+outcome))
                            .when(F.col('below_120_pulse_pressure_Death').isNotNull(), F.col('below_120_pulse_pressure_Death'))
                            .otherwise(F.col('below_120_pulse_pressure_Discharge')).alias('Pulsepressure_below_120'),
                            F.when(F.col('actualicumortality')=='ALIVE', 0)
                            .otherwise(1).alias('ICU_mortality'),
                            F.when(F.col('actualhospitalmortality')=='ALIVE', 0)
                            .otherwise(1).alias('Hospital_mortality'),
                            F.when(F.col(outcome+'_offset').isNotNull(), 1)
                            .otherwise(0).alias(outcome),
                            F.when(F.col(outcome+'_offset').isNotNull(), 1)
                            .when(F.col('Death_offset').isNotNull(), 1)
                            .otherwise(0).alias(outcome+'_composite'),
                            F.when(F.col('Death_offset').isNotNull(), F.col('Death_offset'))
                            .when(F.col(outcome+'_offset').isNotNull(), F.col(outcome+'_offset'))
                            .otherwise(F.col('Discharge_offset')).alias('Outcome_offset')))
    
    return df_outcome

In [17]:
df_AKI_composite = get_df_outcome_composite(df_sepsis, 'AKI')
df_MI_composite = get_df_outcome_composite(df_sepsis, 'MI')
df_AKI_vsif_composite = get_df_outcome_vsif(df_AKI_composite, df_vsif).toPandas()
df_MI_vsif_composite = get_df_outcome_vsif(df_MI_composite, df_vsif).toPandas()
df_AKI_vsif_composite.to_csv(os.path.join(project_path_2, 'Sepsis_AKI_composite.csv'), index=False)
df_MI_vsif_composite.to_csv(os.path.join(project_path_2, 'Sepsis_MI_composite.csv'), index=False)

                                                                                