In [1]:
!pip install wandb pathling pyspark nibabel minio requests boto3

Collecting wandb
  Downloading wandb-0.15.4-py3-none-any.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m32.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting pathling
  Downloading pathling-6.2.2-py2.py3-none-any.whl (57.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.1/57.1 MB[0m [31m27.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting nibabel
  Downloading nibabel-5.1.0-py3-none-any.whl (3.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m41.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting minio
  Downloading minio-7.1.15-py3-none-any.whl (77 kB)
[2K     [90m

In [2]:
import wandb
import os
from pathling.coding import Coding
from pyspark.sql import functions
from pathling.udfs import subsumes
from utils import get_spark_session, get_pathling_context, \
load_resources, extract_patient_id, extract_subject_id, save_artifact

# Setup wandb
os.environ['WANDB_PROJECT'] = 'diabetes-vaccines-notebook'
os.environ['WANDB_NOTEBOOK_NAME'] = '/home/jovyan/work/test-jupyter/prepare-data.ipynb'
os.environ["WANDB_BASE_URL"] = "http://wandb:8080"
os.environ['WANDB_API_KEY'] = 'local-f68b4b71af977015844cb5987382d102a493b0eb'
os.environ['AWS_S3_ENDPOINT_URL'] = 'http://minio:9000'
os.environ['AWS_ACCESS_KEY_ID'] = 'minio'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'minio123'

wandb.login()
run = wandb.init(job_type="data_convert")

[34m[1mwandb[0m: Currently logged in as: [33mlouism[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [3]:
# Configure some constants for the demo
PROJECT_NAME = os.environ['WANDB_PROJECT']
ARTIFACT_NAME = 'training-data'
RESOURCES = ['Patient', 'Immunization', 'Condition']
START_BIRTH_DATE = '1920-01-01'
END_BIRTH_DATE = '2020-01-01'
IMMUNIZATION_CODE = '08' # Hepatitis B vaccine code
CONDITION_CODE = '73211009' # Diabetes SNOMED code

# Configure Spark, Pathling, and load resources
spark = get_spark_session()
pc = get_pathling_context(spark)
resource_data = load_resources(pc, RESOURCES)

In [4]:
###### 1. FILTERING INITIAL PATIENTS ######

# Filtering patients based on birthdate
patients = resource_data['Patient']
filtered_patients = patients.filter(
    (patients.birthDate < END_BIRTH_DATE) & 
    (patients.birthDate > START_BIRTH_DATE))

filtered_patients.select('gender', 'birthDate').show()

+------+----------+
|gender| birthDate|
+------+----------+
|female|2019-07-16|
|female|1974-01-29|
|  male|1945-07-09|
|  male|2016-06-02|
|female|1996-05-28|
|female|2017-11-04|
|  male|1955-08-27|
|female|1942-10-19|
|  male|1997-04-10|
|  male|1973-03-30|
|female|1966-03-08|
|  male|1945-07-09|
|  male|1955-08-27|
|  male|1951-07-26|
|  male|1953-03-09|
|  male|1987-06-06|
|  male|1971-01-09|
|female|2005-04-23|
|female|1936-10-27|
|female|1992-10-01|
+------+----------+
only showing top 20 rows



In [5]:
###### 2. SELECTING HEP B IMMUNIZATIONS ######

# Joining filtered patients with immunizations
immunizations = resource_data['Immunization']
patients_immunizations = filtered_patients.join(
    immunizations.select('patient', 'vaccineCode'),
    filtered_patients.id == extract_patient_id(immunizations),
    'left_outer'
)

# Selecting patient id and their vaccine coding
patients_immunizations = patients_immunizations.select(
    filtered_patients.id,
    functions.explode(
        patients_immunizations.vaccineCode.coding.getField('code')
    ).alias('code')
)

# Checking patients who received the hepatitis B vaccine
patients_immunizations = patients_immunizations.withColumn(
    'is_vaccinated',
    patients_immunizations.code == IMMUNIZATION_CODE
)

patients_immunizations.show()

+--------------------+----+-------------+
|                  id|code|is_vaccinated|
+--------------------+----+-------------+
|d9b90399-0504-4f1...|  08|         true|
|e0f57407-86ed-405...| 140|        false|
|e9adac47-eb98-4fc...|  08|         true|
|55c11701-518c-4ef...|  08|         true|
|d9b90399-0504-4f1...|  08|         true|
|55c11701-518c-4ef...|  08|         true|
|e0f57407-86ed-405...| 140|        false|
|d9b90399-0504-4f1...|  49|        false|
|d9b90399-0504-4f1...| 119|        false|
|d9b90399-0504-4f1...|  10|        false|
|d9b90399-0504-4f1...|  20|        false|
|55c11701-518c-4ef...|  49|        false|
|d9b90399-0504-4f1...| 133|        false|
|55c11701-518c-4ef...| 119|        false|
|55c11701-518c-4ef...|  10|        false|
|55c11701-518c-4ef...|  20|        false|
|55c11701-518c-4ef...| 133|        false|
|d9b90399-0504-4f1...|  49|        false|
|d9b90399-0504-4f1...| 119|        false|
|d9b90399-0504-4f1...|  10|        false|
+--------------------+----+-------

In [6]:
###### 3. SELECTING DIABETIC PATIENTS ######

# Joining filtered patients with conditions
conditions = resource_data['Condition']
patients_conditions = filtered_patients.join(
    conditions.select('subject', 'code'),
    filtered_patients.id == extract_subject_id(conditions),
    'left_outer'
)

# Selecting patient id and their condition coding
patients_conditions = patients_conditions.select(
    filtered_patients.id,
    functions.explode_outer(conditions.code.getField('coding')) \
    .alias('codings')
)

# Checking patients who have diabetes
condition_coding = Coding(system='http://snomed.info/sct', code=CONDITION_CODE)
patients_conditions = patients_conditions.withColumn(
    'has_diabetes',
    subsumes(condition_coding, patients_conditions.codings)                                 
)

patients_conditions.show()

+--------------------+--------------------+------------+
|                  id|             codings|has_diabetes|
+--------------------+--------------------+------------+
|e9adac47-eb98-4fc...|                null|        null|
|c60ac337-2ebe-48f...|{null, http://sno...|       false|
|c60ac337-2ebe-48f...|{null, http://sno...|       false|
|c60ac337-2ebe-48f...|{null, http://sno...|       false|
|c60ac337-2ebe-48f...|{null, http://sno...|       false|
|c60ac337-2ebe-48f...|{null, http://sno...|       false|
|c60ac337-2ebe-48f...|{null, http://sno...|       false|
|c60ac337-2ebe-48f...|{null, http://sno...|       false|
|c60ac337-2ebe-48f...|{null, http://sno...|       false|
|c60ac337-2ebe-48f...|{null, http://sno...|       false|
|cc31e8b9-9d75-4f7...|{null, http://sno...|       false|
|cc31e8b9-9d75-4f7...|{null, http://sno...|       false|
|cc31e8b9-9d75-4f7...|{null, http://sno...|       false|
|cc31e8b9-9d75-4f7...|{null, http://sno...|       false|
|cc31e8b9-9d75-4f7...|{null, ht

In [None]:
###### 4. JOINING THE DIFFERENT DATA SOURCES ######

# Joining vaccine and diabetes dataframes to identify unvaccinated high risk patients
df_a = patients_immunizations.withColumn('id_a', patients_immunizations.id)
df_b = patients_conditions.withColumn('id_b', patients_conditions.id)
two_by_two = df_a.join(df_b, functions.col('id_a') == functions.col('id_b'), 'left_outer')
 
# Aggregating data to get a 2x2 table of diabetes (Y/N) and vaccination (Y/N)
aggregate = two_by_two.groupBy(
    patients_immunizations.is_vaccinated, 
    patients_conditions.has_diabetes
).agg(
    functions.countDistinct(patients_immunizations.id)
)

# Display the results
aggregate.show()

In [None]:
# Optionally, log it as a wandb Table for viewing
table = wandb.Table(dataframe=aggregate.toPandas())

run.log({"hep_b_vaccination_in_diabetics": table})

In [None]:
# Write the results to Minio in Parquet format using Spark
save_artifact(aggregate, PROJECT_NAME, ARTIFACT_NAME, run)

In [None]:
run.finish()