AkHQ: http://localhost:8082  
Spark-Master http://localhost:8083  
Spark-Worker-1 http://localhost:8084

https://github.com/aehrc/pathling/tree/issue/452/lib/python#python-api-for-pathling

# Try Connect To Test FHIR Kafka

In [None]:
!python --version
!yes | pip uninstall pathling
!yes | pip install pathling==5.0.3.dev1

In [None]:
import os
os.environ

In [None]:
# for debugging
#os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 pyspark-shell"

In [None]:
appName = "Kafka, Spark and FHIR Data"
master = "spark://spark-master:7077"
#master = "local[*]"
kafka_topic = "fhir.post-gateway-kdb"

In [None]:
from pyspark.sql import SparkSession
    
spark = SparkSession.builder \
        .appName(appName) \
        .master(master) \
        .config('spark.ui.port','0') \
        .getOrCreate()

In [None]:
# https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
# default for startingOffsets is "latest", but "earliest" allows rewind for missed alerts    
df = spark \
  .readStream  \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka1:19092") \
  .option("subscribe", kafka_topic) \
  .option("startingOffsets", "earliest") \
  .load()

In [None]:
df.printSchema()

In [None]:
query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
          .writeStream \
          .queryName("gettable") \
          .format("memory") \
          .start()

# close connection after 30 seconds
query.awaitTermination(30)

In [None]:
kafka_data = spark.sql("select * from gettable")
kafka_data.show()
type(kafka_data)

In [None]:
pd_df = kafka_data.toPandas()
pd_df

# Bring Pathling into the game

In [None]:
from pathling import PathlingContext

ptl = PathlingContext.create(spark)

In [None]:
# readme-example
#n_bundles = spark.read.text('/home/jovyan/pathling/lib/python/examples/data/bundles/', wholetext=True)
#n_bundles
#type(n_bundles)

In [None]:
kafka_data.select("value")

In [None]:
patients = ptl.encodeBundle(kafka_data.select("value"), 'Patient')
encounter = ptl.encodeBundle(kafka_data.select("value"), 'Encounter')
condition = ptl.encodeBundle(kafka_data.select("value"), 'Condition')

In [None]:
patients.printSchema()

In [None]:
patients.select("id", "birthDate", "gender", "address.postalCode").toPandas()

In [None]:
encounter.printSchema()

In [None]:
encounter.select("id", "subject.reference", "serviceType.coding.code", "period.start", "period.end").toPandas()

In [None]:
condition.printSchema()

In [None]:
condition.select("id", "encounter.reference", "code.coding.code").toPandas()

In [None]:
from pyspark.sql.functions import regexp_replace, col

pt1 = patients.select("id", "birthDate", "gender", "address.postalCode")

enc1 = encounter \
    .select( \
        col("id").alias("encounter_id"), \
        "subject.reference", \
        col("serviceType.coding.code").alias("servicetype_code"), \
        col("period.start").alias("period_start"), \
        col("period.end").alias("period_end") \
       ) \
    .withColumn("patient_id", regexp_replace("reference", "Patient/", ""))

cd1 = condition \
    .select( \
        "id", \
        "encounter.reference", \
        col("code.coding.code").alias("condition_code") \
       ) \
    .withColumn("cond_encounter_id", regexp_replace("reference", "Encounter/", ""))

In [None]:
join1 = enc1 \
    .join(pt1, enc1.patient_id == pt1.id)
join2 = join1 \
    .select("encounter_id", "servicetype_code", "period_start", "period_end", "patient_id", "birthDate", "gender", "postalCode") \
    .join(cd1, join1.encounter_id == cd1.cond_encounter_id) \
    .select("encounter_id", "servicetype_code", "period_start", "period_end", "patient_id", "birthDate", "gender", "postalCode", "condition_code")

In [None]:
join2.toPandas()