In [None]:
import pandas as pd
import numpy as np
import duckdb
import pyCLIF as pc

#### base population

ADT

In [None]:
adt = pc.load_data('clif_adt')
adt['in_dttm'] = pc.getdttm(adt['in_dttm'])
adt= adt[['hospitalization_id','in_dttm','location_category']]
pc.deftime(adt['in_dttm'])

Respiratory Support

In [None]:
rst = pc.load_data('clif_respiratory_support')
rst['recorded_dttm'] = pc.getdttm(rst['recorded_dttm'])
rst['device_category'] = rst['device_category'].str.lower()
rst['mode_category'] = rst['mode_category'].str.lower()
pc.deftime(rst['recorded_dttm'])

Hosp & Pats

In [None]:
rst = pc.load_data('clif_respiratory_support')
hosp = pc.load_data('clif_hospitalization')
pat = pc.load_data('clif_patient')

hosp = hosp[
    (hosp['admission_dttm'].dt.year >= 2019) &
    (hosp['admission_dttm'].dt.year <= 2024) &
    (hosp['hospitalization_id'].isin(adt[adt['location_category'].str.lower()=='icu'].hospitalization_id.unique())) 
].reset_index(drop=True)

required_id= hosp['hospitalization_id'].unique()
print(len(required_id),' : potential cohort count')

base = pd.merge(hosp,pat,on='patient_id',how='inner')\
[['patient_id', 'hospitalization_id','admission_dttm', 'discharge_dttm','age_at_admission', 'discharge_category','sex_category','race_category', 'ethnicity_category']]

base['admission_dttm'] = pc.getdttm(base['admission_dttm'])

print(base.columns)

adt = adt[adt['hospitalization_id'].isin(required_id)].reset_index(drop=True)
rst = rst[rst['hospitalization_id'].isin(required_id)].reset_index(drop=True)

del hosp,pat

Medication Admin Continuous

In [None]:
mac = pc.load_data('clif_medication_admin_continuous')
mac['med_group'] = mac['med_group'].str.lower()
mac['med_category'] = mac['med_category'].str.lower()
mac['med_category_group'] = mac['med_category'].astype(str) +'_'+ mac['med_group'].astype(str)
mac_col = ['hospitalization_id', 'admin_dttm','med_dose','med_category_group']
mac = mac[(mac['hospitalization_id'].isin(required_id)) & (mac['med_group'].isin(['vasoactives', 'sedation', 'cardiac', 'paralytics']))][mac_col].reset_index(drop=True)
mac['admin_dttm'] = pc.getdttm(mac['admin_dttm'])
pc.deftime(mac['admin_dttm'])

Patient Assessments

In [None]:
pas_cat_values_mapping_dict = {
'negative': 0,
'fail': 0,
'pass': 1,
'positive': 1,
None: np.nan ,
np.nan : np.nan,
'yes':1,
'no':0
}
pas_cat_col=['gcs_total','rass','cpot_total']
pas_col = ['hospitalization_id', 'recorded_dttm','numerical_value', 'categorical_value','assessment_category']

pas = pc.load_data('clif_patient_assessments',-1)

pas = pas[(pas['hospitalization_id'].isin(required_id)) & (pas['assessment_category'].isin(pas_cat_col)) ][pas_col].reset_index(drop=True)
pas['recorded_dttm'] = pc.getdttm(pas['recorded_dttm'])
pas['categorical_value'] = pas['categorical_value'].str.lower().map(pas_cat_values_mapping_dict)
pas['assessment_value'] = pas['numerical_value'].combine_first(pas['categorical_value'])
pas.drop(columns=['numerical_value','categorical_value'],inplace=True)

pc.deftime(pas['recorded_dttm'])

Labs

In [None]:
labs = pc.load_data('clif_labs',-1)

labs = labs[['lab_value_numeric','hospitalization_id','lab_result_dttm','lab_category']]
labs = labs[labs['hospitalization_id'].isin(required_id)].reset_index(drop=True)
labs['lab_result_dttm'] = pc.getdttm(labs['lab_result_dttm'])
pc.deftime(labs['lab_result_dttm'])

Vitals

In [None]:
vit = pc.load_data('clif_vitals',-1)[['hospitalization_id', 'recorded_dttm', 'vital_category', 'vital_value']]
vit = vit[(vit['hospitalization_id'].isin(required_id)) & (vit['vital_category'].isin(['temp_c', 'heart_rate', 'sbp', 'dbp', 'spo2', 'respiratory_rate', 'height_cm', 'weight_kg']))].reset_index(drop=True)
vit['recorded_dttm'] = pd.to_datetime(vit['recorded_dttm']).dt.ceil('min')
vit['vital_value'] = pd.to_numeric(vit['vital_value'], errors='coerce')
numeric_vitals = vit.dropna(subset=['vital_value']).reset_index(drop=True)
numeric_vitals = numeric_vitals[numeric_vitals['recorded_dttm'].dt.hour.notna() & numeric_vitals['recorded_dttm'].dt.minute.notna()].reset_index(drop=True)
del vit
pc.deftime(numeric_vitals['recorded_dttm'])

In [None]:
numeric_vitals.vital_category.unique()

Microbiology Culture

In [None]:
micro = pc.load_data('clif_microbiology_culture',-1)
micro = micro[(micro['hospitalization_id'].isin(required_id))].reset_index(drop=True)
micro = micro[(micro['component_category'].str.lower() =='culture') & (micro['fluid_category'].str.lower() == 'blood/buffy coat')].reset_index(drop=True)
micro['micro_component_fluid_category'] = 'culture_blood'
micro = micro[['hospitalization_id', 'order_dttm',  'micro_component_fluid_category']]
micro['value']='ordered'
micro['order_dttm'] = pc.getdttm(micro['order_dttm'])
pc.deftime(micro['order_dttm'])

Medication Admin Intermittent

In [None]:
mai = pc.load_data("clif_medication_admin_intermittent")
mai = mai[(mai['hospitalization_id'].isin(required_id))].reset_index(drop=True)
mai = mai[
    (mai['med_group'].str.lower()=='antibiotics') &
    (mai['med_route_name'].str.contains('iv|Intravenous', case=False,regex=True))
].reset_index(drop=True)

mai['med_category_group'] = mai['med_category'].astype(str) +'_'+ mai['med_group'].astype(str)
mai = mai[['hospitalization_id', 'admin_dttm','med_dose','med_category_group' ]]
mai['admin_dttm'] = pc.getdttm(mai['admin_dttm'])
pc.deftime(mai['admin_dttm'])

Making Wide

In [None]:
duckdb.register("hosp", base)
duckdb.register("labs", labs)
duckdb.register("adt", adt)
duckdb.register("vit", numeric_vitals)
duckdb.register("pas", pas)
duckdb.register("mac", mac)
duckdb.register("micro", micro)
duckdb.register("mai", mai)

q="""
    WITH uni_event_dttm as (
        
    select distinct hospitalization_id,event_time from (
    SELECT hospitalization_id, in_dttm AS event_time
    FROM adt where in_dttm is not null
    
    UNION
    
    SELECT hospitalization_id, lab_result_dttm AS event_time
    FROM labs where lab_result_dttm is not null
    
    UNION
    
    SELECT hospitalization_id, recorded_dttm AS event_time
    FROM vit where recorded_dttm is not null 
    
    UNION
    
    SELECT hospitalization_id, recorded_dttm AS event_time
    FROM rst where recorded_dttm is not null

    UNION
    
    SELECT hospitalization_id, recorded_dttm AS event_time
    FROM pas where recorded_dttm is not null
    
    UNION
    
    SELECT hospitalization_id, admin_dttm AS event_time
    FROM mac where admin_dttm is not null
        
    UNION
    
    SELECT hospitalization_id, admin_dttm AS event_time
    FROM mai where admin_dttm is not null
        
    UNION
    
    SELECT hospitalization_id, order_dttm AS event_time
    FROM micro where order_dttm is not null

    ) uni_time
    )
    select distinct * from hosp a left join uni_event_dttm b on a.hospitalization_id=b.hospitalization_id

"""
expanded_df = duckdb.sql(q).df()
pc.deftime(expanded_df['event_time'])

Pivots

In [None]:
mai.head()

In [14]:
query = """
WITH mai_data AS (
    SELECT  distinct med_category_group,	med_dose	,
    hospitalization_id || '_' || strftime(admin_dttm, '%Y%m%d%H%M') AS combo_id
    FROM mai where admin_dttm is not null 
)
PIVOT mai_data
ON med_category_group
USING first(med_dose)
GROUP BY combo_id
"""
p_mai = duckdb.sql(query).df()

In [None]:
p_mai.head()

In [None]:
micro.head()

In [17]:
query = """
WITH micro_data AS (
    SELECT  distinct micro_component_fluid_category,	value	,
    hospitalization_id || '_' || strftime(order_dttm, '%Y%m%d%H%M') AS combo_id
    FROM micro where order_dttm is not null 
)
PIVOT micro_data
ON micro_component_fluid_category
USING first(value)
GROUP BY combo_id
"""
p_micro = duckdb.sql(query).df()

In [None]:
p_micro.head()

In [None]:
numeric_vitals.head()

In [None]:
query = """
WITH vital_data AS (
    SELECT  distinct vital_category,	vital_value	,
    hospitalization_id || '_' || strftime(recorded_dttm, '%Y%m%d%H%M') AS combo_id
    FROM numeric_vitals where recorded_dttm is not null 
)
PIVOT vital_data
ON vital_category
USING mean(vital_value)
GROUP BY combo_id
"""
p_numeric_vitals = duckdb.sql(query).df()

if 'map' not in p_numeric_vitals.columns:
    p_numeric_vitals['map'] = ((p_numeric_vitals['sbp'].fillna(0) + 2 * p_numeric_vitals['dbp'].fillna(0)) / 3).round(0)
    p_numeric_vitals.loc[p_numeric_vitals['sbp'].isna() | p_numeric_vitals['dbp'].isna(), 'map'] = np.nan

In [None]:
p_numeric_vitals.head()

In [None]:
labs.head()

In [None]:
query = """
WITH labs_data AS (
    SELECT  distinct lab_value_numeric,	lab_category	,
    hospitalization_id || '_' || strftime(lab_result_dttm, '%Y%m%d%H%M') AS combo_id
    FROM labs where lab_result_dttm is not null 
) 
PIVOT labs_data
ON lab_category
USING mean(lab_value_numeric)
GROUP BY combo_id
"""
p_labs = duckdb.sql(query).df()

In [None]:
p_labs.head()

In [None]:
pas.head()

In [None]:
query = """
WITH pas_data AS (
    SELECT  distinct assessment_value ,	assessment_category	,
    hospitalization_id || '_' || strftime(recorded_dttm, '%Y%m%d%H%M') AS combo_id
    FROM pas where recorded_dttm is not null 
) 
PIVOT pas_data
ON assessment_category
USING first(assessment_value)
GROUP BY combo_id
"""
p_pas = duckdb.sql(query).df()
p_pas = p_pas.where(pd.notnull(p_pas), np.nan)

In [None]:
p_pas.head()

In [None]:
mac.head()

In [29]:
query = """
WITH mac_data AS (
    SELECT  distinct med_dose ,	med_category_group,
    hospitalization_id || '_' || strftime(admin_dttm, '%Y%m%d%H%M') AS combo_id
    FROM mac where admin_dttm is not null 
) 
PIVOT mac_data
ON med_category_group
USING min(med_dose)
GROUP BY combo_id
"""
p_mac = duckdb.sql(query).df()

In [None]:
p_mac.head()

In [None]:
duckdb.register("expanded_df", expanded_df)
duckdb.register("p_labs", p_labs)
duckdb.register("p_numeric_vitals", p_numeric_vitals)
duckdb.register("p_pas", p_pas)
duckdb.register("p_mac", p_mac)
duckdb.register("p_micro", p_micro)
duckdb.register("p_mai", p_mai)

q="""
    WITH u_adt as (select *, hospitalization_id || '_' || strftime(in_dttm, '%Y%m%d%H%M') AS combo_id from adt),

    u_rst as (select *, hospitalization_id || '_' || strftime(recorded_dttm, '%Y%m%d%H%M') AS combo_id from rst),

    u_expanded_df as (select *, hospitalization_id || '_' || strftime(event_time, '%Y%m%d%H%M') AS combo_id from expanded_df)

    select * from  u_expanded_df a 

    left join u_adt b on a.combo_id=b.combo_id   

        left join p_labs c on a.combo_id=c.combo_id

            left join p_numeric_vitals d on a.combo_id=d.combo_id 

                left join u_rst e on a.combo_id=e.combo_id 

                        left join p_mac g on a.combo_id=g.combo_id 
                        
                            left join p_pas h on a.combo_id=h.combo_id 

                                left join p_mai i on a.combo_id=i.combo_id 

                                    left join p_micro j on a.combo_id=j.combo_id 
"""

all_join_df = duckdb.sql(q).df().drop_duplicates()

In [None]:
if all_join_df.shape[0] != expanded_df.shape[0]:
    print('Data has duplicates or same timestamp, contact project owner')
else:
    print('thank you !!!')

In [None]:
list(all_join_df.columns)

In [36]:
cohort_col = \
[
'patient_id',
 'hospitalization_id',
 'admission_dttm',
 'discharge_dttm',
 'age_at_admission',
 'discharge_category',
 'sex_category',
 'race_category',
 'ethnicity_category',

 'event_time',


 'in_dttm',
 'location_category',

 'albumin',
 'alkaline_phosphatase',
 'alt',
 'anion_gap',
 'ast',
 'basophil_absolute',
 'basophil_percent',
 'bicarbonate',
 'bicarbonate_arterial',
 'bicarbonate_venous',
 'bilirubin_conjugated',
 'bilirubin_total',
 'bun',
 'calcium_ionized',
 'calcium_total',
 'chloride',
 'creatinine',
 'crp',
 'eosinophils_absolute',
 'eosinophils_percent',
 'esr',
 'ferritin',
 'glucose_fingerstick',
 'glucose_serum',
 'hematocrit',
 'hemoglobin',
 'inr',
 'lactate',
 'ldh',
 'lymphocytes_absolute',
 'lymphocytes_percent',
 'magnesium',
 'monocytes_absolute',
 'monocytes_percent',
 'neutrophils_absolute',
 'neutrophils_percent',
 'paco2',
 'pco2_arterial',
 'pco2_venous',
 'ph_arterial',
 'ph_venous',
 'phosphate',
 'platelet_count',
 'po2_arterial',
 'potassium',
 'procalcitonin',
 'pt',
 'ptt',
 'rdw',
 'so2_arterial',
 'so2_central_venous',
 'so2_mixed_venous',
 'sodium',
 'total_protein',
 'wbc',

 'dbp',
 'heart_rate',
 'height_cm',
 'respiratory_rate',
 'sbp',
 'spo2',
 'temp_c',
 'weight_kg',
 'map',

 'P/F',
 'device_name',
 'fio2_set',
 'flow_rate_set',
 'inspiratory_time_set',
 'lpm_set',
 'mean_airway_pressure_obs',
 'minute_vent_obs',
 'peak_inspiratory_pressure_obs',
 'peak_inspiratory_pressure_set',
 'peep_obs',
 'peep_set',
 'plateau_pressure_obs',
 'pressure_support_set',
 'resp_rate_obs',
 'resp_rate_set',
 'tidal_volume_obs',
 'tidal_volume_set',
 'device_category',
 'mode_category',
 'tracheostomy',
 'pressure_control_set',

 'adenosine_cardiac',
 'amiodarone_cardiac',
 'cisatracurium_paralytics',
 'dexmedetomidine_sedation',
 'diltiazem_cardiac',
 'dobutamine_vasoactives',
 'dopamine_vasoactives',
 'epinephrine_vasoactives',
 'esmolol_cardiac',
 'fentanyl_sedation',
 'hydromorphone_sedation',
 'isoproterenol_vasoactives',
 'ketamine_sedation',
 'labetalol_cardiac',
 'lidocaine_cardiac',
 'lorazepam_sedation',
 'midazolam_sedation',
 'milrinone_vasoactives',
 'morphine_sedation',
 'nicardipine_cardiac',
 'nitroprusside_cardiac',
 'norepinephrine_vasoactives',
 'papaverine_cardiac',
 'pentobarbital_sedation',
 'phenylephrine_vasoactives',
 'procainamide_cardiac',
 'propofol_sedation',
 'remifentanil_sedation',
 'rocuronium_paralytics',
 'vasopressin_vasoactives',

 'cpot_total',
 'gcs_total',
 'rass',

 'ampicillin-sulbact_antibiotics',
 'azithromycin_antibiotics',
 'cefazolin_antibiotics',
 'cefepime_antibiotics',
 'cefoxitin_antibiotics',
 'ceftolozane-tazobactam_antibiotics',
 'cfepime_antibiotics',
 'ciprofloxacin_antibiotics',
 'clindamycin_antibiotics',
 'colistimethate_antibiotics',
 'daptomycin_antibiotics',
 'doxycycline_antibiotics',
 'gentamicin_antibiotics',
 'levofloxacin_antibiotics',
 'linezolid_antibiotics',
 'meropenem-vaborbactam_antibiotics',
 'meropenem_antibiotics',
 'metronidazole_antibiotics',
 'moxifloxacin_antibiotics',
 'penicillin_antibiotics',
 'piperacillin-tazob_antibiotics',
 'piperacillin-tazobactam_antibiotics',
 'piperacillin/tazobactam_antibiotics',
 'tobramycin_antibiotics',
 'vancomycin_antibiotics',

 'culture_blood'
]

In [38]:
all_join_df[cohort_col].\
to_csv(f"C:/Users/vchaudha/Downloads/ATS2024/CLIF-Ltach-prediction/output/intermediate/study_cohort_ltach.csv",index=False)