In [1]:
import pandas as pd
import numpy as np
import duckdb
import pyCLIF as pc

Loaded configuration from config.json
{'site_name': 'RUSH', 'tables_path': 'C:/Users/vchaudha/OneDrive - rush.edu/ATS2024/RUSH_CLIF/', 'file_type': 'csv'}


## Base Population

#### ADT

In [2]:
adt = pc.load_data('clif_adt')
adt= adt[['hospitalization_id','in_dttm','location_category','hospital_id']]
adt['in_dttm'] = pc.getdttm(adt['in_dttm'])
pc.deftime(adt['in_dttm'])

Data loaded successfully from C:/Users/vchaudha/OneDrive - rush.edu/ATS2024/RUSH_CLIF/clif_adt.csv
Count with hours and minutes: 1072486
Count without hours and minutes: 0


#### cohort filters

In [3]:
rst_col = [ 'hospitalization_id', 'recorded_dttm', 'device_category', 'mode_category','fio2_set']
rst = pc.load_data('clif_respiratory_support')
rst = rst[rst_col]

hosp = pc.load_data('clif_hospitalization')
pat = pc.load_data('clif_patient')

imv_hosp_ids = rst[rst['device_category'].str.lower()=='imv'].hospitalization_id.unique()
icu_hosp_ids = adt[adt['location_category'].str.lower()=='icu'].hospitalization_id.unique()

icu_hosp_ids = [x for x in icu_hosp_ids if x is not None]
imv_hosp_ids = [x for x in imv_hosp_ids if x is not None]

hosp = hosp[
    (hosp['admission_dttm'].dt.year >= 2020) &
    (hosp['admission_dttm'].dt.year <= 2021) &
    (hosp['hospitalization_id'].isin(np.intersect1d(imv_hosp_ids, icu_hosp_ids))) &
    (hosp['age_at_admission'] <=119)
].reset_index(drop=True)

required_id= hosp['hospitalization_id'].unique()
print(len(required_id),' : potential cohort count')

base = pd.merge(hosp,pat,on='patient_id',how='inner')\
[['patient_id', 'hospitalization_id','admission_dttm', 'discharge_dttm','age_at_admission', 'discharge_category','sex_category','race_category', 'ethnicity_category']]

base['admission_dttm'] = pc.getdttm(base['admission_dttm'])

base.columns

adt = adt[adt['hospitalization_id'].isin(required_id)].reset_index(drop=True)
rst = rst[rst['hospitalization_id'].isin(required_id)].reset_index(drop=True)

del hosp,pat

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Data loaded successfully from C:/Users/vchaudha/OneDrive - rush.edu/ATS2024/RUSH_CLIF/clif_respiratory_support.csv
Data loaded successfully from C:/Users/vchaudha/OneDrive - rush.edu/ATS2024/RUSH_CLIF/clif_hospitalization.csv
Data loaded successfully from C:/Users/vchaudha/OneDrive - rush.edu/ATS2024/RUSH_CLIF/clif_patient.csv
3553  : potential cohort count


#### Resp Support

In [4]:
rst = rst[rst['hospitalization_id'].isin(required_id)].reset_index(drop=True)
rst['device_category'] = rst['device_category'].str.lower()
rst['recorded_dttm_sec'] = pc.getdttm(rst['recorded_dttm'],cutby=None)
rst['recorded_dttm'] = pc.getdttm(rst['recorded_dttm'])

In [5]:
rst = rst.sort_values(by=['hospitalization_id','recorded_dttm_sec'], ascending=False).groupby(
    ['hospitalization_id', 'recorded_dttm'], as_index=False
).agg({'device_category': 'first', 'mode_category': 'first','fio2_set':'first'}).reset_index(drop=True)

In [6]:
pc.deftime(rst['recorded_dttm'])

Count with hours and minutes: 1655141
Count without hours and minutes: 0


### MAC

In [7]:
mac = pc.load_data('clif_medication_admin_continuous')
mac_col = ['hospitalization_id', 'admin_dttm','med_dose','med_category']
mac = mac[(mac['hospitalization_id'].isin(required_id)) & (mac['med_category'].isin( [
        "norepinephrine",
        "epinephrine",
        "phenylephrine",
        "angiotensin",
        "vasopressin",
        "dopamine",
        "dobutamine",
        "milrinone",
        "isoproterenol",
    ]))][mac_col].reset_index(drop=True)
mac['admin_dttm'] = pc.getdttm(mac['admin_dttm'])

Data loaded successfully from C:/Users/vchaudha/OneDrive - rush.edu/ATS2024/RUSH_CLIF/clif_medication_admin_continuous.csv


### Patient_assessment

In [8]:
cat_values_mapping_dict = {
    'negative': 1,
    'fail': 1,
    'pass': 1,
    'positive': 1,
    None: np.nan ,
    np.nan : np.nan,
    'yes':1,
    'no':1
}

pat_assess_cats_rquired = [ 'sbt_delivery_pass_fail',
                            'sbt_screen_pass_fail']

pat_at = pc.load_data('clif_patient_assessments',-1)
pat_at_col = ['hospitalization_id', 'recorded_dttm','numerical_value', 'categorical_value','assessment_category']
pat_at['assessment_category'] = pat_at['assessment_category'].str.lower()
pat_at = pat_at[(pat_at['hospitalization_id'].isin(required_id)) & (pat_at['assessment_category'].isin(pat_assess_cats_rquired)) ][pat_at_col].reset_index(drop=True)
pat_at['recorded_dttm'] = pc.getdttm(pat_at['recorded_dttm'])
pat_at['categorical_value'] = pat_at['categorical_value'].str.lower().map(cat_values_mapping_dict)
pat_at['assessment_value'] = pat_at['numerical_value'].combine_first(pat_at['categorical_value'])
pat_at.drop(columns=['numerical_value','categorical_value'],inplace=True)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Data loaded successfully from C:/Users/vchaudha/OneDrive - rush.edu/ATS2024/RUSH_CLIF/clif_patient_assessments.csv


  pat_at['assessment_value'] = pat_at['numerical_value'].combine_first(pat_at['categorical_value'])


In [10]:
pat_at['assessment_value'].unique()

array([1.])

### vitals

In [None]:
vit = pc.load_data('clif_vitals',-1)
vit_col = ['hospitalization_id','recorded_dttm','vital_category','vital_value' ]
vit = vit[vit_col]
vit['vital_category'] = vit['vital_category'].str.lower()
vit = vit[vit['vital_category'].isin(['map','heart_rate','sbp','dbp','spo2','respiratory_rate'])].reset_index(drop=True)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

## Wide Dataset

In [None]:
duckdb.register("base", base)
duckdb.register("pat_at", pat_at)
duckdb.register("rst", rst)
duckdb.register("mac", mac)
duckdb.register('adt',adt)

q="""
WITH
    uni_event_dttm as (
        select distinct
            hospitalization_id,
            event_time
        from
            (
                SELECT
                    hospitalization_id,
                    in_dttm AS event_time
                FROM
                    adt
                where
                    in_dttm is not null
                UNION
                SELECT
                    hospitalization_id,
                    recorded_dttm AS event_time
                FROM
                    rst
                where
                    recorded_dttm is not null
                UNION
                SELECT
                    hospitalization_id,
                    recorded_dttm AS event_time
                FROM
                    pat_at
                where
                    recorded_dttm is not null
                UNION
                SELECT
                    hospitalization_id,
                    admin_dttm AS event_time
                FROM
                    mac
                where
                    admin_dttm is not null
            ) uni_time
    )
select distinct
    patient_id,
    a.hospitalization_id,
    admission_dttm,
    discharge_dttm,
    age_at_admission,
    discharge_category,
    sex_category,
    race_category,
    ethnicity_category,
    event_time
from
    base a
    left join uni_event_dttm b on a.hospitalization_id = b.hospitalization_id
"""
wide_cohort_df = duckdb.sql(q).df()
pc.deftime(wide_cohort_df['event_time'])

#### pivots for assessment and mac table 

In [None]:
query = """
WITH pas_data AS (
    SELECT  distinct assessment_value ,	assessment_category	,
    hospitalization_id || '_' || strftime(recorded_dttm, '%Y%m%d%H%M') AS combo_id
    FROM pat_at where recorded_dttm is not null 
) 
PIVOT pas_data
ON assessment_category
USING first(assessment_value)
GROUP BY combo_id
"""
p_pas = duckdb.sql(query).df()

query = """
WITH mac_data AS (
    SELECT  distinct med_dose ,	med_category	,
    hospitalization_id || '_' || strftime(admin_dttm, '%Y%m%d%H%M') AS combo_id
    FROM mac where admin_dttm is not null 
) 
PIVOT mac_data
ON med_category
USING min(med_dose)
GROUP BY combo_id
"""
p_mac = duckdb.sql(query).df()

#### id-ing all unique timestamps

In [None]:
duckdb.register("expanded_df", wide_cohort_df)
duckdb.register("p_pas", p_pas)
duckdb.register("p_mac", p_mac)

q="""
  WITH
    u_rst as (
        select
            *,
            hospitalization_id || '_' || strftime (recorded_dttm, '%Y%m%d%H%M') AS combo_id
        from
            rst
    ),
    u_adt as (
        select
            *,
            hospitalization_id || '_' || strftime (in_dttm, '%Y%m%d%H%M') AS combo_id
        from
            adt
    ),
    u_expanded_df as (
        select
            *,
            hospitalization_id || '_' || strftime (event_time, '%Y%m%d%H%M') AS combo_id
        from
            expanded_df
    )
select
    *
from
    u_expanded_df a
    left join u_adt d on a.combo_id = d.combo_id
    left join u_rst e on a.combo_id = e.combo_id
    left join p_mac g on a.combo_id = g.combo_id
    left join p_pas h on a.combo_id = h.combo_id

                    
"""

all_join_df = duckdb.sql(q).df().drop_duplicates()

In [None]:
if all_join_df.shape[0] != wide_cohort_df.shape[0]:
    print('Data has duplicates or same timestamp issue, contact project owner')
else:
    del rst,mac,pat_at

#### removing wide-supporting columns and adding forward fills

In [None]:
all_join_df.drop(columns= ['hospitalization_id_1','hospitalization_id_2','combo_id','combo_id_1', 'combo_id_2' ,'combo_id_3','recorded_dttm','combo_id_4','in_dttm'], axis = 1,inplace=True)

all_join_df['event_time'] = pd.to_datetime(all_join_df['event_time'])
all_join_df['date'] = all_join_df['event_time'].dt.date

all_join_df = all_join_df.sort_values(['hospitalization_id', 'event_time']).reset_index(drop=True)

all_join_df['device_category_ffill'] = all_join_df.groupby('hospitalization_id')['device_category'].ffill()
all_join_df['location_category_ffill'] = all_join_df.groupby('hospitalization_id')['location_category'].ffill()
# Assign day numbers to each 'hospitalization_id'
all_join_df['day_number'] = all_join_df.groupby('hospitalization_id')['date'].rank(method='dense').astype(int)

# Create the combo_key by combining 'hospitalization_id' and 'day_number'
all_join_df['hosp_id_day_key'] = all_join_df['hospitalization_id'].astype(str) + '_day_' + all_join_df['day_number'].astype(str)

In [None]:
columns_to_check = ['sat_delivery_pass_fail', 'sat_screen_pass_fail']
for col in columns_to_check:
    if col not in all_join_df.columns:
        all_join_df[col] = np.nan

In [None]:
all_join_df.to_csv('../output/intermediate/study_cohort.csv', index=False)