In [0]:
# Base paths
raw_data_base = "/Volumes/meiproject/default/raw_data"
bronze_base = "/Volumes/meiproject/default/bronze"

# Helper to create checkpoint folders
import os

folders = [
    f"{bronze_base}/claims_batch/_checkpoints",
    f"{bronze_base}/claims_stream/_checkpoints",
    f"{bronze_base}/diagnosis_ref/_checkpoints",
    f"{bronze_base}/members_raw/_checkpoints",
    f"{bronze_base}/providers_raw/_checkpoints"
]

for folder in folders:
    if not os.path.exists(folder):
        os.makedirs(folder)

# -------------------------------------
# Claims Batch Stream
# -------------------------------------
claims_batch_stream = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("header", "true") \
    .schema(claims_batch_schema) \
    .option("cloudFiles.schemaLocation", f"{bronze_base}/claims_batch/_schema") \
    .load(f"{raw_data_base}/claims_batch")

claims_batch_query = claims_batch_stream.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{bronze_base}/claims_batch/_checkpoints") \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .table("default.bronze_claims_batch")

# -------------------------------------
# Claims Stream
# -------------------------------------
claims_stream_stream = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .schema(claims_stream_schema) \
    .option("cloudFiles.schemaLocation", f"{bronze_base}/claims_stream/_schema") \
    .load(f"{raw_data_base}/claims_stream")

claims_stream_query = claims_stream_stream.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{bronze_base}/claims_stream/_checkpoints") \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .table("default.bronze_claims_stream")

# -------------------------------------
# Diagnosis Reference
# -------------------------------------
diagnosis_stream = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("header", "true") \
    .schema(diagnosis_schema) \
    .option("cloudFiles.schemaLocation", f"{bronze_base}/diagnosis_ref/_schema") \
    .load(f"{raw_data_base}/diagnosis_ref")

diagnosis_query = diagnosis_stream.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{bronze_base}/diagnosis_ref/_checkpoints") \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .table("default.bronze_diagnosis_ref")

# -------------------------------------
# Members
# -------------------------------------
members_stream = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("header", "true") \
    .schema(members_schema) \
    .option("cloudFiles.schemaLocation", f"{bronze_base}/members_raw/_schema") \
    .load(f"{raw_data_base}/members_raw")

members_query = members_stream.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{bronze_base}/members_raw/_checkpoints") \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .table("default.bronze_members")

# -------------------------------------
# Providers
# -------------------------------------
providers_stream = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .schema(providers_schema) \
    .option("cloudFiles.schemaLocation", f"{bronze_base}/providers_raw/_schema") \
    .load(f"{raw_data_base}/providers_raw")

providers_query = providers_stream.writeStream \
    .format("delta") \
    .option("checkpointLocation", f"{bronze_base}/providers_raw/_checkpoints") \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .table("default.bronze_providers")

# -------------------------------------
# Notes:
# 1. All checkpoint folders are pre-created.
# 2. Each stream query is assigned to a variable, so you can start/stop individually:
#       claims_batch_query.stop()
#       claims_stream_query.stop()
# 3. Use mergeSchema=True to handle schema evolution safely.
# 4. Run each block one by one if needed to avoid overlapping stream errors.
