In [17]:
import pandas as pd
import numpy as np
import dask.dataframe as ddf

In [18]:
'''
read_data_tables: returns relevant data tables from downloaded MIMIC-IV core and hosp modules
'''
def read_data_tables(data_dir):
    # Define module paths
    core = data_dir+'core/' 
    hosp = data_dir+'hosp/'
    
    # Read core data tables
    patients = pd.read_csv(core+'patients.csv', parse_dates=['dod'])
    admissions = pd.read_csv(core+'admissions.csv', parse_dates=['admittime','dischtime','deathtime','edregtime','edouttime'])
    transfers = pd.read_csv(core+'transfers.csv', parse_dates=['intime','outtime'])
    
    # Read hosp data tables (note that labevents.csv is read in parallel using Dask due to its sheer size)
    icd = pd.read_csv(hosp+'d_icd_diagnoses.csv')
    diagnoses_icd = pd.read_csv(hosp+'diagnoses_icd.csv')
    labitems = pd.read_csv(hosp+'d_labitems.csv')
    labs = ddf.read_csv(hosp+'labevents.csv', dtype={
        "labevent_id": "Int64",
        "subject_id": "int64",
        "hadm_id": "Int64",
        "specimen_id": "Int64",
        "itemid": "int64",
        "order_provider_id": "string",
        "value": "string",
        "valuenum": "float64",
        "valueuom": "string",
        "ref_range_lower": "float64",
        "ref_range_upper": "float64",
        "flag": "string",
        "priority": "string",
        "comments": "string"
    }, parse_dates=["charttime", "storetime"])
    return patients, admissions, transfers, icd, diagnoses_icd, labitems, labs

data_dir = 'data/mimic-iv-1.0/physionet.org/files/mimiciv/1.0/'
patients, admissions, transfers, icd, diagnoses_icd, labitems, labs = read_data_tables(data_dir)

In [19]:
# Define ICD-9 and ICD-10 roll-up mappings for birthweight and gestational age
bw_map = {
    10: {
        'P0700': 'UNK',
        'P0701': '<500',
        'P0702': '500-749',
        'P0703': '750-999',
        'P0710': 'UNK',
        'P0714': '1000-1249',
        'P0715': '1250-1499',
        'P0716': '1500-1749',
        'P0717': '1750-1999',
        'P0718': '2000-2499',
    },
    9: {
        '76500': 'UNK',
        '76501': '<500',
        '76502': '500-749',
        '76503': '750-999',
        '76504': '1000-1249',
        '76505': '1250-1499',
        '76506': '1500-1749',
        '76507': '1750-1999',
        '76508': '2000-2499',
        '76509': '>2500',
        '76510': 'UNK',
        '76500': 'UNK',
        '76511': '<500',
        '76512': '500-749',
        '76513': '750-999',
        '76514': '1000-1249',
        '76515': '1250-1499',
        '76516': '1500-1749',
        '76517': '1750-1999',
        '76518': '2000-2499',
        '76519': '>2500',
    }
}
ga_map = {
    10: {
        'P0720': 'UNK',
        'P0721': '<=24',
        'P0722': '<=24',
        'P0723': '<=24',
        'P0724': '25-26',
        'P0725': '25-26',
        'P0726': '27-28',
        'P0730': 'UNK',
        'P0731': '27-28',
        'P0732': '29-30',
        'P0733': '29-30',
        'P0734': '31-32',
        'P0735': '31-32',
        'P0736': '33-34',
        'P0737': '33-34',
        'P0738': '35-36',
        'P0739': '35-36',
    },
    9: {
        '76520': 'UNK',
        '76521': '<=24',
        '76522': '<=24',
        '76523': '25-26',
        '76524': '27-28',
        '76525': '29-30',
        '76526': '31-32',
        '76527': '33-34',
        '76528': '35-36',
        '76529': '>36',
    }
}

In [20]:
# Define number of hours since NICU admission as time 0
t0_out = 6 
t0_covars = 1

In [21]:
# Define lab events
select_labs = [
    'pO2',
    'pCO2',
    'pH',
    'Base Excess',
    'Hematocrit',
    'White Blood Cells',
    'Platelet Count',
    'Intubated',
]
select_required_labs = [
    'Base Excess',
]

In [22]:
'''
build_cohort: constructs cohort of premature neonatal patients 
'''
def build_cohort(
    patients, # Raw data frame
    admissions, # Raw data frame
    transfers, # Raw data frame
    icd, # Raw data frame
    diagnoses_icd, # Raw data frame
    labitems, # Raw data frame
    labs, # Raw data frame
    t0_out, # Number of hours after NICU admission for time 0 for outcome (discharge/death) exclusion criteria
    t0_covars, # Number of hours after NICU admission for time 0 for covariate (lab) inclusion criteria
    ga_map, # Mapping of ICD codes to gestational age bins
    bw_map, # Mapping of ICD codes to birthweight bins
    select_labs, # Set of lab item labels to include in feature space
    select_required_labs=[], # Subset of lab item labels that must not be missing
    ret=False # Whether or not to return the data object (automatically saved in CSV file)
):
    # Filter for neonatal patients (age = 0)
    neonatal_patients = patients[patients['anchor_age'] == 0]
    
    # Filter for premature neonatal patients (ICD codes P07 in v10 and 765 in v9)
    diagnoses = diagnoses_icd.merge(icd, on=['icd_code', 'icd_version'], how='inner')
    premature_diagnoses = diagnoses[
        ((diagnoses['icd_code'].str.startswith('P07')) & (diagnoses['icd_version'] == 10)) | 
        ((diagnoses['icd_code'].str.startswith('765')) & (diagnoses['icd_code'] != 76529) & (diagnoses['icd_version'] == 9))
    ]
    premature_patients = neonatal_patients[neonatal_patients['subject_id'].isin(premature_diagnoses['subject_id'])]
    
    # Filter for first hospital admission per patient (earliest admittime)
    premature_patients_admissions = premature_patients.merge(admissions, on=['subject_id'], how='inner')
    premature_patients_admissions = premature_patients_admissions.sort_values(['subject_id', 'admittime'])
    premature_patients_admissions = premature_patients_admissions.drop_duplicates(subset=['subject_id'])

    # Filter for non-ambiguous discharge (not against medical advice, either discharged dead or to home or home healthcare, not acute hospital, etc.)
    premature_patients_admissions = premature_patients_admissions[premature_patients_admissions['discharge_location'] != 'AGAINST ADVICE']
    premature_patients_admissions = premature_patients_admissions[
        ((premature_patients_admissions['hospital_expire_flag'] == 1) & (premature_patients_admissions['discharge_location'] == 'DIED')) |
        ((premature_patients_admissions['hospital_expire_flag'] == 0) & (premature_patients_admissions['discharge_location'].isin(['HOME', 'HOME HEALTH CARE'])))
    ]

    # Filter for NICU transfers (transfers with careunit as Neonatal Intensive Care Unit)
    nicu_transfers = transfers[transfers['careunit'] == 'Neonatal Intensive Care Unit (NICU)']
    premature_nicu = premature_patients_admissions.merge(nicu_transfers, on=['subject_id', 'hadm_id'], how='inner')
    premature_nicu = premature_nicu.sort_values(['subject_id', 'intime'])
    premature_nicu = premature_nicu.drop_duplicates(subset=['subject_id'])

    # Calculate temporal anchor differences in hours (hopsital admission time, NICU transfer time, discharge time, death time)
    premature_nicu['nicu_admit_diff'] = (pd.to_datetime(premature_nicu['intime']) - pd.to_datetime(premature_nicu['admittime'])).astype(int)/(10**9)/60/60
    premature_nicu['disch_nicu_diff'] = np.where(
        premature_nicu['hospital_expire_flag'] == 0,
        premature_nicu['dischtime'],
        premature_nicu['deathtime']
    ).astype(int)/(10**9)/60/60 - premature_nicu['intime'].astype(int)/(10**9)/60/60
    premature_nicu['disch_admit_diff'] = np.where(
        premature_nicu['hospital_expire_flag'] == 0,
        premature_nicu['dischtime'],
        premature_nicu['deathtime']
    ).astype(int)/(10**9)/60/60 - premature_nicu['admittime'].astype(int)/(10**9)/60/60

    # Filter for NICU transfer within 24 hours of birth (admission is proxy for birth)
    premature_nicu_filt = premature_nicu.loc[(premature_nicu['nicu_admit_diff'] <= 24)]
    
    # Filter for logical time sequence (admit â‰¤ nicu < disch/death)
    premature_nicu_filt = premature_nicu_filt[(premature_nicu_filt['nicu_admit_diff'] >= 0) & (premature_nicu_filt['disch_admit_diff'] > 0) & (premature_nicu_filt['disch_nicu_diff'] > 0)]
    
    # Filter for death/discharge after given time 0
    premature_nicu_filt = premature_nicu_filt[premature_nicu_filt['disch_nicu_diff'] >= t0_out]

    # Roll up gestational age and birthweight to mapped categories (unified across ICD-9 and ICD-10)
    def map_to_df(mapping, feature):
        rows = [(version, code, category) for version, codes in mapping.items() for code, category in codes.items()]
        return pd.DataFrame(rows, columns=['icd_version', 'icd_code', feature])
    ga_df = map_to_df(ga_map, 'ga')
    bw_df = map_to_df(bw_map, 'bw')

    # Add gestational age and birthweight features to each patient in cohort
    ga_diagnoses = premature_diagnoses.merge(ga_df, on=['icd_version', 'icd_code'], how='left')
    bw_diagnoses = premature_diagnoses.merge(bw_df, on=['icd_version', 'icd_code'], how='left')
    ga_bw_diagnoses = pd.concat([
        ga_diagnoses[['subject_id', 'ga']],
        bw_diagnoses[['subject_id', 'bw']]
    ])
    ga_bw_patients = ga_bw_diagnoses.groupby('subject_id', as_index=False).agg({'ga': 'first', 'bw': 'first'})
    final_cohort = premature_nicu_filt.merge(ga_bw_patients, on='subject_id', how='inner')

    # Retrieve all lab events recorded after hospital admission and before time 0
    final_cohort_ddf = ddf.from_pandas(final_cohort, npartitions=1)
    final_cohort_labs_ddf = labs.merge(final_cohort_ddf[['subject_id','hospital_expire_flag','admittime','intime']], on='subject_id', how='inner')
    final_cohort_labs_ddf = final_cohort_labs_ddf.merge(labitems, on='itemid')
    final_cohort_labs_ddf = final_cohort_labs_ddf[(final_cohort_labs_ddf['charttime'].astype(int)/(10**9)/60/60) >= (final_cohort_labs_ddf['admittime']).astype(int)/(10**9)/60/60]
    final_cohort_labs_ddf = final_cohort_labs_ddf[(final_cohort_labs_ddf['charttime'].astype(int)/(10**9)/60/60) < (final_cohort_labs_ddf['intime']).astype(int)/(10**9)/60/60 + t0_covars]
    final_cohort_labs_ddf = final_cohort_labs_ddf[['subject_id', 'charttime', 'valuenum', 'label', 'flag']]
    
    # Filter for select lab events
    final_cohort_labs_ddf = final_cohort_labs_ddf[final_cohort_labs_ddf['label'].isin(select_labs)]
    final_cohort_labs = final_cohort_labs_ddf.compute()
    
    # Calculate the number of recorded measurements per lab per subject (as a collapsed proxy for temporality)
    final_cohort_labs_counts = final_cohort_labs.groupby(['subject_id', 'label']).size().unstack(fill_value=0).reset_index()
    final_cohort_labs_counts = final_cohort_labs_counts.rename(columns={col: f"{col}_count" for col in final_cohort_labs_counts.columns if col != 'subject_id'})
    
    # Aggregate labs based on latest measurement
    final_cohort_labs_latest = final_cohort_labs.sort_values(['subject_id', 'label', 'charttime']).groupby(['subject_id', 'label'], as_index=False).last()

    # Reshape labs to wide data table
    final_cohort_labs_wide = final_cohort_labs_latest.pivot_table(
        index='subject_id',
        columns='label',
        values=['charttime', 'valuenum', 'label', 'flag'],
        aggfunc='first'
    )
    final_cohort_labs_wide.columns = [f"{lbl}_{col}" for col, lbl in final_cohort_labs_wide.columns]
    final_cohort_labs_wide = final_cohort_labs_wide.reset_index()

    # Combine latest record with temporal proxy
    final_cohort_labs_wide = final_cohort_labs_wide.merge(final_cohort_labs_counts, on='subject_id')

    # Merge labs with cohort, and remove patients with none of the select labs recorded
    final_cohort = final_cohort[['subject_id', 'hospital_expire_flag', 'gender', 'ga', 'bw', 'anchor_year_group']].merge(final_cohort_labs_wide, on='subject_id', how='inner')

    # Optionally remove patients without any of the required labs recorded
    for required_lab in select_required_labs:
        final_cohort = final_cohort[final_cohort[f'{required_lab}_count'] > 0]
    
    # Save final cohort as CSV file
    final_cohort.to_csv(f'cohorts/cohort_t{t0_out}_0.csv', index=False)
    if ret: return final_cohort

In [None]:
# Create cohort and save as CSV file in cohorts directory
build_cohort(patients, admissions, transfers, icd, diagnoses_icd, labitems, labs, t0_out, t0_covars, ga_map, bw_map, select_labs, select_required_labs)