In [None]:
# ============================================================================
# WEATHER ALERT POLICY IMPACT MONITOR
# Parametric Insurance Demo
# ============================================================================
#
#   Step 0: Configuration & Imports
#   Step 1: Poll NOAA for Active Severe Weather Alerts (all US)
#   Step 2: Load Active Policies from Lakehouse
#   Step 3: Geo-match alerts to policyholders (lat/lon bounding box)
#   Step 4: Deduplicate against previously processed alerts
#   Step 5: Publish policy.weather.impact events to Event Grid
#   Step 6: Persist impact records to Delta table + summary dashboard
#
# Prerequisites:
#   - schema_load_with_policies.ipynb must have been run first
#   - parametric_insurance_unified_demo_new.ipynb (optional ‚Äî enriches claims context)
#
# Event Grid event published:
#   Type:    policy.weather.impact
#   Subject: weather/alert/<alert_id>/policy/<policy_id>
#   Data:    { alert details, policy details, impact assessment }
# ============================================================================

# üå©Ô∏è Weather Alert Policy Impact Monitor

| Step | Description | Source | Event Grid |
|------|-------------|--------|------------|
| 1 | Poll severe weather alerts | **NOAA API** (free) | ‚Äî |
| 2 | Load active policies | **Fabric Lakehouse** | ‚Äî |
| 3 | Geo-match (lat/lon radius) | Spark computation | ‚Äî |
| 4 | Deduplicate alerts | Delta table lookup | ‚Äî |
| 5 | Publish impact events | ‚Äî | `policy.weather.impact` |
| 6 | Save + dashboard | Delta tables | ‚Äî |

> ‚ö° Run this notebook on a schedule (e.g. every 15 minutes) or trigger it manually for a demo.

---
## üîß Step 0 ‚Äî Configuration & Imports

In [None]:
%pip install azure-core --quiet

In [None]:
import os
import json
import uuid
import math
import requests
import warnings
import notebookutils
from datetime import datetime, timedelta, timezone
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Tuple
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType,
    DoubleType, TimestampType, BooleanType, FloatType
)

warnings.filterwarnings('ignore')

# ---------------------------------------------------------------------------
# CONFIGURATION ‚Äî Edit these values for your environment
# ---------------------------------------------------------------------------

@dataclass
class AlertMonitorConfig:
    """Configuration for the weather alert policy impact monitor."""

    # -- NOAA API --
    noaa_api_url: str = "https://api.weather.gov"
    noaa_user_agent: str = "ParametricInsuranceDemo/1.0 (kalateef@microsoft.com)"

    # Alert severity filter ‚Äî only process alerts at or above this severity.
    # NOAA severity levels (ascending): Unknown, Minor, Moderate, Severe, Extreme
    min_severity: str = "Severe"  # Moderate | Severe | Extreme

    # Alert urgency filter ‚Äî Immediate | Expected | Future | Past | Unknown
    min_urgency: str = "Expected"

    # Geographic match radius in kilometres around each policy's lat/lon.
    # NOAA alert polygons can cover large areas; 50 km is a reasonable proxy
    # for "meaningfully close" without being overly broad.
    alert_radius_km: float = 50.0

    # Deduplication window: don't re-raise an impact for the same
    # (alert_id, policy_id) pair within this many hours.
    dedup_window_hours: int = 6

    # -- Azure Event Grid (optional ‚Äî leave blank for local-only mode) --
    eventgrid_topic_endpoint: str = "<eventgrid-endpoint>"
    eventgrid_topic_key: str = "<eventgrid-key>"

    # Event type published to Event Grid
    EVT_POLICY_WEATHER_IMPACT: str = "policy.weather.impact"


config = AlertMonitorConfig()

# Override from Variable Library or environment
try:
    env_lib = notebookutils.variableLibrary.getLibrary("environmentVariables")
    config.eventgrid_topic_endpoint = getattr(env_lib, "EVENTGRID_TOPIC_ENDPOINT", config.eventgrid_topic_endpoint)
    config.eventgrid_topic_key      = getattr(env_lib, "EVENTGRID_TOPIC_KEY",      config.eventgrid_topic_key)
    config.noaa_user_agent          = getattr(env_lib, "NOAA_USER_AGENT",          config.noaa_user_agent)
except Exception:
    pass

# Notebook widget overrides (for pipeline parameterisation)
try:
    config.eventgrid_topic_endpoint = config.eventgrid_topic_endpoint or notebookutils.mssparkutils.widgets.get("eventgrid_endpoint")
    config.eventgrid_topic_key      = config.eventgrid_topic_key      or notebookutils.mssparkutils.widgets.get("eventgrid_key")
except Exception:
    pass

EVENTGRID_ENABLED = bool(
    config.eventgrid_topic_endpoint and
    config.eventgrid_topic_key and
    config.eventgrid_topic_endpoint != "<eventgrid-endpoint>"
)

spark = SparkSession.builder.getOrCreate()

SEVERITY_ORDER = ["Unknown", "Minor", "Moderate", "Severe", "Extreme"]
URGENCY_ORDER  = ["Unknown", "Past", "Future", "Expected", "Immediate"]

RUN_ID  = uuid.uuid4().hex[:8].upper()
now_utc = datetime.now(timezone.utc).replace(tzinfo=None)   # naive UTC for Spark

print(f"üå©Ô∏è  Weather Alert Policy Impact Monitor")
print(f"   Run ID:           {RUN_ID}")
print(f"   Timestamp (UTC):  {now_utc.isoformat()}Z")
print(f"   Min severity:     {config.min_severity}")
print(f"   Match radius:     {config.alert_radius_km} km")
print(f"   Dedup window:     {config.dedup_window_hours} h")
print(f"   Event Grid:       {'‚úÖ ENABLED' if EVENTGRID_ENABLED else '‚ö†Ô∏è  DISABLED (local-only mode)'}")

---
## üì° Event Grid Client + Utility Functions

In [None]:
# ============================================================================
# Lightweight Event Grid publisher ‚Äî no SDK dependency
# ============================================================================

class NotebookEventGridClient:
    """Publishes CloudEvents-compatible events via the Event Grid REST API."""

    def __init__(self, endpoint: str, key: str):
        self.endpoint = endpoint.rstrip("/")
        self.key = key
        self.audit_log: List[Dict[str, Any]] = []
        self._counter = 0

    def publish_event(
        self,
        event_type: str,
        subject: str,
        data: Dict[str, Any],
        data_version: str = "1.0",
    ) -> bool:
        event_id   = str(uuid.uuid4())
        event_time = datetime.utcnow().isoformat() + "Z"
        self._counter += 1

        payload = [{
            "id":          event_id,
            "eventType":   event_type,
            "subject":     subject,
            "eventTime":   event_time,
            "dataVersion": data_version,
            "data":        data,
        }]

        log_entry = {
            "seq":        self._counter,
            "event_id":   event_id,
            "event_type": event_type,
            "subject":    subject,
            "event_time": event_time,
            "data_summary": json.dumps({k: v for k, v in data.items() if k in (
                "policy_id", "business_name", "alert_id", "alert_event",
                "alert_severity", "city", "state"
            )}),
        }

        try:
            resp = requests.post(
                self.endpoint,
                headers={
                    "aeg-sas-key":  self.key,
                    "Content-Type": "application/json",
                },
                json=payload,
                timeout=10,
            )
            resp.raise_for_status()
            log_entry["status"] = "published"
            log_entry["error"]  = None
            self.audit_log.append(log_entry)
            return True
        except Exception as exc:
            log_entry["status"] = "failed"
            log_entry["error"]  = str(exc)
            self.audit_log.append(log_entry)
            print(f"  ‚ö†Ô∏è  Event Grid publish failed: {exc}")
            return False


eg_client: Optional[NotebookEventGridClient] = (
    NotebookEventGridClient(config.eventgrid_topic_endpoint, config.eventgrid_topic_key)
    if EVENTGRID_ENABLED else None
)


# ============================================================================
# Geo-distance helper (Haversine formula)
# ============================================================================

def haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """Return the great-circle distance in km between two (lat, lon) points."""
    R = 6371.0
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    dphi   = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)
    a = math.sin(dphi / 2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2)**2
    return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))


def severity_rank(s: str) -> int:
    return SEVERITY_ORDER.index(s) if s in SEVERITY_ORDER else 0

def urgency_rank(u: str) -> int:
    return URGENCY_ORDER.index(u) if u in URGENCY_ORDER else 0


print("‚úÖ Event Grid client + utility functions ready.")

---
## üåê Step 1 ‚Äî Poll NOAA for Active Severe Weather Alerts

In [None]:
def fetch_noaa_alerts(min_severity: str = "Moderate") -> List[Dict[str, Any]]:
    """
    Fetch all active NWS alerts for the contiguous US, filtered by
    minimum severity and urgency.

    Returns a list of normalised alert dicts with a centroid lat/lon
    derived from the alert geometry (or affected area geocoding).
    """
    headers = {"User-Agent": config.noaa_user_agent, "Accept": "application/geo+json"}
    url     = f"{config.noaa_api_url}/alerts/active?status=actual&message_type=alert&region_type=land"

    try:
        resp = requests.get(url, headers=headers, timeout=20)
        resp.raise_for_status()
        features = resp.json().get("features", [])
    except Exception as e:
        print(f"  ‚ö†Ô∏è  NOAA alert fetch error: {e}")
        return []

    print(f"  Raw NOAA features returned: {len(features)}")

    alerts = []
    min_sev_rank = severity_rank(min_severity)
    min_urg_rank = urgency_rank(config.min_urgency)

    for feat in features:
        props = feat.get("properties", {})

        sev = props.get("severity", "Unknown")
        urg = props.get("urgency",  "Unknown")

        if severity_rank(sev) < min_sev_rank:
            continue
        if urgency_rank(urg) < min_urg_rank:
            continue

        # Derive a centroid from geometry (Polygon / MultiPolygon)
        centroid_lat, centroid_lon = None, None
        geometry = feat.get("geometry")
        if geometry and geometry.get("type") == "Polygon":
            coords = geometry["coordinates"][0]  # outer ring
            if coords:
                centroid_lon = sum(c[0] for c in coords) / len(coords)
                centroid_lat = sum(c[1] for c in coords) / len(coords)
        elif geometry and geometry.get("type") == "MultiPolygon":
            all_coords = [c for ring in geometry["coordinates"] for c in ring[0]]
            if all_coords:
                centroid_lon = sum(c[0] for c in all_coords) / len(all_coords)
                centroid_lat = sum(c[1] for c in all_coords) / len(all_coords)

        # Affected areas (used when geometry is absent or as enrichment)
        affected_zones  = props.get("affectedZones", [])  # list of zone URLs
        area_desc       = props.get("areaDesc", "")

        # Parse onset / expires timestamps
        def _ts(s):
            if not s:
                return None
            try:
                return datetime.fromisoformat(s.replace("Z", "+00:00")).replace(tzinfo=None)
            except Exception:
                return None

        alert = {
            "alert_id":      props.get("id", feat.get("id", str(uuid.uuid4()))),
            "alert_event":   props.get("event", "Unknown"),
            "headline":      props.get("headline", ""),
            "description":   props.get("description", "")[:1000],  # truncate for storage
            "instruction":   props.get("instruction", "")[:500],
            "severity":      sev,
            "urgency":       urg,
            "certainty":     props.get("certainty", "Unknown"),
            "area_desc":     area_desc,
            "sender_name":   props.get("senderName", ""),
            "onset":         _ts(props.get("onset")),
            "expires":       _ts(props.get("expires")),
            "sent":          _ts(props.get("sent")),
            "centroid_lat":  centroid_lat,
            "centroid_lon":  centroid_lon,
            "zone_count":    len(affected_zones),
            "fetched_at":    now_utc,
        }
        alerts.append(alert)

    return alerts


print("üåê Polling NOAA for active severe weather alerts...")
raw_alerts = fetch_noaa_alerts(config.min_severity)

# Filter to those with a usable centroid for geo-matching
geo_alerts  = [a for a in raw_alerts if a["centroid_lat"] is not None]
no_geo      = len(raw_alerts) - len(geo_alerts)

print(f"\n  Total qualifying alerts: {len(raw_alerts)}")
print(f"  Alerts with geometry:    {len(geo_alerts)}")
print(f"  Alerts without geometry: {no_geo} (skipped ‚Äî no centroid)")
print()

if geo_alerts:
    # Show breakdown by event type
    from collections import Counter
    event_counts = Counter(a["alert_event"] for a in geo_alerts)
    for evt, cnt in event_counts.most_common(10):
        print(f"  {cnt:3d} √ó {evt}")
else:
    print("  ‚ÑπÔ∏è  No geo-located alerts found at the configured severity level.")
    print("      This is normal during calm weather. The notebook will complete gracefully.")

---
## üè¶ Step 2 ‚Äî Load Active Policies from Lakehouse

In [None]:
print("üè¶ Loading active policies from Lakehouse...")

try:
    policies_df = spark.sql("""
        SELECT
            policy_id,
            business_name,
            business_type,
            city,
            state,
            zip_code,
            address,
            latitude,
            longitude,
            threshold_minutes,
            hourly_rate,
            max_payout,
            contact_email,
            contact_phone,
            status
        FROM policies
        WHERE status = 'active'
          AND latitude  IS NOT NULL
          AND longitude IS NOT NULL
    """)
    policies = [row.asDict() for row in policies_df.collect()]
    print(f"  ‚úÖ Loaded {len(policies)} active policies.")
except Exception as e:
    print(f"  ‚ùå Could not load policies: {e}")
    print("     Ensure schema_load_with_policies.ipynb has been run first.")
    policies = []

if policies:
    states = sorted(set(p["state"] for p in policies))
    print(f"  States covered: {', '.join(states)}")

---
## üìç Step 3 ‚Äî Geo-Match Alerts to Policyholders (Lat/Lon Radius)

In [None]:
print(f"üìç Geo-matching {len(geo_alerts)} alert(s) against {len(policies)} policies...")
print(f"   Match radius: {config.alert_radius_km} km\n")

impacts: List[Dict[str, Any]] = []

for alert in geo_alerts:
    a_lat = alert["centroid_lat"]
    a_lon = alert["centroid_lon"]

    for policy in policies:
        p_lat = float(policy["latitude"])
        p_lon = float(policy["longitude"])

        dist_km = haversine_km(a_lat, a_lon, p_lat, p_lon)

        if dist_km <= config.alert_radius_km:
            # Estimate a risk score based on severity, urgency, and proximity
            sev_score  = severity_rank(alert["severity"]) / (len(SEVERITY_ORDER) - 1)
            urg_score  = urgency_rank(alert["urgency"])   / (len(URGENCY_ORDER)  - 1)
            prox_score = 1.0 - (dist_km / config.alert_radius_km)  # 1 = at centroid, 0 = at boundary
            risk_score = round((sev_score * 0.45 + urg_score * 0.35 + prox_score * 0.20), 3)

            impact_id = f"WI-{RUN_ID}-{uuid.uuid4().hex[:6].upper()}"

            impact = {
                # IDs
                "impact_id":        impact_id,
                "alert_id":         alert["alert_id"],
                "policy_id":        policy["policy_id"],
                "run_id":           RUN_ID,
                # Alert details
                "alert_event":      alert["alert_event"],
                "alert_headline":   alert["headline"],
                "alert_description":alert["description"],
                "alert_instruction":alert["instruction"],
                "alert_severity":   alert["severity"],
                "alert_urgency":    alert["urgency"],
                "alert_certainty":  alert["certainty"],
                "alert_area_desc":  alert["area_desc"],
                "alert_sender":     alert["sender_name"],
                "alert_onset":      alert["onset"],
                "alert_expires":    alert["expires"],
                "alert_centroid_lat": float(a_lat),
                "alert_centroid_lon": float(a_lon),
                # Policy details
                "business_name":    policy["business_name"],
                "business_type":    policy["business_type"],
                "city":             policy["city"],
                "state":            policy["state"],
                "zip_code":         policy["zip_code"],
                "contact_email":    policy["contact_email"],
                "contact_phone":    policy["contact_phone"],
                "threshold_minutes":int(policy["threshold_minutes"]),
                "hourly_rate":      float(policy["hourly_rate"]),
                "max_payout":       float(policy["max_payout"]),
                # Geo match
                "policy_lat":       float(p_lat),
                "policy_lon":       float(p_lon),
                "distance_km":      round(dist_km, 2),
                # Risk
                "risk_score":       risk_score,
                "impact_status":    "pending_notification",
                "created_at":       now_utc,
            }
            impacts.append(impact)

print(f"  Raw geo-matches found: {len(impacts)}")

if not impacts:
    print("\n  ‚ÑπÔ∏è  No policyholders fall within the configured radius for the current alerts.")
    print("      Consider widening alert_radius_km or lowering min_severity for testing.")

---
## üîÅ Step 4 ‚Äî Deduplicate Against Previously Processed Alerts

In [None]:
# Create the weather_impact_events table if it doesn't exist yet
spark.sql("""
    CREATE TABLE IF NOT EXISTS weather_impact_events (
        impact_id          STRING,
        alert_id           STRING,
        policy_id          STRING,
        run_id             STRING,
        alert_event        STRING,
        alert_headline     STRING,
        alert_description  STRING,
        alert_instruction  STRING,
        alert_severity     STRING,
        alert_urgency      STRING,
        alert_certainty    STRING,
        alert_area_desc    STRING,
        alert_sender       STRING,
        alert_onset        TIMESTAMP,
        alert_expires      TIMESTAMP,
        alert_centroid_lat DOUBLE,
        alert_centroid_lon DOUBLE,
        business_name      STRING,
        business_type      STRING,
        city               STRING,
        state              STRING,
        zip_code           STRING,
        contact_email      STRING,
        contact_phone      STRING,
        threshold_minutes  INT,
        hourly_rate        DOUBLE,
        max_payout         DOUBLE,
        policy_lat         DOUBLE,
        policy_lon         DOUBLE,
        distance_km        DOUBLE,
        risk_score         DOUBLE,
        impact_status      STRING,
        eventgrid_status   STRING,
        created_at         TIMESTAMP
    ) USING DELTA
""")

# Load recent impact records for dedup
dedup_cutoff = now_utc - timedelta(hours=config.dedup_window_hours)

try:
    recent_impacts = spark.sql(f"""
        SELECT CONCAT(alert_id, '||', policy_id) AS dedup_key
        FROM   weather_impact_events
        WHERE  created_at >= '{dedup_cutoff.isoformat()}'
    """)
    seen_keys = set(row["dedup_key"] for row in recent_impacts.collect())
except Exception:
    seen_keys = set()

print(f"  Already-processed (alert, policy) pairs in last {config.dedup_window_hours}h: {len(seen_keys)}")

new_impacts = [
    imp for imp in impacts
    if f"{imp['alert_id']}||{imp['policy_id']}" not in seen_keys
]

print(f"  New impacts after dedup: {len(new_impacts)}  (skipped {len(impacts) - len(new_impacts)})")

# Sort by risk score descending for publishing priority
new_impacts.sort(key=lambda x: x["risk_score"], reverse=True)

---
## üì§ Step 5 ‚Äî Publish `policy.weather.impact` Events to Event Grid

In [None]:
if not new_impacts:
    print("‚ÑπÔ∏è  No new impacts to publish.")
else:
    print(f"üì§ Publishing {len(new_impacts)} policy.weather.impact event(s) to Event Grid...\n")

    published, failed, local_only = 0, 0, 0

    for imp in new_impacts:
        subject = f"weather/alert/{imp['alert_id'][:40]}/policy/{imp['policy_id']}"

        # Construct a clean, serialisable event payload
        event_data = {
            "impact_id":          imp["impact_id"],
            "alert_id":           imp["alert_id"],
            "policy_id":          imp["policy_id"],
            "alert_event":        imp["alert_event"],
            "alert_headline":     imp["alert_headline"],
            "alert_severity":     imp["alert_severity"],
            "alert_urgency":      imp["alert_urgency"],
            "alert_area_desc":    imp["alert_area_desc"],
            "alert_onset":        imp["alert_onset"].isoformat() if imp["alert_onset"] else None,
            "alert_expires":      imp["alert_expires"].isoformat() if imp["alert_expires"] else None,
            "business_name":      imp["business_name"],
            "business_type":      imp["business_type"],
            "city":               imp["city"],
            "state":              imp["state"],
            "zip_code":           imp["zip_code"],
            "contact_email":      imp["contact_email"],
            "contact_phone":      imp["contact_phone"],
            "distance_km":        imp["distance_km"],
            "risk_score":         imp["risk_score"],
            "threshold_minutes":  imp["threshold_minutes"],
            "hourly_rate":        imp["hourly_rate"],
            "max_payout":         imp["max_payout"],
            "run_id":             imp["run_id"],
        }

        if eg_client:
            ok = eg_client.publish_event(
                event_type = config.EVT_POLICY_WEATHER_IMPACT,
                subject    = subject,
                data       = event_data,
            )
            if ok:
                imp["eventgrid_status"] = "published"
                published += 1
                print(f"  ‚úÖ Published: {imp['business_name']} ({imp['city']}, {imp['state']}) "
                      f"‚Äî {imp['alert_event']} | risk={imp['risk_score']} | {imp['distance_km']} km")
            else:
                imp["eventgrid_status"] = "failed"
                failed += 1
                print(f"  ‚ùå Failed:    {imp['business_name']} ({imp['policy_id']})")
        else:
            imp["eventgrid_status"] = "local_only"
            local_only += 1
            print(f"  üìã Local:    {imp['business_name']} ({imp['city']}, {imp['state']}) "
                  f"‚Äî {imp['alert_event']} | risk={imp['risk_score']}")

    print(f"\n  Published: {published}  |  Failed: {failed}  |  Local-only: {local_only}")

---
## üíæ Step 6 ‚Äî Persist to Delta + Summary Dashboard

In [None]:
if new_impacts:
    impact_schema = StructType([
        StructField("impact_id",          StringType()),
        StructField("alert_id",           StringType()),
        StructField("policy_id",          StringType()),
        StructField("run_id",             StringType()),
        StructField("alert_event",        StringType()),
        StructField("alert_headline",     StringType()),
        StructField("alert_description",  StringType()),
        StructField("alert_instruction",  StringType()),
        StructField("alert_severity",     StringType()),
        StructField("alert_urgency",      StringType()),
        StructField("alert_certainty",    StringType()),
        StructField("alert_area_desc",    StringType()),
        StructField("alert_sender",       StringType()),
        StructField("alert_onset",        TimestampType()),
        StructField("alert_expires",      TimestampType()),
        StructField("alert_centroid_lat", DoubleType()),
        StructField("alert_centroid_lon", DoubleType()),
        StructField("business_name",      StringType()),
        StructField("business_type",      StringType()),
        StructField("city",               StringType()),
        StructField("state",              StringType()),
        StructField("zip_code",           StringType()),
        StructField("contact_email",      StringType()),
        StructField("contact_phone",      StringType()),
        StructField("threshold_minutes",  IntegerType()),
        StructField("hourly_rate",        DoubleType()),
        StructField("max_payout",         DoubleType()),
        StructField("policy_lat",         DoubleType()),
        StructField("policy_lon",         DoubleType()),
        StructField("distance_km",        DoubleType()),
        StructField("risk_score",         DoubleType()),
        StructField("impact_status",      StringType()),
        StructField("eventgrid_status",   StringType()),
        StructField("created_at",         TimestampType()),
    ])

    df = spark.createDataFrame(new_impacts, schema=impact_schema)
    df.write.format("delta").mode("append").saveAsTable("weather_impact_events")
    print(f"‚úÖ Saved {len(new_impacts)} impact records to Delta table: weather_impact_events")
else:
    print("‚ÑπÔ∏è  Nothing to save ‚Äî no new impacts this run.")

In [None]:
print("=" * 70)
print(f"RUN SUMMARY ‚Äî Run ID: {RUN_ID}")
print("=" * 70)

print(f"  NOAA alerts polled (qualifying severity): {len(raw_alerts)}")
print(f"  NOAA alerts with usable geometry:         {len(geo_alerts)}")
print(f"  Policyholder geo-matches (raw):           {len(impacts)}")
print(f"  New impacts after dedup:                  {len(new_impacts)}")

if new_impacts:
    pub_count  = sum(1 for i in new_impacts if i["eventgrid_status"] == "published")
    fail_count = sum(1 for i in new_impacts if i["eventgrid_status"] == "failed")
    local_count= sum(1 for i in new_impacts if i["eventgrid_status"] == "local_only")
    print(f"  Events published to Event Grid:           {pub_count}")
    print(f"  Events failed to publish:                 {fail_count}")
    print(f"  Events stored locally only:               {local_count}")

print()
print("--- Impacted Policyholders (sorted by risk score) ---")

try:
    display(spark.sql("""
        SELECT
            impact_id,
            policy_id,
            business_name,
            city,
            state,
            alert_event,
            alert_severity,
            distance_km,
            risk_score,
            eventgrid_status,
            created_at
        FROM weather_impact_events
        ORDER BY created_at DESC, risk_score DESC
        LIMIT 50
    """))
except Exception as e:
    print(f"  Could not display table: {e}")

print()
print("--- Alert Breakdown ---")
try:
    display(spark.sql("""
        SELECT
            alert_event,
            alert_severity,
            COUNT(DISTINCT alert_id)  AS distinct_alerts,
            COUNT(DISTINCT policy_id) AS impacted_policies,
            ROUND(AVG(distance_km), 1) AS avg_distance_km,
            ROUND(AVG(risk_score), 3)  AS avg_risk_score
        FROM weather_impact_events
        WHERE run_id = '{run_id}'
        GROUP BY alert_event, alert_severity
        ORDER BY avg_risk_score DESC
    """.replace("{run_id}", RUN_ID)))
except Exception:
    pass

print(f"\n‚úÖ Notebook complete. Next step ‚Üí run weather_impact_email_notifier.ipynb")