In [0]:
from pyspark.sql.functions import sum, count, avg, to_date, round as spark_round, date_trunc
from pyspark.sql import functions as F

# Step 1: Load Silver Table
silver_df = spark.table("retail_demo.silver_sales")

# invoice date to date format
silver_df = silver_df.withColumn("invoice_date", to_date("invoice_ts"))

# GOLD TABLE 1 
# Daily Country-wise Sales Summary
gold_df = silver_df.groupBy("invoice_date", "country").agg(
    spark_round(sum("total_sales"), 2).alias("total_sales"),
    count("stockcode").alias("item_count"),
    spark_round(avg("total_sales"), 2).alias("avg_order_value")
)

gold_df.write.format("delta").mode("overwrite").saveAsTable("retail_demo.gold_sales")

# GOLD TABLE 2 
# Daily Customer-wise Sales Summary
gold_customer_df = silver_df.groupBy("invoice_date", "customer_id").agg(
    spark_round(sum("total_sales"), 2).alias("total_spent"),
    count("stockcode").alias("item_count"),
    spark_round(avg("total_sales"), 2).alias("avg_line_value")
)

gold_customer_df.write.format("delta").mode("overwrite").saveAsTable("retail_demo.gold_sales_by_customer")

# GOLD TABLE 3
# Product-wise Sales Summary
gold_product_df = silver_df.groupBy("stockcode", "description_clean").agg(
    sum("Quantity").alias("total_quantity_sold"),
    spark_round(sum("total_sales"), 2).alias("total_revenue"),
    spark_round(avg("unit_price"), 2).alias("avg_unit_price")
)

gold_product_df.write.format("delta").mode("overwrite").saveAsTable("retail_demo.gold_sales_by_product")

# GOLD TABLE 4
# Hourly Sales Trend
gold_hourly_df = silver_df.groupBy("invoice_date", "hour").agg(
    spark_round(sum("total_sales"), 2).alias("total_sales"),
    count("stockcode").alias("item_count")
)

gold_hourly_df.write.format("delta").mode("overwrite").saveAsTable("retail_demo.gold_hourly_sales")

# GOLD TABLE 5
# Weekly Sales Summary by Country

spark.sql("DROP TABLE IF EXISTS retail_demo.gold_weekly_sales")
silver_df = silver_df.withColumn("week_start", date_trunc("week", "invoice_ts"))
gold_weekly_df = silver_df.groupBy("week_start", "country").agg(
    spark_round(sum("total_sales"), 2).alias("weekly_sales"),
    count("stockcode").alias("total_items")
)
gold_weekly_df.write.format("delta").mode("overwrite").saveAsTable("retail_demo.gold_weekly_sales")

In [0]:
%sql
SELECT * FROM retail_demo.gold_sales LIMIT 10

In [0]:
%sql
SELECT * 
FROM retail_demo.gold_sales_by_customer 
LIMIT 10;

In [0]:
%sql
SELECT * 
FROM retail_demo.gold_sales_by_product 
LIMIT 10;

In [0]:
%sql
SELECT * 
FROM retail_demo.gold_hourly_sales 
LIMIT 10;

In [0]:
%sql
SELECT * 
FROM retail_demo.gold_weekly_sales 
LIMIT 10;