In [None]:
from pathling import PathlingContext, Expression as exp
from pyspark.sql import SparkSession
import os
import dotenv

dotenv.load_dotenv(".demo.env")

spark = (
    SparkSession.builder.config(
        "spark.jars.packages",
        "au.csiro.pathling:library-runtime:7.2.0,io.delta:delta-spark_2.12:3.3.2,org.apache.hadoop:hadoop-aws:3.3.4",
    )
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID"))
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY"))
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.driver.memory", "10g")
    .getOrCreate()
)

pc = PathlingContext.create(
    spark=spark,
    enable_extensions=True,
    enable_delta=True,
    enable_terminology=False,
)

In [None]:
data = pc.read.delta("s3a://fhir/default/")

In [None]:
# s. <https://pathling.csiro.au/docs/libraries/fhirpath-query#extract>
patients = data.extract(
    "Patient",  # Ressourcentyp aus dem Daten extrahiert werden sollen
    columns=[
        exp(
            "Patient.id", "patient_resource_id"
        ),  # Auswahl des Pfads "id" der Ressource
        exp(
            "Patient.identifier.where(type.coding.where(system='http://terminology.hl7.org/CodeSystem/v2-0203' and code='MR').exists()).first().value",
            "medical_record_number",
        ),
        exp("gender", "gender"),  #  (Patient als Präfix ist optional)
        exp("birthDate", "birth_date"),
        exp("deceasedDateTime", "deceased_datetime"),
    ],
)

patients.show(truncate=False)

In [None]:
# "patients" ist ein regulärer Spark DataFrame, alle Spark-Funktionen können darauf angewendet werden
patients_by_gender = patients.groupBy("gender").count()
patients_by_gender.show()

patients_by_birth_date = (
    patients.groupBy("birth_date").count().orderBy("count", ascending=False)
)
patients_by_birth_date.show()

In [None]:
import seaborn as sns
from pyspark.sql.functions import current_date, datediff

import matplotlib.pyplot as plt

# Altersberechnung in Spark
patients_age = patients_by_birth_date.withColumn(
    "age",
    (datediff(current_date(), "birth_date") / 365.25).cast("int"),
)
ages_pd = patients_age.select("age").toPandas()

# Age distribution histogram
plt.figure(figsize=(8, 6))
sns.histplot(ages_pd["age"].dropna(), bins=20, kde=True)
plt.title("Altersverteilung der Patienten")
plt.xlabel("Alter (in Jahren)")
plt.ylabel("Anzahl")
plt.show()

## Übung 1: Verteilung der Diagnosen

Es soll ein Histogram entstehen, dass die Anzahl der vorhandenen Diagnosen darstellt.

Tipp: Unter <https://hl7.github.io/fhirpath.js/> kann man FHIRPath Ausdrücke ausprobieren.
Kopiere eine Condition-Ressource aus [synthea/fhir/Condition.ndjson](synthea/fhir/Condition.ndjson) in das Fenster und versuche einen Ausdruck zu erstellen der zum Diagnose-Code (oder Text) führt.


In [None]:
conditions = data.extract(
    "Condition",
    columns=[
        exp("Condition.id", "condition_resource_id"),
        exp(
            "Condition.subject.reference",
            "subject_reference",
        ),
        exp(
            "Condition.clinicalStatus.coding.where(system='http://terminology.hl7.org/CodeSystem/condition-clinical').first().code",
            "clinical_status",
        ),
        exp(
            "Condition.code...",  # TODO: hier soll der SNOMED-code herauskommen
            "code_coding_code",
        ),
        exp(
            "Condition.code...",  # TODO: hier soll das dazugehöroge display-Label herauskommen
            "code_coding_display",
        ),
    ],
    filters=[
        # TODO: es sollen nur Conditions mit einem clinicalStatus von "active" inkludiert werden
    ],
)

conditions.show(truncate=False, n=100)

# Übung 2: Joinen von Patient and Condition

An die Condition Ressourcen sollen nun die Patient Ressourcen gejoined werden um zB die Diagnoseverteilung nach Geschlecht zu stratifizieren.

S. <https://pathling.csiro.au/docs/fhirpath/functions#resolve> und <https://documenter.getpostman.com/view/634774/UVsQs48s#96cb85ba-f28b-4592-b3ee-13b153343f27>


In [None]:
conditions = data.extract(
    "Condition",
    columns=[
        exp("Condition.id", "condition_resource_id"),
        exp(
            "Condition.subject.reference",
            "subject_reference",
        ),
        # ... TODO
    ],
    filters=[],
)

conditions.show(truncate=False, n=100)