In [0]:
from pyspark.sql.functions import col, current_timestamp

# 1. Widget Setup
# Syntax: name, default_value, choices_list, label
dbutils.widgets.dropdown(
    "catalog_name", 
    "databricks_clickstream_dev", 
    ["databricks_clickstream_dev", "databricks_clickstream_test", "databricks_clickstream_prod"],
    "Select Catalog (Environment)"
)

# 2. Variable Assignment
catalog = dbutils.widgets.get("catalog_name")
schema = "clickstream_bronze"
database_path = f"{catalog}.{schema}"

# 3. Path Configuration
source_path = f"/Volumes/{catalog}/{schema}/raw_landing/"
# Placing checkpoints at the root of the volume (standard practice)
checkpoint_base = f"/Volumes/{catalog}/{schema}/raw_landing/_checkpoints/bronze_ingestion"
target_table = f"{database_path}.bronze_clickstream"

# 4. Stream Setup
schema_hints = "event_id STRING, event_time STRING, user_id STRING, session_id STRING"

raw_stream_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("recursiveFileLookup", "true")
        .option("multiLine", "true") 
        .option("cloudFiles.schemaLocation", f"{checkpoint_base}/schema_tracking")
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.inferColumnTypes", "true") 
        .option("cloudFiles.schemaHints", schema_hints) 
        .load(source_path)
)

# 5. Transformation & Metadata
bronze_df = (
    raw_stream_df
        .withColumn("ingestion_time", current_timestamp())
        .select("*", "_metadata.file_path") 
)

# 6. Write to Table
query = (
    bronze_df.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", f"{checkpoint_base}/data_checkpoint")
        .option("mergeSchema", "true")
        .trigger(availableNow=True) 
        .toTable(target_table)
)

query.awaitTermination()