In [0]:
from pyspark.sql.types import *
from pyspark.sql import functions as F

# TODO take acount of last_processing_ts
df_bundle = spark.read.format("delta").table("bundle")

entry_json_schema = StructType([StructField("resource", StringType(), False)])
resource_json_schema_common = StructType(
    [StructField("resourceType", StringType(), False)]
)

df_bundle_parsed = (
    df_bundle.withColumn("entry_element", F.explode("entry"))
    .withColumn(
        "entry_element_parsed", F.from_json(F.col("entry_element"), entry_json_schema)
    )
    .withColumn("resource", F.col("entry_element_parsed.resource"))
    .withColumn(
        "resource_parsed", F.from_json(F.col("resource"), resource_json_schema_common)
    )
    .withColumn("resourceType", F.col("resource_parsed.resourceType"))
    .drop("type", "entry", "entry_element", "entry_element_parsed", "resource_parsed")
)

df_bundle_parsed.show()

In [0]:
fhir_patient_schema = StructType(
    [
        StructField("id", StringType(), False),
        StructField("gender", StringType(), True),
        StructField("birthDate", StringType(), True),
    ]
)

fhir_observation_schema = StructType(
    [
        StructField("id", StringType(), False),
        StructField(
            "code",
            StructType(
                [
                    StructField(
                        "coding",
                        ArrayType(
                            StructType(
                                [
                                    StructField("system", StringType(), False),
                                    StructField("code", StringType(), False),
                                ]
                            )
                        ),
                        True,
                    )
                ]
            ),
            True,
        ),
        StructField(
            "subject", StructType([StructField("reference", StringType(), False)]), True
        ),
        StructField(
            "valueQuantity",
            StructType(
                [
                    StructField("value", FloatType(), True),
                    StructField("unit", StringType(), True),
                ]
            ),
            True,
        ),
        StructField("effectivePeriod", StringType(), True),
        StructField("effectiveDateTime", StringType(), True),
    ]
)

fhir_medicationrequest_schema = StructType(
    [
        StructField("id", StringType(), False),
        StructField(
            "subject", StructType([StructField("reference", StringType(), False)]), True
        ),
        StructField(
            "medicationCodeableConcept",
            StructType(
                [
                    StructField(
                        "coding",
                        ArrayType(
                            StructType(
                                [
                                    StructField("system", StringType(), False),
                                    StructField("code", StringType(), False),
                                ]
                            )
                        ),
                        True,
                    )
                ]
            ),
            True,
        ),
        StructField("authoredOn", StringType(), True),
    ]
)

fhir_condition_schema = StructType(
    [
        StructField(
            "subject", StructType([StructField("reference", StringType(), False)]), True
        ),
        StructField(
            "code",
            StructType(
                [
                    StructField(
                        "coding",
                        ArrayType(
                            StructType(
                                [
                                    StructField("system", StringType(), False),
                                    StructField("code", StringType(), False),
                                ]
                            )
                        ),
                        True,
                    )
                ]
            ),
            True,
        ),
        StructField("onsetDateTime", StringType(), True),
    ]
)

fhir_resource_type_to_schema = {
    "Patient": fhir_patient_schema,
    "Observation": fhir_observation_schema,
    "MedicationRequest": fhir_medicationrequest_schema,
    "Condition": fhir_condition_schema,
}

fhir_resource_types = fhir_resource_type_to_schema.keys()


for fhir_resource_type in fhir_resource_types:
    # Filter rows for current table
    df_bundle_filtered = df_bundle_parsed.filter(
        F.col("resourceType") == fhir_resource_type
    )

    # Parse the json dynamically selecting the schema
    df_bundle_filtered = df_bundle_filtered.withColumn(
        "resource_parsed",
        F.from_json(
            F.col("resource"), fhir_resource_type_to_schema[fhir_resource_type]
        ),
    )
    df_fhir_resource = df_bundle_filtered.select("resource_parsed.*")

    delta_table_name = f"fhir_{fhir_resource_type.lower()}"

    df_fhir_resource.write.mode("append").format("delta").saveAsTable(delta_table_name)


spark.read.format("delta").table("fhir_patient").show()
spark.read.format("delta").table("fhir_observation").show()
spark.read.format("delta").table("fhir_medicationrequest").show()
spark.read.format("delta").table("fhir_condition").show()