In [0]:
import dlt
from pyspark.sql.functions import col, to_timestamp


raw_path = "abfss://raw@yourstorage.dfs.core.windows.net/amazon_reviews/"

@dlt.table(
    comment="Raw Amazon reviews loaded incrementally from the data lake"
)
def bronze_reviews():
    return (
        spark.readStream.format('cloudFiles')
        .option("cloudFiles.Format","json")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(raw_path)
    )

@dlt.table(
    comment = "Cleaned Amazon reviews with proper types and filters",
    expectations = {
       "valid_rating": "star_rating BETWEEN 1 AND 5",
        "non_empty_review": "review_body IS NOT NULL"
    }
)
def silver_reviews():
    df = dlt.read("bronze_reviews")
    return (
        df.withColumn("review_timestamp", to_timestamp(col("review_date")))
        .filter(col("review_body").isNotNull())
        .filter(col("star_rating").isNotNull())
    )

@dlt.table(
    comment = "Aggregated metrics per product"
)
def gold_reviews():
    df = dlt.read("silver_reviews")
    return (
        df.groupBy("product_id")
        .agg(
            F.count("*").alias("review_count"),
            F.avg("star_rating").alias("avg_rating")
        )
    )