In [1]:
import pandas as pd
import numpy as np
import os
import math
import sys
from datetime import datetime
import pdb
import argparse
import random

import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import functions as sf

sys.path.append('/cluster/home/faltysm/source/2020_VolumeChallenge/common')
from spark_common import get_spark_session
from spark_common import get_included_subset

In [49]:
VC_PATH="/cluster/work/grlab/clinical/hirid2/research/faltysm/volume_challenge/patient_stay/2020-11-12"
BOLUS_PATH="/cluster/work/grlab/clinical/hirid2/research/faltysm/volume_challenge/gap_closed/bolus/2020-12-05"
ISOBOLUS_PATH="/cluster/work/grlab/clinical/hirid2/research/faltysm/volume_challenge/gap_closed/bolus_isolated/2020-12-05/augmented/2020-12-05"
HIRID2_PATH="/cluster/work/grlab/clinical/hirid2/pg_db_export/p_generaldata"
MEASUREMENTS_PATH="/cluster/work/grlab/clinical/hirid2/research/faltysm/volume_challenge/gap_closed/measurements_around_bolus/2020-12-05"
CONTROL_MEASUREMENTS_PATH="/cluster/work/grlab/clinical/hirid2/research/faltysm/volume_challenge/gap_closed/measurements_around_non_bolus/2020-12-05"
NONBOLUS_PATH="/cluster/work/grlab/clinical/hirid2/research/faltysm/volume_challenge/gap_closed/non_bolus/2020-12-05/augmented/2020-12-05"
STATIC_PATH='/cluster/work/grlab/clinical/hirid2/research/1a_hdf5_clean/v8/static.h5'

spark = get_spark_session(8, 1024, 64)

<h2>Statistics from Data preprocessing (external)</h2>
Consenting patients admitted after 08: 60559<br>
Consenting patients admitted after 08, aged within [16,100]: 60504 (only these are included in the static file)

In [50]:
ENDPOINT = "cardiac"

count_adm_after2008 = spark.read.parquet(HIRID2_PATH).where('YEAR(admissiontime) >= 2008').count()
count_gc_adm_after2008 = 60559  #from xinrui exclusion
vc_data_all = spark.read.parquet(VC_PATH)

#subset patients to study group
if ENDPOINT == "cardiac":
    vc_data = vc_data_all.where(sf.col("adm_codeid").isin({8699,8700,8701,9085,9086,9087,9088,9089}))
    PSM_FULL_PATH="/cluster/work/grlab/clinical/hirid2/research/faltysm/volume_challenge/gap_closed/psm_cardiac/2020-12-30"


In [51]:
print(f"Total stays: {count_adm_after2008}")
print(f"Excluded GC: {count_adm_after2008-count_gc_adm_after2008} ({count_gc_adm_after2008})")
print(f"Excluded Cohort: {vc_data_all.count()-vc_data.count()} ({vc_data.count()})")
vc_data_age = vc_data.where('age_criterion=True')
count_included = vc_data_age.where('invasive_hr_measurement_criterion=True and invasive_bp_measurement_criterion=True and age_criterion=True and study_drug_criterion=True and ecmo_criterion=True').count()
print(f"Excluded Age: {vc_data.where('age_criterion=False').count()}")
print(f"Excluded missing HR: {vc_data_age.where('invasive_hr_measurement_criterion=False').count()}")
print(f"Excluded missing inv. BP: {vc_data_age.where('invasive_bp_measurement_criterion=False').count()}")
print(f"Excluded study drug: {vc_data_age.where('study_drug_criterion=False').count()}")
print(f"Excluded ecmo/impella: {vc_data_age.where('ecmo_criterion=False').count()}")
print(f"Total included stays: {count_included}")
print(f"Total excluded gc valid stays: {count_gc_adm_after2008-count_included}")

Total stays: 64491
Excluded GC: 3932 (60559)
Excluded Cohort: 52001 (8503)
Excluded Age: 5
Excluded missing HR: 0
Excluded missing inv. BP: 2
Excluded study drug: 2
Excluded ecmo/impella: 54
Total included stays: 8440
Total excluded gc valid stays: 52119


In [5]:
ds_bolus = spark.read.parquet(BOLUS_PATH)
ds_isobolus = spark.read.parquet(ISOBOLUS_PATH)
ds_nonbolus = spark.read.parquet(NONBOLUS_PATH)
ds_measurements = spark.read.parquet(MEASUREMENTS_PATH)
ds_control_measurements = spark.read.parquet(CONTROL_MEASUREMENTS_PATH)

In [6]:
#subset patients to study group
if ENDPOINT == "cardiac":
    ds_bolus = ds_bolus.join(vc_data, (ds_bolus.patientid == vc_data.patientid), how='left').drop(vc_data.patientid).where(sf.col("adm_codeid").isin({8699,8700,8701,9085,9086,9087,9088,9089}))
    ds_isobolus = ds_isobolus.join(ds_measurements, (ds_isobolus.patientid == ds_measurements.patientid) & (ds_isobolus.bolusid == ds_measurements.bolusid), how='left').drop(ds_measurements.patientid).drop(ds_measurements.bolusid).where(sf.col("adm_codeid_baseline").isin({8699,8700,8701,9085,9086,9087,9088,9089}))
    ds_nonbolus = ds_nonbolus.join(ds_control_measurements, (ds_nonbolus.patientid == ds_control_measurements.patientid) & (ds_nonbolus.bolusid == ds_control_measurements.bolusid), how='left').drop(ds_control_measurements.patientid).drop(ds_control_measurements.bolusid).where(sf.col("adm_codeid_baseline").isin({8699,8700,8701,9085,9086,9087,9088,9089}))

In [7]:
print(f"Patients count with iso/bolus: {ds_isobolus.select('patientid').distinct().count()}/{ds_bolus.select('patientid').distinct().count()}")
print(f"Bolus count: {ds_bolus.count()}")
print(f"Not isolated excluded: {ds_bolus.count() - ds_isobolus.count()}")
print(f"Missing HR or MAP during bolus observation time: {ds_isobolus.where('vm1_criterion = 0 or vm5_criterion = 0').count()}")
print(f"Bolus duration > 30min {ds_isobolus.where('vm1_criterion = 1 and vm5_criterion = 1').where('lenght>6').count()}")
print(f"Total boluses included: {ds_isobolus.where('vm1_criterion = 1 and vm5_criterion = 1').where('lenght<=6').count()}")
print(f"From total patients: {ds_isobolus.where('vm1_criterion = 1 and vm5_criterion = 1').where('lenght<=6').select('patientid').distinct().count()}")
print(f"Second or later Bolus excluded: {ds_isobolus.where('vm1_criterion = 1 and vm5_criterion = 1').where('lenght<=6').where('bolusid>1').count()}")
print(f"Total first boluses included: {ds_isobolus.where('vm1_criterion = 1 and vm5_criterion = 1').where('lenght<=6').where('bolusid=1').count()}")
print ("===================non bolus============")
print(f"Patients count non bolus: {ds_nonbolus.select('patientid').distinct().count()}")
print(f"Total non boluses: {ds_nonbolus.count()}")
print(f"Missing HR or MAP during bolus observation time: {ds_nonbolus.where('vm1_criterion = 0 or vm5_criterion = 0').count()}")
print(f"Total non boluses > 30min: {ds_nonbolus.where('lenght>6').where('vm1_criterion = 1 or vm5_criterion = 1').count()}")
print(f"Total non boluses included: {ds_nonbolus.where('lenght<=6').where('vm1_criterion = 1 and vm5_criterion = 1').count()}")
print(f"From total patients: {ds_nonbolus.where('lenght<=6').where('vm1_criterion = 1 and vm5_criterion = 1').select('patientid').distinct().count()}")

Patients count with iso/bolus: 5121/7301
Bolus count: 26377
Not isolated excluded: 17832
Missing HR or MAP during bolus observation time: 1
Bolus duration > 30min 3342
Total boluses included: 5202
From total patients: 3598
Second or later Bolus excluded: 2326
Total first boluses included: 2876
Patients count non bolus: 5019
Total non boluses: 15369
Missing HR or MAP during bolus observation time: 0
Total non boluses > 30min: 5949
Total non boluses included: 9420
From total patients: 3520


In [8]:
ds_psm = spark.read.parquet(PSM_FULL_PATH)

In [9]:
print ("===================Full============")
print(f"Bolus count: {ds_psm.where('bolus=1').count()}")
print(f"Bolus patient count: {ds_psm.where('bolus=1').select('patientid').distinct().count()}")
print(f"non-Bolus count: {ds_psm.where('bolus=0').count()}")
print(f"non-Bolus patient count: {ds_psm.where('bolus=0').select('patientid').distinct().count()}")

Bolus count: 3572
Bolus patient count: 2736
non-Bolus count: 3572
non-Bolus patient count: 2239
