In [None]:
import dlt
from pyspark.sql.functions import (
    expr, col, current_timestamp, sum as _sum, max as _max, min as _min,
    when, coalesce, lag, lead, window, date_format, to_date, round as _round,
    count, avg, desc
)
from pyspark.sql.window import Window

In [None]:
@dlt.materialized_view(
    name="gold_daily_advertiser_spend_summary",
    comment="Daily spend summary with budget monitoring and ad serving status."
)
def gold_daily_advertiser_spend_summary():
    """
    Core fact table that tracks daily gross/net spend vs budget with real-time status.
    """
    # Aggregate from hourly summaries (batch operation)
    daily_spend = (
        spark.read.table("ad_monitor.silver.silver_hourly_spend_summary")
        .groupBy("advertiser_id", to_date(col("moment")).alias("spend_date"))
        .agg(
            _sum("hourly_gross_spend").alias("daily_gross_spend"),
            _sum("hourly_net_spend").alias("daily_net_spend"),
            count("*").alias("total_transactions"),
            _max("last_event_time").alias("last_transaction_time")
        )
    )
    
    # Join with current budgets (batch-to-batch join)
    budgets = spark.read.table("ad_monitor.silver.silver_advertiser_budgets_current")
    
    return daily_spend.join(
        budgets,
        "advertiser_id",
        "left"
    ).select(
        col("advertiser_id"),
        col("spend_date"),
        col("daily_gross_spend"),
        col("daily_net_spend"),
        col("total_transactions"),
        col("new_budget_value").alias("current_daily_budget"),
        col("last_transaction_time"),
        
        # Budget monitoring calculations
        _round(col("current_daily_budget") - col("daily_gross_spend"), 2).alias("remaining_gross_budget"),
        _round(col("current_daily_budget") - col("daily_net_spend"), 2).alias("remaining_net_budget"),
        
        # Budget utilization percentages
        _round((col("daily_gross_spend") / col("current_daily_budget")) * 100, 2).alias("gross_budget_utilization_pct"),
        _round((col("daily_net_spend") / col("current_daily_budget")) * 100, 2).alias("net_budget_utilization_pct"),
        
        # Status flags
        when(col("daily_gross_spend") >= col("current_daily_budget"), True).otherwise(False).alias("gross_budget_exceeded"),
        when(col("daily_net_spend") >= col("current_daily_budget"), True).otherwise(False).alias("net_budget_exceeded"),
        
        # Ad serving status
        when(col("daily_gross_spend") >= col("current_daily_budget"), "STOPPED")
        .when(col("daily_gross_spend") >= col("current_daily_budget") * 0.95, "CRITICAL")
        .when(col("daily_gross_spend") >= col("current_daily_budget") * 0.8, "WARNING")
        .otherwise("ACTIVE").alias("ad_serving_status"),
        
        current_timestamp().alias("calculated_at")
    )

In [None]:
@dlt.materialized_view(
    name="gold_realtime_spend_monitor",
    comment="Real-time spend monitoring for immediate budget alerts."
)
def gold_realtime_spend_monitor():
    """
    Real-time monitoring table for immediate budget alerts.
    """
    # Using hourly aggregations for faster processing
    hourly_spend = spark.read.table("ad_monitor.silver.silver_hourly_spend_summary")
    
    # Window function to calculate running daily totals
    daily_window = Window.partitionBy("advertiser_id", "moment").orderBy("moment")
    
    running_totals = hourly_spend.withColumn(
        "daily_gross_running_total",
        _sum("hourly_gross_spend").over(daily_window)
    ).withColumn(
        "daily_net_running_total", 
        _sum("hourly_net_spend").over(daily_window)
    )
    
    # Join with current budgets
    budgets = spark.read.table("ad_monitor.silver.silver_advertiser_budgets_current")
    
    return running_totals.join(
        budgets,
        "advertiser_id",
        "left"
    ).select(
        col("advertiser_id"),
        col("ad_monitor.silver.silver_advertiser_budgets_current.moment").alias("moment"),
        col("daily_gross_running_total"),
        col("daily_net_running_total"),
        col("new_budget_value").alias("current_daily_budget"),
        col("last_event_time"),
        
        # Real-time status
        _round(col("current_daily_budget") - col("daily_gross_running_total"), 2).alias("remaining_budget"),
        _round((col("daily_gross_running_total") / col("current_daily_budget")) * 100, 2).alias("budget_utilization_pct"),
        
        when(col("daily_gross_running_total") >= col("current_daily_budget"), "STOP_ADS")
        .when(col("daily_gross_running_total") >= col("current_daily_budget") * 0.95, "URGENT")
        .when(col("daily_gross_running_total") >= col("current_daily_budget") * 0.8, "WARNING")
        .otherwise("ACTIVE").alias("alert_status"),
        
        col("calculated_at")
    )


In [None]:
@dlt.materialized_view(
    name="gold_budget_change_impact",
    comment="Analysis of budget changes and their impact on spending patterns."
)
def gold_budget_change_impact():
    """
    Tracks budget changes over time and their impact on spending.
    Uses materialized view to enable window functions.
    """
    budget_history = spark.read.table("ad_monitor.silver.silver_advertiser_budgets_history")  # Batch read
    
    # Window to track budget changes
    budget_window = Window.partitionBy("advertiser_id").orderBy("moment")
    
    return budget_history.withColumn(
        "previous_budget",
        lag("new_budget_value").over(budget_window)
    ).withColumn(
        "budget_change",
        col("new_budget_value") - coalesce(col("previous_budget"), col("new_budget_value"))
    ).withColumn(
        "budget_change_type",
        when(col("budget_change") > 0, "INCREASE")
        .when(col("budget_change") < 0, "DECREASE")
        .otherwise("INITIAL")
    ).withColumn(
        "budget_change_pct",
        when(col("previous_budget").isNotNull(),
             _round((col("budget_change") / col("previous_budget")) * 100, 2)
        ).otherwise(0)
    ).select(
        col("advertiser_id"),
        col("moment").alias("budget_change_time"),
        col("new_budget_value").alias("new_budget"),
        col("previous_budget"),
        col("budget_change"),
        col("budget_change_pct"),
        col("budget_change_type"),
        current_timestamp().alias("processed_at")
    )

In [None]:
@dlt.materialized_view(
    name="gold_advertiser_performance_metrics",
    comment="Key performance metrics per advertiser for dashboard KPIs."
)
def gold_advertiser_performance_metrics():
    """
    Aggregated performance metrics for executive dashboard.
    """
    daily_summary = dlt.read("gold_daily_advertiser_spend_summary")
    
    # Calculate rolling averages and trends
    advertiser_window = Window.partitionBy("advertiser_id").rowsBetween(-6, 0)
    
    return daily_summary.withColumn(
        "avg_7day_spend",
        _round(avg("daily_gross_spend").over(advertiser_window), 2)
    ).select(
        col("advertiser_id"),
        col("daily_gross_spend"),
        col("daily_net_spend"),
        col("current_daily_budget"),
        col("ad_serving_status"),
        col("avg_7day_spend"),
        current_timestamp().alias("calculated_at")
    )
