### sparkJSONPath demo

This notebook demonstrates a powerful approach to processing semi-structured data at scale by integrating Apache Spark with the `jsonpath-ng` library. When dealing with terabyte-level JSON datasets, traditional parsing methods can be inefficient. By leveraging Spark's distributed computing capabilities and the flexible querying power of JSONPath, we can perform complex data extraction and manipulation tasks with high performance. 

This solution showcases how to define and apply User-Defined Functions (UDFs) in Spark SQL to query nested JSON objects, enabling efficient analysis of massive complex datasets, such as the FHIR EMR records used in this example.

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SingleTaskApp") \
    .master("local[1]") \
    .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
    .getOrCreate()

sc = spark.sparkContext

from sparkJSONPath import *

In [None]:
import os,requests,zipfile

def download_and_extract(url, dest_path):
    if not os.path.exists(dest_path):
        os.makedirs(dest_path)
    
    zip_file_name = os.path.join(dest_path, url.split('/')[-1])
    
    if not os.path.exists(zip_file_name):
        print(f"Downloading {url}...")
        response = requests.get(url)
        with open(zip_file_name, 'wb') as f:
            f.write(response.content)
        print("Download complete.")
    
    print(f"Extracting {zip_file_name}...")
    with zipfile.ZipFile(zip_file_name, 'r') as zip_ref:
        zip_ref.extractall(dest_path)
    print("Extraction complete.")

download_and_extract("https://github.com/smart-on-fhir/sample-bulk-fhir-datasets/archive/refs/heads/10-patients.zip", "data")
download_and_extract("https://github.com/smart-on-fhir/sample-bulk-fhir-datasets/archive/refs/heads/1000-patients.zip", "data")

# Register UDFs for Spark SQL

In [None]:
spark.udf.register("get_value", get_value)
spark.udf.register("get_all_values", get_all_values)

# Functionality Demonstration (Spark SQL)

## Load small dataset

In [7]:
from pyspark.sql.functions import col, from_json

# Load the patient data as a DataFrame of strings
patient_df = spark.read.json("data/sample-bulk-fhir-datasets-10-patients/Patient.000.ndjson")

patient_df.createOrReplaceTempView("patients")

spark.sql("DESCRIBE patients").show()
spark.sql("SELECT * FROM patients LIMIT 1").show(truncate=False, vertical=True)

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|             address|array<struct<city...|   NULL|
|           birthDate|              string|   NULL|
|       communication|array<struct<lang...|   NULL|
|    deceasedDateTime|              string|   NULL|
|           extension|array<struct<exte...|   NULL|
|              gender|              string|   NULL|
|                  id|              string|   NULL|
|          identifier|array<struct<syst...|   NULL|
|       maritalStatus|struct<coding:arr...|   NULL|
|                meta|struct<profile:ar...|   NULL|
|multipleBirthBoolean|             boolean|   NULL|
|                name|array<struct<fami...|   NULL|
|        resourceType|              string|   NULL|
|             telecom|array<struct<syst...|   NULL|
|                text|struct<div:string...|   NULL|
+--------------------+--------------------+-------+

-RECORD 0--

## Demonstrate `get_value`

In [25]:
spark.sql("SELECT get_value(meta, '$.profile') AS meta_profile FROM patients").show()

+--------------------+
|        meta_profile|
+--------------------+
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
|['http://hl7.org/...|
+--------------------+



## Demonstrate `get_all_values`

In [26]:
spark.sql("SELECT get_all_values(name, '$.[*].given') AS given_names FROM patients").show(truncate=False)

+------------------------------------------------------+
|given_names                                           |
+------------------------------------------------------+
|[['Sumiko254', 'Larue605'], ['Sumiko254', 'Larue605']]|
|[['Devin82', 'Anibal473']]                            |
|[['Denis399', 'Lincoln623']]                          |
|[['Yvone889', 'Janina163'], ['Yvone889', 'Janina163']]|
|[['Marine542', 'Ai120'], ['Marine542', 'Ai120']]      |
|[['An125', 'Suanne858'], ['An125', 'Suanne858']]      |
|[['Rocky100']]                                        |
|[['Gladys682'], ['Gladys682']]                        |
|[['Elisa944', 'Donetta1'], ['Elisa944', 'Donetta1']]  |
|[['Kasandra729']]                                     |
|[['Corrin41', 'Sau887'], ['Corrin41', 'Sau887']]      |
|[['Augustus49', 'Neville893']]                        |
|[['Karena692']]                                       |
+------------------------------------------------------+



## Demonstrate `makeStructUpdateUDF`

In [8]:
# For makeStructUpdateUDF, we need to create a specific UDF and register it
def update_marital_status(value, parent, field):
    return "Married"

update_marital_status_udf = makeStructUpdateUDF("$.text", update_marital_status, patient_df.schema["maritalStatus"].dataType)
spark.udf.register("update_marital_status_sql", update_marital_status_udf)

spark.sql("""
SELECT 
    maritalStatus.text,
    update_marital_status_sql(maritalStatus).text as updated_marital_status
FROM patients
""").show()

+-------------+----------------------+
|         text|updated_marital_status|
+-------------+----------------------+
|      Married|               Married|
|Never Married|               Married|
|Never Married|               Married|
|      Married|               Married|
|      Married|               Married|
|      Married|               Married|
|      Married|               Married|
|     Divorced|               Married|
|      Married|               Married|
|Never Married|               Married|
|      Married|               Married|
|Never Married|               Married|
|Never Married|               Married|
+-------------+----------------------+



                                                                                

## Demonstrate Joining with `get_value`

In [None]:
# Load the condition data
condition_df = spark.read.json("data/sample-bulk-fhir-datasets-10-patients/Condition.000.ndjson")

condition_df.createOrReplaceTempView("conditions")

spark.sql("""
SELECT 
    p.patient_id, 
    c.patient_ref
FROM 
    (SELECT id as patient_id FROM patients) p
JOIN 
    (SELECT get_value(subject, '$.reference') as patient_ref, * FROM conditions) c
ON p.patient_id = split(c.patient_ref, '/')[1]
""").show()

[Stage 27:>                                                         (0 + 1) / 1]

+--------------------+--------------------+
|          patient_id|         patient_ref|
+--------------------+--------------------+
|129c6ac7-8d06-89d...|Patient/129c6ac7-...|
|cbc86e51-9eca-385...|Patient/cbc86e51-...|
|6a4160eb-a793-2f8...|Patient/6a4160eb-...|
|7bc002fa-dc52-17d...|Patient/7bc002fa-...|
|a5cb8ce9-cec6-6b2...|Patient/a5cb8ce9-...|
|79a66c97-6131-321...|Patient/79a66c97-...|
|79a66c97-6131-321...|Patient/79a66c97-...|
|a4a401d1-a46a-eb4...|Patient/a4a401d1-...|
|79a66c97-6131-321...|Patient/79a66c97-...|
|7bc002fa-dc52-17d...|Patient/7bc002fa-...|
|6a4160eb-a793-2f8...|Patient/6a4160eb-...|
|129c6ac7-8d06-89d...|Patient/129c6ac7-...|
|79a66c97-6131-321...|Patient/79a66c97-...|
|79a66c97-6131-321...|Patient/79a66c97-...|
|a4a401d1-a46a-eb4...|Patient/a4a401d1-...|
|cbc86e51-9eca-385...|Patient/cbc86e51-...|
|ca15b832-01e4-41d...|Patient/ca15b832-...|
|79a66c97-6131-321...|Patient/79a66c97-...|
|6a4160eb-a793-2f8...|Patient/6a4160eb-...|
|6a4160eb-a793-2f8...|Patient/6a

                                                                                

# Performance Tests (Spark SQL)

In [29]:
# Load the large patient data
large_patient_df = spark.read.json("data/sample-bulk-fhir-datasets-1000-patients/Patient.000.ndjson")

large_patient_df.createOrReplaceTempView("large_patients")

print(f"Large dataset has {large_patient_df.count()} rows.")

Large dataset has 1144 rows.


## Performance test for `get_value`

In [31]:
import time

start_time = time.time()

_ = spark.sql("SELECT get_value(meta, '$.profile') AS meta_profile FROM large_patients").toPandas()

end_time = time.time()
print(f"Time taken for get_value on large dataset: {end_time - start_time:.2f} seconds")

[Stage 33:>                                                         (0 + 1) / 1]

Time taken for get_value on large dataset: 4.48 seconds


                                                                                

## Performance test for `get_all_values`

In [32]:
import time

start_time = time.time()

_ = spark.sql("SELECT get_all_values(name, '$.[*].given') AS given_names FROM large_patients").toPandas()

end_time = time.time()
print(f"Time taken for get_all_values on large dataset: {end_time - start_time:.2f} seconds")

[Stage 34:>                                                         (0 + 1) / 1]

Time taken for get_all_values on large dataset: 4.53 seconds


                                                                                

## Performance test for `makeStructUpdateUDF`

In [34]:
import time

start_time = time.time()

update_marital_status_udf_large = makeStructUpdateUDF("$.text", update_marital_status, large_patient_df.schema["maritalStatus"].dataType)
spark.udf.register("update_marital_status_sql_large", update_marital_status_udf_large)

_ = spark.sql("""
SELECT 
    maritalStatus.text as original_marital_status,
    update_marital_status_sql_large(maritalStatus).text as updated_marital_status
FROM large_patients
""").toPandas()

end_time = time.time()
print(f"Time taken for makeStructUpdateUDF on large dataset: {end_time - start_time:.2f} seconds")

Time taken for makeStructUpdateUDF on large dataset: 0.15 seconds


25/08/10 22:58:41 WARN SimpleFunctionRegistry: The function update_marital_status_sql_large replaced a previously registered function.
