In [0]:
from pyspark.sql.functions import col, trim, lower, when, regexp_replace, to_date, year, month

In [0]:
dbutils.widgets.removeAll()

In [0]:
_env = "dev"
target_schema_name = "gold"

In [0]:
input_schema = "silver"
input_table_name = f"{_env}.{input_schema}.{table_name}"

In [0]:
df = spark.read.table(input_table_name)

### Computing aggregated reports.

In [0]:
v_table_name = "report_total_revenue_units_by_category"

# 1. Total Revenue and Units Sold by Product Category
report1 = df.groupBy("Product_Category").agg(
    {"Revenue": "sum", "Units_Sold": "sum"}) \
    .withColumnRenamed("sum(Revenue)", "Total_Revenue") \
    .withColumnRenamed("sum(Units_Sold)", "Total_Units_Sold")

report1.write.format("delta") \
    .mode("overwrite")\
    .option("mergeSchema", "true") \
    .saveAsTable(f"{_env}.{target_schema_name}.{v_table_name}"
)

In [0]:
v_table_name = "report_avg_discount_price_by_segment"

# 2. Average Discount and Price by Customer Segment
report2 = df.groupBy("Customer_Segment").agg(
    {"Discount": "avg", "Price": "avg"}) \
    .withColumnRenamed("avg(Discount)", "Avg_Discount") \
    .withColumnRenamed("avg(Price)", "Avg_Price")

report2.write.format("delta") \
    .mode("overwrite")\
    .option("mergeSchema", "true") \
    .saveAsTable(f"{_env}.{target_schema_name}.{v_table_name}"
)

In [0]:
v_table_name = "report_monthly_revenue_trend"

# 3. Monthly Revenue Trend
report3 = df.groupBy("order_year", "order_month").agg(
    {"Revenue": "sum"}) \
    .withColumnRenamed("sum(Revenue)", "Monthly_Revenue") \
    .orderBy("order_year", "order_month")

report3.write.format("delta") \
    .mode("overwrite")\
    .option("mergeSchema", "true") \
    .saveAsTable(f"{_env}.{target_schema_name}.{v_table_name}"
)

In [0]:
v_table_name = "report_marketing_spend_efficiency"

# 4. Marketing Spend Efficiency by Product Category
report4 = df.groupBy("Product_Category").agg(
    {"Revenue": "sum", "Marketing_Spend": "sum"}
)\
    .withColumnRenamed("sum(Revenue)", "Total_Revenue") \
    .withColumnRenamed("sum(Marketing_Spend)", "Total_Marketing_Spend") \
    .withColumn("Revenue_per_Marketing_Spend", col("Total_Revenue") / col("Total_Marketing_Spend"))

report4.write.format("delta") \
    .mode("overwrite")\
    .option("mergeSchema", "true") \
    .saveAsTable(f"{_env}.{target_schema_name}.{v_table_name}"
)