In [95]:
import pandas as pd


In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark import SparkConf
from pyspark.sql.window import Window



print("loaded libraries")
spark = SparkSession.builder \
        .appName("cohort identification") \
        .getOrCreate()

loaded libraries


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/24 15:00:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/24 15:00:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [80]:
# Get worst FiO2
resp_full = spark.read.option("header",True).csv('/project2/wparker/SIPA_data/RCLIF_respiratory_support_09282023.csv')
resp_full = resp_full.withColumn('recorded_time',f.to_timestamp('recorded_time','yyyy-MM-dd HH:mm:ss'))
resp_full = resp_full.filter(((f.col('recorded_time')>='2020-03-01 00:00:00') & 
                   (f.col('recorded_time')<='2022-03-31 11:59:59')))

resp_full = resp_full.select('C19_HAR_ID', 'device_name', 'recorded_time', 'fio2')
resp_full = resp_full.withColumn('meas_hour', f.hour(f.col('recorded_time')))
resp_full = resp_full.withColumn('meas_date', f.to_date(f.col('recorded_time')))

fio2 = resp_full.filter(f.col("fio2")!="NA")
fio2 = fio2.filter(f.col("device_name")!="NA")
fio2 = fio2.filter(f.col("device_name")!="Room Air")


fio2 = fio2.select('C19_HAR_ID', 'device_name', 'meas_date', 'meas_hour', 'fio2')

group_cols = ["C19_HAR_ID", "device_name", "meas_date", "meas_hour"]
fio2 = fio2.groupBy(group_cols) \
            .agg(f.max('fio2').alias("fio2"))


In [53]:
# Now need PaO2
labs = spark.read.parquet("/project2/wparker/SIPA_data/RCLIF_labs.parquet")


### Cleaning up values/columns
labs = labs.select('C19_HAR_ID', 'lab_result_time','lab_name', 'lab_value')

select_expr = [f.regexp_replace(f.col('lab_name'), "[\ufeff]", "").alias('lab_name')]
labs = labs.select('C19_HAR_ID', 'lab_result_time', 'lab_value', *select_expr)

labs = labs.filter(f.col("lab_name")=="pao2")

labs = labs.withColumn('lab_result_time',f.to_timestamp('lab_result_time','yyyy-MM-dd HH:mm:ss'))
labs = labs.filter(((f.col('lab_result_time')>='2020-03-01 00:00:00') & 
                   (f.col('lab_result_time')<='2022-03-31 11:59:59')))


select_expr = [f.regexp_replace(f.col('lab_value'), "[\ufeff]", "").alias('lab_value')]
labs = labs.select('C19_HAR_ID', 'lab_result_time', 'lab_name', *select_expr)

select_expr = [f.regexp_replace(f.col('lab_value'), "[<]", "").alias('lab_value')]
labs = labs.select('C19_HAR_ID', 'lab_result_time', 'lab_name', *select_expr)

select_expr = [f.regexp_replace(f.col('lab_value'), "[>]", "").alias('lab_value')]
labs = labs.select('C19_HAR_ID', 'lab_result_time', 'lab_name', *select_expr)

labs = labs.withColumn('meas_hour', f.hour(f.col('lab_result_time')))
labs = labs.withColumn('meas_date', f.to_date(f.col('lab_result_time')))
labs = labs.select('C19_HAR_ID', 'meas_date', 'meas_hour', 'lab_name', 'lab_value')
labs = labs.withColumn("lab_value_num",labs.lab_value.cast('double'))

group_cols = ["C19_HAR_ID","meas_date", "meas_hour"]
labs = labs.groupBy(group_cols) \
           .pivot("lab_name") \
           .agg(f.min('lab_value_num').alias("min"))


                                                                                

In [81]:
# Merge FiO2 and PaO2 to get FiO2/PaO2

fio2 = fio2.repartition('C19_HAR_ID')
labs = labs.repartition('C19_HAR_ID')

group_cols = ["C19_HAR_ID","meas_date", "meas_hour"]
df = fio2.join(labs, on=group_cols, how='full')

In [86]:
# Get first time on oxygen support & P/F <200

pf_df = df.withColumn("p_f", f.expr(
        """
        CASE
        WHEN fio2 IS NOT NULL AND pao2 IS NOT NULL THEN ( pao2 / fio2 )
        ELSE NULL
        END
        """
    ))
pf_df = pf_df.filter(f.col("p_f").isNotNull())
df = pf_df.filter(f.col("p_f")<200)
df = df.filter(f.col("device_name")!="Vent")
df = df.filter(f.col("device_name")!="NIPPV")


df = df.select("C19_HAR_ID","meas_date", "meas_hour", "device_name", "p_f","pao2","fio2")

order_cols = ["meas_date", "meas_hour"]
w1 = Window.partitionBy("C19_HAR_ID").orderBy(order_cols)

df_first = df.withColumn("row",f.row_number().over(w1)) \
             .filter(f.col("row") == 1).drop("row")

group_cols = ["C19_HAR_ID","device_name","meas_date", "meas_hour","fio2"]
df_first_with_time = df_first.join(resp_full, on=group_cols, how="left")
df_first_with_time = df_first_with_time.select("C19_HAR_ID","recorded_time", "meas_date", 
                                               "meas_hour","device_name","p_f","pao2","fio2")

w2 = Window.partitionBy("C19_HAR_ID").orderBy("recorded_time")

df_first_with_time = df_first_with_time.withColumn("row",f.row_number().over(w2)) \
             .filter(f.col("row") == 1).drop("row")

In [90]:
#get just invasive or non-invasive mechanical ventilation
vent = resp_full.filter(((f.col('device_name')=='Vent') | 
                   (f.col('device_name')=='NIPPV')))

# minimum time by person

w3 = Window.partitionBy("C19_HAR_ID").orderBy('recorded_time')

vent_first = vent.withColumn("row",f.row_number().over(w3)) \
             .filter(f.col("row") == 1).drop("row")

In [112]:
# Merge with oxygen support and P/F < 200 group, get first time meeting criteria
vent_first = vent_first.repartition('C19_HAR_ID')
df_first_with_time = df_first_with_time.repartition('C19_HAR_ID')

group_cols = ["C19_HAR_ID","recorded_time", "meas_date", "meas_hour","device_name"]
df = vent_first.join(df_first_with_time, on=group_cols, how='full')

resp_support = df.groupBy("C19_HAR_ID").agg(f.min("recorded_time").alias("resp_life_support_start"))


In [103]:
# Now pressors
df_meds = spark.read.option("header",True).csv('/project2/wparker/SIPA_data/RCLIF_meds_admin_conti.csv')
df_meds = df_meds.withColumn('admin_time',f.to_timestamp('admin_time','yyyy-MM-dd HH:mm:ss'))
df_meds = df_meds.filter(((f.col('admin_time')>='2020-03-01 00:00:00') & 
                   (f.col('admin_time')<='2022-03-31 11:59:59')))

In [114]:
pressors = df_meds.filter(((f.col('med_name')=='phenylephrine') | 
                       (f.col('med_name')=='epinephrine') | 
                       (f.col('med_name')=='vasopressin') | 
                       (f.col('med_name')=='dopamine') |
                       (f.col('med_name')=='dobutamine') |
                       (f.col('med_name')=='norepinephrine') |
                       (f.col('med_name')=='angiotensin') |
                       (f.col('med_name')=='isoproterenol')))
pressors = pressors.select("C19_HAR_ID", "admin_time")

pressors = pressors.groupBy("C19_HAR_ID").agg(f.min("admin_time").alias("pressor_life_support_start"))



In [117]:
df = pressors.join(resp_support, on='C19_HAR_ID', how='full')
df = df.withColumn("life_support_start", f.least(f.col('pressor_life_support_start'),
                                                 f.col('resp_life_support_start')))
df = df.select('C19_HAR_ID', 'life_support_start')
df.write.parquet("/project2/wparker/SIPA_data/life_support_cohort.parquet", mode="overwrite")

                                                                                