###ca_277_bronze_streaming###


In [0]:
# Widgets
dbutils.widgets.text("include_existing", "true") # set to true for the first time, false for subsequent runs
dbutils.widgets.text("trigger", "1 minute")

# Parameters
include_existing = dbutils.widgets.get("include_existing").lower()
trigger_str = dbutils.widgets.get("trigger")

In [0]:
# Imports
from claims360.common.io import load_schema, add_ingest_metadata

In [0]:
# Paths
read_path = "dbfs:/Volumes/claims360_dev/bronze/raw/ca_277"
checkpoint_location = "dbfs:/Volumes/claims360_dev/bronze/_checkpoints/ca_277"
bronze_table = "claims360_dev.bronze.ca_277"
schema_path = "../schemas/ca_277.json"
auto_loader_schema_location = "dbfs:/Volumes/claims360_dev/bronze/_schemas/ca_277"

In [0]:
# Load schema
schema = load_schema(schema_path)

In [0]:
# Reader
src_df = (spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .option("cloudFiles.schemaLocation", auto_loader_schema_location)
          .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
          .option("cloudFiles.rescuedDataColumn", "_rescued")
          .option("cloudFiles.includeExistingFiles", include_existing)
          .schema(schema)
          .load(read_path))

In [0]:
# Add metadata
bronze_df = add_ingest_metadata(src_df, "277CA")

In [0]:
# Writer
(bronze_df.writeStream.format("delta")
   .outputMode("append")
   .option("checkpointLocation", checkpoint_location)
   .option("mergeSchema", "true")
   .trigger(processingTime=trigger_str)
   .toTable(bronze_table))

In [0]:
%sh pwd