In [0]:
from pyspark.sql import functions as F

bronze_df = spark.table("retail_lakehouse.bronze_sales")


In [0]:
silver_df = (
    bronze_df
    .withColumn("InvoiceNo", F.col("InvoiceNo").cast("string"))
    .withColumn("StockCode", F.col("StockCode").cast("string"))
    .withColumn("Description", F.col("Description").cast("string"))
    .withColumn("CustomerID", F.col("CustomerID").cast("string"))
    .withColumn("Country", F.col("Country").cast("string"))
    .withColumn("Quantity", F.col("Quantity").cast("int"))
    .withColumn("UnitPrice", F.col("UnitPrice").cast("double"))
    .withColumn("InvoiceDate", F.col("InvoiceDate").cast("timestamp"))
)


In [0]:
silver_df = (
    silver_df
    .filter(F.col("CustomerID").isNotNull())          # remove anonymous records
    .filter(~F.col("InvoiceNo").startswith("C"))      # remove cancelled orders
    .filter(F.col("Quantity") > 0)                     # remove returns / bad qty
    .filter(F.col("UnitPrice") > 0)                    # remove invalid prices
)


In [0]:
silver_df = (
    silver_df
    .withColumn("invoice_date", F.to_date("InvoiceDate"))
    .withColumn("total_amount", F.round(F.col("Quantity") * F.col("UnitPrice"), 2))
)


In [0]:
silver_df = silver_df.dropDuplicates(
    ["InvoiceNo", "StockCode", "InvoiceDate"]
)


In [0]:
(
    silver_df.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("retail_lakehouse.silver_sales")
)


In [0]:
%sql
SELECT
  COUNT(*) AS silver_rows,
  COUNT(DISTINCT InvoiceNo) AS unique_invoices,
  SUM(total_amount) AS total_revenue
FROM retail_lakehouse.silver_sales;


silver_rows,unique_invoices,total_revenue
387846,18532,8844799.350000007


In [0]:
%sql
SELECT
  COUNT(*) AS silver_rows,
  COUNT(DISTINCT InvoiceNo) AS unique_invoices,
  SUM(total_amount) AS total_revenue
FROM retail_lakehouse.silver_sales;


silver_rows,unique_invoices,total_revenue
387846,18532,8844799.350000007
