## SILVER LAYER (Clean + Valid Records)
- Data Cleaning
    - Duplicates removed: dropDuplicates(transaction_id)
    - Null handling: expectations + defaults
    - Timestamp standardization: explicit to_timestamp + UTC
    - Schema enforced: explicit casts

- Incremental Processing
    - read_stream from Bronze
    - Watermark on UTC timestamp
    - Idempotent via checkpoints + deduplication

- Data Calibration
    - total_amount recalculated every time
    - Incorrect source totals overridden
    - Currency standardized

- Data Quality Rules
    - quantity > 0
    - unit_price > 0
    - Invalid rows removed

- Quarantine Handling
    - Separate Delta table
    - Rejection reason logged
    - Original data preserved

In [0]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [0]:
@dlt.table(
    name="silver_sales_transactions",
    comment="Cleaned, calibrated, and validated sales transactions",
    table_properties={
        "quality": "silver",
                      }
)

# data quality rules
@dlt.expect("not_null_transaction_id", "transaction_id IS NOT NULL")
@dlt.expect("valid_quantity", "quantity > 0")
@dlt.expect("valid_unit_price", "unit_price > 0")

def silver_sales_transactions():

    sales = dlt.read_stream("bronze_sales_transactions")
    products = dlt.read("bronze_product_master")
    stores = dlt.read("bronze_store_region")

    cleaned = (
        sales
        # Timestamp standardization
        .withColumn(
            "transaction_timestamp_parsed",
            F.to_timestamp("transaction_timestamp", "yyyy-MM-dd HH:mm:ss")
        )
        .withColumn(
            "transaction_timestamp_utc",
            F.to_utc_timestamp("transaction_timestamp_parsed", "UTC")
        )

        # Type enforcement
        .withColumn("quantity", F.col("quantity").cast(IntegerType()))
        .withColumn("unit_price", F.col("unit_price").cast(DecimalType(10, 2)))
        .withColumn("discount", F.coalesce(F.col("discount"), F.lit(0)).cast(DecimalType(10, 2)))

        # Data calibration 
        .withColumn(
            "calculated_total_amount",
            (F.col("quantity") * F.col("unit_price") - F.col("discount"))
            .cast(DecimalType(12, 2))
        )
        .withColumn("total_amount", F.col("calculated_total_amount"))

        # Currency normalization
        .withColumn("currency", F.upper(F.coalesce("currency", F.lit("USD"))))

        # Incremental processing
        .withWatermark("transaction_timestamp_utc", "1 day")
    )

    # Foreign key validation
    validated = (
        cleaned
        .join(products.select("product_id"), "product_id", "left_semi")
        .join(stores.select("store_id"), "store_id", "left_semi")
        .dropDuplicates(["transaction_id"])
    )

    return validated


In [0]:
# SILVER SALES QUARANTINE TABLE (INVALID RECORDS)
import dlt
from pyspark.sql import functions as F

@dlt.table(
    name="silver_sales_quarantine",
    comment="Invalid sales records quarantined during validation",
    table_properties={"quality": "quarantine"}
)
def silver_sales_quarantine():

    sales = dlt.read_stream("bronze_sales_transactions")

    quarantined = (
        sales
        .withColumn(
            "rejection_reason",
            F.when(F.col("transaction_id").isNull(), "NULL_TRANSACTION_ID")
             .when(F.col("quantity") <= 0, "INVALID_QUANTITY")
             .when(F.col("unit_price") <= 0, "INVALID_UNIT_PRICE")
             .otherwise("UNKNOWN_REASON")
        )
        .filter(
            (F.col("transaction_id").isNull()) |
            (F.col("quantity") <= 0) |
            (F.col("unit_price") <= 0)
        )
    )

    return quarantined



In [0]:
# SILVER PRODUCT MASTER
@dlt.table(
    name="silver_product_master",
    comment="Cleaned and standardized product master data",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_product_id", "product_id IS NOT NULL")
def silver_product_master():

    df = dlt.read_stream("bronze_product_master")

    return (
        df
        .withColumn("product_name", F.trim(F.upper("product_name")))
        .withColumn("category", F.trim(F.upper("category")))
        .withColumn("brand", F.trim(F.upper("brand")))
        .withColumn("standard_price", F.col("standard_price").cast(DecimalType(10, 2)))
        .dropDuplicates(["product_id"])
    )


In [0]:
# SILVER STORE REGION 
@dlt.table(
    name="silver_store_region",
    comment="Cleaned store and region reference data",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_store_id", "store_id IS NOT NULL")
def silver_store_region():

    df = dlt.read_stream("bronze_store_region")

    return (
        df
        .withColumn("store_name", F.trim(F.initcap("store_name")))
        .withColumn("region", F.upper("region"))
        .withColumn("country", F.upper("country"))
        .dropDuplicates(["store_id"])
    )