# extract_data_eicu


## Prepare


In [1]:
from functools import reduce
import json
import pandas as pd
import numpy as np
from common_eicu import *


In [2]:
TEST_ROWS = 50_000

COMPACT_MODE = False
# COMPACT_MODE = True

TEST_MODE = False
# TEST_MODE = True

# COMPRESS_OUTPUT = False
COMPRESS_OUTPUT = True


In [3]:
if COMPACT_MODE:
    NON_TEMPORAL_COLUMNS = NON_TEMPORAL_COLUMNS_COMPACT
    LAB_VARIABLES = LAB_VARIABLES_COMPACT
    EXAM_ITEMS = EXAM_ITEMS_COMPACT
    TREATMENT_KEYWORDS = TREATMENT_KEYWORDS_COMPACT
    CATEGORICAL_COLUMNS = CATEGORICAL_COLUMNS_COMPACT
    CONDITION_ONLY_COLUMNS = CONDITION_ONLY_COLUMNS_COMPACT
    APERIODIC_COLUMNS = APERIODIC_COLUMNS_COMPACT
    PERIODIC_COLUMNS = PERIODIC_COLUMNS_COMPACT
    CUMULATIVE_COLUMNS = CUMULATIVE_COLUMNS_COMPACT
    INFUSION_KEYWORDS = INFUSION_KEYWORDS_COMPACT
else:
    NON_TEMPORAL_COLUMNS = NON_TEMPORAL_COLUMNS_FULL
    LAB_VARIABLES = LAB_VARIABLES_FULL
    EXAM_ITEMS = EXAM_ITEMS_FULL
    TREATMENT_KEYWORDS = TREATMENT_KEYWORDS_FULL
    CATEGORICAL_COLUMNS = CATEGORICAL_COLUMNS_FULL
    CONDITION_ONLY_COLUMNS = CONDITION_ONLY_COLUMNS_FULL
    APERIODIC_COLUMNS = APERIODIC_COLUMNS_FULL
    PERIODIC_COLUMNS = PERIODIC_COLUMNS_FULL
    CUMULATIVE_COLUMNS = CUMULATIVE_COLUMNS_FULL
    INFUSION_KEYWORDS = INFUSION_KEYWORDS_FULL


## Non-Temporal Data


In [4]:
# generate the map of non-temporal data sources
data_sources = {}
with open(CATALOGUE_PATH, 'r') as catalogue_file:
    catalogue = json.load(catalogue_file)
    for column_name in NON_TEMPORAL_COLUMNS:
        if not column_name in catalogue:
            raise Exception(
                f'Cannot find column "{column_name}" in catalogue!'
            )
        file_path = catalogue[column_name]
        if file_path in data_sources:
            data_sources[file_path].append(column_name)
        else:
            data_sources[file_path] = [column_name]


In [5]:
# collect non-temporal data frames
data_frames = []
for input_path, column_names in data_sources.items():
    usecols = [KEY_IDENTITY, *column_names]
    data_frame = pd.read_csv(
        input_path,
        usecols=usecols,
        index_col=KEY_IDENTITY,
    )
    data_frame.columns = map(
        map_column_name,
        data_frame.columns,
    )
    data_frames.append(data_frame)


In [6]:
def map_age(age):
    if age != age:
        return age
    elif age == '> 89':
        return 90
    else:
        return int(age)


In [7]:
def map_gender(gender):
    if gender == 'Female':
        return 'Female'
    elif gender == 'Male':
        return 'Male'
    else:
        return 'Other'


In [8]:
# join non-temporal data frames
df_non_temporal = pd.concat(
    data_frames,
    axis='columns',
    join='outer',
)

# fix some columns
df_non_temporal['age'] = df_non_temporal['age'].map(map_age)
df_non_temporal['gender'] = df_non_temporal['gender'].map(map_gender)

df_non_temporal.info()


<class 'pandas.core.frame.DataFrame'>
Int64Index: 200859 entries, 141168 to 3353263
Data columns (total 4 columns):
 #   Column     Non-Null Count   Dtype  
---  ------     --------------   -----  
 0   gender     200859 non-null  object 
 1   age        200764 non-null  float64
 2   ethnicity  198569 non-null  object 
 3   height     196644 non-null  float64
dtypes: float64(2), object(2)
memory usage: 7.7+ MB


## Init Temporal Data


In [9]:
# identity -> { raw_col -> { 'offsets': [], 'values': []  } }
temporal_data = {}

# init indices
for index in df_non_temporal.index:
    temporal_data[index] = {}

len(temporal_data)


200859

## Filter Diagnosis


In [10]:
df_sepsis = pd.read_csv(
    relative_path('./data/sepsis_eicu.csv.gz'),
    usecols=[KEY_IDENTITY, KEY_DIAGNOSIS_STRING],
    nrows=(TEST_ROWS if TEST_MODE else None),
)

non_sepsis = set(temporal_data.keys())
diagnosis_iterator = SimpleProgress(df_sepsis.index)
for index in diagnosis_iterator:
    identity = df_sepsis.at[index, KEY_IDENTITY]
    non_sepsis.discard(identity)

for identity in non_sepsis:
    del temporal_data[identity]


100%


In [11]:
len(temporal_data)


23479

## Treatment Info


In [12]:
df_treatment = pd.read_csv(
    relative_path('./data/treatment_eicu_processed.csv.gz'),
    nrows=(TEST_ROWS if TEST_MODE else None),
)

# init treatment columns in temporal_data
for record in temporal_data.values():
    for keyword in TREATMENT_KEYWORDS:
        record[keyword] = {
            'offsets': [],
            'values': [],
        }

# collect treatment info
treatment_iterator = SimpleProgress(df_treatment.index)
for index in treatment_iterator:
    identity = df_treatment.at[index, KEY_IDENTITY]
    if not identity in temporal_data:
        continue
    record = temporal_data[identity]
    raw_value = df_treatment.at[index, KEY_TREATMENT_STRING]
    treatment_string = str(raw_value).lower()
    for keyword in TREATMENT_KEYWORDS:
        if not keyword in treatment_string:
            continue
        offset = df_treatment.at[index, KEY_TREATMENT_OFFSET]
        store = record[keyword]
        store['offsets'].append(offset)
        store['values'].append(1)


100%


## Exam Items


In [13]:
df_exam = pd.read_csv(
    relative_path('./data/exam_eicu_processed.csv.gz'),
    nrows=(TEST_ROWS if TEST_MODE else None),
)

# init exam columns in temporal_data
for record in temporal_data.values():
    for item_name in EXAM_ITEMS:
        record[item_name] = {
            'offsets': [],
            'values': [],
        }

# collect exam items
exam_iterator = SimpleProgress(df_exam.index)
for index in exam_iterator:
    identity = df_exam.at[index, KEY_IDENTITY]
    if not identity in temporal_data:
        continue
    item_name = df_exam.at[index, KEY_EXAM_NAME]
    if not item_name in EXAM_ITEMS:
        continue
    offset = df_exam.at[index, KEY_EXAM_OFFSET]
    value = df_exam.at[index, KEY_EXAM_RESULT]
    record = temporal_data[identity]
    store = record[item_name]
    store['offsets'].append(offset)
    store['values'].append(value)


100%


## Lab Variables


In [14]:
df_lab = pd.read_csv(
    relative_path('./data/lab_eicu_processed.csv.gz'),
    nrows=(TEST_ROWS if TEST_MODE else None),
)

# init lab columns in temporal_data
for record in temporal_data.values():
    for var_name in LAB_VARIABLES:
        record[var_name] = {
            'offsets': [],
            'values': [],
        }

# collect lab variables
lab_iterator = SimpleProgress(df_lab.index)
for index in lab_iterator:
    identity = df_lab.at[index, KEY_IDENTITY]
    if not identity in temporal_data:
        continue
    var_name = df_lab.at[index, KEY_LAB_NAME]
    if not var_name in LAB_VARIABLES:
        continue
    offset = df_lab.at[index, KEY_LAB_OFFSET]
    value = df_lab.at[index, KEY_LAB_RESULT]
    record = temporal_data[identity]
    store = record[var_name]
    store['offsets'].append(offset)
    store['values'].append(value)


100%


## Aperiodic Data


In [15]:
df_aperiodic = pd.read_csv(
    relative_path('./data/aperiodic_eicu_processed.csv.gz'),
    nrows=(TEST_ROWS if TEST_MODE else None),
)

# init aperiodic columns in temporal_data
for record in temporal_data.values():
    for name in APERIODIC_COLUMNS:
        record[name] = {
            'offsets': [],
            'values': [],
        }

# collect aperiodic columns
aperiodic_iterator = SimpleProgress(df_aperiodic.index)
for index in aperiodic_iterator:
    identity = df_aperiodic.at[index, KEY_IDENTITY]
    if not identity in temporal_data:
        continue
    offset = df_aperiodic.at[index, KEY_APERIODIC_OFFSET]
    record = temporal_data[identity]
    for name in APERIODIC_COLUMNS:
        value = df_aperiodic.at[index, name]
        store = record[name]
        store['offsets'].append(offset)
        store['values'].append(value)


100%


## Periodic Data


In [16]:
df_periodic = pd.read_csv(
    relative_path('./data/periodic_eicu_processed.csv.gz'),
    nrows=(TEST_ROWS if TEST_MODE else None),
)

# init periodic columns in temporal_data
for record in temporal_data.values():
    for name in PERIODIC_COLUMNS:
        record[name] = {
            'offsets': [],
            'values': [],
        }

# collect periodic columns
periodic_iterator = SimpleProgress(df_periodic.index)
for index in periodic_iterator:
    identity = df_periodic.at[index, KEY_IDENTITY]
    if not identity in temporal_data:
        continue
    offset = df_periodic.at[index, KEY_PERIODIC_OFFSET]
    record = temporal_data[identity]
    for name in PERIODIC_COLUMNS:
        value = df_periodic.at[index, name]
        store = record[name]
        store['offsets'].append(offset)
        store['values'].append(value)


100%


## Infusion Info


In [17]:
df_infusion = pd.read_csv(
    relative_path('./data/infusion_eicu_processed.csv.gz'),
    nrows=(TEST_ROWS if TEST_MODE else None),
)

# init infusion columns in temporal_data
for record in temporal_data.values():
    for keyword in INFUSION_KEYWORDS:
        record[keyword] = {
            'offsets': [],
            'values': [],
        }

# collect infusion info
infusion_iterator = SimpleProgress(df_infusion.index)
for index in infusion_iterator:
    identity = df_infusion.at[index, KEY_IDENTITY]
    if not identity in temporal_data:
        continue
    keyword = df_infusion.at[index, KEY_INFUSION_NAME]
    if not keyword in INFUSION_KEYWORDS:
        continue
    offset = df_infusion.at[index, KEY_INFUSION_OFFSET]
    value = df_infusion.at[index, KEY_INFUSION_AMOUNT]
    record = temporal_data[identity]
    store = record[keyword]
    store['offsets'].append(offset)
    store['values'].append(value)


100%


## Join Temporal Data


In [31]:
# construct temporal data rows

raw_temporal_columns = [
    KEY_IDENTITY,
    KEY_OFFSET,
    *TREATMENT_KEYWORDS,
    *EXAM_ITEMS,
    *LAB_VARIABLES,
    *APERIODIC_COLUMNS,
    *PERIODIC_COLUMNS,
    *INFUSION_KEYWORDS,
]

temporal_data_rows = []
temporal_data_iterator = SimpleProgress(temporal_data.items())
for identity, record in temporal_data_iterator:

    stores = record.values()
    all_offsets = [store['offsets'] for store in stores]

    offset_begin = max(
        min(offsets) if len(offsets) > 0 else MIN_OFFSET
        for offsets in all_offsets
    )
    offset_end = max(
        max(offsets) if len(offsets) > 0 else MIN_OFFSET
        for offsets in all_offsets
    )
    if offset_begin < MIN_OFFSET:
        offset_begin = MIN_OFFSET

    for offset in range(offset_begin, offset_end + 1):
        row = []

        for column_name in raw_temporal_columns:

            if column_name == KEY_IDENTITY:
                row.append(identity)
                continue
            elif column_name == KEY_OFFSET:
                row.append(offset)
                continue

            store = record[column_name]
            offsets = store['offsets']
            values = store['values']
            count = len(offsets)

            indices = list(
                filter(
                    lambda i: offsets[i] == offset,
                    range(count)
                )
            )
            index_count = len(indices)
            if index_count == 0:
                if len(offsets) > 0 and offsets[-1] < offset:
                    row.append(values[-1])
                else:
                    row.append(pd.NA)
            elif index_count == 1:
                row.append(values[indices[0]])
            else:
                selected_values = list(values[i] for i in indices)
                if column_name in CUMULATIVE_COLUMNS:
                    value = np.sum(selected_values)
                else:
                    value = np.mean(selected_values)
                row.append(value)

        temporal_data_rows.append(row)


100%


In [32]:
df_temporal = pd.DataFrame(
    temporal_data_rows,
    columns=map(
        map_column_name,
        raw_temporal_columns,
    ),
)

# fill NAs with zeros in treatment and infusion columns
ZERO_KEYWORDS = TREATMENT_KEYWORDS + INFUSION_KEYWORDS
for keyword in ZERO_KEYWORDS:
    column_name = map_column_name(keyword)
    df_temporal[column_name].fillna(0, inplace=True)

# fill other NAs
df_temporal.groupby(KEY_IDENTITY, sort=False).ffill()
df_temporal.groupby(KEY_IDENTITY, sort=False).bfill()

df_temporal.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 123203 entries, 0 to 123202
Data columns (total 53 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   patientunitstayid  123203 non-null  int64  
 1   offset             123203 non-null  int64  
 2   vasopressor        123203 non-null  float64
 3   heparin            123203 non-null  float64
 4   weight             50232 non-null   object 
 5   urine              41321 non-null   object 
 6   PEEP               32884 non-null   object 
 7   creatinine         119520 non-null  object 
 8   platelet           118135 non-null  object 
 9   INR                93259 non-null   object 
 10  PT                 91051 non-null   object 
 11  PTT                74401 non-null   object 
 12  lactate            95410 non-null   object 
 13  RDW                111342 non-null  object 
 14  total bilirubin    104287 non-null  object 
 15  direct bilirubin   41974 non-null   object 
 16  bi

## Construct Output


In [33]:
df_output = pd.merge(
    df_non_temporal,
    df_temporal,
    on=KEY_IDENTITY,
)
df_output.info()


<class 'pandas.core.frame.DataFrame'>
Int64Index: 123203 entries, 0 to 123202
Data columns (total 57 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   patientunitstayid  123203 non-null  int64  
 1   gender             123203 non-null  object 
 2   age                123203 non-null  float64
 3   ethnicity          122556 non-null  object 
 4   height             122510 non-null  float64
 5   offset             123203 non-null  int64  
 6   vasopressor        123203 non-null  float64
 7   heparin            123203 non-null  float64
 8   weight             50232 non-null   object 
 9   urine              41321 non-null   object 
 10  PEEP               32884 non-null   object 
 11  creatinine         119520 non-null  object 
 12  platelet           118135 non-null  object 
 13  INR                93259 non-null   object 
 14  PT                 91051 non-null   object 
 15  PTT                74401 non-null   object 
 16  la

In [34]:
# fix types
df_output['vasopressor'] = df_output['vasopressor'].astype('int')
df_output['heparin'] = df_output['heparin'].astype('int')

# fill NAs in non-categorical columns with means
df_output.fillna(
    df_output.drop(columns=CATEGORICAL_COLUMNS).mean(),
    inplace=True,
)

# and those in categorical columns with mode values
df_output.fillna(
    df_output[CATEGORICAL_COLUMNS].mode().iloc[0, :],
    inplace=True,
)

df_output.info()


<class 'pandas.core.frame.DataFrame'>
Int64Index: 123203 entries, 0 to 123202
Data columns (total 57 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   patientunitstayid  123203 non-null  int64  
 1   gender             123203 non-null  object 
 2   age                123203 non-null  float64
 3   ethnicity          123203 non-null  object 
 4   height             123203 non-null  float64
 5   offset             123203 non-null  int64  
 6   vasopressor        123203 non-null  int32  
 7   heparin            123203 non-null  int32  
 8   weight             123203 non-null  float64
 9   urine              123203 non-null  float64
 10  PEEP               123203 non-null  float64
 11  creatinine         123203 non-null  float64
 12  platelet           123203 non-null  float64
 13  INR                123203 non-null  float64
 14  PT                 123203 non-null  float64
 15  PTT                123203 non-null  float64
 16  la

## Post Processing


In [35]:
def post_process_compact(df):
    pass


In [36]:
def post_process_full(df):

    post_process_compact(df)

    df['BMI'] = df['weight'] / (df['height'] / 100) ** 2
    df.drop(columns=['weight', 'height'], inplace=True)

    # TODO: is this the right formula?
    df['indirect bilirubin'] = df['total bilirubin'] - df['direct bilirubin']

    df['PaO2/FiO2'] = df['paO2'] / df['FiO2']

    df['SpO2/FiO2'] = df['SpO2'] / df['FiO2']

    df['ROX index'] = df['SpO2/FiO2'] / df['respiration rate']


In [37]:
if COMPACT_MODE:
    post_process_compact(df_output)
else:
    post_process_full(df_output)


In [38]:
df_output.info()


<class 'pandas.core.frame.DataFrame'>
Int64Index: 123203 entries, 0 to 123202
Data columns (total 60 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   patientunitstayid   123203 non-null  int64  
 1   gender              123203 non-null  object 
 2   age                 123203 non-null  float64
 3   ethnicity           123203 non-null  object 
 4   offset              123203 non-null  int64  
 5   vasopressor         123203 non-null  int32  
 6   heparin             123203 non-null  int32  
 7   urine               123203 non-null  float64
 8   PEEP                123203 non-null  float64
 9   creatinine          123203 non-null  float64
 10  platelet            123203 non-null  float64
 11  INR                 123203 non-null  float64
 12  PT                  123203 non-null  float64
 13  PTT                 123203 non-null  float64
 14  lactate             123203 non-null  float64
 15  RDW                 123203 non-nul

## Check PICS


In [39]:
def check_pics(index):
    return all(
        indicator(df_output.at[index, col])
        for col, indicator in PICS_CONDITIONS.items()
    )


In [40]:
# Compute PICS flags:
# `FLAG_POSITIVE` if all PICS conditions
# are fulfilled today or yesterday;
# `FLAG_NEGATIVE` otherwise.

df_output[KEY_FLAG] = FLAG_NEGATIVE  # init

last_identity = None
last_index = None
output_iterator = SimpleProgress(df_output.index)
for current_index in output_iterator:

    current_identity = df_output.at[current_index, KEY_IDENTITY]

    if check_pics(current_index):
        df_output.at[current_index, KEY_FLAG] = FLAG_POSITIVE
        if last_identity == current_identity:
            df_output.at[last_index, KEY_FLAG] = FLAG_POSITIVE

    if (
        last_identity != current_identity
        and last_index != None
        and df_output.at[last_index, KEY_FLAG] != FLAG_POSITIVE
    ):
        df_output.at[last_index, KEY_FLAG] = pd.NA

    last_identity = current_identity
    last_index = current_index


100%


In [41]:
# remove extra columns
df_output.drop(
    columns=CONDITION_ONLY_COLUMNS,
    inplace=True,
)

# remove NA flags
df_output.dropna(inplace=True)

df_output.info()


<class 'pandas.core.frame.DataFrame'>
Int64Index: 100133 entries, 1 to 123202
Data columns (total 61 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   patientunitstayid   100133 non-null  int64  
 1   gender              100133 non-null  object 
 2   age                 100133 non-null  float64
 3   ethnicity           100133 non-null  object 
 4   offset              100133 non-null  int64  
 5   vasopressor         100133 non-null  int32  
 6   heparin             100133 non-null  int32  
 7   urine               100133 non-null  float64
 8   PEEP                100133 non-null  float64
 9   creatinine          100133 non-null  float64
 10  platelet            100133 non-null  float64
 11  INR                 100133 non-null  float64
 12  PT                  100133 non-null  float64
 13  PTT                 100133 non-null  float64
 14  lactate             100133 non-null  float64
 15  RDW                 100133 non-nul

## Output


In [42]:
OUTPUT_PATH = relative_path(
    './data/data_eicu_'
    + ('compact' if COMPACT_MODE else 'full')
    + ('_test' if TEST_MODE else '')
    + '.csv'
    + ('.gz' if COMPRESS_OUTPUT else '')
)

df_output.to_csv(
    OUTPUT_PATH,
    index=False,
    compression=('gzip' if COMPRESS_OUTPUT else None),
)
