In [0]:
#Reading Bronze table
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark.sql("USE retail_lakehouse")

bronze = spark.table("retail_lakehouse.bronze_transactions")
print("Bronze rows:", bronze.count())
display(bronze.limit(5))


Bronze rows: 500000


transaction_id,transaction_ts,store_id,customer_id,product_id,qty,unit_price,currency,channel,payment_method,ingest_time,source_file
TXN_000000000001,2026-01-06T16:06:32.481383+00:00,S_0002,C_0097197,P_000564,2,57.36,EUR,online,wallet,2026-01-12T04:27:34.594566Z,dbfs:/tmp/retail/landing/transactions_20260112_014908_part001.csv
TXN_000000000002,2025-12-14T09:50:43.481383+00:00,S_0028,C_0004166,P_000062,1,56.22,EUR,online,wallet,2026-01-12T04:27:34.594566Z,dbfs:/tmp/retail/landing/transactions_20260112_014908_part001.csv
TXN_000000000003,2025-12-16T14:54:58.481383+00:00,S_0027,C_0028894,P_000920,5,-78.3,EUR,online,card,2026-01-12T04:27:34.594566Z,dbfs:/tmp/retail/landing/transactions_20260112_014908_part001.csv
TXN_000000000004,2025-12-26T13:23:54.481383+00:00,S_0018,C_0020380,P_000441,3,27.35,EUR,store,card,2026-01-12T04:27:34.594566Z,dbfs:/tmp/retail/landing/transactions_20260112_014908_part001.csv
TXN_000000000005,2025-12-26T09:04:54.481383+00:00,S_0039,C_0034672,P_001653,1,182.97,EUR,online,cash,2026-01-12T04:27:34.594566Z,dbfs:/tmp/retail/landing/transactions_20260112_014908_part001.csv


In [0]:
#Parsing Timestamp
silver_stage = (
    bronze
    .withColumn("transaction_ts_parsed", F.to_timestamp("transaction_ts"))
)


In [0]:
#Creating “valid” vs “rejected” conditions
valid_condition = (
    F.col("transaction_id").isNotNull() &
    F.col("transaction_ts").isNotNull() &
    F.col("transaction_ts_parsed").isNotNull() &
    F.col("store_id").isNotNull() &
    F.col("customer_id").isNotNull() &
    F.col("product_id").isNotNull() &
    (F.col("qty") > 0) &
    (F.col("unit_price") > 0) &
    (F.col("currency") == "EUR") &
    (F.col("channel").isin("online", "store"))
)


In [0]:
#Spliting into clean & rejected + adding reject_reason
rejected = (
    silver_stage
    .withColumn(
        "reject_reason",
        F.when(F.col("transaction_id").isNull(), F.lit("missing_transaction_id"))
         .when(F.col("transaction_ts").isNull(), F.lit("missing_transaction_ts"))
         .when(F.col("transaction_ts_parsed").isNull(), F.lit("invalid_transaction_ts_format"))
         .when(F.col("store_id").isNull(), F.lit("missing_store_id"))
         .when(F.col("customer_id").isNull(), F.lit("missing_customer_id"))
         .when(F.col("product_id").isNull(), F.lit("missing_product_id"))
         .when(F.col("qty") <= 0, F.lit("invalid_qty"))
         .when(F.col("unit_price") <= 0, F.lit("invalid_unit_price"))
         .when(F.col("currency") != "EUR", F.lit("invalid_currency"))
         .when(~F.col("channel").isin("online","store"), F.lit("invalid_channel"))
         .otherwise(F.lit("other"))
    )
    .filter(~valid_condition)
)

clean_pre_dedup = silver_stage.filter(valid_condition)


In [0]:
#Deduplicate (keeping latest by ingest_time)
w = Window.partitionBy("transaction_id").orderBy(F.col("ingest_time").desc())

clean_dedup = (
    clean_pre_dedup
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn")
)


In [0]:
#Writing Silver tables (overwrite for first run)
(
    clean_dedup
    .drop("transaction_ts")  # optional; keep only parsed
    .withColumnRenamed("transaction_ts_parsed", "transaction_ts")
    .write.format("delta")
    .mode("overwrite")
    .saveAsTable("retail_lakehouse.silver_transactions_clean")
)

(
    rejected
    .write.format("delta")
    .mode("overwrite")
    .saveAsTable("retail_lakehouse.silver_transactions_rejected")
)

print("✅ Silver tables written")


✅ Silver tables written


In [0]:
#Validating counts
clean_cnt = spark.table("retail_lakehouse.silver_transactions_clean").count()
rej_cnt = spark.table("retail_lakehouse.silver_transactions_rejected").count()

print("✅ Silver clean rows:", clean_cnt)
print("❌ Silver rejected rows:", rej_cnt)
print("Total:", clean_cnt + rej_cnt)


✅ Silver clean rows: 485252
❌ Silver rejected rows: 10000
Total: 495252


In [0]:
#duplicates existed in the valid set
clean_pre_cnt = clean_pre_dedup.count()
clean_post_cnt = clean_dedup.count()
print("Pre-dedup:", clean_pre_cnt, "Post-dedup:", clean_post_cnt, "Removed:", clean_pre_cnt - clean_post_cnt)


Pre-dedup: 490000 Post-dedup: 485252 Removed: 4748


In [0]:
#Top reject reasons
display(
  spark.table("retail_lakehouse.silver_transactions_rejected")
    .groupBy("reject_reason").count()
    .orderBy(F.col("count").desc())
)

reject_reason,count
missing_store_id,2038
invalid_qty,2016
missing_product_id,1996
invalid_unit_price,1983
missing_customer_id,1967
