In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

SILVER_PATH = "s3://finance-beop-fm/silver/"

spark.sql ("DROP DATABASE IF EXISTS gold CASCADE")
spark.sql ("CREATE DATABASE IF NOT EXISTS gold")

def register_table(df, table_name):
    df.write.mode("overwrite").format("delta").saveAsTable(f"gold.{table_name}")

def create_gold_datamart():
    print(" Starting GOLD layer processing...")
    orders_df = spark.read.parquet(f"{SILVER_PATH}orders")
    payments_df = spark.read.parquet(f"{SILVER_PATH}payments")
    expenses_df = spark.read.parquet(f"{SILVER_PATH}expenses")
    budgets_df = spark.read.parquet(f"{SILVER_PATH}budgets")
    
    create_sales_performance(orders_df, payments_df)
    create_financial_summary(orders_df, payments_df, expenses_df)
    create_daily_kpi(orders_df, payments_df, expenses_df)
    create_customer_360(orders_df, payments_df)
    create_budget_vs_actual(expenses_df, budgets_df)
    create_monthly_report(orders_df, payments_df, expenses_df)
    
    print("  Reading data from silver layer completed...")

def create_sales_performance(orders_df, payments_df):
    sales_df = (
        orders_df.alias("o")
        .join(payments_df.filter(col("payment_status") == "SUCCESS").alias("p"), col("o.order_id") == col("p.order_id"), "left")
        
        .select(
            col("o.order_id"), col("o.customer_id"), col("o.order_date"), col("o.order_amount"),col("o.product_category"),col("p.payment_id"),col("p.paid_amount"),col("p.payment_date"),col("p.payment_method"), 
                
            when(col("p.paid_amount").isNull(), 0 ).otherwise(1).alias("is_paid"),
            (col("o.order_amount") - coalesce(col("p.paid_amount"), lit(0))).alias("pending_amount"),

            year("o.order_date").alias("order_year"),
            month("o.order_date").alias("order_month"),
            quarter("o.order_date").alias("order_quarter"),
            weekofyear("o.order_date").alias("order_week"),
            current_timestamp().alias("gold_load_ts")))
   
    register_table(sales_df, "sales_performance")
    print(f" Sales Performance created: {sales_df.count():,} rows")

def create_financial_summary(orders_df, payments_df, expenses_df):
    revenue_df = (payments_df.filter(col("payment_status") == "SUCCESS") 
                  .groupBy(year("payment_date").alias("year"), month("payment_date").alias("month"))
                  .agg(sum("paid_amount").alias("total_revenue"),
                       countDistinct("order_id").alias("orders_paid")))
    expense_df = (expenses_df.groupBy(year("expense_date").alias("year"), month("expense_date").alias("month")) .agg(sum("expense_amount").alias("total_expenses")))
    summary_df = (revenue_df.join(expense_df, ["year", "month"], "full")
                  .fillna(0)
                  .withColumn("net_profit", col("total_revenue")- col("total_expenses"))
                  .withColumn("expense_ratio_pct", when(col("total_revenue") == 0, 0).otherwise(col("total_expenses") / col("total_revenue") * 100))
                  .withColumn("gold_load_ts", current_timestamp())
                  .orderBy(col("year").desc(), col("month").desc()))
    register_table(summary_df, "financial_summary")
    print(f" Financial Summary created: {summary_df.count():,} rows")

def create_daily_kpi(orders_df, payments_df, expenses_df):
    revenue = (
        payments_df.filter(col("payment_status") == "SUCCESS")
        .groupBy("payment_date")
        .agg(sum("paid_amount").alias("daily_revenue"))
        .withColumnRenamed("payment_date", "date"))
    expenses = (
        expenses_df.groupBy("expense_date")
        .agg(sum("expense_amount").alias("daily_expenses"))
        .withColumnRenamed("expense_date", "date"))
    
    daily_kpi = (revenue.join(expenses, "date", "full")
                 .fillna(0)
                 .withColumn("daily_profit", col("daily_revenue") - col("daily_expenses"))
                 .withColumn("gold_load_ts", current_timestamp())
                 .orderBy(col("date").desc()))
    register_table(daily_kpi, "daily_kpi")
    print(f" Daily KPI created: {daily_kpi.count():,} rows")

def create_customer_360(orders_df, payments_df):
    orders_agg = (orders_df.groupBy("customer_id")
                  .agg(count("*").alias("total_orders"),
                       sum("order_amount").alias("total_order_value"),
                       max("order_date").alias("last_order_date")))
    payments_agg = (payments_df.filter(col("payment_status") == "SUCCESS")
                    .join(orders_df.select("order_id", "customer_id"), "order_id")
                    .groupBy("customer_id")
                    .agg(sum("paid_amount").alias("total_paid")))
    customer_360 = (orders_agg.join(payments_agg, "customer_id", "left")
                    .fillna(0)
                    .withColumn("outstanding_amount", col("total_order_value") - col("total_paid"))
                    .withColumn("gold_load_ts", current_timestamp()))
    register_table(customer_360, "customer_360")
    print(f" Customer 360 created: {customer_360.count():,} rows")

def create_budget_vs_actual(expenses_df, budgets_df):
    expense_monthly = (expenses_df.withColumn("exp_month", date_format("expense_date", "yyyy-MM"))
                       .groupBy("department", "exp_month")
                       .agg(sum("expense_amount").alias("actual_amount"))
                       .withColumnRenamed("department", "exp_dept") )
    budget_vs_actual = (budgets_df.join(expense_monthly,(budgets_df.month == expense_monthly.exp_month) &
                                        (budgets_df.department == expense_monthly.exp_dept), "full")
                        .withColumn("final_department", coalesce(budgets_df.department, col("exp_dept")))
                        .withColumn("final_month", coalesce(budgets_df.month, col("exp_month")))
                        .select(
            col("final_department").alias("department"),
            col("final_month").alias("month"),
            coalesce(col("budget_amount"), lit(0)).alias("budget_amount"),
            coalesce(col("actual_amount"), lit(0)).alias("actual_amount"),
            # Variance: Actual - Budget (Positive means overspent, Negative means under)
            (coalesce(col("actual_amount"), lit(0)) - coalesce(col("budget_amount"), lit(0))).alias("variance"),
            current_timestamp().alias("gold_load_ts")))
    register_table(budget_vs_actual, "budget_vs_actual")
    print(f" Budget vs Actual created: {budget_vs_actual.count():,} rows")

def create_monthly_report(orders_df, payments_df, expenses_df):
    monthly_revenue = (payments_df.filter(col("payment_status") == "SUCCESS")
                  .groupBy(year("payment_date").alias("year"), month("payment_date").alias("month"))
                  .agg(sum("paid_amount").alias("revenue")))
    monthly_expense = (expenses_df.groupBy(year("expense_date").alias("year"), month("expense_date").alias("month"))
                       .agg(sum("expense_amount").alias("expenses")))
    
    report = (monthly_revenue.join(monthly_expense, ["year", "month"], "full")
              .fillna(0)
              .withColumn("profit", col("revenue") - col("expenses"))
              .withColumn("gold_load_ts", current_timestamp())
              .orderBy(col("year").desc(), col("month").desc()))
    register_table(report, "monthly_financial_report")
    print(f" Monthly Financial Report created: {report.count():,} rows")

create_gold_datamart()

 Starting GOLD layer processing...
 Sales Performance created: 8,605 rows
 Financial Summary created: 25 rows
 Daily KPI created: 663 rows
 Customer 360 created: 3,731 rows
 Budget vs Actual created: 120 rows
 Monthly Financial Report created: 25 rows
  Reading data from silver layer completed...
