In [0]:
from pyspark.sql.functions import col, sum, count, avg, desc, month, year

# Load data
df = spark.read.format("delta").load("/mnt/silver/retail_cleaned")

# Save data
df.write.format("delta").mode("overwrite").saveAsTable("retail_orders_gold")

# Sales by Region
sales_by_region = df.groupBy("region").agg(
    sum("sales").alias("total_sales")
)
sales_by_region.write.format("delta").mode("overwrite").saveAsTable("sales_by_region")

# Top Selling Products
top_products = df.groupBy("product_name").agg(
    count("order_id").alias("total_orders"),
    sum("sales").alias("total_revenue")
).orderBy(desc("total_revenue")).limit(10)
top_products.write.format("delta").mode("overwrite").saveAsTable("top_products")

# Monthly Sales
monthly_sales = df.withColumn("month", month("order_date")).groupBy("month").agg(
    sum("sales").alias("monthly_sales")
)
monthly_sales.write.format("delta").mode("overwrite").saveAsTable("monthly_sales")

# Average Shipping Cost
avg_shipping_cost = df.agg(
    avg("shipping_cost").alias("avg_shipping_cost")
)
avg_shipping_cost.write.format("delta").mode("overwrite").saveAsTable("avg_shipping_cost")

# Total Orders Summary
order_count = df.select("order_id").distinct().count()
summary_df = spark.createDataFrame([(order_count,)], ["total_orders"])
summary_df.write.format("delta").mode("overwrite").saveAsTable("total_orders_summary")

print("data saved in tables.")


data saved in tables.
