In [0]:
from pyspark.sql import functions as F

In [0]:
# Read bronze sales
bronze_sales = spark.table("vsqproject.bronze.raw_sales")

In [0]:
display(bronze_sales)

In [0]:
# parse timestamp robustly (try multiple common formats), cast numbers, drop invalids
silver_sales = (
    bronze_sales
      .filter(F.col("sale_id").isNotNull())
      .dropDuplicates(["sale_id"])
      # cast numeric fields safely
      .withColumn("quantity", F.col("quantity").cast("int"))
      .withColumn("price", F.col("price").cast("double"))
      # parse timestamp into timestamp/date (try a few patterns)
      .withColumn("transactiondatetime", F.coalesce(
          F.to_timestamp(F.col("transactiondatetime"), "yyyy-MM-dd HH:mm:ss"),
          F.to_timestamp(F.col("transactiondatetime"), "MM/dd/yyyy HH:mm:ss"),
          F.to_timestamp(F.col("transactiondatetime"), "MM/dd/yyyy"),
          F.to_timestamp(F.col("transactiondatetime"), "yyyy-MM-dd"),
          F.try_to_timestamp("transactiondatetime")  # fallback safe parser
      ))
      .withColumn("sale_date", F.to_date(F.col("transactiondatetime")))
      # remove invalid records
      .filter(
          (F.col("product_id").isNotNull()) &
          (F.col("customer_id").isNotNull()) &
          (F.col("store_id").isNotNull()) &
          (F.col("quantity").isNotNull()) & (F.col("quantity") > 0) &
          (F.col("price").isNotNull()) & (F.col("price") > 0) &
          (F.col("transactiondatetime").isNotNull())
      )
      # compute derived metric using available columns only
      .withColumn("sale_amount", F.col("quantity") * F.col("price"))
)

In [0]:
display(silver_sales)

In [0]:
# enforce referential integrity (keep only sales where FK exists in Silver dims)
# load silver dims (they must exist before this step)
products_df = spark.table("vsqproject.silver.products_silver").select("product_id")
customers_df = spark.table("vsqproject.silver.customers_silver").select("customer_id")
stores_df   = spark.table("vsqproject.silver.stores").select("store_id")

silver_sales_integrity = (
    silver_sales
      .join(customers_df, "customer_id", "inner")
      .join(stores_df, "store_id", "inner")
)

In [0]:
display(silver_sales_integrity)

In [0]:
# Save to Silver fact (partition by sale_date for faster queries)
silver_sales_integrity.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .partitionBy("sale_date") \
    .saveAsTable("vsqproject.silver.silver_sales")