In [0]:
from pyspark.sql import functions as F

The main differences between Cell 2 and Cell 3 are:

Pipeline Type

Cell 2: Standard PySpark code for batch ETL. It reads, transforms, and writes data directly using Spark DataFrame APIs and Delta Lake.
Cell 3: Lakeflow Declarative Pipelines (DLT) code. It uses the @dlt.table decorator to declaratively define a materialized view in a Lakeflow pipeline, which Databricks manages and keeps up to date automatically.
Execution & Management

Cell 2: Runs as a one-off job or notebook cell. You must manually rerun the code to refresh data.
Cell 3: Runs as part of a Lakeflow Declarative Pipeline. The pipeline orchestrates updates, incremental refreshes, and lineage tracking automatically.
Data Quality & Metadata

Cell 2: No built-in support for data quality constraints or table metadata.
Cell 3: Supports table metadata (name, comment) and can easily add data quality expectations using decorators like @dlt.expect.
Publishing & Catalog Integration

Cell 2: Writes to a Delta path, but does not automatically register the table in the metastore/catalog.
Cell 3: Publishes the table to the catalog/schema defined in the pipeline settings, making it discoverable and queryable as a managed table.
Incremental & Declarative Updates

Cell 2: Overwrites the data each time; not incremental.
Cell 3: Materialized view is incrementally refreshed by Databricks, optimizing for cost and performance.
Summary:
Cell 2 is imperative, batch ETL code. Cell 3 is declarative, pipeline-managed code that creates a materialized view with automatic refresh, catalog integration, and optional data quality constraints.

In [0]:
# 1. Process sales_customers

df_customers = spark.table("samples.bakehouse.sales_customers")

df_customers_staging = (
    df_customers
    .dropDuplicates(["customerID"])
    .withColumn("phone_number", F.regexp_replace("phone_number", "[^0-9]", ""))
    .withColumn("email_address", F.lower(F.col("email_address")))
    .withColumn("load_date", F.current_timestamp())
)
df_customers_staging.write.format("delta").mode("overwrite").save("/mnt/staging/sales_customers")

In [0]:
import dlt

@dlt.table(
    name="sales_customers_staging",
    comment="Processed sales_customers with cleaned phone and email"
)
def sales_customers_staging():
    df_customers = spark.read.table("samples.bakehouse.sales_customers")
    return (
        df_customers
        .dropDuplicates(["customerID"])
        .withColumn("phone_number", F.regexp_replace("phone_number", "[^0-9]", ""))
        .withColumn("email_address", F.lower(F.col("email_address")))
        .withColumn("load_date", F.current_timestamp())
    )

In [0]:
# 2. Process sales_franchises
df_franchises = spark.table("samples.bakehouse.sales_franchises")

df_franchises_staging = df_franchises.withColumn("load_date", F.current_timestamp())

df_franchises_staging.write.format("delta").mode("overwrite").save("/mnt/staging/sales_franchises")

In [0]:
# 3. Process sales_suppliers
df_suppliers = spark.table("samples.bakehouse.sales_suppliers")

df_suppliers_staging = (
    df_suppliers
    .withColumn("alergy", 
                F.when(
                    F.lower(F.col("ingredient")).rlike("pistachios|peanuts|poppy seeds"), "Y"
                ).otherwise("N")
               )
    .withColumn("load_date", F.current_timestamp())
)

df_suppliers_staging.write.format("delta").mode("overwrite").save("/mnt/staging/sales_suppliers")

In [0]:
# 4. Process sales_transactions
df_transactions = spark.table("samples.bakehouse.sales_transactions")

df_transactions_staging = df_transactions.withColumn("load_date", F.current_timestamp())

df_transactions_staging.write.format("delta").mode("overwrite").save("/mnt/staging/sales_transactions")

In [0]:
# 5. Process media_customer_reviews
df_reviews = spark.table("samples.bakehouse.media_customer_reviews")

df_reviews_staging = df_reviews.withColumn("load_date", F.current_timestamp())

df_reviews_staging.write.format("delta").mode("overwrite").save("/mnt/staging/media_customer_reviews")

In [0]:
# 6. Process media_gold_reviews_chunked
df_gold_reviews = spark.table("samples.bakehouse.media_gold_reviews_chunked")

df_gold_reviews_staging = df_gold_reviews.withColumn("load_date", F.current_timestamp())

df_gold_reviews_staging.write.format("delta").mode("overwrite").save("/mnt/staging/media_gold_reviews_chunked")