In [1]:
%pip install pyspark
%pip install fhir.resources

# Directory containing Synthea-generated JSON files
input_directory = "/home/jovyan/work/fhir-data/fhir"
output_directory = "/home/jovyan/work/fhir-data/fhir-parquet"



Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0mm
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.7
Note: you may need to restart the kernel to use updated packages.
Collecting fhir.resources
  Downloading fhir.resources-7.1.0-py2.py3-none-any.whl.metadata (45 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.3/45.3 kB[0m [31m202.4 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting pydantic<3.0,>=2.0.1 (from pydantic[email]<3.0,>=2.0.1->fhir.resources)
  Downloading pydantic-2.8.2-py3-none-any.whl.metadata (125 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m125.2/125.2 kB[0m [31m580.9 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting annot

In [2]:
import os
import json
from typing import Dict, Any, List, Set
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType
from collections import Counter
import traceback

# Initialize a SparkSession
spark = SparkSession.builder.appName("FHIR to Parquet").getOrCreate()

# Configuration
INPUT_DIRECTORY = "/home/jovyan/work/fhir-data/fhir"
OUTPUT_DIRECTORY = "/home/jovyan/work/fhir-data/fhir-parquet"

def discover_resource_types(directory: str) -> Set[str]:
    """Discover all FHIR resource types in the input directory."""
    resource_types = set()
    for root, _, files in os.walk(directory):
        for filename in files:
            if filename.endswith(".json"):
                file_path = os.path.join(root, filename)
                with open(file_path, 'r') as file:
                    data = json.load(file)
                    entries = data.get('entry', [])
                    for entry in entries:
                        resource_type = entry['resource']['resourceType']
                        resource_types.add(resource_type)
    return resource_types

def parse_fhir_resource(resource_data: Dict[str, Any]) -> Row:
    """Parse a FHIR resource into a PySpark Row."""
    # Convert the entire resource to a JSON string
    return Row(resource=json.dumps(resource_data))

def parse_fhir_bundle(file_path: str, resource_types: Set[str]) -> Dict[str, List[Row]]:
    """Parse a FHIR bundle file and return resources grouped by type."""
    with open(file_path, 'r') as file:
        data = json.load(file)
        entries = data.get('entry', [])
        
        resources = {resource_type.lower() + "s": [] for resource_type in resource_types}
        
        for entry in entries:
            resource = parse_fhir_resource(entry['resource'])
            resource_type = entry['resource']['resourceType'].lower() + "s"
            if resource_type in resources:
                resources[resource_type].append(resource)
        
        return resources

def process_fhir_files(resource_types: Set[str]) -> Dict[str, List[Row]]:
    """Process all FHIR files in the input directory."""
    all_resources = {resource_type.lower() + "s": [] for resource_type in resource_types}
    resource_counts = Counter()
    
    for root, _, files in os.walk(INPUT_DIRECTORY):
        for filename in files:
            if filename.endswith(".json"):
                file_path = os.path.join(root, filename)
                resources = parse_fhir_bundle(file_path, resource_types)
                for resource_type, rows in resources.items():
                    all_resources[resource_type].extend(rows)
                    resource_counts[resource_type] += len(rows)
    
    print("Resource counts before processing:")
    for resource_type, count in sorted(resource_counts.items()):
        print(f"{resource_type}: {count}")
    
    return all_resources

def save_to_parquet(resources: Dict[str, List[Row]]):
    """Save resources to Parquet files."""
    processed_counts = Counter()
    error_counts = Counter()
    
    # Define schema for all resource types
    schema = StructType([
        StructField("resource", StringType(), True)
    ])
    
    for resource_type, rows in sorted(resources.items()):
        if rows:
            print(f"\nProcessing {resource_type}...")
            print(f"Number of rows: {len(rows)}")
            
            try:
                df = spark.createDataFrame(rows, schema)
                print(f"DataFrame created successfully for {resource_type}")
                # df.printSchema()
                # df.show(5, truncate=False)
                
                output_path = os.path.join(OUTPUT_DIRECTORY, resource_type)
                df.write.mode('overwrite').parquet(output_path)
                print(f"Parquet file written successfully for {resource_type}")
                processed_counts[resource_type] = df.count()
            except Exception as e:
                print(f"Error processing {resource_type}: {e}")
                error_counts[resource_type] += 1
                print(traceback.format_exc())
    
    print("\nResource counts after processing:")
    for resource_type, count in sorted(processed_counts.items()):
        print(f"{resource_type}: {count}")
    
    print("\nResources that encountered errors:")
    for resource_type, count in sorted(error_counts.items()):
        print(f"{resource_type}: {count}")

# Main execution
discovered_resource_types = discover_resource_types(INPUT_DIRECTORY)
print("Discovered resource types:", ", ".join(sorted(discovered_resource_types)))

all_resources = process_fhir_files(discovered_resource_types)
save_to_parquet(all_resources)

# Don't stop the SparkSession here, as it might be needed for further operations in the notebook
# spark.stop()

Discovered resource types: Claim, DiagnosticReport, DocumentReference, Encounter, ExplanationOfBenefit, Immunization, Location, Organization, Patient, Practitioner, PractitionerRole, Provenance
Resource counts before processing:
claims: 893
diagnosticreports: 893
documentreferences: 893
encounters: 893
explanationofbenefits: 893
immunizations: 1292
locations: 139
organizations: 138
patients: 100
practitionerroles: 138
practitioners: 138
provenances: 100

Processing claims...
Number of rows: 893
DataFrame created successfully for claims
Parquet file written successfully for claims

Processing diagnosticreports...
Number of rows: 893
DataFrame created successfully for diagnosticreports
Parquet file written successfully for diagnosticreports

Processing documentreferences...
Number of rows: 893
DataFrame created successfully for documentreferences
Parquet file written successfully for documentreferences

Processing encounters...
Number of rows: 893
DataFrame created successfully for encou