# init [RUN THIS]

In [1]:
import numpy as np
import pandas as pd
# import polars as pl
import os, sys
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
# print(os.getcwd())
os.chdir('/gpfs/data/healthcare-allocate/CLIF-MIMIC/code')
# print(os.getcwd())

proj_root = "/gpfs/data/healthcare-allocate/CLIF-MIMIC"
if proj_root not in sys.path:
    sys.path.append(proj_root)

# from code.custom_utils import *
import logging

## load tables

In [2]:
def load_mimic_table(module: {"icu", "hosp"}, table, file_type: {"csv", "parquet", "pq"} = "csv"):
    if file_type in ["pq", "parquet"]:
        return pd.read_parquet(f'../mimic-iv-2.2/{module}/{table}.parquet')
    elif file_type == "csv":
        return pd.read_csv(f'../mimic-iv-2.2/{module}/{table}.csv.gz')

In [3]:
patients = load_mimic_table("hosp", "patients") # gives gender
admissions = load_mimic_table("hosp", "admissions") # gives race and ethnicity

In [4]:
d_items = load_mimic_table("icu", "d_items", "csv")
chartevents = load_mimic_table("icu", "chartevents", "parquet")

In [5]:
procedureevents = load_mimic_table("icu", "procedureevents", "csv")
datetimeevents = load_mimic_table("icu", "datetimeevents", "csv")

In [6]:
inputevents = load_mimic_table("icu", "inputevents", "csv")
outputevents = load_mimic_table("icu", "outputevents", "csv")

In [7]:
# def resave_mimic_table_to_parquet(table: pd.DataFrame):
    # if not yet in memory, load it:
    # if not table:

In [8]:
# labevents = load_mimic_table("hosp", "labevents", "csv")
# labevents.to_parquet("../mimic-iv-2.2/hosp/labevents.parquet")
d_labitems = load_mimic_table("hosp", "d_labitems", "csv")
labevents = load_mimic_table("hosp", "labevents", "parquet")

poe = load_mimic_table("hosp", "poe", "csv")
poe_detail = load_mimic_table("hosp", "poe_detail", "csv")

transfers = load_mimic_table("hosp", "transfers", "csv")
icustays = load_mimic_table("icu", "icustays", "csv")

In [9]:
ingredient_events = load_mimic_table("icu", "ingredientevents", "csv")

## load mappings

In [10]:
def load_mapping_csv(csv_name: str, dtype = None):
    return pd.read_csv(
        f"../mapping/mimic-to-clif-mappings - {csv_name}.csv", dtype = dtype
        )
# covert to a dict for df col renaming later

def construct_mapper_dict(
    mapping_df: pd.DataFrame, key_col: str, value_col: str, map_none_to_none = False,
    excluded_item_ids: list = None
    ):

    if not excluded_item_ids:
        excluded_item_ids = []
    
    if "itemid" in mapping_df.columns:
        mapping_df = mapping_df.loc[
            ~mapping_df["itemid"].isin(excluded_item_ids)
            , 
            ]
    
    mapper_dict = dict(zip(mapping_df[key_col], mapping_df[value_col]))
    
    # Replace "NO MAPPING" with NA
    for key, value in mapper_dict.items():
        if value == "NO MAPPING":
            mapper_dict[key] = None
    
    # to enable a None -> None mapping
    if map_none_to_none:
        mapper_dict[None] = None
        
    return mapper_dict

In [11]:
# patient table
race_ethnicity_mapping = load_mapping_csv("race_ethnicity")
race_mapper_dict = construct_mapper_dict(race_ethnicity_mapping, "mimic_race", "race")
ethnicity_mapper_dict = construct_mapper_dict(race_ethnicity_mapping, "mimic_race", "ethnicity")

# hosp table
discharge_mapping = load_mapping_csv("discharge")
discharge_mapper_dict = construct_mapper_dict(
    discharge_mapping, "discharge_location", "disposition_category"
    )

# adt 
adt_mapping = load_mapping_csv("adt")
adt_mapper_dict = construct_mapper_dict(adt_mapping, "careunit", "location_category")

# vitals table
vitals_mapping = load_mapping_csv("vitals")
vital_name_mapper_dict = construct_mapper_dict(vitals_mapping, "itemid", "label = vital_name")
vital_category_mapper_dict = construct_mapper_dict(vitals_mapping, "itemid", "vital_category")

# resp support table
resp_mapping = load_mapping_csv("respiratory_support")
resp_device_mapping = load_mapping_csv("device_category")
resp_mode_mapping = load_mapping_csv("mode_category")

resp_mapper_dict = construct_mapper_dict(resp_mapping, "itemid", "variable")
resp_device_mapper_dict = construct_mapper_dict(
    resp_device_mapping, "device_name", "device_category", excluded_item_ids = ["223848"]
    )
resp_mode_mapper_dict = construct_mapper_dict(resp_mode_mapping, "mode_name", "mode_category")

# labs
labs_mapping = load_mapping_csv("labs")
labs_mapping["itemid"] = labs_mapping["itemid"].dropna().astype(int).astype(str)
labs_mapper_dict = construct_mapper_dict(labs_mapping, "itemid", "lab_category")

In [None]:
labs_mapper_dict

# utils [RUN THIS]

In [14]:
logging.basicConfig(
    level = logging.INFO,
    format = '%(asctime)s:%(levelname)s:%(message)s',
    handlers = [logging.FileHandler("mimic-to-clif.log"),
              logging.StreamHandler()])

## `CacheInfo` class

In [15]:
class CacheInfo:
    """
    CacheInfo object used to represent the current status of `lru_cache`
    """
    def __init__(self, max_size):
        self.max_size = max_size
        self.misses = 0
        self.hits = 0
        self.cur_size = 0
        # NOTE: you may add to this if you want, but do not modify the lines above
        # create an attribute in the CacheInfo class to store the cache dict
        self.cache_dict = {}
    
    def __repr__(self):
        return f"CacheInfo(hits={self.hits}, misses={self.misses}, max_size={self.max_size}, cur_size={self.cur_size})"

# first layer: a decorator factory
def lru_cache(max_size = 128):
    '''
    This function is a decorator factory that returns a decorator with a user-specified
    maximum size of the cache 

    Input:
        - max_size: the maximum size of the cache
    
    Output: 
        - a decorator
    '''
    # second layer: the decorator
    def decorator(func):
        '''
        This function is a decorator that takes in an original function and
        return a new, decorated function.

        Input: an original function

        Output: a new function
        '''
        # initialize an instance of the CacheInfo class
        cache_info = CacheInfo(max_size = max_size)
        def key_generator(*args, **kwargs):
            '''
            This helper function creates a unique key given every different 
            combination of positional and key-word arguments

            Input: 
                - *args: any position arguments
                - **kwargs: any key word arguments

            Output: 
                - a tuple that stores all the arguments and their data type
            '''
            # generates a tuple that stores the data type of each position arg
            args_type = tuple(map(lambda x: type(x), args))
            # kwargs is a dict, so we use items() to turn it into a seq of
            # key-value tuples, and add the data type of the key word arg
            # to the tuple, then use frozenset() to make it immutable and 
            # thus hashable
            kwargs_and_type = frozenset(
                       map(lambda tup: (tup, type(tup[1])), 
                           kwargs.items()))
            return (args, args_type, kwargs_and_type)
        def new_func(*args, **kwargs):
            '''
            This is the new function that replaces the original function.

            Input:
                - *args, **kwargs: any position and key word arguments

            Output:
                - the result of the new function
            '''
            key = key_generator(*args, **kwargs)        
            # if the key is already in the cache dict, i.e. the same args have
            # been provided before, there should be "memory" in the cache
            # we got a hit
            if key in cache_info.cache_dict:
                cache_info.hits += 1 
                # temporarily store the cache result first before we remove the key
                cached_result = cache_info.cache_dict[key]
                # remove the key
                cache_info.cache_dict.pop(key)
                # insert the same key to the tail of the dict
                cache_info.cache_dict[key] = cached_result
                return cached_result
            # when we have a new arg combination that is not seen before 
            # -- we have a "miss":
            else:
                cache_info.misses += 1  
                # add the output of the function to the dict
                cache_info.cache_dict[key] = func(*args, **kwargs)  
                # update cache size (length of the dict) 
                cache_info.cur_size = len(cache_info.cache_dict) 
                # if the cache exceeds the maximum size, remove the least recently used item
                if len(cache_info.cache_dict) > max_size:
                    # first covert the dict to a list so we can track the order
                    cache_list = list(cache_info.cache_dict.items())
                    # remove the first element in the list, which is the least 
                    # recently used item
                    cache_list.pop(0)
                    # convert the list back to a dict and update
                    cache_info.cache_dict = dict(cache_list)
                    # update the cache size again, which should = max_size
                    cache_info.cur_size = len(cache_info.cache_dict)  
                return cache_info.cache_dict[key]
        # update the attribute of the now-decorated new func
        new_func.cache_info = cache_info
        return new_func     
    return decorator

## helper funcs

In [76]:
def convert_and_sort_datetime(df: pd.DataFrame, event_type: {"ce", "pe", "other"}, time_col_name = "time"):
    # if procedure event
    if event_type == "pe":
        df["starttime"] = pd.to_datetime(df["starttime"])
        df["endtime"] = pd.to_datetime(df["endtime"])
        df = df.sort_values(["hadm_id", "starttime", "endtime", "storetime"]).reset_index(drop = True) #.reset_index()
    elif event_type == "ce":
        df["charttime"] = pd.to_datetime(df['charttime'])
        df = df.sort_values(["hadm_id", "charttime", "storetime"]).reset_index(drop = True) #.reset_index()
    elif event_type == "other":
        df["time"] = pd.to_datetime(df['time'])
        df = df.sort_values(["hadm_id", "time"])
    return df

In [17]:
def save_to_rclif(df: pd.DataFrame, table_name, file_format = "pq"):
    if file_format in ["pq", "parquet"]:
        df.to_parquet(f'../rclif/clif_{table_name}.parquet')

def read_from_rclif(table_name, file_format = "pq"):
    if file_format in ["pq", "parquet"]:
        return pd.read_parquet(f'../rclif/clif_{table_name}.parquet')

In [18]:
# FIXME: delete "ALREDAY MAPPED" at some pt
EXCLUDED_LABELS_DEFAULT = ["NO MAPPING", "UNSURE", "MAPPED ELSEWHERE", "SPECIAL CASE", "ALREADY MAPPED"] 

# find all the relevant item ids for a table
def get_relevant_item_ids(mapping_df: pd.DataFrame, decision_col: str, 
                          excluded_labels: list = EXCLUDED_LABELS_DEFAULT,
                          excluded_item_ids: list = None
                          ):
    '''
    - decision_col: the col on which to apply the excluded_labels
    - excluded_item_ids: additional item ids to exclude
    '''
    if not excluded_item_ids:
        excluded_item_ids = []
    
    return mapping_df.loc[
        (~mapping_df[decision_col].isin(excluded_labels)) & 
        (~mapping_df["itemid"].isin(excluded_item_ids))
        , "itemid"
        ].unique()
    
def rename_and_reorder_cols(df, rename_mapper_dict: dict, new_col_order: list) -> pd.DataFrame:
    baseline_rename_mapper = {
        "subject_id": "patient_id", "hadm_id": "hospitalization_id",
    }
    
    return (
        df.rename(columns = baseline_rename_mapper | rename_mapper_dict)
        .reindex(columns = new_col_order)
        )

def find_duplicates(df, cols: list[str] = ["hadm_id", "time", "itemid"]):
    '''
    Check whether there are duplicates -- more than one populated value -- for what is supposed to be 
    unique combination of columns. That is, for the same measured variable (e.g. vital_category) at
    the same time during the same encounter, there should be only one corresponding value.
    '''
    return df[df.duplicated(subset = cols, keep = False)]

def check_duplicates(df: pd.DataFrame, additional_cols: list):
    '''
    Check whether there are duplicates -- more than one populated value -- for what is supposed to be 
    unique combination of columns. That is, for the same measured variable (e.g. vital_category) at
    the same time during the same hospitalization, there should be only one corresponding value.
    '''
    cols_to_check = ["hospitalization_id", "recorded_dttm"].extend(additional_cols)
    return df[df.duplicated(subset = cols_to_check, keep = False)]

In [56]:
@lru_cache()
def item_id_to_feature_value(item_id: int, col: str = "label", df = d_items):
    '''
    Find the corresponding feature value of an item by id.
    i.e. find the label, or linksto, of item id 226732.
    '''
    row = df.loc[df["itemid"] == item_id, :]
    label = row["label"].values[0]
    if col == "label":
        logging.info(f"the {col} for item {item_id} is {label}")
        return label
    else:
        feature_value = row[col].values[0]
        logging.info(f"the {col} for item {item_id} ({label}) is {feature_value}")
        return feature_value

@lru_cache()
def item_id_to_label(item_id: int) -> str:
    '''
    Helper function that returns the "label" string of an item given its item_id. 
    '''
    return item_id_to_feature_value(item_id)

def item_id_to_events_df(item_id: int, original: bool = False) -> pd.DataFrame:
    '''
    Return in a pandas df all the events associated with an item id.
    - simplify: whether to return the original df (False), or a simplified one 
    with some columns (particulary timestamps) renamed to support integration 
    between different events df.  # FIXME - might rename this arg
    '''
    # find whether it is chartevents, or procedure events, etc.
    linksto_table_name = item_id_to_feature_value(item_id, col = "linksto")
    # turn string into a dj object
    linksto_df: pd.DataFrame = globals()[linksto_table_name]
    events_df = linksto_df.loc[linksto_df["itemid"] == item_id, :]
    # return the original columns
    if original:
        return events_df
    # else, if simplified:
    elif linksto_table_name == "procedureevents": # FIXME: trach is complex and need additional attention
        events_df_simplified = events_df.loc[
            :, ['subject_id', 'hadm_id', 'stay_id', 'endtime', 'itemid', 'value', 'valueuom']
        ].rename(columns = {"endtime": "time"})
        return events_df_simplified
    elif linksto_table_name == "chartevents":
        events_df_simplified = events_df.loc[
            :, ['subject_id', 'hadm_id', 'stay_id', 'charttime', 'itemid', 'value', 'valueuom']
        ].rename(columns = {"charttime": "time"})
        return events_df_simplified
        
    # FIXME: likely an issue if data struct of different events table are different 

def item_ids_list_to_events_df(item_ids: list):
    df_list = [item_id_to_events_df(item_id, original = True) for item_id in item_ids]
    df_merged = pd.concat(df_list) #.head().assign(
        ## linksto = lambda df: df["itemid"].apply(lambda item_id: item_id_to_feature_value(item_id, col = "linksto"))
    # )
    return df_merged 
    # FIXME: automatically add the label and linksto table source columns -- create cache?

## `ItemFinder` class

In [44]:
class ItemFinder():
    def __init__(self, kw = None, items_df = d_items, 
                 col: str = "label", case_sensitive: bool = False, 
                 for_labs: bool = False, report_na = True
                 ) -> pd.DataFrame:
        '''
        Look up an item by keyword from the `d_items` table of the `icu` module.
        - case: whether the search is case sensitive
        - report_na: whether to print when there is no match; or simply return a 
        '''
        self.kw = kw 
        self.df = items_df
        self.col = "abbreviation" if col == "abbr" else col
        self.for_labs = for_labs

        # df of items that match the key words -- a raw output
        self.items_select_df: pd.DataFrame = items_df[
            items_df[self.col].str.contains(kw, case = case_sensitive, na = False)
        ]
        
        # first check whether there is any return in the raw output
        if len(self.items_select_df) == 0:
            if report_na:
                raise Exception(f"No matching result found in column {col} with case sensitive being {case_sensitive}")
            else:
                logging.warning(f"No matching result for {kw} in column {col} with case sensitive being {case_sensitive}")
                self.candidate_table = pd.DataFrame()
        
        # ... only proceed when the return is not of zero length
        # and enhance the simple raw output with counts and value instances
        else:
            logging.info(f"{len(self.items_select_df)} matching item(s) found for {self.kw}.")
            # list of ids for items that match the key words
            self.items_select_ids = self.items_select_df["itemid"].values
            # a np array of non-duplicated events table names, e.g. ["chartevents", "procedureevents"]
            self.linksto_table_names = self.items_select_df["linksto"].unique()
            self.item_freq = self.generate_item_freq()
            # logging.info(f"type is {type(self.item_freq)}")
            self.candidate_table = self.make_candidate_table()

    def generate_item_freq(self):
        '''
        Iterative over each events table, find the items freq therein, and combine into one df.
        # FIXME - should maybe make this map style without loop
        '''
        freq_df_ls = [] # a list of df's
        for table_name in self.linksto_table_names:
            # fetch the object by name str, i.e. chartevents, procedureevents, etc.
            events_df: pd.DataFrame = globals()[table_name]
            # filter for all the selected events in that events table
            events_select_df = events_df.loc[
                events_df["itemid"].isin(self.items_select_ids), :
            ]
            # a df of item freq for one event type  
            item_freq = events_select_df.value_counts("itemid")
            item_freq.name = "count"

            # check if the df is empty -- there shouldn't be an empty one FIXME
            # if not item_freq_df.empty:
            freq_df_ls.append(item_freq)
        
        return pd.concat(freq_df_ls)

    def make_candidate_table(self):
        '''
        merge item freq and values instances to the raw output to generate the enhanced table of the 
        candidate items.
        '''
        
        cand_table = (
            self.items_select_df
            .loc[:, ["itemid", "label", "abbreviation", "linksto", "category", "unitname", "param_type"]]
            # FIXME
            .join(self.item_freq, on = "itemid", validate = "1:1")
            .sort_values(by = "count", ascending = False) 
            .assign(
                value_instances = lambda df: df["itemid"].apply(item_id_to_value_instances)
            )
        )
        if self.for_labs:
            return cand_table.reindex(
                columns = ["itemid", "label", "abbreviation", "linksto", "category", "count", "value_instances", "unitname"]
                )
        else:
            return cand_table

@lru_cache()
def item_id_to_value_instances(item_id: int):
    '''
    Wrapper
    '''
    label = item_id_to_feature_value(item_id, "label")

    param_type = item_id_to_feature_value(item_id, "param_type")
    
    if param_type == "Numeric":
        val_instances = item_id_to_value_instances_numeric(item_id)
    elif param_type == "Text":
        val_instances = item_id_to_value_instances_categorical(item_id).to_dict()
    else:
        return param_type
    print(f"item label: {label}; value instances: {str(val_instances)}")
    return str(val_instances)

def item_id_to_value_instances_categorical(item_id: int, events: pd.DataFrame = chartevents):
    '''
    Return all the unique categories
    '''
    assoc_events = events.loc[events["itemid"] == item_id, :]
    categories: pd.Series = assoc_events.value_counts("value") 
    return categories
    
def item_id_to_value_instances_numeric(item_id: int, events: pd.DataFrame = chartevents):
    '''
    Find max, min, mean of a continuous, or numeric, item.
    '''
    valuenum_col = events.loc[events["itemid"] == item_id, :]["valuenum"]
    val_max, val_min, val_mean = valuenum_col.max(), valuenum_col.min(), round(valuenum_col.mean(), 2)
    return f"Max: {val_max}, Min: {val_min}, Mean: {val_mean}"

In [None]:
ItemFinder("weight").candidate_table

# `patient` table

## utils

In [128]:
patient_col_names = [
    "patient_id", "race_name", "race_category", "ethnicity_name", "ethnicity_category",
    "sex_name", "sex_category", "birth_date", "death_dttm", "language_name", "language_category"
]

## EDA

In [None]:
n_patient_admitted = admissions["subject_id"].nunique()
n_patient_admitted

In [None]:
# multiple race for one patient
race_counts = admissions.groupby('subject_id')['race'].nunique()
multi_race_indices = race_counts[race_counts > 1].index
multi_race_encounters = admissions[
    admissions['subject_id'].isin(multi_race_indices)
    ][["subject_id", "hadm_id", "race", "admittime", "admission_type", "admission_location"]]
multi_race_encounters

In [None]:
# no. of patients with multiple races over different encounters
multi_race_encounters["subject_id"].nunique()

In [None]:
# but only one race per encounter: 
race_counts = admissions.groupby('hadm_id')['race'].nunique()
race_counts[race_counts > 1].index

In [None]:
# check for South Americans
south_american_subject_ids = admissions.loc[admissions["race"] == "SOUTH AMERICAN", "subject_id"].unique()
sa_race_counts = (
    admissions[admissions["subject_id"].isin(south_american_subject_ids)]
    .groupby('subject_id')['race'].nunique()
)
multi_race_indices_sa = sa_race_counts[sa_race_counts > 1].index
multi_race_encounters_sa = admissions[
    admissions['subject_id'].isin(multi_race_indices_sa)
    ][["subject_id", "hadm_id", "race", "admittime", "admission_type", "admission_location"]]
multi_race_encounters_sa

In [None]:
(sa_race_counts == 1).sum()

In [None]:
len(multi_race_encounters["subject_id"].unique())

In [None]:
# checking language issue
admissions.head(20)

## ETL

### gender / sex

In [None]:
# fetch sex (intended in CLIF) / gender (available in MIMIC)
sex = patients[["subject_id", "gender"]].copy()
sex.columns = ["patient_id", "sex_name"]
sex["sex_category"] = sex["sex_name"].map(lambda x: "Female" if x == "F" else "Male")
sex.head()

In [None]:
# check for NA
patients["gender"].isna().sum()

### race and ethnicity

In [None]:
# race and ethnicity
race_ethn = admissions[["subject_id", "hadm_id", "race", "admittime"]].copy()
race_ethn.columns = ["patient_id", "hospitalization_id", "race_name", "admittime"]
race_ethn["race_category"] = race_ethn["race_name"].map(race_mapper_dict)
race_ethn["ethnicity_name"] = race_ethn["race_name"]
race_ethn["ethnicity_category"] = race_ethn["ethnicity_name"].map(ethnicity_mapper_dict)
race_ethn.head()

In [111]:
def check_multi_race_over_encounters(df, col: str = "race_category"):
    race_counts = df.groupby('patient_id')[col].nunique()
    multi_race_indices = race_counts[race_counts > 1].index
    multi_race_encounters = df[
        df['patient_id'].isin(multi_race_indices)
        ]
    return multi_race_encounters

In [None]:
check_multi_race_over_encounters(race_ethn)

In [None]:
race_ethn_informative = race_ethn.loc[~race_ethn["race_category"].isin(["Other", "Unknown"]), ]
multi_race_ethn_informative = check_multi_race_over_encounters(race_ethn_informative)
multi_race_ethn_informative

In [None]:
# no. of patients with multi informative races over diff encounters:
multi_race_ethn_informative["patient_id"].nunique()

In [None]:
# check for ethnicity - no issue here
check_multi_race_over_encounters(multi_race_ethn_informative, col = "ethnicity_category")

In [None]:
## RESUME ETL:
# apply de-deduplication logic to create one-to-one mapping from patient_id to race
multi_race_deduped = (
    multi_race_ethn_informative.groupby('patient_id')
    .apply(lambda x: (
        x.groupby('race_category')
        .agg(count=('race_category', 'size'),
             most_recent=('admittime', 'max'))
        .sort_values(['count', 'most_recent'], ascending=[False, False])
        .head(1)))
    .reset_index()
    )
multi_race_deduped

In [None]:
unique_race_mapper_dict = dict(zip(multi_race_deduped["patient_id"], multi_race_deduped["race_category"]))

race_ethn_deduped = race_ethn.drop_duplicates(["patient_id", "race_category", "ethnicity_category"]).copy()

race_ethn_deduped["race_category"] = np.where(
    race_ethn_deduped["patient_id"].isin(multi_race_deduped["patient_id"]),
    race_ethn_deduped["patient_id"].map(unique_race_mapper_dict),
    race_ethn_deduped["race_category"]
)

race_ethn_deduped.drop_duplicates(["patient_id", "race_category", "ethnicity_category"], inplace=True)

race_ethn_deduped

In [None]:
race_ethn_deduped.value_counts("race_category")

In [None]:
# remove the non-informative others unless they are the only race
race_ethn_deduped_informative = race_ethn_deduped.groupby("patient_id").apply(
    lambda gr: gr if len(gr) == 1 else gr[~gr["race_category"].isin(["Other","Unknown"])]
).reset_index(drop = True)

In [None]:
# repeat the same for ethnicity
race_ethn_deduped_informative = race_ethn_deduped.groupby("patient_id").apply(
    lambda gr: gr if len(gr) == 1 else gr[~gr["ethnicity_category"].isin(["Other", "Unknown", "Non-Hispanic"])]
).reset_index(drop = True)

In [None]:
# check for duplicates and confirm there is none
race_ethn_deduped_informative[
    race_ethn_deduped_informative.duplicated(["patient_id", "race_category"], keep=False)
    ]

In [None]:
# again for ethn
race_ethn_deduped_informative[
    race_ethn_deduped_informative.duplicated(["patient_id", "ethnicity_category"], keep=False)
    ]

In [None]:
race_ethn_deduped_informative

### merge and save

In [124]:
death = admissions[["subject_id", "deathtime"]].copy().dropna(subset=["deathtime"]).drop_duplicates()
death.columns = ["patient_id", "death_dttm"]

In [None]:
patient_merged = pd.merge(
    race_ethn_deduped_informative, sex, on = "patient_id", how = "outer"
)

patient_merged = pd.merge(
    patient_merged, death, on = "patient_id", how = "outer", indicator = True)

patient_merged

In [None]:
patient_merged.value_counts("_merge")

In [None]:
patient_final = patient_merged.reindex(columns = patient_col_names)
patient_final

In [None]:
patient_final.dtypes

In [None]:
patient_final["hospitalization_id"] = patient_final["hospitalization_id"].astype(str)

In [None]:
patient_final.dtypes

In [132]:
# save
patient_final.to_parquet('../rclif/clif_patient.parquet')

# `hospitalization` table

## utils

In [59]:
hosp_col_names = [
    "patient_id", "hospitalization_id", "admission_dttm", "discharge_dttm",
    "age_at_admission", "admission_type_name", "admission_type_category",
    "discharge_name", "discharge_category", "zipcode_nine_digit", "zipcode_five_digit", 
    "census_block_code", "census_block_group_code", "census_tract", "state_code", "county_code"
]

hosp_col_rename_mapper = {
    "admittime": "admission_dttm", "dischtime": "discharge_dttm", 
    "admission_type": "admission_type_name", "discharge_location": "discharge_name"
}

## EDA

In [None]:
# check the discharge locations
admissions.value_counts("discharge_location").reset_index()

In [None]:
admissions.head()

## ETL

In [None]:
hosp = admissions[
    ["subject_id", "hadm_id", "admittime", "dischtime", "admission_type", "discharge_location"]
    ]

hosp["discharge_category"] = hosp["discharge_location"].map(discharge_mapper_dict)

In [94]:
hosp_merged = pd.merge(
    hosp, patients[["subject_id", "anchor_age", "anchor_year"]],
    on = "subject_id", how = "left"
)

hosp_merged["age_at_admission"] = hosp_merged["anchor_age"] + pd.to_datetime(hosp_merged["admittime"]).dt.year - hosp_merged["anchor_year"]

In [95]:
hosp_final = rename_and_reorder_cols(hosp_merged, hosp_col_rename_mapper, hosp_col_names)

In [None]:
hosp_final.head()

In [None]:
hosp_final.dtypes

In [97]:
hosp_final.to_parquet('../rclif/clif_hospitalization.parquet')

# `ADT` table

## utils

In [112]:
adt_col_names = ["hospitalization_id", "hospital_id", "in_dttm", "out_dttm", "location_name", "location_category"]

adt_col_rename_mapper = {
    'intime': 'in_dttm', 'outtime': 'out_dttm', 'careunit': 'location_name'
}

## EDA

In [None]:
transfers.head() 

In [None]:
ItemFinder("Appendectomy").candidate_table

In [None]:
item_id_to_label(225966)

In [None]:
transfers.head(10)

In [None]:
transfers.value_counts("careunit").reset_index()

In [None]:
transfers.value_counts("eventtype").reset_index()

In [None]:
adt_events_units = transfers.value_counts(["eventtype", "careunit"], dropna=False).reset_index()
adt_events_units

## ETL

In [None]:
# drop two NA cases: (1) discharge -- so careunit is NA; 
# (2) ED visit with no hospitalization -- so no hadm_id
adt = transfers.dropna(subset = ["careunit", "hadm_id"])
adt['location_category'] = adt['careunit'].map(adt_mapper_dict)
adt_final = rename_and_reorder_cols(adt, adt_col_rename_mapper, adt_col_names)

In [None]:
adt_final

In [None]:
adt_final.dtypes

# `vitals` table

## utils

In [21]:
vitals_mapping = load_mapping_csv("vitals")
vital_name_mapper_dict = construct_mapper_dict(vitals_mapping, "itemid", "label = vital_name")
vital_category_mapper_dict = construct_mapper_dict(vitals_mapping, "itemid", "vital_category")

In [23]:
# current logic is temp_site is preserved into meas_site_name; else is NA
vital_col_names = ["hospitalization_id", "recorded_dttm", "vital_name", "vital_category", "vital_value", "meas_site_name"]

vitals_col_rename_mapper_dict = {
    "hadm_id": "hospitalization_id", 
    "time": "recorded_dttm",
    "value": "vital_value"
    }

@lru_cache()
def convert_f_to_c(temp_f) -> float:
    if isinstance(temp_f, str) or isinstance(temp_f, int):
        temp_f = float(temp_f) 
    
    if isinstance(temp_f, float):
        temp_c = (temp_f - 32) * 5 / 9
        return round(temp_c, 1) # so 39.3333 -> 39.3
    else:
        raise("wrong type")

## ETL

### regular cases
We first process the regular cases

In [None]:
# find vital_items_ids
vitals_items_ids = get_relevant_item_ids(
    mapping_df = vitals_mapping, decision_col = "vital_category", 
    excluded_labels = EXCLUDED_LABELS_DEFAULT + ["temp_c"]
    )
vitals_events = item_ids_list_to_events_df(vitals_items_ids)

# use np.where to convert the unit for one item 
# from lb to kg for the only weight item in undesired unit -- Admission Weight (lbs.)
vitals_events["value"] = np.where(
    vitals_events["itemid"] == 226531,
    vitals_events["value"].astype(float).apply(lambda x: round(x/2.205, 1)),
    vitals_events["value"]
)

In [29]:
# takes about 35s to run - the same if we are using apply vs map
vitals_events["vital_name"] = vitals_events["itemid"].apply(lambda x: vital_name_mapper_dict[x])
vitals_events["vital_category"] = vitals_events["itemid"].apply(lambda x: vital_category_mapper_dict[x])

In [None]:
vitals_final = rename_and_reorder_cols(vitals_events, vitals_col_rename_mapper_dict, vital_col_names)
vitals_final

validation

In [None]:
# checked and there is no dup
check_duplicates(vitals_final, ["vital_category", "vital_value"] )

In [None]:
vitals_final[vitals_final["vital_value"].isna()]

### special case: `temp_c` 
We then process the special case where we not only need to convert units but the conversion may also create duplication that needs to be resolved (e.g. if the same measurement was originally recorded in two units).

In [None]:
temp_events = item_ids_list_to_events_df([223761, 223762, 224642])
temp_events

In [None]:
# pivot directly
temp_wider = temp_events.pivot(
    index = ["hadm_id", "time"], 
    columns = "itemid",
    values = "value"
    ).reset_index()
temp_wider

In [32]:
# map temp_site to the clif categories of meas_site_name
temp_wider["meas_site_name"] = temp_wider[224642]

# convert temp from f to c with a coalesce logic
# 223761 = temp in f, 223762 = temp in c
temp_wider["vital_value"] = temp_wider[223762].fillna(
    temp_wider[223761].apply(convert_f_to_c)
    )

temp_wider['vital_name'] = temp_wider.apply(
    lambda row: "Temperature Celsius" if pd.notna(row[223762]) else "Temperature Fahrenheit", 
    axis = "columns"
    )

temp_wider["vital_category"] = "temp_c"

In [None]:
temp_final = rename_and_reorder_cols(temp_wider, vitals_col_rename_mapper_dict, vital_col_names)
temp_final

validation

In [None]:
# checked there is no dup
check_duplicates(temp_final, ["vital_category",	"vital_value"] )

In [None]:
# check for NAs and notice that a lot of NAs were generated during pivoting
temp_final[temp_final["vital_value"].isna()]

In [39]:
# so drop these NAs
temp_final.dropna(subset=["vital_value"], inplace = True)

### merge, validate, save

In [None]:
# merge 
vitals_merged = pd.concat([
    vitals_final, temp_final
])

vitals_merged

In [None]:
# check there is no more NA
vitals_merged[vitals_merged["vital_value"].isna()]

In [None]:
# check dytpes
vitals_merged.dtypes

In [43]:
# convert dtypes to the desired
vitals_merged["vital_value"] = vitals_merged["vital_value"].apply(float)
vitals_merged["recorded_dttm"] = pd.to_datetime(vitals_merged["recorded_dttm"])

In [None]:
vitals_merged.dtypes

In [None]:
vitals_merged

In [46]:
# save
vitals_merged.to_parquet('../rclif/clif_vitals.parquet')

# `respiratory_support` table

## utils [RUN THIS]

In [None]:
resp_item_ids = get_relevant_item_ids(
    mapping_df = resp_mapping, decision_col = "variable" # , excluded_item_ids=[223848] # remove the vent brand name
    ) 

resp_item_ids

In [None]:
resp_events: pd.DataFrame = item_ids_list_to_events_df(resp_item_ids)
resp_events.head()

In [None]:
resp_events["variable"] = resp_events["itemid"].apply(lambda x: resp_mapper_dict[x])
resp_events

In [227]:
resp_columns = [
    "hospitalization_id", "recorded_dttm", "device_name", "device_category", "vent_brand_name", 
    "mode_name", "mode_category", "tracheostomy", "fio2_set", "lpm_set",
    "tidal_volume_set", "resp_rate_set", "pressure_control_set", "pressure_support_set",
    "flow_rate_set", "peak_inspiratory_pressure_set", "inspiratory_time_set",
    "peep_set", "tidal_volume_obs", "resp_rate_obs", "plateau_pressure_obs",
    "peak_inspiratory_pressure_obs", "peep_obs", "minute_vent_obs", "mean_airway_pressure_obs"
    ]

In [None]:
resp_device_rank = ["IMV", "NIPPV", "CPAP", "High Flow NC", "Face Mask", "Trach Collar", "Nasal Cannula", "Room Air", "Other"]
# testing
resp_device_rank.index("IMV")

## EDA

#### check duplicates

In [None]:
resp_duplicates.query("itemid == 224696") 

In [None]:
# 2166-07-19 23:50:00	2166-07-20 01:20:08
resp_events.query("stay_id == 39214730").sort_values("time")

In [None]:
# 2166-07-20 04:34:07 -> 2166-07-21 18:06:26
resp_events.query("stay_id == 36123037").sort_values("time").head(30)

In [None]:
icustays.query("hadm_id == 26871621") 

In [None]:
# find the time ranges for the two stays, and see which stay should the 1 am measurement fall into
# so should be stay_id == 39214730
transfers.query("hadm_id == 26871621").sort_values("intime")

## ETL [RUN THIS]

### cleaning

In [267]:
# remove duplicates to prepare for pivoting 
# two kinds of duplicates to handle: by devices and other
resp_duplicates: pd.DataFrame = find_duplicates(resp_events)

In [None]:
resp_duplicates.value_counts("variable")

In [None]:
resp_device_mapper_dict

In [None]:
# 1/ deal with devices
resp_duplicates_devices: pd.DataFrame = resp_duplicates.query("itemid == 226732").copy()
resp_duplicates_devices["device_category"] = resp_duplicates_devices["value"].apply(
    lambda x: resp_device_mapper_dict[x.strip()] if pd.notna(x) else None
    )
resp_duplicates_devices.dropna(subset="device_category",inplace=True)
resp_duplicates_devices["rank"] = resp_duplicates_devices["device_category"].apply(
    lambda x: resp_device_rank.index(x.strip()))
resp_duplicates_devices

In [None]:
resp_duplicates_devices#.value_counts("variable")

In [None]:
# deal with the device case - find indices to drop
top_ranked_device_indices = resp_duplicates_devices.groupby(["hadm_id", "time", "itemid"])["rank"].idxmin()
# non top-ranked categories to be dropped
lower_ranked_device_indices = resp_duplicates_devices.index.difference(top_ranked_device_indices)
# drop the designated indices
resp_events_clean = resp_events.drop(lower_ranked_device_indices)
# drop None
resp_events_clean.dropna(subset = "value", inplace=True) # RESUME
resp_events_clean

In [None]:
resp_events_clean.value_counts("variable")

In [None]:
# 2/ deal with duplicate vent reads:
setting_duplicate_indices_to_drop = find_duplicates(resp_events_clean).query("stay_id == 36123037").index
resp_events_clean.drop(setting_duplicate_indices_to_drop, inplace = True)
resp_events_clean

In [None]:
# check all duplicates are dropped
find_duplicates(resp_events_clean)

In [276]:
# create two columns based on item_id: 
resp_events_clean["label"] = resp_events_clean["itemid"].map(item_id_to_label)
resp_events_clean["variable"] = resp_events_clean["itemid"].map(resp_mapper_dict)

In [None]:
resp_events_clean.value_counts('itemid')

### pivoting and coalescing

In [278]:
# this is for EDA
resp_wider_in_lables = resp_events_clean.pivot(
    index = ["hadm_id", "time"], 
    columns = ["variable", "label"],
    values = "value" 
)

In [None]:
resp_wider_in_lables.columns

In [None]:
# this is for actually cleaning based on item ids
resp_wider_in_ids = resp_events_clean.pivot(
    index = ["hadm_id", "time"], 
    columns = ["itemid"],
    values = "value" 
).reset_index()
resp_wider_in_ids = convert_and_sort_datetime(resp_wider_in_ids, event_type = "other")
resp_wider_in_ids.columns

In [None]:
# implement the coalease logic
resp_wider_in_ids["tracheostomy"] = resp_wider_in_ids[225448].fillna(resp_wider_in_ids[226237])
resp_wider_in_ids["lpm_set"] = resp_wider_in_ids[223834].fillna(resp_wider_in_ids[227287])
resp_wider_in_ids["tidal_volume_obs"] = (
    resp_wider_in_ids[224685].fillna(resp_wider_in_ids[224686]).fillna(resp_wider_in_ids[224421])
    )
resp_wider_in_ids["resp_rate_set"] = resp_wider_in_ids[224688].fillna(resp_wider_in_ids[227581])
resp_wider_in_ids["resp_rate_obs"] = resp_wider_in_ids[224690].fillna(resp_wider_in_ids[224422])
resp_wider_in_ids["flow_rate_set"] = resp_wider_in_ids[224691].fillna(resp_wider_in_ids[227582])
resp_wider_in_ids["peep_set"] = resp_wider_in_ids[220339].fillna(resp_wider_in_ids[227579])
resp_wider_in_ids["mode_name"] = (
    resp_wider_in_ids[223849].fillna(resp_wider_in_ids[229314].fillna(resp_wider_in_ids[227577])) # FIXME
    )

In [None]:
# remove duplicate variable columns that were coaleased into one
resp_wider_cleaned = resp_wider_in_ids.drop(
    columns = [225448, 226237, 223834, 227287, 224685, 224686, 224421, 224688, 227581, 224690, 
               224422, 224691, 227582, 220339, 227579, 223849, 229314, 227577]
    )
resp_wider_cleaned.rename(columns=resp_mapper_dict, inplace = True)
resp_wider_cleaned

In [None]:
# check whether there are still duplicates that remain
(resp_wider_cleaned.columns.value_counts() > 1).sum()

In [None]:
resp_wider_cleaned

In [285]:
# map _name to _category
resp_wider_cleaned["device_category"] = resp_wider_cleaned["device_name"].map(
    lambda x: resp_device_mapper_dict[x.strip()] if pd.notna(x) else None
    )
resp_wider_cleaned["mode_category"] = resp_wider_cleaned["mode_name"].map(resp_mode_mapper_dict)

In [None]:
# check for mapping -- looks like it's fine
resp_wider_cleaned.value_counts(["device_name", "device_category"], dropna = False)

In [None]:
# check for mapping
resp_counts = resp_wider_cleaned.value_counts(["mode_name", "mode_category", "device_category"], dropna = False).reset_index()
resp_counts

In [None]:
# check for mapping
mode_device_counts = resp_wider_cleaned.value_counts(["mode_category", "device_category"], dropna = False)
mode_device_counts

### rename and save

In [None]:
resp_final = rename_and_reorder_cols(
    resp_wider_cleaned, 
    rename_mapper_dict = {"hadm_id": "hospitalization_id", "time": "recorded_dttm"}, 
    new_col_order = resp_columns
)
resp_final

In [291]:
# save_to_rclif(resp_final, "respiratory_support")