In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.json as paj
import pyarrow.parquet as pq
import ijson
import json
import gzip
import os
import io
from typing import List, Dict, Any, Iterator
import pandas as pd
from pathlib import Path

In [0]:
# NOTE: Spark configs such as 'spark.sql.execution.arrow.pyspark.enabled' cannot be set at runtime in Databricks serverless/Spark Connect.
# If you need these configs, set them in the cluster configuration or in the SparkSession builder in your class.

In [0]:
class DatabricksArrowJSONSplitter:
    """
    Arrow-optimized JSON.GZ splitter designed for Databricks serverless compute.
    Uses Arrow's columnar processing and vectorized operations for maximum performance.
    """
    
    def __init__(self, app_name: str = "ArrowJSONSplitter"):
        """No SparkSession creation here; use global spark object."""
        self.app_name = app_name
    
    def split_nested_gz_with_arrow(self, input_files: List[str], output_path: str,
                                  nested_configs: List[Dict], batch_size: int = 50000) -> None:
        files_df = spark.createDataFrame(
            [(f, i) for i, f in enumerate(input_files)],
            ["file_path", "file_id"]
        )
        # ... rest of method unchanged ...
    
    def split_with_arrow_datasets(self, input_pattern: str, output_path: str,
                                 json_path: str = 'item', chunk_size: int = 100000) -> None:
        input_files = self._get_databricks_files(input_pattern)
        files_df = spark.createDataFrame(
            [(f,) for f in input_files], 
            ["file_path"]
        )
        # ... rest of method unchanged ...
    
    def split_columnar_optimized(self, input_files: List[str], output_path: str,
                               schema_config: Dict, chunk_size: int = 200000) -> None:
        arrow_schema = self._build_arrow_schema(schema_config)
        files_df = spark.createDataFrame(
            [(f,) for f in input_files],
            ["file_path"]
        )
        return files_df
        # ... rest of method unchanged ...
    
    def split_with_delta_optimization(self, input_path: str, output_path: str,
                                    json_path: str, partition_cols: List[str] = None) -> None:
        input_files = self._get_databricks_files(input_path)
        files_df = spark.createDataFrame([(f,) for f in input_files], ["file_path"])
        # ... rest of method unchanged ...
    
    def split_with_databricks_autoloader(self, input_path: str, output_path: str,
                                       schema_config: Dict, json_path: str = 'item') -> None:
        schema = self._build_spark_schema_from_config(schema_config)
        streaming_df = spark \
            .readStream \
            .format("cloudFiles") \
            .option("cloudFiles.format", "json") \
            .option("cloudFiles.compression", "gzip") \
            .option("cloudFiles.schemaLocation", f"{output_path}/_schemas") \
            .option("multiline", "true") \
            .option("mode", "PERMISSIVE") \
            .schema(schema) \
            .load(input_path)
        # ... rest of method unchanged ...
    
    def split_with_unity_catalog(self, catalog_table: str, output_table: str,
                               json_column: str, nested_configs: List[Dict]) -> None:
        source_df = spark.table(catalog_table)
        # ... rest of method unchanged ...
    
    def _write_arrow_results_to_delta(self, results: List, output_path: str) -> None:
        results_data = []
        for row in results:
            result_dict = json.loads(row.processing_result)
            results_data.append(result_dict)
        results_df = spark.createDataFrame(results_data)
        results_df.write \
            .format("delta") \
            .mode("overwrite") \
            .save(f"{output_path}/processing_summary")
    
    def _build_arrow_schema(self, schema_config: Dict[str, Any]) -> pa.schema:
        """
        Build a PyArrow schema from the schema_config dictionary.
        Supports nested structs and lists.
        """
        def parse_field(name, field_config):
            type_map = {
                "string": pa.string(),
                "float64": pa.float64(),
                "int64": pa.int64(),
                "timestamp": pa.timestamp('ns'),
                "bool": pa.bool_(),
            }
            nullable = field_config.get("nullable", True)
            field_type = field_config["type"]
            if field_type in type_map:
                return pa.field(name, type_map[field_type], nullable=nullable)
            elif field_type == "list":
                item_type = field_config["item_type"]
                if item_type in type_map:
                    value_type = type_map[item_type]
                elif item_type == "struct":
                    value_type = pa.struct([
                        parse_field(sub_name, sub_config)
                        for sub_name, sub_config in field_config["fields"].items()
                    ])
                else:
                    raise ValueError(f"Unsupported list item_type: {item_type}")
                return pa.field(name, pa.list_(value_type), nullable=nullable)
            elif field_type == "struct":
                struct_type = pa.struct([
                    parse_field(sub_name, sub_config)
                    for sub_name, sub_config in field_config["fields"].items()
                ])
                return pa.field(name, struct_type, nullable=nullable)
            else:
                raise ValueError(f"Unsupported field type: {field_type}")
        fields = [parse_field(name, config) for name, config in schema_config.items()]
        return pa.schema(fields)

    def close(self):
        """No Spark session to stop; nothing to do."""
        pass

In [0]:
# # Databricks-specific optimization functions
# def setup_databricks_cluster_libraries():
#     """
#     Setup script for Databricks cluster initialization.
#     Add this to cluster init scripts.
#     """
#     install_commands = [
#         "pip install ijson pyarrow",
#         "pip install --upgrade pandas"
#     ]
    
#     return "\n".join([
#         "#!/bin/bash",
#         *install_commands
#     ])

# def optimize_databricks_cluster_config():
#     """
#     Recommended Databricks cluster configuration for JSON.GZ processing.
#     """
#     return {
#         "cluster_name": "json-gz-processor",
#         "spark_version": "13.3.x-scala2.12",  # Latest LTS
#         "node_type_id": "i3.xlarge",  # Memory optimized for decompression
#         "driver_node_type_id": "i3.xlarge",
#         "autoscale": {
#             "min_workers": 2,
#             "max_workers": 20
#         },
#         "spark_conf": {
#             "spark.sql.execution.arrow.pyspark.enabled": "true",
#             "spark.sql.execution.arrow.maxRecordsPerBatch": "50000",
#             "spark.sql.adaptive.enabled": "true",
#             "spark.sql.adaptive.coalescePartitions.enabled": "true",
#             "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
#         },
#         "custom_tags": {
#             "purpose": "json-gz-processing",
#             "cost-center": "data-engineering"
#         }
#     }

# # Example usage for Databricks
# def databricks_example():
#     """Example usage optimized for Databricks serverless compute."""
    
#     # Initialize with Databricks optimizations
#     splitter = DatabricksArrowJSONSplitter("DataBricksJSONProcessor")
    
#     try:
#         # Example 1: Process files from DBFS/Unity Catalog
#         input_files = [
#             "/Volumes/mgiglia/dev_matthew_giglia_price_transparency/landing/in-network/2025-08_040_05C0_in-network-rates_1_of_5.json.gz"
#         ]
        
#         # Schema configuration for Arrow optimization
#         schema_config = {
#             "id": {"type": "string", "nullable": False},
#             "timestamp": {"type": "timestamp", "nullable": True},
#             "user_id": {"type": "string", "nullable": True},
#             "event_data": {"type": "string", "nullable": True},
#             "metadata": {"type": "list", "nullable": True}
#         }
        
#         # Split with columnar optimization
#         splitter.split_columnar_optimized(
#             input_files=input_files,
#             output_path="/tmp/arrow_optimized_output",
#             schema_config=schema_config,
#             chunk_size=100000
#         )
        
#         # Example 2: Use Auto Loader for streaming
#         streaming_query = splitter.split_with_databricks_autoloader(
#             input_path="s3://incoming-data/*.json.gz",
#             output_path="/tmp/streaming_output",
#             json_path="events.item"
#         )
        
#         # Example 3: Unity Catalog integration
#         splitter.split_with_unity_catalog(
#             catalog_table="main.raw_data.json_files",
#             output_table="main.processed_data.split_objects",
#             json_column="file_content",
#             nested_configs=[
#                 {"json_path": "users.item", "output_subdir": "users"},
#                 {"json_path": "events.item", "output_subdir": "events"}
#             ]
#         )
        
#         # Example 4: Direct Delta Lake optimization
#         splitter.split_with_delta_optimization(
#             input_path="/databricks-datasets/nested-json/*.json.gz",
#             output_path="/tmp/delta_optimized",
#             json_path="data.records.item",
#             partition_cols=["date", "region"]
#         )
        
#     finally:
#         splitter.close()

# if __name__ == "__main__":
#     # Print cluster configuration
#     cluster_config = optimize_databricks_cluster_config()
#     print("Recommended Databricks cluster configuration:")
#     print(json.dumps(cluster_config, indent=2))
    
#     # Run Databricks example
#     databricks_example()

In [0]:
# Arrow schema config for in-network JSON structure
schema_config = {
    "reporting_entity_name": {"type": "string", "nullable": False},
    "reporting_entity_type": {"type": "string", "nullable": False},
    "plan_name": {"type": "string", "nullable": True},
    "plan_id_type": {"type": "string", "nullable": True},
    "plan_id": {"type": "string", "nullable": True},
    "plan_market_type": {"type": "string", "nullable": True},
    "last_updated_on": {"type": "string", "nullable": False},
    "version": {"type": "string", "nullable": False},
    "provider_references": {
        "type": "list", "nullable": True, "item_type": "struct", "fields": {
            "provider_group_id": {"type": "float64", "nullable": False},
            "provider_groups": {
                "type": "list", "nullable": True, "item_type": "struct", "fields": {
                    "npi": {"type": "list", "nullable": True, "item_type": "float64"},
                    "tin": {
                        "type": "struct", "nullable": False, "fields": {
                            "type": {"type": "string", "nullable": False},
                            "value": {"type": "string", "nullable": False}
                        }
                    }
                }
            },
            "location": {"type": "string", "nullable": True}
        }
    },
    "in_network": {
        "type": "list", "nullable": False, "item_type": "struct", "fields": {
            "negotiation_arrangement": {"type": "string", "nullable": False},
            "name": {"type": "string", "nullable": False},
            "billing_code_type": {"type": "string", "nullable": False},
            "billing_code_type_version": {"type": "string", "nullable": False},
            "billing_code": {"type": "string", "nullable": False},
            "description": {"type": "string", "nullable": False},
            "negotiated_rates": {
                "type": "list", "nullable": False, "item_type": "struct", "fields": {
                    "negotiated_prices": {
                        "type": "list", "nullable": False, "item_type": "struct", "fields": {
                            "service_code": {"type": "list", "nullable": True, "item_type": "string"},
                            "billing_class": {"type": "string", "nullable": False},
                            "negotiated_type": {"type": "string", "nullable": False},
                            "billing_code_modifier": {"type": "list", "nullable": True, "item_type": "string"},
                            "negotiated_rate": {"type": "float64", "nullable": False},
                            "expiration_date": {"type": "string", "nullable": False},
                            "additional_information": {"type": "string", "nullable": True}
                        }
                    },
                    "provider_groups": {
                        "type": "list", "nullable": True, "item_type": "struct", "fields": {
                            "npi": {"type": "list", "nullable": True, "item_type": "float64"},
                            "tin": {
                                "type": "struct", "nullable": False, "fields": {
                                    "type": {"type": "string", "nullable": False},
                                    "value": {"type": "string", "nullable": False}
                                }
                            }
                        }
                    },
                    "provider_references": {
                        "type": "list", "nullable": True, "item_type": "float64"  # provider_group_id
                    }
                }
            },
            "covered_services": {
                "type": "list", "nullable": True, "item_type": "struct", "fields": {
                    "billing_code_type": {"type": "string", "nullable": False},
                    "billing_code_type_version": {"type": "string", "nullable": False},
                    "billing_code": {"type": "string", "nullable": False},
                    "description": {"type": "string", "nullable": False}
                }
            },
            "bundled_codes": {
                "type": "list", "nullable": True, "item_type": "struct", "fields": {
                    "billing_code_type": {"type": "string", "nullable": False},
                    "billing_code_type_version": {"type": "string", "nullable": False},
                    "billing_code": {"type": "string", "nullable": False},
                    "description": {"type": "string", "nullable": False}
                }
            }
        }
    }
}

# This config can be passed to _build_arrow_schema or _build_spark_schema_from_config

In [0]:
splitter = DatabricksArrowJSONSplitter("DataBricksJSONProcessor")

In [0]:
output_dir = "/Volumes/mgiglia/dev_matthew_giglia_price_transparency/landing/in-network/split/"

In [0]:
input_files = [
  "/Volumes/mgiglia/dev_matthew_giglia_price_transparency/landing/in-network/2025-08_040_05C0_in-network-rates_1_of_5.json.gz"
]

In [0]:
 # Split with columnar optimization
display(splitter.split_columnar_optimized(
  input_files=input_files,
  output_path=f"{output_dir}/arrow_optimized_output",
  schema_config=schema_config,
  chunk_size=100000
))

In [0]:
try:
  # Example 1: Process files from DBFS/Unity Catalog
  input_files = [
      "/Volumes/mgiglia/dev_matthew_giglia_price_transparency/landing/in-network/2025-08_040_05C0_in-network-rates_1_of_5.json.gz"
  ]
  
  # # Schema configuration for Arrow optimization
  # schema_config = {
  #     "id": {"type": "string", "nullable": False},
  #     "timestamp": {"type": "timestamp", "nullable": True},
  #     "user_id": {"type": "string", "nullable": True},
  #     "event_data": {"type": "string", "nullable": True},
  #     "metadata": {"type": "list", "nullable": True}
  # }
  
  # Split with columnar optimization
  splitter.split_columnar_optimized(
      input_files=input_files,
      output_path="/tmp/arrow_optimized_output",
      schema_config=schema_config,
      chunk_size=100000
  )
  
  # Example 2: Use Auto Loader for streaming
  streaming_query = splitter.split_with_databricks_autoloader(
      input_path="s3://incoming-data/*.json.gz",
      output_path="/tmp/streaming_output",
      json_path="events.item"
  )
  
  # Example 3: Unity Catalog integration
  splitter.split_with_unity_catalog(
      catalog_table="main.raw_data.json_files",
      output_table="main.processed_data.split_objects",
      json_column="file_content",
      nested_configs=[
          {"json_path": "users.item", "output_subdir": "users"},
          {"json_path": "events.item", "output_subdir": "events"}
      ]
  )
  
  # Example 4: Direct Delta Lake optimization
  splitter.split_with_delta_optimization(
      input_path="/databricks-datasets/nested-json/*.json.gz",
      output_path="/tmp/delta_optimized",
      json_path="data.records.item",
      partition_cols=["date", "region"]
  )
  
finally:
  splitter.close()