In [None]:
from datetime import datetime
from pyspark.sql.functions import col, lit, explode, array, struct, sum as spark_sum, concat
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, StringType

start_time = datetime.now()
print("RLM Revenue Refresh Started at {start_time}")

try:
    # -----------------------------------
    # STEP 1: Wake up the cluster
    spark.range(1).collect()
    print("✅ Cluster is awake. Proceeding with SQL queries...")

    # -----------------------------------
    # STEP 2: Extract weekly revenue data for Region1 and Region2 from SAP using Spark SQL

    # --- Region1 ---
    region1_df = spark.sql(""" 
      WITH ranked_data AS (
      SELECT 
        geography_name,
        fiscalyear,
        fiscalquarter,
        fiscal_week_in_qtr_num,
        product_group,
        revenue,
        LoadDate,
        CAST(LoadDate AS DATE) AS LoadDay,
        ROW_NUMBER() OVER (
          PARTITION BY geography_name, fiscalyear, fiscalquarter, fiscal_week_in_qtr_num, product_group, CAST(LoadDate AS DATE)
          ORDER BY LoadDate DESC
        ) AS rn
      FROM analytics.gold.global_revenue
      WHERE geography_name = 'Total Region1'
    )

    SELECT 
      'Region1' as Category,
      d.FISCALYEAR as FiscalYear,
      d.FISCALQUARTER as FiscalQuarter,
      DATEADD(r.LoadDay, -1) AS Date,
      SUM(r.revenue) AS Revenue
    FROM ranked_data AS r
    INNER JOIN analytics.gold.date AS d
      ON DATEADD(r.LoadDay, -1) = d.day
      AND r.fiscalyear = d.FISCALYEAR
      AND r.fiscalquarter = d.FISCALQUARTER
    WHERE r.rn = 1
    GROUP BY 
      r.geography_name,
      d.FISCALYEAR,
      d.FISCALQUARTER,
      d.FISCALQUARTER_WORK_DAY_NO,
      r.LoadDay
    ORDER BY Date DESC;
    """)


    # --- Region2 ---
    Region2_df = spark.sql("""
      WITH latest_loads AS (
      SELECT 
        CAST(LoadDate AS DATE) AS LoadDay,
        MAX(LoadDate) AS LatestLoadDate
      FROM analytics.gold.global_revenue
      WHERE geography_name = 'Region2'
      GROUP BY CAST(LoadDate AS DATE)
    )

    SELECT 
      r.geography_name as Category,
      r.fiscalyear as FiscalYear,
      r.fiscalquarter as FiscalQuarter,
      DATEADD(CAST(r.LoadDate AS DATE), -1) AS Date,
      SUM(r.revenue) AS Revenue
    FROM analytics.gold.global_revenue r
    JOIN latest_loads l
      ON CAST(r.LoadDate AS DATE) = l.LoadDay
      AND r.LoadDate = l.LatestLoadDate
    INNER JOIN analytics.gold.date AS d
      ON DATEADD(r.LoadDate, -1) = d.day
      AND r.fiscalyear = d.FISCALYEAR
      AND r.fiscalquarter = d.FISCALQUARTER
    WHERE r.geography_name = 'Region2'
    GROUP BY 
      r.geography_name,
      r.fiscalyear,
      r.fiscalquarter,
      CAST(r.LoadDate AS DATE)
    ORDER BY Date DESC;
    """)

    # Ensure data was returned from SAP for both regions
    assert Region1_df is not None and Region1_df.count() > 0, "❌ Region1_df is missing or empty!"
    assert Region2_df is not None and Region2_df.count() > 0, "❌ Region2_df is missing or empty!"
    print("✅ SQL queries completed successfully.")

    # -----------------------------------
    # STEP 3: Load historical revenue from Excel for edge cases (FY25 Q1 & Q2)
    # This step loads pre-processed values maintained by the Finance team until our SAP data catches up.
    try:
        import openpyxl
    except ImportError:
        %pip install openpyxl
        import openpyxl
        dbutils.library.restartPython()

    import pandas as pd

    excel_path = "/Workspace/Users/financial-analytics-hub/R12_PY.xlsx"

    Region1_pd = pd.read_excel(excel_path, sheet_name="Region1")
    Region2_pd = pd.read_excel(excel_path, sheet_name="Region2")

    Region1_excel_raw = spark.createDataFrame(Region1_pd)
    Region2_excel_raw = spark.createDataFrame(Region2_pd)

    print("✅ Excel files read successfully.")

    # Pivoting Excel data: Convert from wide format (dates as columns) to tall format (dates as rows) 
    date_columns = [col_name for col_name in Region1_excel_raw.columns if col_name != 'Category']

    Region1_excel_melted = Region1_excel_raw.select(
        explode(array(*[struct(lit(c).alias("Date"), col(c).alias("Revenue")) for c in date_columns])).alias("x")
    ).select(
        col("x.Date").cast("date").alias("Date"),
        col("x.Revenue").cast("double").alias("Revenue"),
        lit("Region1").alias("Category")
    )

    Region2_excel_melted = Region2_excel_raw.select(
        explode(array(*[struct(lit(c).alias("Date"), col(c).alias("Revenue")) for c in date_columns])).alias("x")
    ).select(
        col("x.Date").cast("date").alias("Date"),
        col("x.Revenue").cast("double").alias("Revenue"),
        lit("Region2").alias("Category")
    )

    # Add fiscal calendar context to Excel data to match SAP output format
    date_table_df = spark.table("analytics.gold.date")

    Region1_excel_joined = (Region1_excel_melted
        .join(date_table_df.select("Date", "FiscalYear", "FiscalQuarter"), "Date", "left")
        .withColumn("FiscalYear", concat(lit("FY"), col("FiscalYear").cast("string"))))

    Region2_excel_joined = (Region2_excel_melted
        .join(date_table_df.select("Date", "FiscalYear", "FiscalQuarter"), "Date", "left")
        .withColumn("FiscalYear", concat(lit("FY"), col("FiscalYear").cast("string"))))

    # Calculate QTD revenue for Excel-based data
    window_spec = Window.partitionBy("FiscalYear", "FiscalQuarter", "Category").orderBy("Date") \
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)

    Region1_excel_qtd = (Region1_excel_joined
        .withColumn("Revenue_QTD", spark_sum("Revenue").over(window_spec))
        .select("Date", "FiscalYear", "FiscalQuarter", col("Revenue_QTD").alias("Revenue"), "Category"))

    Region2_excel_qtd = (Region2_excel_joined
        .withColumn("Revenue_QTD", spark_sum("Revenue").over(window_spec))
        .select("Date", "FiscalYear", "FiscalQuarter", col("Revenue_QTD").alias("Revenue"), "Category"))

    print("✅ Excel pivoting and QTD calculation completed.")

    # -----------------------------------
    # STEP 4: Merge SAP data with Excel data
    # Combines the most up-to-date SAP data with stopgap Excel-based values.
    # This ensures historical completeness while transitioning fully to automated data.
    final_df = (Region1_df
                .unionByName(Region2_df)
                .unionByName(Region1_excel_qtd)
                .unionByName(Region2_excel_qtd))

    final_df_clean = final_df.dropna(subset=["Date", "FiscalYear", "FiscalQuarter", "Revenue", "Category"])
    final_df_clean = final_df_clean.orderBy(col("Date").desc())

    print("✅ Union completed successfully.")

    # -----------------------------------
    # STEP 5: Write the unified QTD revenue to Databricks
    # Output table is consumed by Power BI dashboards to monitor daily revenue vs. targets for Region1 and Region2. 
    final_df_casted = final_df_clean.select(
        col("Date").cast("date").alias("Date"),
        col("FiscalYear").cast("string").alias("FiscalYear"),
        col("FiscalQuarter").cast("string").alias("FiscalQuarter"),
        col("Revenue").cast("double").alias("Revenue"),
        col("Category").cast("string").alias("Category")
    )

    final_df_casted.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("analytics.gold.qtd_rev_r12")

    print("✅ Delta table written successfully.")

# Error handling for unexpected failures
except Exception as e:
    print(f"❌ Job failed with error: {str(e)}")
    raise e

# Log total duration regardless of success/failure
finally:
    end_time = datetime.now()
    print(f"🏁 RLM Revenue Refresh Completed at {end_time}")
    print(f"🕐 Total Duration: {(end_time - start_time).seconds} seconds")
