In [0]:
# MAGIC %load_ext autoreload
# MAGIC %autoreload 2

In [0]:
import pyspark.sql.functions as f
from delta.tables import DeltaTable

In [0]:
CATALOG = "pei"
SOURCE_SILVER = f"{CATALOG}.silver.orders_enriched"
TARGET_GOLD = f"{CATALOG}.gold.agg_sales_performance"
WATERMARK_KEY = "gold_agg_sales"

In [0]:
try: 
    watermark_df = (spark.read.table(f"{CATALOG}.default.batch_watermark")
                    .filter(f.col("table_name") == WATERMARK_KEY)
                    .select("last_processed_version")
                    .first())
    
    # If no watermark exists, start from version 0
    last_version = watermark_df.last_processed_version if watermark_df else -1

    changes_df = (spark.read.format("delta")
                  .option("readChangeFeed", "true")
                  .option("startingVersion", last_version + 1)
                  .table(SOURCE_SILVER)
                  .filter(f.col("_change_type").isin("insert", "update_postimage")))
    
    changes_df = changes_df.withColumn("order_year", f.year("order_date"))

    if not changes_df.isEmpty():
        df_batch_agg = (
            changes_df
            .groupBy("order_year", "category", "sub_category", "customer_name")
            .agg(f.sum("profit").alias("batch_profit"))
        )

        batch_years = [row[0] for row in df_batch_agg.select("order_year").distinct().collect()]
        year_filter = ", ".join([f"'{y}'" for y in batch_years])

        target_gold = DeltaTable.forName(spark, "pei.gold.agg_sales_performance")

        (target_gold.alias("t")
        .merge(
            df_batch_agg.alias("s"), 
            f"t.order_year IN ({year_filter}) AND t.order_year = s.order_year AND "
            "t.category = s.category AND t.sub_category = s.sub_category AND "
            "t.customer_name = s.customer_name"
        )
        .whenMatchedUpdate(set = {"total_profit": f.col("t.total_profit") + f.col("s.batch_profit")})
        .whenNotMatchedInsert(values = {
            "order_year": "s.order_year",
            "category": "s.category",
            "sub_category": "s.sub_category",
            "customer_name": "s.customer_name",
            "total_profit": "s.batch_profit"
        })
        .execute())

        latest_cdf_version = (
        spark.sql(f"DESCRIBE HISTORY {SOURCE_SILVER} LIMIT 1")
        .select("version")
        .first()[0]
        )

        spark.sql(f"""
            MERGE INTO pei.default.batch_watermark t
            USING (SELECT 'gold_agg_sales' as tn, {latest_cdf_version} as ver) s
            ON t.table_name = s.tn
            WHEN MATCHED THEN UPDATE SET last_processed_version = s.ver
            WHEN NOT MATCHED THEN INSERT (table_name, last_processed_version) VALUES (s.tn, s.ver)
        """)

    else: 
        print("No new changes found in Silver table.")

except Exception as e: 
    print(f"FAILED: Gold Layer Aggregation. Error: {str(e)}")