In [None]:
import com.databricks.labs.smolder.functions.parse_hl7_message

val hl7Path = "abfss://<container>@<storageaccount>.dfs.core.windows.net/hl7messages/messages.hl7"
val bronzePath =  "abfss://<container>@<storageaccount>.dfs.core.windows.net/bronze"

// Load the file that has the HL7 messages from ADLS folder
val hl7Raw = (spark.read.option("lineSep", "\n\n").text(hl7Path))

(hl7Raw.write.format("delta").mode("overwrite").save(bronzePath))

val hl7Bronze = (spark.read
.format("delta")
.load(bronzePath)
.select(parse_hl7_message($"value").as("message"))
.select($"message.message".as("message"), $"message.segments".as("segments"))
)

// Structure the raw payload for SQL access via pyspark and move the data to a temp view
hl7Bronze.createOrReplaceTempView("hl7Bronze_temp")

In [None]:
%%pyspark

# copy the data from the temp view to Bronze for further processing
hl7Bronze = spark.sql(""" 
SELECT message, 
FILTER(segments, segment -> segment.id == "PID") as PID,
FILTER(segments, segment -> segment.id == "PV1") as PV1
FROM hl7Bronze_temp
"""
)
display(hl7Bronze)


In [None]:
%%pyspark

from pyspark.sql.functions import explode, col, split, when, concat_ws, to_date
from delta.tables import *

# Process the raw data to create a structured table
hl7BronzeProcessed = (hl7Bronze
.withColumn("message_type", split(col("message"), "\|").getItem(8))
.withColumn("exploded_PID", explode("PID"))
.withColumn("exploded_PV1", explode("PV1"))
.select(col("message"), col("message_type"), col("exploded_PID.fields").alias("PID"), col("exploded_PV1.fields").alias("PV1"))
.withColumn("patient_id", split(col("PID")[2], "\^").getItem(0))
.withColumn("first_name", split(col("PID")[4], "\^").getItem(1))
.withColumn("last_name", split(col("PID")[4], "\^").getItem(0))
.withColumn("name", concat_ws(' ', col("first_name"), col("last_name")))
.withColumn("gender", col("PID")[7])
.withColumn("sending_facility", split(col("message"), "\|").getItem(3))
.withColumn("date_of_birth", col("PID")[6])
.withColumn("date_of_birth", when(col("date_of_birth")=="", "19000101").otherwise(col("date_of_birth")))
.withColumn("date_of_birth", to_date(col("date_of_birth"), "yyyyMMddHHmmss"))
.withColumn("patient_address", col("PID")[10])
.withColumn("patient_city", split(col("patient_address"), "\^").getItem(2))
.withColumn("patient_country", split(col("patient_address"), "\^").getItem(5))
.withColumn("ethnicity", split(col("PID")[21], "\^").getItem(1))
.na.drop(subset=["message","patient_id"])
.na.fill({"name": "", "gender": "", "sending_facility": "", "patient_city": "", "patient_country": "", "ethnicity": ""})
.filter(col("message_type") == "ADT^A01") # filter the rest of the messages and keep only ADT^A01 message
.drop("PID", "PV1", "first_name", "last_name", "patient_address")
)

display(hl7BronzeProcessed)

In [None]:
%%pyspark

# write the processed data into Silver lake
silverPath =  "abfss://<container>@<storageaccount>.dfs.core.windows.net/silver"

(hl7BronzeProcessed.write
.format("delta")
.mode("overwrite")
.save(silverPath)
)

In [None]:
%%pyspark

# read the data from Silver to create an aggregated view
hl7Silver = (spark.read.format("delta").load(silverPath))
hl7SilverAgg = (hl7Silver.groupBy("patient_city", "gender").count())

hl7GoldAgg = (hl7Silver
.select("gender", "patient_city", "date_of_birth")
.join(hl7SilverAgg, ["gender", "patient_city"])
)

display(hl7GoldAgg)

In [None]:
%%pyspark

# Persist the aggregated view into Gold
goldPath =  "abfss://<container>@<storageaccount>.dfs.core.windows.net/gold"

(hl7GoldAgg.write.format("delta").mode("overwrite").save(goldPath))

In [None]:
%%sql

-- sample SQL query to run on the persisted aggregated view

SELECT patient_city, gender, COUNT(count) AS `Number of Patients`
    FROM delta.`abfss://<container>@<storageaccount>.dfs.core.windows.net/gold`
    GROUP BY patient_city, gender