To load the entire data, it is recommended to request the following resources on SLURM:

`srun -p himem -c 8 --mem 64GB -t 0-08:00:00 --pty bash`

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os

from pyspark.sql import SparkSession

from make_clinical_dataset.shared.constants import ROOT_DIR

In [None]:
# Initialize a Spark session
spark = SparkSession.builder.appName("EPIC-data").config("spark.driver.memory", "50G").getOrCreate()

# Observation

In [None]:
date = '2025-01-08'
data_dir = f'{ROOT_DIR}/data/raw/data_pull_{date}/observation_parquet'

In [None]:
!du -hs {ROOT_DIR}/data/raw/data_pull_{date}/observation_csv
!du -hs {ROOT_DIR}/data/raw/data_pull_{date}/observation_parquet

In [None]:
# Read all Parquet files in the folder
# make sure its all parquet.gzip files in this folder (no one created some weird files in there)
assert all([fname.endswith('parquet.gzip') for fname in os.listdir(data_dir)])
df = spark.read.parquet(data_dir)

In [None]:
# Print schema
df.printSchema()

In [None]:
# Total data size
print((df.count(), len(df.columns)))

In [None]:
# Total number of unique patients
df.select("PATIENT_RESEARCH_ID").distinct().count()

In [None]:
# Total number of unique procedure names
df.select('`Observations.ProcName`').distinct().count()

In [None]:
# Unique procedures
df.groupBy('`Observations.ProcName`').count().orderBy("count", ascending=False).show(truncate=False, n=500)

In [None]:
# BONUS: Spark's data partition distribution
from pyspark.sql.functions import spark_partition_id
df.groupBy(spark_partition_id()).count().orderBy("count", ascending=False).show(n=df.rdd.getNumPartitions())

# Clinic Note

In [None]:
date = '2025-01-08'
data_dir = f'{ROOT_DIR}/data/raw/data_pull_{date}/clinic_notes_parquet'

In [None]:
!du -hs {ROOT_DIR}/data/raw/data_pull_{date}/clinic_notes_csv
!du -hs {ROOT_DIR}/data/raw/data_pull_{date}/clinic_notes_parquet

In [None]:
# Read all Parquet files in the folder
# make sure its all parquet.gzip files in this folder (no one created some weird files in there)
assert all([fname.endswith('parquet.gzip') for fname in os.listdir(data_dir)])
df = spark.read.parquet(data_dir)

In [None]:
# Print schema
df.printSchema()

In [None]:
# Total data size
print((df.count(), len(df.columns)))

In [None]:
# Total number of unique patients
df.select("PATIENT_RESEARCH_ID").distinct().count()

In [None]:
# Total number of unique procedure names
df.select('`ClinicNotes.ClinicNote.code.text`').distinct().count()

In [None]:
# Unique procedures
df.groupBy('`ClinicNotes.ClinicNote.code.text`').count().orderBy("count", ascending=False).show(truncate=False, n=100)