# 03 - Gold (Analytics) Layer Aggregation

Create business-ready aggregated views for the Streamlit dashboard.

**Key Concepts:**
- Pre-aggregated tables for fast queries
- Multiple grain levels (event, daily, regional)
- Optimized for visualization

**Source:** `{catalog}.{schema}.silver_events`  
**Targets:**
- `gold_events_map` - Individual events for map display
- `gold_daily_summary` - Daily aggregations
- `gold_regional_summary` - Regional aggregations

## Setup

In [None]:
# Parameters
dbutils.widgets.text("catalog", "earthquakes_dev", "Catalog")
dbutils.widgets.text("schema", "usgs", "Schema")

In [None]:
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")

print(f"Catalog: {catalog}")
print(f"Schema: {schema}")

In [None]:
from pyspark.sql.functions import (
    col, count, avg, max as spark_max, min as spark_min, sum as spark_sum,
    round as spark_round, current_timestamp, date_trunc, when, to_date
)

from utils.helpers import (
    get_table_path, 
    write_delta_table, 
    write_delta_table_with_cdf,
    print_table_stats,
    read_incremental_or_full,
    save_checkpoint,
    table_exists
)

In [None]:
# Define table paths
source_table = get_table_path(catalog, schema, "silver_events")
checkpoint_table = get_table_path(catalog, schema, "_checkpoints")

print(f"Source: {source_table}")
print(f"Checkpoint: {checkpoint_table}")

## Read Silver Data

In [None]:
# Read incrementally using CDF (only new/changed records since last run)
df_changes, source_version, is_incremental = read_incremental_or_full(
    spark, source_table, checkpoint_table
)

changes_count = df_changes.count()
print(f"Changed records to process: {changes_count:,}")

if changes_count == 0:
    print("No new records to process, exiting")
    dbutils.notebook.exit("success")

# Drop CDF metadata columns if present
cdf_cols = ["_change_type", "_commit_version", "_commit_timestamp"]
for c in cdf_cols:
    if c in df_changes.columns:
        df_changes = df_changes.drop(c)

# For incremental processing, we need to identify affected dates and regions
# so we can recompute only those aggregations
if is_incremental:
    affected_dates = [row["date"] for row in df_changes.select(
        to_date(col("event_time")).alias("date")
    ).distinct().collect()]
    
    affected_regions = [row["region"] for row in df_changes.select(
        col("region")
    ).filter(col("region").isNotNull()).distinct().collect()]
    
    print(f"Affected dates: {len(affected_dates)}")
    print(f"Affected regions: {len(affected_regions)}")
else:
    affected_dates = None
    affected_regions = None

# For gold layer aggregations, we need the full silver data for affected partitions
df = spark.table(source_table)
print(f"Total silver records: {df.count():,}")

## Gold Table 1: Events for Map Display

Denormalized view with only columns needed for the map visualization.

In [None]:
# Severity category based on SIGNIFICANCE (not magnitude)
# This is more consistent as significance accounts for multiple factors
df_map = df.select(
    # Identifiers
    col("event_id"),
    
    # Location (required for map)
    col("latitude"),
    col("longitude"),
    col("place"),
    col("region"),
    
    # Magnitude info (kept for reference)
    col("magnitude"),
    col("magnitude_category"),
    
    # Significance (primary metric for sizing/coloring)
    col("significance"),
    (col("significance") / 100.0).alias("size_factor"),
    
    # Depth info
    col("depth_km"),
    col("depth_category"),
    
    # Timing
    col("event_time"),
    
    # Alert/Impact
    col("alert_level"),
    col("has_tsunami_warning"),
    col("felt_reports"),
    
    # Severity category based on SIGNIFICANCE score
    # Thresholds: severe >= 600, major >= 400, moderate >= 200, minor >= 100, low < 100
    when(col("significance") >= 600, "severe")
    .when(col("significance") >= 400, "major")
    .when(col("significance") >= 200, "moderate")
    .when(col("significance") >= 100, "minor")
    .otherwise("low").alias("severity"),
    
    # URL for details
    col("detail_url"),
    
    # Processing timestamp
    current_timestamp().alias("_processed_at")
).filter(
    # Only events with valid coordinates
    col("latitude").isNotNull() & col("longitude").isNotNull()
)

print(f"Map events: {df_map.count():,}")

In [None]:
# Write gold_events_map
# For map events, we can use merge with event_id as key for incremental updates
table_path = get_table_path(catalog, schema, "gold_events_map")
write_delta_table_with_cdf(
    df_map, 
    table_path, 
    mode="merge",
    merge_keys=["event_id"],
    enable_cdf=False  # Gold layer doesn't need CDF (end of pipeline)
)

## Gold Table 2: Daily Summary

Aggregated statistics by day for time-series charts.

In [None]:
# For incremental processing, only aggregate affected dates
if is_incremental and affected_dates:
    df_to_agg = df.filter(to_date(col("event_time")).isin(affected_dates))
    print(f"Aggregating {df_to_agg.count():,} records for {len(affected_dates)} affected dates")
else:
    df_to_agg = df
    print(f"Full aggregation of {df_to_agg.count():,} records")

df_daily = df_to_agg.groupBy(
    date_trunc("day", col("event_time")).alias("date")
).agg(
    # Counts
    count("*").alias("total_events"),
    count(when(col("significance") >= 500, 1)).alias("high_significance_events"),
    count(when(col("has_tsunami_warning"), 1)).alias("tsunami_warnings"),
    
    # Significance stats (primary metric)
    spark_round(avg("significance"), 0).alias("avg_significance"),
    spark_max("significance").alias("max_significance"),
    spark_min("significance").alias("min_significance"),
    
    # Magnitude stats (secondary, for reference)
    spark_round(avg("magnitude"), 2).alias("avg_magnitude"),
    spark_max("magnitude").alias("max_magnitude"),
    
    # Depth stats
    spark_round(avg("depth_km"), 2).alias("avg_depth_km"),
    
    # Impact
    spark_sum("felt_reports").alias("total_felt_reports"),
    
    # Counts by significance category
    count(when(col("significance") >= 600, 1)).alias("count_severe"),
    count(when((col("significance") >= 400) & (col("significance") < 600), 1)).alias("count_major"),
    count(when((col("significance") >= 200) & (col("significance") < 400), 1)).alias("count_moderate"),
    count(when((col("significance") >= 100) & (col("significance") < 200), 1)).alias("count_minor"),
    count(when(col("significance") < 100, 1)).alias("count_low"),
).withColumn("_processed_at", current_timestamp())

print(f"Daily records: {df_daily.count():,}")

In [None]:
# Write gold_daily_summary - use merge to update only affected dates
table_path = get_table_path(catalog, schema, "gold_daily_summary")
write_delta_table(
    df_daily, 
    table_path, 
    mode="merge",
    merge_keys=["date"]  # Merge on date to update existing aggregations
)

## Gold Table 3: Regional Summary

Aggregated statistics by region.

In [None]:
# For incremental processing, only aggregate affected regions
if is_incremental and affected_regions:
    df_to_agg = df.filter(col("region").isin(affected_regions))
    print(f"Aggregating {df_to_agg.count():,} records for {len(affected_regions)} affected regions")
else:
    df_to_agg = df
    print(f"Full aggregation of {df_to_agg.count():,} records")

df_regional = df_to_agg.filter(col("region").isNotNull()).groupBy(
    col("region")
).agg(
    # Counts
    count("*").alias("total_events"),
    count(when(col("significance") >= 500, 1)).alias("high_significance_events"),
    
    # Significance stats (primary metric)
    spark_round(avg("significance"), 0).alias("avg_significance"),
    spark_max("significance").alias("max_significance"),
    
    # Magnitude stats (secondary, for reference)
    spark_round(avg("magnitude"), 2).alias("avg_magnitude"),
    spark_max("magnitude").alias("max_magnitude"),
    
    # Depth stats
    spark_round(avg("depth_km"), 2).alias("avg_depth_km"),
    
    # Time range
    spark_min("event_time").alias("first_event"),
    spark_max("event_time").alias("last_event"),
    
    # Centroid for map
    spark_round(avg("latitude"), 4).alias("centroid_lat"),
    spark_round(avg("longitude"), 4).alias("centroid_lon"),
).withColumn("_processed_at", current_timestamp())

print(f"Regional records: {df_regional.count():,}")

In [None]:
# Write gold_regional_summary - use merge to update only affected regions
table_path = get_table_path(catalog, schema, "gold_regional_summary")
write_delta_table(
    df_regional, 
    table_path, 
    mode="merge",
    merge_keys=["region"]  # Merge on region to update existing aggregations
)

# Save checkpoint for silver -> gold processing
save_checkpoint(spark, checkpoint_table, source_table, source_version, changes_count)

## Summary

In [None]:
# Print summary of all gold tables
gold_tables = ["gold_events_map", "gold_daily_summary", "gold_regional_summary"]

for table in gold_tables:
    table_path = get_table_path(catalog, schema, table)
    print_table_stats(spark, table_path)

In [None]:
# Sample from each table
print("\n=== gold_events_map sample ===")
spark.table(get_table_path(catalog, schema, "gold_events_map")) \
    .select("event_id", "significance", "severity", "region", "size_factor") \
    .show(5)

print("\n=== gold_daily_summary sample ===")
spark.table(get_table_path(catalog, schema, "gold_daily_summary")) \
    .orderBy(col("date").desc()) \
    .select("date", "total_events", "avg_significance", "max_significance") \
    .show(5)

print("\n=== gold_regional_summary sample ===")
spark.table(get_table_path(catalog, schema, "gold_regional_summary")) \
    .orderBy(col("total_events").desc()) \
    .select("region", "total_events", "avg_significance", "max_significance") \
    .show(5)

In [None]:
# Return success
dbutils.notebook.exit("success")