In [None]:
# imv , nippv, hf, face mask, nc, room air 

In [1]:
import pandas as pd
import numpy as np
import duckdb
import pyCLIF as pc

Loaded configuration from config.json
{'site_name': 'UCMC', 'tables_path': '/Users/kavenchhikara/Desktop/CLIF/CLIF-UCMC/rclif/c19/', 'file_type': 'parquet'}


### Base Population

In [2]:
adt = pc.load_data('clif_adt')
adt= adt[['hospitalization_id','in_dttm','location_category']]
adt['in_dttm'] = pc.getdttm(adt['in_dttm'])
pc.deftime(adt['in_dttm'])

Data loaded successfully from /Users/kavenchhikara/Desktop/CLIF/CLIF-UCMC/rclif/c19/clif_adt.parquet
Count with hours and minutes: 694833
Count without hours and minutes: 0


In [5]:
rst_col = [ 'hospitalization_id', 'recorded_dttm', 'device_category']
rst = pc.load_data('clif_respiratory_support')
rst = rst[rst_col]

# Identify conflicts and take the first non-null device_category per group
# Create a boolean indicator for non-null 'device_category'
rst['device_category_notnull'] = rst['device_category'].notnull()

# Sort the DataFrame so that non-null 'device_category' values come first within each group
rst = rst.sort_values(
    ['hospitalization_id', 'recorded_dttm', 'device_category_notnull'],
    ascending=[True, True, False]
)

# Drop duplicates, keeping the first occurrence (which has the non-null 'device_category')
rst = rst.drop_duplicates(subset=['hospitalization_id', 'recorded_dttm'], keep='first')

# Drop the temporary indicator column
rst = rst.drop(columns=['device_category_notnull'])

# Reset the index if desired
rst = rst.reset_index(drop=True)

In [8]:
hosp = pc.load_data('clif_hospitalization')
pat = pc.load_data('clif_patient')

imv_hosp_ids = rst[rst['device_category'].str.lower()=='imv'].hospitalization_id.unique()
icu_hosp_ids = adt[adt['location_category'].str.lower()=='icu'].hospitalization_id.unique()

hosp = hosp[
    (hosp['admission_dttm'].dt.year >= 2020) &
    (hosp['admission_dttm'].dt.year <= 2021) &
    (hosp['hospitalization_id'].isin(np.intersect1d(imv_hosp_ids, icu_hosp_ids))) &
    (hosp['age_at_admission'] <=119)
].reset_index(drop=True)

required_id= hosp['hospitalization_id'].unique()
print(len(required_id),' : potential cohort count')

base = pd.merge(hosp,pat,on='patient_id',how='inner')\
[['patient_id', 'hospitalization_id','admission_dttm', 'discharge_dttm','age_at_admission', 'discharge_category','sex_category','race_category', 'ethnicity_category']]

base['admission_dttm'] = pc.getdttm(base['admission_dttm'])

base.columns

adt = adt[adt['hospitalization_id'].isin(required_id)].reset_index(drop=True)
rst = rst[rst['hospitalization_id'].isin(required_id)].reset_index(drop=True)

del hosp,pat

Data loaded successfully from /Users/kavenchhikara/Desktop/CLIF/CLIF-UCMC/rclif/c19/clif_hospitalization.parquet
Data loaded successfully from /Users/kavenchhikara/Desktop/CLIF/CLIF-UCMC/rclif/c19/clif_patient.parquet
5390  : potential cohort count


### Resp

In [9]:
rst = rst[rst['hospitalization_id'].isin(required_id)].reset_index(drop=True)
rst['recorded_dttm'] = pc.getdttm(rst['recorded_dttm'])
rst['device_category'] = rst['device_category'].str.lower()
# check2 = rst[rst.duplicated(subset=['hospitalization_id','recorded_dttm', 'device_category'], keep=False)]

### MAC

In [10]:
mac = pc.load_data('clif_medication_admin_continuous')
mac_col = ['hospitalization_id', 'admin_dttm','med_dose','med_category']
mac = mac[(mac['hospitalization_id'].isin(required_id)) & (mac['med_category'].isin(['fentanyl', 'propofol', 'lorazepam', 'midazolam','hydromorphone','morphine']))][mac_col].reset_index(drop=True)
mac['admin_dttm'] = pc.getdttm(mac['admin_dttm'])

Data loaded successfully from /Users/kavenchhikara/Desktop/CLIF/CLIF-UCMC/rclif/c19/clif_medication_admin_continuous.parquet


### Patient_assessment

In [11]:
cat_values_mapping_dict = {
    'negative': 0,
    'fail': 0,
    'pass': 1,
    'positive': 1,
    None: np.nan ,
    np.nan : np.nan,
    'yes':1,
    'no':0
}

pat_assess_cats_rquired = [ 'rass',
                            'sat_delivery_pass_fail',
                            'sat_screen_pass_fail']

pat_at = pc.load_data('clif_patient_assessments',-1)
pat_at_col = ['hospitalization_id', 'recorded_dttm','numerical_value', 'categorical_value','assessment_category']
pat_at = pat_at[(pat_at['hospitalization_id'].isin(required_id)) & (pat_at['assessment_category'].isin(pat_assess_cats_rquired)) ][pat_at_col].reset_index(drop=True)
pat_at['recorded_dttm'] = pc.getdttm(pat_at['recorded_dttm'])
pat_at['categorical_value'] = pat_at['categorical_value'].str.lower().map(cat_values_mapping_dict)
pat_at['assessment_value'] = pat_at['numerical_value'].combine_first(pat_at['categorical_value'])
pat_at.drop(columns=['numerical_value','categorical_value'],inplace=True)

Data loaded successfully from /Users/kavenchhikara/Desktop/CLIF/CLIF-UCMC/rclif/c19/clif_patient_assessments.parquet


In [12]:
pat_at['assessment_value'].unique()

array([], dtype=float64)

### Wide Dataset

In [13]:
duckdb.register("base", base)
duckdb.register("pat_at", pat_at)
duckdb.register("rst", rst)
duckdb.register("mac", mac)
duckdb.register('adt',adt)

q="""
WITH
    uni_event_dttm as (
        select distinct
            hospitalization_id,
            event_time
        from
            (
                SELECT
                    hospitalization_id,
                    in_dttm AS event_time
                FROM
                    adt
                where
                    in_dttm is not null
                UNION
                SELECT
                    hospitalization_id,
                    recorded_dttm AS event_time
                FROM
                    rst
                where
                    recorded_dttm is not null
                UNION
                SELECT
                    hospitalization_id,
                    recorded_dttm AS event_time
                FROM
                    pat_at
                where
                    recorded_dttm is not null
                UNION
                SELECT
                    hospitalization_id,
                    admin_dttm AS event_time
                FROM
                    mac
                where
                    admin_dttm is not null
            ) uni_time
    )
select distinct
    patient_id,
    a.hospitalization_id,
    admission_dttm,
    discharge_dttm,
    age_at_admission,
    discharge_category,
    sex_category,
    race_category,
    ethnicity_category,
    event_time
from
    base a
    left join uni_event_dttm b on a.hospitalization_id = b.hospitalization_id
"""
wide_cohort_df = duckdb.sql(q).df()
pc.deftime(wide_cohort_df['event_time'])

Count with hours and minutes: 1772658
Count without hours and minutes: 0


In [14]:
query = """
WITH pas_data AS (
    SELECT  distinct assessment_value ,	assessment_category	,
    hospitalization_id || '_' || strftime(recorded_dttm, '%Y%m%d%H%M') AS combo_id
    FROM pat_at where recorded_dttm is not null 
) 
PIVOT pas_data
ON assessment_category
USING first(assessment_value)
GROUP BY combo_id
"""
p_pas = duckdb.sql(query).df()

query = """
WITH mac_data AS (
    SELECT  distinct med_dose ,	med_category	,
    hospitalization_id || '_' || strftime(admin_dttm, '%Y%m%d%H%M') AS combo_id
    FROM mac where admin_dttm is not null 
) 
PIVOT mac_data
ON med_category
USING min(med_dose)
GROUP BY combo_id
"""
p_mac = duckdb.sql(query).df()

In [15]:
duckdb.register("expanded_df", wide_cohort_df)
duckdb.register("p_pas", p_pas)
duckdb.register("p_mac", p_mac)

q="""
  WITH
    u_rst as (
        select
            *,
            hospitalization_id || '_' || strftime (recorded_dttm, '%Y%m%d%H%M') AS combo_id
        from
            rst
    ),
    u_adt as (
        select
            *,
            hospitalization_id || '_' || strftime (in_dttm, '%Y%m%d%H%M') AS combo_id
        from
            adt
    ),
    u_expanded_df as (
        select
            *,
            hospitalization_id || '_' || strftime (event_time, '%Y%m%d%H%M') AS combo_id
        from
            expanded_df
    )
select
    *
from
    u_expanded_df a
    left join u_adt d on a.combo_id = d.combo_id
    left join u_rst e on a.combo_id = e.combo_id
    left join p_mac g on a.combo_id = g.combo_id
    left join p_pas h on a.combo_id = h.combo_id

                    
"""

all_join_df = duckdb.sql(q).df().drop_duplicates()

In [16]:
if all_join_df.shape[0] != wide_cohort_df.shape[0]:
    print('Data has duplicates or same timestamp, contact project owner')
else:
    del rst,mac,pat_at

Data has duplicates or same timestamp, contact project owner


In [17]:
all_join_df.columns

Index(['patient_id', 'hospitalization_id', 'admission_dttm', 'discharge_dttm',
       'age_at_admission', 'discharge_category', 'sex_category',
       'race_category', 'ethnicity_category', 'event_time',
       'location_category', 'device_category', 'fentanyl', 'hydromorphone',
       'lorazepam', 'midazolam', 'morphine', 'propofol', 'date',
       'device_category_ffill', 'location_category_ffill', 'day_number',
       'hosp_id_day_key'],
      dtype='object')

In [17]:
check = all_join_df[all_join_df.duplicated(subset=['hospitalization_id','event_time'], keep=False)]

In [18]:
filtered_rst = rst[rst['hospitalization_id'] == '11633026']
filtered_rst


Unnamed: 0,hospitalization_id,recorded_dttm,device_category
97312,11633026,2021-07-09 10:39:00,nasal cannula
97313,11633026,2021-07-09 10:39:00,room air
97314,11633026,2021-07-09 13:41:00,
97315,11633026,2021-07-09 14:00:00,imv
97316,11633026,2021-07-09 14:15:00,
...,...,...,...
97465,11633026,2021-07-13 15:51:00,nasal cannula
97466,11633026,2021-07-13 18:00:00,
97467,11633026,2021-07-13 19:00:00,
97468,11633026,2021-07-13 22:00:00,


In [12]:
all_join_df.columns

Index(['patient_id', 'hospitalization_id', 'admission_dttm', 'discharge_dttm',
       'age_at_admission', 'discharge_category', 'sex_category',
       'race_category', 'ethnicity_category', 'event_time', 'combo_id',
       'hospitalization_id_1', 'in_dttm', 'location_category', 'combo_id_1',
       'hospitalization_id_2', 'recorded_dttm', 'device_category',
       'combo_id_2', 'combo_id_3', 'fentanyl', 'hydromorphone', 'lorazepam',
       'midazolam', 'morphine', 'propofol', 'combo_id_4'],
      dtype='object')

In [13]:
all_join_df.drop(columns= ['hospitalization_id_1','hospitalization_id_2','combo_id','combo_id_1', 'combo_id_2' ,'combo_id_3','recorded_dttm','combo_id_4','in_dttm'], axis = 1,inplace=True)

all_join_df['event_time'] = pd.to_datetime(all_join_df['event_time'])
all_join_df['date'] = all_join_df['event_time'].dt.date

all_join_df = all_join_df.sort_values(['hospitalization_id', 'event_time']).reset_index(drop=True)

all_join_df['device_category_ffill'] = all_join_df.groupby('hospitalization_id')['device_category'].ffill()
all_join_df['location_category_ffill'] = all_join_df.groupby('hospitalization_id')['location_category'].ffill()
# Assign day numbers to each 'hospitalization_id'
all_join_df['day_number'] = all_join_df.groupby('hospitalization_id')['date'].rank(method='dense').astype(int)

# Create the combo_key by combining 'hospitalization_id' and 'day_number'
all_join_df['hosp_id_day_key'] = all_join_df['hospitalization_id'].astype(str) + '_day_' + all_join_df['day_number'].astype(str)

In [15]:
all_join_df.to_csv('../output/intermediate/study_cohort.csv', index=False)