# Bronze Layer: Container Status with Structured Streaming

**Ingestion Pattern**: Spark Structured Streaming

**Features**:
- Enable Change Data Feed (CDF) for CDC pattern
- Watermarking for late data handling
- Deduplication using event time

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import logging

# Import utilities
import sys
sys.path.append("../utils")
from logging_utils import get_logger

logger = get_logger("bronze_status_streaming")

In [0]:
# Get parameters
dbutils.widgets.text("catalog_name", "cargo_fleet_dev", "Catalog")
dbutils.widgets.text("checkpoint_location", "", "Checkpoint Location")

catalog_name = dbutils.widgets.get("catalog_name")
checkpoint_location = dbutils.widgets.get("checkpoint_location") or \
    f"/Volumes/{catalog_name}/bronze/checkpoints/status_streaming"

logger.info(f"Starting Structured Streaming for container status")

In [0]:
status_schema = StructType([
    StructField("container_id", StringType(), False),
    StructField("status", StringType(), True),
    StructField("location", StringType(), True),
    StructField("checkpoint_type", StringType(), True),
    StructField("checkpoint_time", StringType(), True),
    StructField("temperature_celsius", DoubleType(), True),
    StructField("humidity_percent", DoubleType(), True),
    StructField("seal_intact", BooleanType(), True),
    StructField("inspected_by", StringType(), True),
    StructField("notes", StringType(), True)
])

In [0]:
source_path = f"/Volumes/{catalog_name}/bronze/status_landing"

try:
    df_status_stream = (
        spark.readStream
        .format("json")
        .schema(status_schema)
        .option("maxFilesPerTrigger", 50)
        .load(source_path)
    )
    
    logger.info(f"Stream configured for source: {source_path}")
    
except Exception as e:
    logger.error(f"Failed to configure stream: {str(e)}")
    raise

In [0]:
# Transform with Watermarking

try:
    df_status_bronze = (
        df_status_stream
        # Convert checkpoint_time to timestamp
        .withColumn("checkpoint_time", to_timestamp(col("checkpoint_time")))
        
        # Add watermark for late data (1 hour threshold)
        .withWatermark("checkpoint_time", "1 hour")
        
        # Remove duplicates based on container_id and checkpoint_time
        .dropDuplicates(["container_id", "checkpoint_time"])
        
        # Add metadata
        .withColumn("ingestion_timestamp", current_timestamp())
        .withColumn("source_file", col("_metadata.file_path"))
        .withColumn("ingestion_date", current_date())
        
        # Add data quality checks
        .withColumn("is_temperature_valid",
            col("temperature_celsius").between(-30, 50)
        )
        .withColumn("is_humidity_valid",
            col("humidity_percent").between(0, 100)
        )
    )
    
    logger.info("Watermarking and transformations applied")
    
except Exception as e:
    logger.error(f"Failed to apply transformations: {str(e)}")
    raise

In [0]:
## Write Stream with CDF Enabled
bronze_table = f"{catalog_name}.bronze.container_status_raw"

try:
    # First, ensure the table exists with CDF enabled
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {bronze_table} (
            container_id STRING,
            status STRING,
            location STRING,
            checkpoint_type STRING,
            checkpoint_time TIMESTAMP,
            temperature_celsius DOUBLE,
            humidity_percent DOUBLE,
            seal_intact BOOLEAN,
            inspected_by STRING,
            notes STRING,
            ingestion_timestamp TIMESTAMP,
            source_file STRING,
            ingestion_date DATE,
            is_temperature_valid BOOLEAN,
            is_humidity_valid BOOLEAN
        )
        USING DELTA
        PARTITIONED BY (ingestion_date)
        TBLPROPERTIES (
            delta.enableChangeDataFeed = true,
            delta.autoOptimize.optimizeWrite = true,
            delta.autoOptimize.autoCompact = true
        )
        COMMENT 'Container status changes with CDF enabled for CDC pattern'
    """)
    
    logger.info(f"✓ Target table created with CDF enabled: {bronze_table}")
    
    # Write stream
    query = (
        df_status_bronze.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpoint_location)
        .option("mergeSchema", "true")
        .partitionBy("ingestion_date")
        .trigger(availableNow=True)
        .toTable(bronze_table)
    )
    
    logger.info(f"✓ Streaming query started with watermarking")
    logger.info(f"✓ Late data threshold: 1 hour")
    logger.info(f"✓ Query ID: {query.id}")
    
except Exception as e:
    logger.error(f"Failed to start streaming query: {str(e)}")
    raise

In [0]:
# Keep the stream running
query.awaitTermination()