In [0]:
from pyspark.sql.functions import col, avg, sum as sum_, count, expr
# Load data 
df = spark.read.format("delta").load("/mnt/silver/retail_silver")

df = (
    df.withColumn("SchoolHolidayInt", col("SchoolHoliday").cast("int"))
      .withColumn("StateHolidayInt", expr("CASE WHEN StateHoliday = '1' THEN 1 ELSE 0 END"))
)

# Group 
df_gold = (
    df.groupBy("Product_ID", "Product_Code", "Warehouse", "Product_Category")
      .agg(
          count("*").alias("total_days"),
          sum_("Order_Demand").alias("total_demand"),
          avg("Order_Demand").alias("avg_daily_demand"),
          sum_("Open").alias("open_days_raw"),
          avg("Petrol_price").alias("avg_petrol_price"),
          avg("Promo").alias("avg_promo"),
          expr("IF(AVG(SchoolHolidayInt) > 0, 'Yes', 'No')").alias("had_school_holiday"),
          expr("IF(AVG(StateHolidayInt) > 0, 'Yes', 'No')").alias("had_state_holiday")
      )
      .withColumn("is_open", expr("IF(open_days_raw > 0, 'Yes', 'No')"))
      .withColumn("promo_demand_ratio", col("avg_daily_demand") / (col("avg_promo") + expr("1e-6")))
      .withColumn("promo_demand_level", expr("""
          CASE
              WHEN promo_demand_ratio >= 1000 THEN 'High'
              WHEN promo_demand_ratio >= 100 THEN 'Medium'
              ELSE 'Low'
          END
      """))
      .select(
          "Product_ID", "Product_Code", "Warehouse", "Product_Category",
          "total_days", "total_demand", "avg_daily_demand",
          "is_open", "avg_petrol_price",
          "had_school_holiday", "had_state_holiday",
          "promo_demand_level"
      )
)

# Save results
df_gold.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/gold/retail_kpis")

# SQL table
spark.sql("DROP TABLE IF EXISTS retail_kpis")
spark.sql("""
CREATE TABLE retail_kpis
USING DELTA
LOCATION '/mnt/gold/retail_kpis'
""")

print("Gold table created with KPIs.")
print("total records:", df_gold.count())

Gold table created with KPIs.
total records: 169211
