---
title: "Constructing an Analytic Dataset from CLIF"
author: "Rachel Baccile"
date: "2024-2-7"
categories: [longitudinal-dataset, analysis, SIPA]
tbl-colwidths: [75,25]
format:
  html:
    theme: sandstone
    css: styles.css  #
---

## 1. Cohort Identification
**Cohort Definition**: Data from UCMC COVID Datamart. Adult patients (18+) admitted January 1 2020 – March 31, 2022 and who recieved life support. Life support was defined as:

- Received vasoactive medications for shock, **or**
- Received invasive or non-invasive mechanical ventilation, **or**
- Received oxygen therapy with PaO2/FiO2 < 200 (S/F < 179 if no P/F measured)

Patients were excluded in they were discharged from the ED (including to hospice) and if they received < 6 hours of life support.

**CLIF tables used**:

- RCLIF_limited_identifers (admission date, discharge date)
- RCLIF_encounter_demographics_dispo (age at admission, disposition)
- RCLIF_adt (location in hospital)
- RCLIF_resp_support (respiratory support device, FiO2)
- RCLIF_labs (PaO2)
- RCLIF_vitals (SpO2)
- RCLIF_medication_adm_continuous (vasopressors)

### 1.1 Identify Adults during time period that were admitted to the hospital
1. Use RCLIF_limited_identifers to identify admissions in the time period of interest (variable: admission_dttm)
2. Use RCLIF_encounter_demographics_dispo to filter for only those 18+ at time of admission (variable: age_at_admission)
3. Use the ADT table to filter out those only seen in the ER (variable: dept_name)
4. "Explode" the data between admission_dttm and discharge_dttm to get hourly blocks for the duration of admission

```python

######## Load in limited IDs for admission dates in time period
limited_ids = spark.read.option("header",True).csv("/project2/wparker/SIPA_data/RCLIF_limited_identifers.csv")
limited_ids = limited_ids.withColumn('admission_datetime',f.to_timestamp('admission_date','yyyy-MM-dd HH:mm:ss'))
limited_ids = limited_ids.withColumn('discharge_datetime',f.to_timestamp('discharge_date','yyyy-MM-dd HH:mm:ss'))
limited_ids = limited_ids.select('C19_HAR_ID', 'admission_datetime', 'discharge_datetime', 'zip_code').distinct()

# Filter to time period
limited_ids = limited_ids.filter(((f.col('admission_datetime')>='2020-03-01') & 
                   (f.col('admission_datetime')<='2022-03-31')))
limited_ids = limited_ids.filter((f.col('discharge_datetime')>=f.col('admission_datetime')))
limited_ids = limited_ids.filter(f.col('discharge_datetime').isNotNull())
limited_ids = limited_ids.filter(f.col('admission_datetime').isNotNull())


######## Load in encounters for age and discharge disposition
demo_disp = spark.read.option("header",True).csv("/project2/wparker/SIPA_data/RCLIF_encounter_demographics_dispo.csv")
demo_disp = demo_disp.withColumn("age_at_admission",demo_disp.age_at_admission.cast('double'))
demo_disp = demo_disp.select('C19_PATIENT_ID', 'C19_HAR_ID', 'age_at_admission', 'disposition')

# Filter to adults only
demo_disp = demo_disp.filter(f.col('age_at_admission')>=18)

adults_in_time = limited_ids.join(demo_disp, on='C19_HAR_ID', how='inner')

# Exclude people only in ER
adt = spark.read.option("header",True).csv("/project2/wparker/SIPA_data/RCLIF_adt.csv")

adult_ids = adults_in_time.select('C19_HAR_ID')
adt_adults = adult_ids.join(adt, on='C19_HAR_ID', how='inner')
adt_adults = adt_adults.select('C19_HAR_ID', 'dept_name').distinct()
adt_adults = adt_adults.withColumn('count', f.lit(1))
adt_adults_wide = adt_adults.groupBy('C19_HAR_ID').pivot('dept_name').agg(f.sum('count'))
adt_adults_wide = adt_adults_wide.filter(f.col('ICU').isNotNull() |
                                         f.col('NA').isNotNull() |
                                         f.col('OR').isNotNull() |
                                         f.col('Ward').isNotNull())
adt_adults_wide = adt_adults_wide.select('C19_HAR_ID').distinct()

adults_in_time = adt_adults_wide.join(adults_in_time, on='C19_HAR_ID', how='left')
har_ids = adults_in_time.select('C19_HAR_ID', 'admission_datetime', 'discharge_datetime')

## Explode between admission and discharge to get all hourly timestamps
har_ids_hours = har_ids.withColumn('txnDt', f.explode(f.expr('sequence(admission_datetime, discharge_datetime, interval 1 hour)')))
har_ids_hours = har_ids_hours.withColumn('meas_hour', f.hour(f.col('txnDt')))
har_ids_hours = har_ids_hours.withColumn('meas_date', f.to_date(f.col('txnDt')))
har_ids_hours = har_ids_hours.select('C19_HAR_ID', 'txnDt', 'meas_date', 'meas_hour').orderBy('C19_HAR_ID', 'txnDt')

```

### 1.2 Determine if patients meet respiratory criteria
Use RCLIF_resp_support table to identify instances of invasive or non-invasive mechanical ventilation. If no invasive or non-invasive mechanical ventilation, use RCLIF_resp_support, RCLIF_labs, and RCLIF_vitals to calculate PaO2/FiO2 or Sp02.

#### 1.2.1 Prepare RCLIF_resp_support
1. Inner join RCLIF_resp_support to adults hospitalized in time period, identified in previous code block.
2. Filter out invalid FiO2 measurements
    - N removed: 3,440 observations; 376 unique encounters
3. Remove instances of CPAP
    - CPAP defined as device_name == "NIPPV" and without an FiO2 measurement, or when mode_name == "CPAP"
    - N removed: 357,851 observations; 128 unique encounters
4. If device_name was missing, fill in based on the following logic:
    - If fio2 ==.21 and lpm, peep, and set_volume are null, then device_name == Room Air
    - If fio2 ==.21, lpm==0, and peep and set_volume are null, then device_name == Room Air
    - If fio2, peep, and set_volume are null and lpm <=20 then device_name == Nasal Cannula
    - If fio2, peep, and set_volume are null and lpm >20 then device_name == High Flow NC
    - If fio2 and lpm are null and set_volumne is not null, then device_name == Vent
5. If device_name was Nasal Cannula but lpm > 20, then device_name== High Flow NC
6. If fio2 is missing and device_name == Room Air, then fio2== 0.21
7. If fio2 is missing and device_name == Nasal Cannula, then fio2 was calculated as: ( 0.24 + (0.04 * lpm) )

```python
######### Get worst FiO2

## Read in respiratory support table
resp_full = spark.read.option("header",True).csv('/project2/wparker/SIPA_data/RCLIF_resp_support.csv')
resp_full = resp_full.withColumn('recorded_time',f.to_timestamp('recorded_time','yyyy-MM-dd HH:mm:ss'))
resp_full = resp_full.withColumn("fio2",resp_full.fio2.cast('double'))
resp_full = resp_full.withColumn("lpm",resp_full.lpm.cast('double'))
resp_full = resp_full.withColumn("peep",resp_full.peep.cast('double'))
resp_full = resp_full.withColumn("set_volume",resp_full.peep.cast('double'))


## Filter to only adults in time frame
resp_full = har_ids.join(resp_full, on='C19_HAR_ID', how='inner')
##Filter to only variables we need
resp_full = resp_full.select('C19_PATIENT_ID', 'C19_HAR_ID', 'recorded_time', 'device_name', 'mode_name', 'mode_category','lpm', 'fio2', 'peep', 'set_volume')

## Filter for valid values or Null
resp_full = resp_full.filter((((f.col('fio2')>=0.21) &
                              (f.col('fio2')<=1)) |
                              (f.col('fio2').isNull())))
resp_full.select(f.countDistinct("C19_PATIENT_ID")).show()

## Filter out people on NIPPV without a FiO2 measurement--CPAP
nippv_test = resp_full.filter(f.col('device_name')=="NIPPV")
nippv_test.summary().show()

resp_full = resp_full.filter(~((f.col('device_name')=='NIPPV') &
                              (f.col('fio2').isNull())&
                              (f.col('lpm').isNull()) &
                              (f.col('peep').isNull())))

resp_full = resp_full.filter(~f.col('mode_name').rlike(r'CPAP'))
resp_full.select(f.countDistinct("C19_PATIENT_ID")).show()

## Replace "NA" strings with actual Nulls
resp_full.select('device_name').distinct().show()
resp_full = resp_full.withColumn('device_name', f.when(~f.col('device_name').rlike(r'NA'), f.col('device_name')))

## Try to fill in some of the null device names based on other values
resp_full = resp_full.withColumn('device_name_2', f.expr(
        """
        CASE
        WHEN device_name IS NOT NULL THEN device_name
        WHEN device_name IS NULL AND fio2 ==.21 AND lpm IS NULL AND peep IS NULL AND set_volume IS NULL THEN 'Room Air'
        WHEN device_name IS NULL AND fio2 IS NULL AND lpm ==0 AND peep IS NULL AND set_volume IS NULL THEN 'Room Air'
        WHEN device_name IS NULL AND fio2 IS NULL AND lpm <=20 AND peep IS NULL AND set_volume IS NULL THEN 'Nasal Cannula'
        WHEN device_name IS NULL AND fio2 IS NULL AND lpm >20 AND peep IS NULL AND set_volume IS NULL THEN 'High Flow NC'
        WHEN device_name IS NULL AND fio2 IS NULL AND lpm IS NULL AND set_volume IS NOT NULL THEN 'Vent'
        WHEN device_name == "Nasal Cannula" AND fio2 IS NULL AND lpm >20 THEN 'High Flow NC'
        ELSE NULL
        END
        """
))

## Try to fill in FiO2 based on LPM for nasal cannula
resp_full = resp_full.withColumn('fio2_combined', f.expr(
        """
        CASE
        WHEN fio2 IS NOT NULL THEN fio2
        WHEN fio2 IS NULL AND device_name_2 == 'Room Air' THEN .21
        WHEN fio2 IS NULL AND device_name_2 == 'Nasal Cannula' THEN ( 0.24 + (0.04 * lpm) )
        ELSE NULL
        END
        """
))
```

#### 1.2.1 Creating Respiratory Support Hourly Blocking
**Goal**: For each one hour interval, identify highest level of respiratory support and maximum FiO2.

1. Extract the measurement hour and measurement date from the recorded_time
2. Rank respiratory support devices
3. Group by C19_HAR_ID, measurement date, and measurement hour, then take the minimum of device_rank (highest level of support) and maximum of fio2

```python

## Extract hour and date for blocking
resp_full = resp_full.select('C19_HAR_ID', 'device_name_2', 'recorded_time', 'fio2_combined')
resp_full = resp_full.withColumn('meas_hour', f.hour(f.col('recorded_time')))
resp_full = resp_full.withColumn('meas_date', f.to_date(f.col('recorded_time')))

fio2 = resp_full.select('C19_HAR_ID', 'device_name_2', 'meas_date', 'meas_hour', 'fio2_combined').distinct()

## Need to rank devices to get max in hour
fio2 = fio2.withColumn("device_rank", f.expr(
        """
        CASE
        WHEN device_name_2 == 'Vent' THEN 1
        WHEN device_name_2 == 'NIPPV' THEN 2
        WHEN device_name_2 == 'High Flow NC' THEN 3
        WHEN device_name_2 == 'Face Mask' THEN 4 
        WHEN device_name_2 == 'Trach Collar' THEN 5
        WHEN device_name_2 == 'Nasal Cannula' THEN 6 
        WHEN device_name_2 == 'Other' THEN 7
        WHEN device_name_2 == 'Room Air' THEN 8
        WHEN device_name_2 IS NULL THEN NULL
        ELSE NULL
        END
        """
    ))

## Group by person, device, measurement date and measurement hour; get max FiO2 and LPM within each hour
group_cols = ["C19_HAR_ID", "meas_date", "meas_hour"]
fio2 = fio2.groupBy(group_cols) \
            .agg((f.max('fio2_combined').alias("fio2_combined")),
                  (f.min('device_rank').alias("device_rank"))).orderBy(group_cols)
fio2 = fio2.withColumn("device_name", f.expr(
        """
        CASE
        WHEN device_rank == 1 THEN 'Vent'
        WHEN device_rank == 2 THEN 'NIPPV' 
        WHEN device_rank == 3 THEN 'High Flow NC'
        WHEN device_rank == 4 THEN 'Face Mask' 
        WHEN device_rank == 5 THEN 'Trach Collar'
        WHEN device_rank == 6 THEN 'Nasal Cannula'
        WHEN device_rank == 7 THEN 'Other'
        WHEN device_rank == 8 THEN 'Room Air'
        WHEN device_rank IS NULL THEN NULL
        ELSE NULL
        END
        """
    ))
    
```

Now, you have a dataset with one observation per C19_HAR_ID per hour, with the values representing their worst respiratory state in that hour, but only for the hours that have a recorded value. For example, if someone is on a Nasal Cannula with a constant setting for many hours, only the hour that they were started on that device may have an entry. Therefore, we need to carry forward values until another value is recorded, indicating a change in respiratory support, discharge, or death. This will allow us to merge in the minimum PaO2 values per hour using the RCLIF_labs table (see section #) and ensure the PaO2 and FiO2 were measured within the same hour when calculating the PaO2/FiO2 ratio.

To do this, we:
1. Left join back to hourly blocked dataset created in previous code chunk
2. Carry forward values until a change or until discharge

```python

## Merge back to hourly blocked cohort table 
group_cols = ["C19_HAR_ID", "meas_date", "meas_hour"]
fio2_hours = har_ids_hours.join(fio2, on=group_cols, how='left').orderBy('C19_HAR_ID', 'txnDt').distinct()


## Carry forward device name until another device is recorded or the end of the measurement time window
fio2_hours = fio2_hours.withColumn('device_filled', 
                                       f.coalesce(f.col('device_name'), 
                                                  f.last('device_name', True) \
                                                  .over(Window.partitionBy('C19_HAR_ID') \
                                                        .orderBy('txnDt')), f.lit('NULL')))
fio2_hours = fio2_hours.withColumn('device_filled', 
                                       f.when(~f.col('device_filled').rlike(r'NULL'), f.col('device_filled')))

## Carry forward FiO2 measurement name until another device is recorded or the end of the measurement time window
fio2_hours = fio2_hours.withColumn("fio2_combined",fio2_hours.fio2_combined.cast('double'))

fio2_hours = fio2_hours.withColumn('fio2_filled', 
                                       f.when((f.col('fio2_combined').isNotNull()), f.col('fio2_combined')))
fio2_hours = fio2_hours.withColumn('fio2_filled', 
                                       f.coalesce(f.col('fio2_combined'), 
                                                  f.last('fio2_combined', True) \
                                                  .over(Window.partitionBy('C19_HAR_ID', 'device_filled') \
                                                        .orderBy('txnDt')), f.lit('NULL')))

fio2_filled = fio2_hours.select('C19_HAR_ID','txnDt','meas_date', 'meas_hour',
                                  'device_filled','fio2_filled').distinct()
                                  
```

Now, we need to bring in PaO2 values using the RCLIF_labs table to calculate PaO2/FiO2 ratio. We will conduct a similar process as above to create an hourly blocked dataset of the minimum PaO2 per hour.

1. Inner join RCLIF_labs to adults hospitalized in time period
2. Filter to just PaO2 labs
3. Group by C19_HAR_ID, measurement date, and measurement hour, then take the minimum lab_value
4. Left join back to hourly blocked dataset
5. Carry forward values for only 4 hours

```python

# Now need PaO2
labs = spark.read.parquet("/project2/wparker/SIPA_data/RCLIF_labs.parquet")

### Selecting only variables we need
labs = labs.select('C19_HAR_ID', 'lab_result_time','lab_name', 'lab_value')

### Filtering to only adults in time period
labs = labs.join(har_ids, on='C19_HAR_ID', how='inner')

### Filtering to only PaO2, formatting values
labs = labs.filter(f.col("lab_name")=="pao2")
labs = labs.withColumn("lab_value",labs.lab_value.cast('double'))
labs = labs.withColumn('lab_result_time',f.to_timestamp('lab_result_time','yyyy-MM-dd HH:mm:ss'))

# Get min PaO2 per hour
labs = labs.withColumn('meas_hour', f.hour(f.col('lab_result_time')))
labs = labs.withColumn('meas_date', f.to_date(f.col('lab_result_time')))
pao2 = labs.select('C19_HAR_ID', 'meas_date', 'meas_hour', 'lab_name', 'lab_value')

group_cols = ["C19_HAR_ID","meas_date", "meas_hour"]
pao2 = pao2.groupBy(group_cols) \
           .pivot("lab_name") \
           .agg(f.min('lab_value').alias("min")).orderBy(group_cols)


## Merge back to hourly blocked cohort table 
group_cols = ["C19_HAR_ID", "meas_date", "meas_hour"]
pao2_hours = har_ids_hours.join(pao2, on=group_cols, how='left').orderBy('C19_HAR_ID', 'txnDt').distinct()


## Carry forward PaO2 until next measurement or end of window, maximum 4 hours

### Get time of most recent PaO2
pao2_hours = pao2_hours.withColumn('last_measure', f.when(f.col('pao2').isNotNull(), f.col('txnDt')))
pao2_hours = pao2_hours.withColumn('last_measure', f.coalesce(f.col('last_measure'), 
                                                                  f.last('last_measure', True)\
                                                                  .over(Window.partitionBy('C19_HAR_ID')\
                                                                        .orderBy('txnDt')), f.lit('NULL')))

pao2_hours = pao2_hours.withColumn('last_measure',f.to_timestamp('last_measure','yyyy-MM-dd HH:mm:ss'))
pao2_hours = pao2_hours.withColumn('txnDt',f.to_timestamp('txnDt','yyyy-MM-dd HH:mm:ss'))

### Calculate time difference between the hour we're trying to fill and the most recent PaO2, filter to only 3 additional hrs (4 total)
pao2_hours = pao2_hours.withColumn("hour_diff", 
                                       (f.col("txnDt").cast("long")-f.col("last_measure").cast("long"))/(60*60))
pao2_hours_2 = pao2_hours.filter((f.col('hour_diff')>=0)&(f.col('hour_diff')<=3))

### Fill PaO2 forward
pao2_hours_2 = pao2_hours_2.withColumn('pao2_filled', f.when(f.col('pao2').isNotNull(), f.col('pao2')))
pao2_hours_2 = pao2_hours_2.withColumn('pao2_filled', f.coalesce(f.col('pao2'), 
                                                                 f.last('pao2', True)\
                                                                 .over(Window.partitionBy('C19_HAR_ID', 
                                                                                          'last_measure')\
                                                                       .orderBy('txnDt')), f.lit('NULL')))

pao2_filled = pao2_hours_2.select('C19_HAR_ID','meas_date', 'meas_hour', 'pao2_filled')

```

Next, we repeat the process with the RCLIF_vitals table to obtain SpO2 measurements. In the absence of a PaO2/FiO2 ratio, we use SpO2/FiO2 ratio to determine cohort inclusion.

```python

# Now need spO2
vitals = spark.read.parquet("/project2/wparker/SIPA_data/RCLIF_vitals.parquet")

### Filtering to only adults in time period
vitals = vitals.join(har_ids, on='C19_HAR_ID', how='inner')

### Formatting values
vitals = vitals.withColumn('measured_time',f.to_timestamp('recorded_time','yyyy-MM-dd HH:mm:ss'))
vitals = vitals.withColumn("vital_value",vitals.vital_value.cast('double'))

### Selecting variables we need
vitals = vitals.select('C19_HAR_ID', 'measured_time','vital_name', 'vital_value')
vitals = vitals.withColumn('meas_hour', f.hour(f.col('measured_time')))
vitals = vitals.withColumn('meas_date', f.to_date(f.col('measured_time')))

### Filtering to only SpO2, valid values
spo2 = vitals.filter(f.col("vital_name")=="spO2")
spo2 = spo2.select('C19_HAR_ID', 'meas_date', 'meas_hour', 'vital_name', 'vital_value')
spo2 = spo2.filter(f.col('vital_value')>60)
spo2 = spo2.filter(f.col('vital_value')<=100)

# Get min SpO2 per hour

group_cols = ["C19_HAR_ID","meas_date", "meas_hour"]
spo2 = spo2.groupBy(group_cols) \
           .pivot("vital_name") \
           .agg(f.min('vital_value').alias("min"))

## Merge back to hourly blocked cohort table 
group_cols = ["C19_HAR_ID", "meas_date", "meas_hour"]
spo2_hours = har_ids_hours.join(spo2, on=group_cols, how='left').orderBy('C19_HAR_ID', 'txnDt').distinct()


## Cary forward SpO2
spo2_hours = spo2_hours.withColumn('spO2_filled', 
                                           f.when(f.col('spO2').isNotNull(), f.col('spO2')))
spo2_hours = spo2_hours.withColumn('spO2_filled', 
                                           f.coalesce(f.col('spO2'), 
                                                      f.last('spO2', True)\
                                                      .over(Window.partitionBy('C19_HAR_ID', 'last_measure')\
                                                            .orderBy('txnDt')), f.lit('NULL')))

spO2_filled = spo2_hours.select('C19_HAR_ID','meas_date', 'meas_hour', 'spO2_filled')


```

Now that we have hourly blocked respiratory support, FiO2, PaO2, and SpO2, we can full join these tables to determine the first hour a patient met the respiratory inclusion criteria (the minimum recorded time of either FiO2/PaO2 < 200, SpO2/FiO2 <179, non-invasive ventilation, or invasive ventilation). I also write this hourly blocked dataset for later use in the 48-hour analytic dataset.

```python 

# Merge FiO2, PaO2, spO2 to get FiO2/PaO2

group_cols = ["C19_HAR_ID","meas_date", "meas_hour"]
df = fio2_filled.join(pao2_filled, on=group_cols, how='full')
df = df.join(spO2_filled, on=group_cols, how='full').orderBy(group_cols)

df = df.withColumn("fio2_filled",df.fio2_filled.cast('double'))
df = df.withColumn("pao2_filled",df.pao2_filled.cast('double'))
df = df.withColumn("spO2_filled",df.spO2_filled.cast('double'))

# Get first time on oxygen support & P/F <200
df = df.withColumn("p_f", f.expr(
        """
        CASE
        WHEN fio2_filled IS NOT NULL AND pao2_filled IS NOT NULL THEN ( pao2_filled / fio2_filled )
        ELSE NULL
        END
        """
    ))

df = df.withColumn("s_f", f.expr(
        """
        CASE
        WHEN fio2_filled IS NOT NULL AND spO2_filled IS NOT NULL THEN ( spO2_filled / fio2_filled )
        ELSE NULL
        END
        """
    ))

df = df.distinct()
df.write.parquet("/project2/wparker/SIPA_data/p_f_combined_filled.parquet", mode="overwrite")

## Get first time somone on oxygen therapy with PaO2/FiO2 < 200 (S/F < 179 if no P/F measured) and invasive and non-invasive ventilation

df = df.filter((((f.col("p_f")<200))|
                (f.col("s_f")<179) |
                (f.col('device_filled')=='Vent') | 
                (f.col('device_filled')=='NIPPV')))


df = df.select("C19_HAR_ID", "txnDt", "device_filled","pao2_filled","fio2_filled", "spO2_filled", "p_f", "s_f")

w1 = Window.partitionBy("C19_HAR_ID").orderBy('txnDt')

df_first_with_time = df.withColumn("row",f.row_number().over(w1)) \
             .filter(f.col("row") == 1).drop("row")

df_first_with_time = df_first_with_time.select("C19_HAR_ID", "txnDt").withColumnRenamed("txnDt", "recorded_time")

resp_support = df_first_with_time.groupBy("C19_HAR_ID").agg(f.min("recorded_time").alias("resp_life_support_start")).distinct()

```

### 1.3 Determine if patients received vasopressors

Using the RCLIF_medication_admin_continuous table, we identify the first instance a patient is started on a vasopressor. We inner join the adults hospitalized in the time period to the RCLIF_medication_admin_continuous table, then filter to only vasopressor medications, then take the first time per patient per encounter to use as the index time.

``` python
# Now pressors
df_meds = spark.read.option("header",True).csv('/project2/wparker/SIPA_data/RCLIF_meds_admin_conti.csv')
df_meds = df_meds.withColumn('admin_time',f.to_timestamp('admin_time','yyyy-MM-dd HH:mm:ss'))

# Filter to pressor medications

pressors = df_meds.filter(f.col('med_category')=='vasoactives')
pressors = pressors.select("C19_HAR_ID", "admin_time")

# Get first time someone is on a pressor
pressors = pressors.groupBy("C19_HAR_ID").agg(f.min("admin_time").alias("pressor_life_support_start"))
pressors = pressors.join(har_ids, on='C19_HAR_ID', how='inner').distinct()
```

### 1.4 Determine start of life support

The start of life support will be the index time for constructing the 48 hour analytic dataset (41 hours before, the index hour, and 6 hours after). To do that, we take the minimum between the time respiratory criteria was met and the time vasopressors were started. Now we have a cohort identified with the first hour block life support was initiated (index time). 

We also calculate the start of the window time and end of the window time, and "explode" between these times to create the base 48 hour blocked dataset for the cohort that subsequent data will be left joined to. 

```python

# Get first time on life support

df = pressors.join(resp_support, on='C19_HAR_ID', how='full')
df = df.withColumn("life_support_start", f.least(f.col('pressor_life_support_start'),
                                                 f.col('resp_life_support_start')))
df = df.filter(f.col('life_support_start').isNotNull())

df = df.withColumn('window_start', (f.col('life_support_start')-f.expr("INTERVAL 41 HOURS")))
df = df.withColumn('window_end', (f.col('life_support_start')+f.expr("INTERVAL 6 HOURS")))
df = df.select("C19_HAR_ID", "life_support_start", "window_start", "window_end")

## Re-join age at admission and disposition, admission & discharge dttm, and zip
df = df.join(limited_ids, on='C19_HAR_ID', how='left')
df = df.join(demo_disp, on='C19_HAR_ID', how='left')
df = df.select("C19_HAR_ID", "C19_PATIENT_ID", "zip_code", "admission_datetime", "discharge_datetime", "life_support_start", "window_start", "window_end", "age_at_admission",
               "disposition")

df.write.parquet("/project2/wparker/SIPA_data/life_support_cohort.parquet", mode="overwrite")

## Explode between window to get all hourly timestamps
cohort_hours = df.withColumn('txnDt', 
                                   f.explode(f.expr('sequence(window_start, window_end, interval 1 hour)')))
cohort_hours = cohort_hours.withColumn('meas_hour', f.hour(f.col('txnDt')))
cohort_hours = cohort_hours.withColumn('meas_date', f.to_date(f.col('txnDt')))
cohort_hours = cohort_hours.select("C19_HAR_ID", "C19_PATIENT_ID", "zip_code", "admission_datetime", "discharge_datetime", "life_support_start", "window_start", "window_end", 
                                   "age_at_admission", "disposition", "meas_date", "meas_hour").distinct()

cohort_hours.write.parquet("/project2/wparker/SIPA_data/life_support_cohort_48.parquet", mode="overwrite")

```

## 2. Constructing a 48 Hour Analytic Dataset

### 2.1 48 Hour Labs

To construct the 48 hour blocked labs dataset, we:
1. Read in RCLIF_labs and cohort identified in previous section
2. Left join RCLIF_labs to cohort
3. Extract the date and hour of each lab, find minimum and maximum per hour per person per encounter

```python
## Read in Labs
labs = spark.read.parquet("/project2/wparker/SIPA_data/RCLIF_labs.parquet")
labs = labs.withColumn("lab_value",labs.lab_value.cast('double'))
labs = labs.withColumn('lab_result_time',f.to_timestamp('lab_result_time','yyyy-MM-dd HH:mm:ss'))

labs = labs.withColumn('meas_hour', f.hour(f.col('lab_result_time')))
labs = labs.withColumn('meas_date', f.to_date(f.col('lab_result_time')))
labs = labs.select('C19_HAR_ID', 'meas_date', 'meas_hour', 'lab_name', 'lab_value')

## Left join to cohort
cohort = spark.read.parquet("/project2/wparker/SIPA_data/life_support_cohort.parquet")
cohort = cohort.withColumn('life_support_start_time',f.to_timestamp('life_support_start','yyyy-MM-dd HH:mm:ss'))
cohort = cohort.select('C19_HAR_ID', 'life_support_start_time')

cohort_labs = cohort.join(labs, on="C19_HAR_ID", how="left")

cohort_labs = cohort_labs.select('C19_HAR_ID', 'life_support_start_time','meas_date', 'meas_hour', 
                                       'lab_name', 'lab_value')
cohort_labs = cohort_labs.filter(f.col('lab_name').isNotNull())
cohort_labs = cohort_labs.filter(f.col('lab_value').isNotNull())

## Get min and max for each time lab
group_cols = ["C19_HAR_ID", "life_support_start_time",'meas_date', 'meas_hour']
cohort_labs_wide = cohort_labs.groupBy(group_cols) \
                                     .pivot("lab_name") \
                                     .agg(f.min('lab_value').alias("min"),
                                         f.max('lab_value').alias("max")).orderBy(group_cols)

```

Next, we:

4. Left join the hourly min and max of all the labs from the previous step to the hourly blocked cohort dataset
5. Forward fill lab values. For this example, I have only forward filled the three lab values we wanted to use in a 48 hour analytic dataset for SIPA (creatinine, bilirubin, and platelet count). Future work will include making this a function, and filling all min and max lab values.

```python

## Join to cohort hourly blocked dataset
cohort_hours = spark.read.parquet("/project2/wparker/SIPA_data/life_support_cohort_48.parquet")

group_cols = ["C19_HAR_ID", 'life_support_start', 'meas_date', 'meas_hour']
cohort_labs_48 = cohort_hours.join(cohort_labs_wide, on=group_cols, how='left')

## Carry forward labs we need for SIPA

cohort_labs_48 = cohort_labs_48.withColumn('billirubin_max_filled', 
                                       f.coalesce(f.col('bilirubin_total_max'), 
                                                  f.last('bilirubin_total_max', True) \
                                                  .over(Window.partitionBy('C19_HAR_ID') \
                                                        .orderBy(group_cols)), f.lit('NULL')))


cohort_labs_48 = cohort_labs_48.withColumn('platelet_count_min_filled', 
                                       f.coalesce(f.col('platelet_count_min'), 
                                                  f.last('platelet_count_min', True) \
                                                  .over(Window.partitionBy('C19_HAR_ID') \
                                                        .orderBy(group_cols)), f.lit('NULL')))


cohort_labs_48 = cohort_labs_48.withColumn('creatinine_max_filled', 
                                       f.coalesce(f.col('creatinine_max'), 
                                                  f.last('creatinine_max', True) \
                                                  .over(Window.partitionBy('C19_HAR_ID') \
                                                        .orderBy(group_cols)), f.lit('NULL')))



cohort_labs_48.write.parquet("/project2/wparker/SIPA_data/cohort_labs_48.parquet", mode="overwrite")
```

Lastly, we summarize the values of the additional labs we want for SIPA (BUN, pH, potassium) and save a dataset of one observation per person/encounter with the worst values of all selected labs.

```python
## Summarize to one row per person
group_cols = ['C19_HAR_ID', 'life_support_start', 'window_start', 'window_end']
cohort_labs_48_summary = cohort_labs_48.groupBy(group_cols) \
                                     .agg(f.min('creatinine_max_filled').alias("creatinine_max_filled"),
                                         f.max('platelet_count_min_filled').alias("platelet_count_min_filled"),
                                         f.max('billirubin_max_filled').alias("billirubin_max_filled"),
                                         f.max('potassium_max').alias("potassium_max"),
                                         f.max('bun_max').alias("bun_max"),
                                         f.min('ph_venous_min').alias("ph_venous_min"))\
                                    .distinct()\
                                    .orderBy("life_support_start")

cohort_labs_48_summary.write.parquet("/project2/wparker/SIPA_data/cohort_labs_48_summary.parquet", 
                                       mode="overwrite")
```

### 2.2 48 Hour Vitals

We repeat the process to create the 48 hour dataset for vitals. For vitals, we only carry forward height and weight. We also calculate MAP using the hourly SBP and DBP when MAP is not available before summarizing to one obervation per person/encounter to ensure the SBP and DBP are measured within the same hour.

```python
## Read in Vitals
vitals = spark.read.parquet("/project2/wparker/SIPA_data/RCLIF_vitals.parquet")
vitals = vitals.withColumn('measured_time',f.to_timestamp('recorded_time','yyyy-MM-dd HH:mm:ss'))

vitals = vitals.withColumn('meas_hour', f.hour(f.col('measured_time')))
vitals = vitals.withColumn('meas_date', f.to_date(f.col('measured_time')))
vitals = vitals.withColumn("vital_value",vitals.vital_value.cast('double'))

## Left join to cohort
cohort = spark.read.parquet("/project2/wparker/SIPA_data/life_support_cohort.parquet")
cohort = cohort.withColumn('life_support_start',f.to_timestamp('life_support_start','yyyy-MM-dd HH:mm:ss'))
cohort = cohort.withColumn('window_start',f.to_timestamp('window_start','yyyy-MM-dd HH:mm:ss'))
cohort = cohort.withColumn('window_end',f.to_timestamp('window_end','yyyy-MM-dd HH:mm:ss'))

cohort_vitals = cohort.join(vitals, on="C19_HAR_ID", how="left")
cohort_vitals = cohort_vitals.filter(f.col('vital_name').isNotNull())
cohort_vitals = cohort_vitals.filter(f.col('vital_value').isNotNull())
cohort_vitals = cohort_vitals.filter(f.col('vital_value')>=0)


## Filter to only vitals in window
cohort_vitals_window = cohort_vitals.filter((f.col('measured_time') >= f.col('window_start')) &
                              (f.col('measured_time') <= f.col('window_end')))

cohort_vitals_wide = cohort_vitals_window.select('C19_HAR_ID', 'life_support_start','meas_date', 'meas_hour', 
                                       'vital_name', 'vital_value')

## Get min and max for each time vital
group_cols = ["C19_HAR_ID", "life_support_start",'meas_date', 'meas_hour']
cohort_vitals_wide = cohort_vitals_wide.groupBy(group_cols) \
                                     .pivot("vital_name") \
                                     .agg(f.min('vital_value').alias("min"),
                                         f.max('vital_value').alias("max")).orderBy(group_cols)


## Join to cohort hourly blocked dataset
cohort_hours = spark.read.parquet("/project2/wparker/SIPA_data/life_support_cohort_48.parquet")

group_cols = ["C19_HAR_ID", 'life_support_start', 'meas_date', 'meas_hour']
cohort_vitals_48 = cohort_hours.join(cohort_vitals_wide, on=group_cols, how='left')


## Carry forward height and weight only for SIPA
cohort_vitals_48 = cohort_vitals_48.withColumn('weight_filled', 
                                       f.coalesce(f.col('weight_max'), 
                                                  f.last('weight_max', True) \
                                                  .over(Window.partitionBy('C19_HAR_ID') \
                                                        .orderBy('txnDt')), f.lit('NULL')))

cohort_vitals_48 = cohort_vitals_48.withColumn('height_filled', 
                                       f.coalesce(f.col('height_max'), 
                                                  f.last('height_max', True) \
                                                  .over(Window.partitionBy('C19_HAR_ID') \
                                                        .orderBy('txnDt')), f.lit('NULL')))


## Fill in MAP using SBP and DBP--these need to be measured in same hour, so need to calculate before summarizing 
cohort_vitals_48 = cohort_vitals_48.withColumn("MAP_for_sofa", f.expr(
        """
        CASE
        WHEN MAP_min IS NOT NULL THEN MAP_min
        WHEN MAP_min IS NULL AND sbp_min IS NOT NULL AND dbp_min IS NOT NULL THEN ( sbp_min + 2.0 * dbp_min ) / 3.0
        ELSE NULL
        END
        """
    ))
cohort_vitals_48.write.parquet("/project2/wparker/SIPA_data/cohort_vitals_48.parquet", mode="overwrite")


## Summarize to one observation per person/encounter
group_cols = ['C19_HAR_ID', 'life_support_start', 'window_start', 'window_end']

cohort_vitals_48_summary = cohort_vitals_48.groupBy(group_cols) \
                                     .agg(f.max('weight_filled').alias("weight_filled"),
                                         f.max('height_filled').alias("height_filled"),
                                         f.min('MAP_for_sofa').alias("MAP_for_sofa"))\
                                    .distinct()\
                                    .orderBy("life_support_start")

cohort_vitals_48_summary.write.parquet("/project2/wparker/SIPA_data/cohort_vitals_48_summary.parquet", 
                                         mode="overwrite")
```

### 2.3 Combining respiratory support, vitals, and medication to determine hours of life support received

We start with our base hourly blocked dataset for our cohort. We then left join the hourly blocked respiratory support and hourly blocked vitals created previously. Next, we bring in vasopressor medications and sum the number of vasopressors received per hour and whether it was dobutamine alone.

We capped the number of pressors received at 4, likely because they were being transitioned off some drugs and onto others. Clinically more than 4 would not be used at one time, so we set that as the maximum.

```python

## Read in cohort hourly blocked base dataset
group_cols = ["C19_HAR_ID",'meas_date', 'meas_hour']
cohort_hours = spark.read.parquet("/project2/wparker/SIPA_data/life_support_cohort_48.parquet").orderBy(group_cols)

## Bring in respiratory support hourly blocked dataset
resp_support = spark.read.parquet("/project2/wparker/SIPA_data/p_f_combined_filled.parquet").orderBy(group_cols)
resp_support = resp_support.select('C19_HAR_ID', 'meas_date', 'meas_hour', 'device_filled', 'fio2_filled', 'pao2_filled', 'spO2_filled', 'p_f', 's_f')

## Left join respiratory support to cohort hourly blocked
cohort_resp_support = cohort_hours.join(resp_support, on=group_cols, how="left").orderBy(group_cols)

# Bring in worst vitals. Only need MAP for SIPA
vitals = spark.read.parquet("/project2/wparker/SIPA_data/cohort_vitals_48.parquet")
vitals = vitals.select('C19_HAR_ID', 'meas_date', 'meas_hour', 'MAP_for_SOFA')

cohort_resp_vitals = cohort_resp_support.join(vitals, on=group_cols, how="left").orderBy(group_cols)

## bring in meds to get flag for life support yes/no
# Now pressors
df_meds = spark.read.parquet('/project2/wparker/SIPA_data/RCLIF_medication_admin_continuous.parquet')
df_meds = df_meds.withColumn('admin_time',f.to_timestamp('admin_dttm','yyyy-MM-dd HH:mm:ss'))
df_meds = df_meds.withColumnRenamed("encounter_id", "C19_HAR_ID")

# Filter to pressor medications

pressors = df_meds.filter(f.col('med_category')=='vasoactives')
pressors = pressors.select("C19_HAR_ID", "admin_time", "med_name")

## Get cohort index times
cohort = spark.read.parquet("/project2/wparker/SIPA_data/life_support_cohort.parquet")
cohort = cohort.select('C19_HAR_ID', 'window_start', 'window_end').distinct()

## Get all pressors in 48 hour window
cohort_pressors = cohort.join(pressors,'C19_HAR_ID','left')

cohort_pressors = cohort_pressors.filter((f.col('admin_time') >= f.col('window_start')) &
                              (f.col('admin_time') <= f.col('window_end')))

cohort_pressors_48 = cohort_pressors.withColumn('meas_hour', f.hour(f.col('admin_time')))
cohort_pressors_48 = cohort_pressors_48.withColumn('meas_date', f.to_date(f.col('admin_time')))

cohort_pressors_48 = cohort_pressors_48.select('C19_HAR_ID','meas_hour','meas_date', 'med_name').distinct()

## Get number of pressors per hour
w2 = Window.partitionBy(group_cols).orderBy("med_name")

group_cols = ['C19_HAR_ID','meas_hour','meas_date', 'med_name']

cohort_pressors_grouped = cohort_pressors_48.withColumn("row",f.row_number().over(w2))
cohort_pressors_grouped = cohort_pressors_48.withColumn("count",f.lit(1))

## Get count of pressors per hour
group_cols = ["C19_HAR_ID",'meas_date', 'meas_hour']
cohort_pressors_grouped = cohort_pressors_grouped.groupBy(group_cols) \
                                     .pivot("med_name") \
                                     .agg(f.count('count').alias("count")).orderBy(group_cols)

cohort_pressors_grouped = cohort_pressors_grouped.withColumn("dobutamine_alone", f.expr(
        """
        CASE
        WHEN dobutamine IS NOT NULL AND dopamine IS NULL AND epinephrine IS NULL AND isoproterenol IS NULL AND milrinone IS NULL AND norepinephrine IS NULL AND phenylephrine IS NULL AND vasopressin IS NULL THEN 1
        ELSE 0
        END
        """
    ))

col_list = ['dobutamine', 'dopamine', 'epinephrine', 'isoproterenol', 'milrinone', 'norepinephrine', 'phenylephrine', 'vasopressin']
cohort_pressors_grouped = cohort_pressors_grouped.na.fill(0, subset=col_list)
cohort_pressors_grouped = cohort_pressors_grouped.withColumn('num_pressors', sum([f.col(c) for c in col_list]))

cohort_pressors_grouped = cohort_pressors_grouped.withColumn("num_pressors", f.expr(
        """
        CASE
        WHEN num_pressors > 4 THEN 4
        ELSE num_pressors
        END
        """
    ))
cohort_pressors_grouped = cohort_pressors_grouped.select('C19_HAR_ID', 'meas_date', 'meas_hour', 'num_pressors', 'dobutamine_alone')

group_cols = ['C19_HAR_ID','meas_date','meas_hour']
cohort_resp_vitals_pressors = cohort_resp_vitals.join(cohort_pressors_grouped, on=group_cols, how="left")

col_list = ['num_pressors', 'dobutamine_alone']
cohort_resp_vitals_pressors = cohort_resp_vitals_pressors.na.fill(0, subset=col_list)

```

Next, we want to exclude people who received fewer than 6 hours of life support. So we create a flag for if someone received life support in that hour, defined as receiving vasopressor medications, non-invasive or invasive mechanical ventilation, a PaO2/FiO2 ratio less than 200, or an SpO2/FiO2 ratio less than 179. We sum and filter out those receiveing less than 6 hours of life support.

```python

##Flag if on life support at each hour

cohort_resp_vitals_pressors = cohort_resp_vitals_pressors.withColumn("on_life_support", f.expr(
        """
        CASE
        WHEN num_pressors >=1 THEN 1
        WHEN device_filled == 'NIPPV' THEN 1
        WHEN device_filled == 'Vent' THEN 1
        WHEN  p_f < 200 THEN 1
        WHEN s_f < 179 THEN 1
        ELSE 0
        END
        """
    ))

cohort_life_support_count = cohort_resp_vitals_pressors.groupBy('C19_HAR_ID') \
                                            .agg(f.sum('on_life_support').alias('life_support_count')) \
                                            .filter(f.col('life_support_count')>=6)

df = cohort_life_support_count.join(cohort_resp_vitals_pressors, on='C19_HAR_ID', how="left")

```

### 2.4 Merging scores, labs, and dialysis flag

Lastly, we repeat a similar procedure to left join hourly blocked minimum scores, labs, and a flag for whether a person was receiving dialysis. 

```python

## add glasgow score
scores = spark.read.option("header",True).csv('/project2/wparker/SIPA_data/RCLIF_scores_10192023.csv')
scores = scores.withColumn('score_time',f.to_timestamp('score_time','yyyy-MM-dd HH:mm:ss'))
scores = scores.withColumn('meas_hour', f.hour(f.col('score_time')))
scores = scores.withColumn('meas_date', f.to_date(f.col('score_time')))
scores = scores.withColumn("score_value",scores.score_value.cast('double'))

## Get min GCS
group_cols = ["C19_HAR_ID",'meas_date', 'meas_hour']
scores_wide = scores.groupBy(group_cols) \
                                     .pivot("score_name") \
                                     .agg(f.min('score_value').alias("min")).orderBy(group_cols)
scores_wide = scores_wide.withColumnRenamed('NUR RA GLASGOW ADULT SCORING', 'min_gcs_score')

cohort_scores_48 = cohort_hours.join(scores_wide, on=group_cols, how="left")
cohort_scores_48 = cohort_scores_48.select('C19_HAR_ID', 'meas_date', 'meas_hour', 'min_gcs_score')

df = df.join(cohort_scores_48, on=group_cols, how="left")

## Add labs
labs = spark.read.parquet("/project2/wparker/SIPA_data/cohort_labs_48.parquet")
labs = labs.select('C19_HAR_ID', 'meas_date', 'meas_hour', 'billirubin_max_filled', 'potassium_max', 'bun_max', 'ph_venous_min',
                         'platelet_count_min_filled', 'creatinine_max_filled')
df = df.join(labs, on=group_cols, how="left")

## add dialysis flag
dialysis = spark.read.option("header",True).csv('/project2/wparker/SIPA_data/RCLIF_dialysis_flag_only.csv')
dialysis = dialysis.withColumn('recorded_time',f.to_timestamp('recorded_time','yyyy-MM-dd HH:mm:ss'))
dialysis = dialysis.withColumn('meas_hour', f.hour(f.col('recorded_time')))
dialysis = dialysis.withColumn('meas_date', f.to_date(f.col('recorded_time')))

dialysis = dialysis.select('C19_HAR_ID', 'meas_date', 'meas_hour', 'on_dialysis') \
                .distinct() \
                .groupBy(group_cols) \
                .agg(f.max('on_dialysis').alias('on_dialysis'))
df = df.join(dialysis, on=group_cols, how="left")

df.repartition(1).write.csv("/project2/wparker/SIPA_data/sipa_df_48.csv", mode='overwrite', header="true")

```