In [None]:
# !pip install pyspark
# !pip install python-dotenv
# !pip install pycryptodome
# !pip install pandas

# 1. Initalize Spark Session

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

import json
import os
import base64

import pandas as pd

from Crypto.Cipher import AES
from dotenv import load_dotenv
load_dotenv()

# Define PostgreSQL connection properties
connection_url = os.getenv('db_connection')
connection_config = {
    "user": os.getenv('db_user'),
    "password": os.getenv('db_password'),
    "driver": "org.postgresql.Driver"
}

In [None]:
spark = SparkSession.builder \
    .appName("FhirDataApplication") \
    .config("spark.jars", "/home/snowblade/Downloads/postgresql-42.7.4.jar") \
    .getOrCreate()

display(spark)

# 2. Bronze Load | FHIR Integration

## 2.1. Patient Data

In [None]:
bronze_patient_df =  spark.read.text("/home/snowblade/private/MimicPatient.ndjson.gz")\
    .withColumn("source", F.split_part(F.input_file_name(), F.lit("/"), F.lit(-1)))\
    .withColumn('root_id', F.expr('uuid()'))

In [None]:
bronze_patient_df.write.jdbc(
    table="bronze.patient_data",
    mode="overwrite",
    url=connection_url,
    properties=connection_config
)

## 2.2 Encounter Data

In [None]:
bronze_encounter_df =  spark.read.text("/home/snowblade/private/MimicEncounter.ndjson.gz")\
    .withColumn("source", F.split_part(F.input_file_name(), F.lit("/"), F.lit(-1)))\
    .withColumn('root_id', F.expr('uuid()'))

bronze_encounter_ed_df =  spark.read.text("/home/snowblade/private/MimicEncounterED.ndjson.gz")\
    .withColumn("source", F.split_part(F.input_file_name(), F.lit("/"), F.lit(-1)))\
    .withColumn('root_id', F.expr('uuid()'))


bronze_encounter_df = bronze_encounter_df.union(bronze_encounter_ed_df)

In [None]:
bronze_encounter_df.write.jdbc(
    table="bronze.encounter_data",
    mode="overwrite",
    url=connection_url,
    properties=connection_config
)

## 2.2 Condition Data

In [None]:
bronze_condition_df =  spark.read.text("/home/snowblade/private/MimicCondition.ndjson.gz")\
    .withColumn("source", F.split_part(F.input_file_name(), F.lit("/"), F.lit(-1)))\
    .withColumn('root_id', F.expr('uuid()'))

bronze_condition_ed_df =  spark.read.text("/home/snowblade/private/MimicConditionED.ndjson.gz")\
    .withColumn("source", F.split_part(F.input_file_name(), F.lit("/"), F.lit(-1)))\
    .withColumn('root_id', F.expr('uuid()'))


bronze_condition_df = bronze_condition_df.union(bronze_condition_ed_df)

In [None]:
bronze_condition_df.write.jdbc(
    table="bronze.condition_data",
    mode="overwrite",
    url=connection_url,
    properties=connection_config
)

# 3. Silver Transformation | Feature Engineering

## 3.1. Patient Resource

In [None]:
with open('schema/Patient.json') as f:
    schema = T.StructType.fromJson(json.loads(f.read()))

bronze_patient_df = spark.read.jdbc(
    url=connection_url,
    table="bronze.patient_data",
    properties=connection_config
)

bronze_patient_df = bronze_patient_df.withColumn('parsed_value', F.from_json(F.col('value'), schema))
bronze_patient_df = bronze_patient_df.select('root_id', 'parsed_value.*', 'source')
bronze_patient_df.createOrReplaceTempView('bronze_patient_df')

In [None]:
silver_patient_df = spark.sql("""
select 
    root_id,
    id AS patientId,
    gender AS gender,
    to_date(birthDate) birthDate,
    maritalStatus.coding[0].code AS maritalStatus,
    source
from bronze_patient_df
""")

silver_patient_df.write.jdbc(
    table="silver.patient",
    mode="overwrite",
    url=connection_url,
    properties=connection_config
)

## 3.2. Encounter Resource

In [None]:
with open('schema/Encounter.json') as f:
    schema = T.StructType.fromJson(json.loads(f.read()))

bronze_encounter_df = spark.read.jdbc(
    url=connection_url,
    table="bronze.encounter_data",
    properties=connection_config
)

bronze_encounter_df = bronze_encounter_df.withColumn('parsed_value', F.from_json(F.col('value'), schema))
bronze_encounter_df = bronze_encounter_df.select('root_id', 'parsed_value.*', 'source')
bronze_encounter_df.createOrReplaceTempView('bronze_encounter_df')

In [None]:
silver_encounter_df = spark.sql("""
SELECT
    root_id,
    id AS encounterId,
    replace(subject.reference, "Patient/", "") AS patientId,
    replace(partOf.reference, "Encounter/", "") AS ref_encounterId,
    CAST(period.start AS timestamp) periodStart,
    CAST(period.end AS timestamp) periodEnd,
    date_diff(day, periodStart, periodEnd) duration,
    status AS status,
    class.code AS encounterClass,
    type[0].coding[0].display AS displayType,
    nvl(priority.coding[0].display, 'emergency') AS priority,
    -- Next EncounterID & Readmission Status
    LEAD(id) OVER (PARTITION BY subject.reference ORDER BY period.start) AS nextEncounterId,
    CASE 
        WHEN DATEDIFF(day, period.end, LEAD(period.start) OVER (PARTITION BY subject.reference ORDER BY period.start)) <= 30 THEN 'Readmission'
        ELSE 'No Readmission'
    END AS readmissionStatus,
    source
FROM bronze_encounter_df
""")

silver_encounter_df.write.jdbc(
    table="silver.encounter",
    mode="overwrite",
    url=connection_url,
    properties=connection_config
)

## 3.3. Condition Resource

In [None]:
with open('schema/Condition.json') as f:
    schema = T.StructType.fromJson(json.loads(f.read()))

bronze_condition_df = spark.read.jdbc(
    url=connection_url,
    table="bronze.condition_data",
    properties=connection_config
)

bronze_condition_df = bronze_condition_df.withColumn('parsed_value', F.from_json(F.col('value'), schema))
bronze_condition_df = bronze_condition_df.select('root_id', 'parsed_value.*', 'source')
bronze_condition_df.createOrReplaceTempView('bronze_condition_df')

In [None]:
silver_condition_df = spark.sql("""
SELECT
    root_id,
    id AS conditionId,
    replace(subject.reference, "Patient/", "") AS patientId,
    replace(encounter.reference, "Encounter/", "") AS encounterId,
    category[0].coding[0].code categoryCode,
    code.coding[0].code AS conditionCode,
    code.coding[0].display AS conditionDisplay,
    code.coding[0].system AS conditionSystem,
    source
FROM bronze_condition_df
""")
silver_condition_df.write.jdbc(
    table="silver.condition",
    mode="overwrite",
    url=connection_url,
    properties=connection_config
)