# Readmission Risk for Heart Failure Patients

In [1]:
import mimicfouretl.bigquery_utils as bq
from mimicfouretl.data_insights import display_datasets
from mimicfouretl.query_builder import QueryBuilder

## Build BigQuery Spark session

In [2]:
bq.set_credentials_file('../bq_credentials/client_secret.json')
bq.set_project_id('mimic-iv-418015')
# bq.set_project_id('micro-vine-412020')

In [3]:
client = bq.get_client(use_service_account_auth=False)

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=236933606679-n0530hpv6li2upvr6ibubbd7f3hik03j.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A60095%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fbigquery&state=E2LLv2hzz7Ls7KaMClE3Tezg7Btd9q&access_type=offline


In [4]:
# tables = bq.list_tables('mimiciv_icu', client)
tables = bq.list_tables('physionet-data.mimiciv_hosp', client)
tables

['physionet-data.mimiciv_hosp.admissions',
 'physionet-data.mimiciv_hosp.d_hcpcs',
 'physionet-data.mimiciv_hosp.d_icd_diagnoses',
 'physionet-data.mimiciv_hosp.d_icd_procedures',
 'physionet-data.mimiciv_hosp.d_labitems',
 'physionet-data.mimiciv_hosp.diagnoses_icd',
 'physionet-data.mimiciv_hosp.drgcodes',
 'physionet-data.mimiciv_hosp.emar',
 'physionet-data.mimiciv_hosp.emar_detail',
 'physionet-data.mimiciv_hosp.hcpcsevents',
 'physionet-data.mimiciv_hosp.labevents',
 'physionet-data.mimiciv_hosp.microbiologyevents',
 'physionet-data.mimiciv_hosp.omr',
 'physionet-data.mimiciv_hosp.patients',
 'physionet-data.mimiciv_hosp.pharmacy',
 'physionet-data.mimiciv_hosp.poe',
 'physionet-data.mimiciv_hosp.poe_detail',
 'physionet-data.mimiciv_hosp.prescriptions',
 'physionet-data.mimiciv_hosp.procedures_icd',
 'physionet-data.mimiciv_hosp.provider',
 'physionet-data.mimiciv_hosp.services',
 'physionet-data.mimiciv_hosp.transfers']

In [5]:
display_datasets()

Dropdown(description='Dataset:', options=('hosp.provider', 'hosp.services', 'hosp.d_icd_procedures', 'hosp.pre…

Output()

In [6]:
spark = bq.get_spark_session()

24/03/29 15:16:24 WARN Utils: Your hostname, KGMSurface resolves to a loopback address: 127.0.1.1; using 10.0.0.136 instead (on interface wlp0s20f3)
24/03/29 15:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/kevin/anaconda3/envs/mimic-iv-etl/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/kevin/.ivy2/cache
The jars for the packages stored in: /home/kevin/.ivy2/jars
com.google.cloud.spark#spark-bigquery-with-dependencies_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-95fa33c4-94da-481d-809f-cbbd718fd26e;1.0
	confs: [default]
	found com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.37.0 in central
	[0.37.0] com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;latest.version
:: resolution report :: resolve 1856ms :: artifacts dl 2ms
	:: modules in use:
	com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.37.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   1   |   0   |   0   

## Get relevant ICD codes and Lab Item IDs

### Get ICD codes for heart failure diagnoses

In [7]:
qb_heart_failure_codes = QueryBuilder(dataset='hosp.d_icd_diagnoses', 
                                      columns=['icd_code', 'icd_version', 'long_title'])
qb_heart_failure_codes.apply_filters("LOWER(long_title) LIKE '%heart failure%'")
heart_failure_codes_query = qb_heart_failure_codes.generate_query()
print(heart_failure_codes_query)

SELECT icd_code, long_title, icd_version
FROM `physionet-data.mimiciv_hosp.d_icd_diagnoses`
WHERE LOWER(long_title) LIKE '%heart failure%'


In [8]:
heart_failure_icd_items = bq.run_query(spark, heart_failure_codes_query)

In [9]:
heart_failure_icd_items.show()

                                                                                

+--------+--------------------+-----------+
|icd_code|          long_title|icd_version|
+--------+--------------------+-----------+
|   40290|Unspecified hyper...|          9|
|   40412|Hypertensive hear...|          9|
|   40200|Malignant hyperte...|          9|
|   42842|Chronic combined ...|          9|
|   42821|Acute systolic he...|          9|
|   42843|Acute on chronic ...|          9|
|   42830|Diastolic heart f...|          9|
|   42841|Acute combined sy...|          9|
|   40400|Hypertensive hear...|          9|
|   39891|Rheumatic heart f...|          9|
|   40492|Hypertensive hear...|          9|
|   40210|Benign hypertensi...|          9|
|   42833|Acute on chronic ...|          9|
|   40401|Hypertensive hear...|          9|
|   42820|Systolic heart fa...|          9|
|   40490|Hypertensive hear...|          9|
|   42832|Chronic diastolic...|          9|
|   40211|Benign hypertensi...|          9|
|   40413|Hypertensive hear...|          9|
|   40411|Hypertensive hear...| 

In [10]:
heart_failure_icd_codes_list = [row['icd_code'] for row in heart_failure_icd_items.select('icd_code').distinct().collect()]

In [11]:
heart_failure_icd_codes_str = "'" + "', '".join(heart_failure_icd_codes_list) + "'"

### Get itemids for BNP labs

In [12]:
qb_bnp_labs = QueryBuilder(dataset='hosp.d_labitems', 
                                      columns=['itemid', 'label', 'fluid', 'category'])
qb_bnp_labs.apply_filters("LOWER(label) LIKE '%bnp%'")
bnp_labs_query = qb_bnp_labs.generate_query()
print(bnp_labs_query)

SELECT fluid, category, itemid, label
FROM `physionet-data.mimiciv_hosp.d_labitems`
WHERE LOWER(label) LIKE '%bnp%'


In [13]:
bnp_lab_items = bq.run_query(spark, bnp_labs_query)

In [14]:
bnp_lab_items.show()

+-------+---------+------+---------------+
|  fluid| category|itemid|          label|
+-------+---------+------+---------------+
|Pleural|Chemistry| 51921|proBNP, Pleural|
|  Blood|Chemistry| 50963|       NTproBNP|
+-------+---------+------+---------------+



In [15]:
bnp_lab_itemids_list = [row['itemid'] for row in bnp_lab_items.select('itemid').distinct().collect()]

In [16]:
bnp_lab_itemids_str = ', '.join(str(itemid) for itemid in bnp_lab_itemids_list)

## Query MIMIC IV database for Heart Failure diagnoses and BNP labs

In [17]:
# Initialize QueryBuilders
qb_diagnoses = QueryBuilder(dataset='hosp.diagnoses_icd', 
                            columns=['subject_id', 'hadm_id', 'seq_num', 'icd_code', 'icd_version'])
qb_diagnoses.apply_filters(f'icd_code IN ({heart_failure_icd_codes_str})')

qb_labevents = QueryBuilder(dataset='hosp.labevents', 
                            columns=['subject_id', 'hadm_id', 'itemid', 'valuenum', 'ref_range_lower', 'ref_range_upper'])
qb_labevents.apply_filters(f'itemid IN ({bnp_lab_itemids_str})')

# Join datasets
qb_diagnoses.join_with(qb_labevents, join_type='inner', columns=['subject_id', 'hadm_id'])

# Generate query for joined data
qualifying_hosp_admissions_query = qb_diagnoses.generate_query(limit=100)
# This query can now be used to extract the relevant joined data
print(qualifying_hosp_admissions_query)

SELECT icd_version, `physionet-data.mimiciv_hosp.diagnoses_icd`.hadm_id, ref_range_upper, ref_range_lower, `physionet-data.mimiciv_hosp.diagnoses_icd`.subject_id, valuenum, seq_num, icd_code, itemid
FROM `physionet-data.mimiciv_hosp.diagnoses_icd`
INNER JOIN `physionet-data.mimiciv_hosp.labevents` ON `physionet-data.mimiciv_hosp.diagnoses_icd`.subject_id = `physionet-data.mimiciv_hosp.labevents`.subject_id AND `physionet-data.mimiciv_hosp.diagnoses_icd`.hadm_id = `physionet-data.mimiciv_hosp.labevents`.hadm_id
WHERE icd_code IN ('I119', 'I5021', 'I5033', 'I509', 'I50814', '42842', '40413', '42830', 'I130', '40403', 'I50', 'I9713', '4289', '40412', 'I5032', '4280', '40291', 'I5042', 'I50811', '42833', 'I50812', 'I1311', 'I5041', 'I5043', '40211', '40411', 'I503', '40492', 'I5083', 'I110', '42821', '40401', '40410', 'I131', 'I5089', '42840', '40493', 'I0981', 'I5022', '40402', 'I97130', 'I50810', 'I97131', 'I508', 'I5081', 'I5084', '40400', 'I5020', '42832', 'I5040', 'I5023', '42841', '4

In [18]:
qualifying_hosp_admissions_df = bq.run_query(spark, qualifying_hosp_admissions_query)

In [19]:
qualifying_hosp_admissions_df.show()

+-----------+--------+---------------+---------------+----------+--------+-------+--------+------+
|icd_version| hadm_id|ref_range_upper|ref_range_lower|subject_id|valuenum|seq_num|icd_code|itemid|
+-----------+--------+---------------+---------------+----------+--------+-------+--------+------+
|         10|23237871|           NULL|           NULL|  11277578|   563.0|     19|  I50811| 51921|
|         10|23237871|           NULL|           NULL|  11277578|   563.0|     10|    I132| 51921|
|         10|21037414|           NULL|           NULL|  11018073|  4379.0|      7|   I5020| 51921|
|         10|25290806|           NULL|           NULL|  15746911| 44473.0|      1|    I132| 51921|
|         10|25290806|           NULL|           NULL|  15746911| 44473.0|      3|   I5043| 51921|
|         10|27678862|           NULL|           NULL|  13542543| 12107.0|      2|   I5033| 51921|
|         10|27678862|           NULL|           NULL|  13542543| 12107.0|      4|    I130| 51921|
|         