# Weather & Hazard Sentinel: Multi-Agent Monitoring for the Texas Gulf Coast

**Track:** Agents for Good (Disaster Response & Preparedness)  
**Region:** American Red Cross · Texas Gulf Coast Region  

This notebook implements a multi-agent **Weather & Hazard Sentinel** that ingests live weather data, interprets hazard risk, evaluates triggers for readiness, and generates a brief for regional leadership. It is designed for roles like:

- Regional Planning & Situational Awareness Manager  
- Regional Preparedness Point of Contact (POC)  

and supports **situational awareness, early hazard detection, and response posture decisions**.

## 1. Problem · Solution · Value

### Problem

The Texas Gulf Coast Region faces frequent weather-driven hazards (heavy rain, flooding, heat, severe storms). Right now, situational awareness and readiness decisions often depend on ad-hoc checks of multiple websites, manual reading of discussions, and non-standard notes. This creates three issues:

- **Fragmented data:** Forecasts, alerts, and observations live in different tools.  
- **Inconsistent triggers:** Readiness decisions vary by person and shift.  
- **Limited history:** It is hard to review “what we saw” versus “what actually happened.”

### Solution

Build a **Weather & Hazard Sentinel** agent system that:

1. Pulls **live weather data** for key regional areas (Coastal Bend, Houston Metro, Golden Triangle).  
2. Converts this into **structured hazard risks** (hazard type, timeframe, likelihood, impact, rationale).  
3. Evaluates these risks against a **rule-based trigger library** to compute a recommended **readiness posture** per area.  
4. Generates a **human-readable brief** for regional leadership and logs all results for after-action review.  

The system runs as a **loop agent** that can be scheduled (e.g., Cloud Run + Cloud Scheduler) and supports **checkpointing** for long-running operations.

### Value

For the American Red Cross | Texas Gulf Coast Region, this agent aims to:

- Provide a **single, repeatable pipeline** for hazard monitoring and posture decisions.  
- Make triggers **explicit and tunable** instead of implicit.  
- Preserve a **history of runs, risks, and triggers** for training, after-action reviews, and system improvement.  
- Lay the groundwork for **AI-assisted interpretation** of official bulletins in future iterations.

In [1]:
# 1. Core imports & basic config

import os
import json
import uuid
import datetime as dt
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Optional

import pandas as pd

import logging
logging.basicConfig(level=logging.INFO)

# -------------------------------------------------------
# REGION CONFIG
# -------------------------------------------------------

REGION_NAME = "American Red Cross | Texas Gulf Coast Region"

REGION_AREAS = [
    "Coastal Bend",
    "Houston Metro",
    "Golden Triangle",
]

# Area coordinates for real API calls
REGION_AREA_COORDS = {
    "Coastal Bend":      {"lat": 27.8,  "lon": -97.4},
    "Houston Metro":     {"lat": 29.76, "lon": -95.37},
    "Golden Triangle":   {"lat": 30.08, "lon": -94.13},
}

# -------------------------------------------------------
# HAZARD TYPES
# -------------------------------------------------------

HAZARD_TYPES = [
    "Heavy Rain & Flooding",
    "Severe Storms",
    "Excessive Heat",
    "Wildfire",
]

# -------------------------------------------------------
# TRIGGERS
# -------------------------------------------------------

TRIGGERS = [
    {
        "id": "flood_enhanced_monitoring",
        "hazard": "Heavy Rain & Flooding",
        "min_likelihood": "Medium",
        "min_impact": "Disruptive",
        "recommended_posture": "Enhanced Monitoring",
        "name": "Heavy Rain – Flash Flood Watch",
        "note": "Consider readiness actions for flood-prone areas."
    },
    {
        "id": "flood_response_consideration",
        "hazard": "Heavy Rain & Flooding",
        "min_likelihood": "High",
        "min_impact": "Dangerous",
        "recommended_posture": "Response Consideration",
        "name": "Heavy Rain – Possible Flash Flooding",
        "note": "Discuss shelter readiness and resource pre-positioning."
    },
]

LIKELIHOOD_ORDER = ["Low", "Medium", "High"]
IMPACT_ORDER = ["Nuisance", "Disruptive", "Dangerous"]

# -------------------------------------------------------
# LOAD ALL SECRETS (Kaggle → env vars)
# -------------------------------------------------------

from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()

# Weather API (OpenWeather "Current Weather" FREE endpoint)
WEATHER_API_KEY = user_secrets.get_secret("WEATHER_API_KEY")
WEATHER_API_URL = user_secrets.get_secret("WEATHER_API_URL")

# If user did not set WEATHER_API_URL secret, fallback:
if not WEATHER_API_URL:
    WEATHER_API_URL = "https://api.openweathermap.org/data/2.5/weather"

os.environ["WEATHER_API_KEY"] = WEATHER_API_KEY
os.environ["WEATHER_API_URL"] = WEATHER_API_URL

logging.info(f"Weather API configured: {WEATHER_API_URL}")

# Gemini API key
GEMINI_API_KEY = user_secrets.get_secret("GEMINI_API_KEY")
os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY

# Optional:
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID", "")
GCS_BUCKET = os.getenv("GCS_BUCKET", "")

INFO:root:Weather API configured: https://api.openweathermap.org/data/2.5/weather


## 2. Architecture Overview: “Weather & Hazard Sentinel”

The system is implemented as a **multi-agent pipeline**:

**Data Sources → Tools → Agents → A2A Messages → Memory & Checkpoint → Outputs**

### Core Data Flow (One Monitoring Cycle)

1. **Data Ingestion Agent**  
   - Calls the OpenWeather “Current Weather” API for each area.  
   - Produces a `HazardInputsMessage` with structured forecast slices (e.g., QPF proxy, feels-like temperature) and short bulletins.

2. **Hazard Interpretation Agent**  
   - Reads `HazardInputsMessage`.  
   - Uses **rule-based thresholds** (and optionally Gemini) to assign **likelihood** and **impact** per hazard.  
   - Produces a `HazardRisksMessage` containing a list of `HazardRisk` objects.

3. **Trigger Evaluation Agent**  
   - Compares each `HazardRisk` with a configurable **Trigger Library**.  
   - Computes a recommended **readiness posture** per area (Normal, Enhanced Monitoring, Response Consideration).  
   - Produces a `TriggerResultsMessage`.

4. **Briefing Agent**  
   - Takes `HazardRisksMessage` + `TriggerResultsMessage`.  
   - Generates a concise **Weather & Hazard Brief** for regional leadership (via Gemini when available, with a non-LLM fallback).  
   - Produces a `BriefPacketMessage`.

5. **Memory & Logging Agent**  
   - Logs runs, risks, triggers, and briefs into in-memory tables (convertible to DataFrames).  
   - Maintains a `CheckpointState` (last run time, last posture per area, last run ID, operational period label).

6. **Orchestrator / Scheduler**  
   - Coordinates the entire cycle: **Ingestion → Interpretation → Trigger Evaluation → Briefing → Memory**.  
   - Supports **pause / resume** and is designed to be called by a **Cloud Run + Cloud Scheduler** job in production.

### Agent-to-Agent (A2A) Protocol

Agents exchange **typed messages** instead of raw dicts. This makes the system explicit and testable:

- `HazardInputsMessage` → from Data Ingestion to Hazard Interpretation  
- `HazardRisksMessage` → from Hazard Interpretation to Trigger Evaluation  
- `TriggerResultsMessage` → from Trigger Evaluation to Briefing  
- `BriefPacketMessage` → from Briefing to Memory / outputs  

Each message carries key fields like `area`, `hazard`, `timeframe`, `likelihood`, `impact`, `posture`, `rationale`, and `timestamp`.

### System Architecture — Weather & Hazard Sentinel

```mermaid
flowchart LR
    Source["External Weather APIs\nOpenWeather / NOAA"]
    Ingest["Data Ingestion Agent"]
    HazInt["Hazard Interpretation Agent\nRules + Optional Gemini"]
    Trigger["Trigger Evaluation Agent\nThreshold Rules"]
    Brief["Briefing Agent\nLLM / Fallback"]
    Memory["Memory & Logging Agent"]
    Checkpt["GCS Checkpoint\nCloud Run Persistence"]
    Output["Daily Hazard Brief\nReadiness Posture"]

    Source --> Ingest
    Ingest --> HazInt
    HazInt --> Trigger
    Trigger --> Brief
    Brief --> Memory
    Memory --> Checkpt
    Brief --> Output

    subgraph Runtime
        Ingest
        HazInt
        Trigger
        Brief
        Memory
        Checkpt
    end

## 3. Capstone Features (How This Maps to the Rubric)

This project demonstrates multiple concepts from the course:

- **Multi-Agent System**
  - Data Ingestion Agent (`DataIngestionAgent`)
  - Hazard Interpretation Agent (`HazardInterpretationAgent`)
  - Trigger Evaluation Agent (`TriggerEvaluationAgent`)
  - Briefing Agent (`BriefingAgent`)
  - Memory & Logging Agent (`MemoryLoggingAgent`)
  - Orchestrator / Scheduler (`OrchestratorScheduler`)

- **Tools**
  - External HTTP tool: **OpenWeather Current Weather API** (real live data).
  - LLM tool: **Gemini API** via `google-generativeai` (used for hazard refinement and briefing, with fallback).

- **Sessions & Memory**
  - `MemoryLoggingAgent` stores:
    - Runs (`runs`)
    - Risks (`risks_log`)
    - Fired triggers (`triggers_log`)
    - Briefs (`briefs_log`)
  - `CheckpointState` tracks:
    - `last_run_time`
    - `last_posture_by_area`
    - `last_run_id`
    - `operational_period_label`

- **Long-Running Operations**
  - `OrchestratorScheduler.run_cycle()` represents a **single monitoring cycle**.  
  - A separate `orchestrator_cloud_run_cycle()` (later cell) wraps this with **GCS checkpoint load/save**, suitable for **Cloud Run + Cloud Scheduler**.

These pieces together show a practical, cloud-ready **Weather & Hazard Sentinel** agent.

In [2]:
# 2. A2A message dataclasses (schemas for agent-to-agent messages)

@dataclass
class HazardInputsMessage:
    """
    A2A: from Data Ingestion Agent → Hazard Interpretation Agent.
    Represents raw hazard inputs for a given run.
    """
    run_id: str
    as_of: dt.datetime
    areas: List[str]
    forecasts: List[Dict[str, Any]]   # structured forecast slices
    bulletins: List[str]              # free-text text products


@dataclass
class HazardRisk:
    """
    Internal representation for a single hazard risk in an area.
    """
    area: str
    hazard: str
    timeframe: str
    likelihood: str      # Low / Medium / High
    impact: str          # Nuisance / Disruptive / Dangerous
    rationale: str
    supporting_evidence: List[str]


@dataclass
class HazardRisksMessage:
    """
    A2A: from Hazard Interpretation → Trigger Evaluation Agent.
    """
    run_id: str
    as_of: dt.datetime
    risks: List[HazardRisk]


@dataclass
class AreaTriggerSummary:
    """
    Structured summary of posture + triggers for a given area.
    """
    name: str
    posture: str
    fired_triggers: List[Dict[str, Any]]  # name, rationale, trigger_id


@dataclass
class TriggerResultsMessage:
    """
    A2A: from Trigger Evaluation → Briefing Agent.
    """
    run_id: str
    as_of: dt.datetime
    areas: List[AreaTriggerSummary]


@dataclass
class BriefPacketMessage:
    """
    A2A: from Briefing Agent → Memory / Outputs.
    """
    run_id: str
    as_of: dt.datetime
    markdown_brief: str
    text_brief: str
    posture_overview: Dict[str, str]  # area → posture


@dataclass
class CheckpointState:
    """
    For long-running operations (pause/resume in Cloud Run / scheduler).
    """
    last_run_time: Optional[dt.datetime]
    last_posture_by_area: Dict[str, str]
    last_run_id: Optional[str]
    operational_period_label: str

### Agent-to-Agent (A2A) Message Flow

```mermaid
sequenceDiagram
    participant Ingest as Data Ingestion Agent
    participant HazInt as Hazard Interpretation Agent
    participant Trigger as Trigger Evaluation Agent
    participant Brief as Briefing Agent
    participant Mem as Memory & Logging Agent

    Ingest->>HazInt: HazardInputsMessage
    HazInt->>Trigger: HazardRisksMessage
    Trigger->>Brief: TriggerResultsMessage
    Brief->>Mem: BriefPacketMessage
    Mem-->>Mem: Update CheckpointState

In [3]:
# CELL 3 — Gemini client setup (official Python client, external LLM calls)

!pip install -q google-generativeai

import logging
import json
import os

from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()

# Load key from Kaggle Secrets and export as env var
GEMINI_API_KEY = user_secrets.get_secret("GEMINI_API_KEY")
os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY

try:
    import google.generativeai as genai

    if GEMINI_API_KEY and len(GEMINI_API_KEY) > 0:
        genai.configure(api_key=GEMINI_API_KEY)
        logging.info("Gemini client configured successfully.")
    else:
        logging.warning("GEMINI_API_KEY not set or empty. Gemini calls will be mocked.")
except Exception as e:
    logging.warning(f"Could not import google-generativeai: {e}")
    genai = None

# Use a current, supported model (you can override via env var GEMINI_MODEL)
DEFAULT_GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.5-pro")


def call_gemini(
    prompt: str,
    model_name: str = DEFAULT_GEMINI_MODEL,
    temperature: float = 0.2,
    max_output_tokens: int = 768,
) -> str:
    """
    Safely call Gemini.

    - Handles missing API key / client errors
    - Extracts text from candidates instead of using response.text
    - On error, returns a tagged string so callers can decide to fallback
    """

    if not genai or not GEMINI_API_KEY:
        logging.warning("Gemini not configured; returning fallback.")
        return "[Gemini fallback: key missing or not loaded] " + prompt[:200]

    try:
        model = genai.GenerativeModel(model_name)
        response = model.generate_content(
            prompt,
            generation_config=genai.types.GenerationConfig(
                temperature=temperature,
                max_output_tokens=max_output_tokens,
            ),
        )

        # Try to extract text from the first candidate
        try:
            if getattr(response, "candidates", None):
                parts = response.candidates[0].content.parts
                text = "".join(getattr(p, "text", "") for p in parts)
                if text and text.strip():
                    return text
        except Exception as inner_e:
            logging.warning(f"Could not extract text from Gemini response: {inner_e}")

        # Fallback: short stringified response for debugging
        return "[Gemini raw response] " + str(response)[:500]

    except Exception as e:
        logging.error(f"Gemini call failed: {e}")
        return f"[Gemini error: {e}]"

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m319.9/319.9 kB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
bigframes 2.12.0 requires google-cloud-bigquery-storage<3.0.0,>=2.30.0, which is not installed.
google-cloud-translate 3.12.1 requires protobuf!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.19.5, but you have protobuf 5.29.5 which is incompatible.
ray 2.51.1 requires click!=8.3.0,>=7.0, but you have click 8.3.0 which is incompatible.
bigframes 2.12.0 requires rich<14,>=12.4.4, but you have rich 14.2.0 which is incompatible.
pydrive2 1.21.3 requires cryptography<44, but you have cryptography 46.0.3 which is incompatible.
pydrive2 1.21.3 requires pyOpenSSL<=24.2.1,>=19.1.0, but you have pyopenssl 25.3.0 which is incompatible.
gcsfs 2025.3.0

INFO:root:Gemini client configured successfully.


In [4]:
# 4. External weather API helper (OpenWeather current weather endpoint)

import requests

def fetch_weather_raw(
    lat: float,
    lon: float,
    units: str = "metric",
    api_url: Optional[str] = None,
    api_key: Optional[str] = None
) -> Dict[str, Any]:
    """
    Fetch current weather from OpenWeather 'Current Weather' endpoint.

    Default:
      https://api.openweathermap.org/data/2.5/weather

    It returns fields like:
      - main.temp
      - main.feels_like
      - rain["1h"] (mm) if raining
      - etc.
    """
    if api_url is None:
        api_url = WEATHER_API_URL
    if api_key is None:
        api_key = WEATHER_API_KEY

    if not api_url or not api_key:
        logging.warning("Weather API not configured; returning empty response.")
        return {}

    params = {
        "lat": lat,
        "lon": lon,
        "units": units,
        "appid": api_key,   # OpenWeather uses 'appid'
    }

    try:
        resp = requests.get(api_url, params=params, timeout=10)
        resp.raise_for_status()
        logging.info("Weather API call OK.")
        return resp.json()
    except Exception as e:
        logging.error(f"Weather API call failed: {e}")
        return {}

In [5]:
# 5. Region locations / mapping (example lat/lon per area)

# You can refine these with real coordinates for your region.
REGION_AREA_COORDS = {
    "Coastal Bend": {"lat": 27.8, "lon": -97.4},      # Corpus Christi-ish
    "Houston Metro": {"lat": 29.76, "lon": -95.37},   # Houston
    "Golden Triangle": {"lat": 30.08, "lon": -94.13}, # Beaumont/Port Arthur area
}

## 4. Agents in Detail

### 4.1 Data Ingestion Agent (`DataIngestionAgent`) · Cell 6

- Input: list of region areas (Coastal Bend, Houston Metro, Golden Triangle).  
- Tool: OpenWeather **Current Weather** API (lat, lon → JSON).  
- Logic:
  - Extracts:
    - `temp`, `feels_like` (°C)
    - `rain["1h"]` / `rain["3h"]` (mm) if present
  - Computes:
    - `qpf_inches_24h` as a simple proxy for heavy rain  
  - Maps to hazard slices:
    - `Heavy Rain & Flooding` if QPF proxy ≥ threshold  
    - `Excessive Heat` if feels_like ≥ threshold  
    - `No Significant Hazard` otherwise
  - Optional **demo override** (`demo_force_hazard=True`) forces a heavy rain case for Coastal Bend to clearly demonstrate trigger firing.

- Output: `HazardInputsMessage` with:
  - `forecasts`: list of per-area hazard slices  
  - `bulletins`: short operational summaries (no LLM usage, to conserve quota)

---

### 4.2 Hazard Interpretation Agent (`HazardInterpretationAgent`) · Cell 7

- Input: `HazardInputsMessage`.  
- Logic:
  - `_rule_based_seed()`:
    - Uses thresholds on `qpf_inches_24h` (rain) and `heat_index` (heat) to assign:
      - `likelihood ∈ {Low, Medium, High}`
      - `impact ∈ {Nuisance, Disruptive, Dangerous}`
    - Generates a short quantitative `rationale`.
  - `_refine_with_gemini()` (optional):
    - Sends forecast + bulletins + seed assessment to Gemini.
    - Asks for a strict JSON response (`likelihood`, `impact`, `rationale`).
    - Tries to parse and override the rule-based values.

- Output: `HazardRisksMessage` (list of `HazardRisk` objects).  
- In this notebook run, `use_gemini=False` to avoid quota limits, but the LLM refinement path is implemented and ready.

---

### 4.3 Trigger Evaluation Agent (`TriggerEvaluationAgent`) · Cell 8

- Input: `HazardRisksMessage`.  
- Logic:
  - Compares each `HazardRisk` to a **Trigger Library** defined in `TRIGGERS`.  
  - Each trigger specifies:
    - `hazard`
    - `min_likelihood`
    - `min_impact`
    - `recommended_posture`
  - Uses ordered scales:
    - `LIKELIHOOD_ORDER = ["Low", "Medium", "High"]`
    - `IMPACT_ORDER = ["Nuisance", "Disruptive", "Dangerous"]`
  - For each area:
    - Starts at `Normal`.
    - Upgrades to `Enhanced Monitoring` or `Response Consideration` if any trigger’s conditions are met.
    - Records `fired_triggers` with name and rationale.

- Output: `TriggerResultsMessage` with `areas: List[AreaTriggerSummary]`.

---

### 4.4 Briefing Agent (`BriefingAgent`) · Cell 9

- Input: `HazardRisksMessage` + `TriggerResultsMessage`.  
- Logic:
  - Builds a compact JSON summary of:
    - Region name, timestamp  
    - All hazards per area  
    - Posture and fired triggers per area
  - If `use_gemini=True` and an API key is available:
    - Calls Gemini with a prompt to generate a brief with:
      1. Overview  
      2. Key Hazards by Area  
      3. Recommended Readiness Posture  
    - If Gemini returns an error or raw dump, falls back.
  - Fallback:
    - Generates a plain-text brief style summary that is deterministic and quota-free.

- Output: `BriefPacketMessage` with:
  - `markdown_brief`
  - `text_brief`
  - `posture_overview` (area → posture)

---

### 4.5 Memory & Logging Agent (`MemoryLoggingAgent`) · Cell 10

- Input per cycle: `HazardInputsMessage`, `HazardRisksMessage`, `TriggerResultsMessage`, `BriefPacketMessage`.  
- Responsibilities:
  - Append to:
    - `runs` (one row per cycle)
    - `risks_log` (one row per hazard risk)
    - `triggers_log` (one row per fired trigger)
    - `briefs_log` (one row per brief text)
  - Update `checkpoint`:
    - `last_run_time`
    - `last_posture_by_area`
    - `last_run_id`
    - `operational_period_label`

- Output: `to_dataframes()` helper to inspect everything as pandas DataFrames.

---

### 4.6 Orchestrator / Scheduler (`OrchestratorScheduler`) · Cell 11

- Coordinates the multi-agent pipeline.  
- `run_cycle(...)` implements:
  1. Data Ingestion → `HazardInputsMessage`
  2. Hazard Interpretation → `HazardRisksMessage`
  3. Trigger Evaluation → `TriggerResultsMessage`
  4. Briefing → `BriefPacketMessage`
  5. Memory Logging → checkpoint update  

- Supports `pause()` and `resume()` to conceptually illustrate **long-running jobs** where a scheduler can stop and restart runs over time.

In [6]:
# 6. Data Ingestion Agent (OpenWeather current weather → hazard_inputs)

class DataIngestionAgent:
    """
    Data Ingestion Agent:
      - Calls OpenWeather current weather API for each area.
      - Builds HazardInputsMessage with simple hazard slices + bulletins.
    """

    def __init__(self, region_areas: List[str]):
        self.region_areas = region_areas

    def ingest(
        self,
        time_window: str = "next_24_hours",
        products: Optional[List[str]] = None,
        demo_force_hazard: bool = False,
    ) -> HazardInputsMessage:
        if products is None:
            products = ["openweather_current"]

        run_id = str(uuid.uuid4())
        now = dt.datetime.utcnow()

        forecasts: List[Dict[str, Any]] = []
        bulletins: List[str] = []

        for area in self.region_areas:
            coords = REGION_AREA_COORDS.get(area)
            if not coords:
                continue

            raw = fetch_weather_raw(coords["lat"], coords["lon"])
            if not raw:
                forecasts.append({
                    "area": area,
                    "hazard": "Unknown",
                    "timeframe": time_window,
                    "products": products,
                })
                bulletins.append(
                    f"{area}: Unable to retrieve external weather data at this time."
                )
                continue

            # --- Extract simple hazard features from /data/2.5/weather ---
            main = raw.get("main", {})
            temp = float(main.get("temp", 0.0))
            feels_like = float(main.get("feels_like", temp))
            rain_mm = 0.0
            if "rain" in raw:
                # can be {"1h": mm} or {"3h": mm}
                rain = raw["rain"]
                rain_mm = float(rain.get("1h", rain.get("3h", 0.0)))

            # Convert mm to inches for QPF proxy
            qpf_inches_24h = rain_mm / 25.4 if rain_mm else 0.0
            
            hazards_here = []

            # Heavy Rain & Flooding proxy
            if qpf_inches_24h >= 0.1:
                hazards_here.append({
                    "area": area,
                    "hazard": "Heavy Rain & Flooding",
                    "qpf_inches_24h": qpf_inches_24h,
                    "timeframe": "Next 6–24 hours",
                    "products": products,
                })

            # Excessive Heat proxy using feels_like
            if feels_like >= 35.0:  # Celsius
                hi = feels_like * 1.1  # crude surrogate
                hazards_here.append({
                    "area": area,
                    "hazard": "Excessive Heat",
                    "heat_index": hi,
                    "timeframe": "Afternoon",
                    "products": products,
                })

            if not hazards_here:
                hazards_here.append({
                    "area": area,
                    "hazard": "No Significant Hazard",
                    "timeframe": time_window,
                    "products": products,
                })

            # --- DEMO OVERRIDE (for capstone video) ----------------------
            # If demo_force_hazard=True, force a heavy rain scenario
            # for Coastal Bend so triggers clearly fire.
            if demo_force_hazard and area == "Coastal Bend":
                hazards_here = [{
                    "area": area,
                    "hazard": "Heavy Rain & Flooding",
                    "qpf_inches_24h": 3.2,   # > 3.0 → High & Dangerous in rules
                    "timeframe": "Next 24 hours",
                    "products": products + ["demo_override"],
                }]
            # --------------------------------------------------------------
            
            forecasts.extend(hazards_here)

            # Simple rule-based bulletin without LLM (saves quota)
            summary_text = (
                f"Current temp {temp:.1f}°C (feels like {feels_like:.1f}°C), "
                f"rain last hour {rain_mm:.1f} mm. "
            )
            
            if hazards_here and hazards_here[0]["hazard"] != "No Significant Hazard":
                summary_text += "Potential operational impacts due to highlighted hazards."
            else:
                summary_text += "No significant hazards detected at this time."
            
            bulletins.append(f"{area}: {summary_text}")

        return HazardInputsMessage(
            run_id=run_id,
            as_of=now,
            areas=self.region_areas,
            forecasts=forecasts,
            bulletins=bulletins,
        )


ingestion_agent = DataIngestionAgent(REGION_AREAS)

In [7]:
# CELL 7 — Hazard Interpretation Agent using Gemini (LLM risk analysis)

class HazardInterpretationAgent:
    """
    Hazard Interpretation Agent

    Responsibilities:
    - Read HazardInputsMessage (forecasts + bulletins).
    - Use simple rules to seed likelihood/impact.
    - Optionally refine that assessment with Gemini based on text/bulletins.
    - Output a HazardRisksMessage (A2A: hazard_risks).
    """

    def __init__(self, use_gemini: bool = True):
        """
        Parameters
        ----------
        use_gemini : bool
            If True, try to refine rule-based assessments with Gemini.
            If False, use rule-based seeds only (no external LLM calls).
        """
        self.use_gemini = use_gemini

    def _rule_based_seed(self, fc: Dict[str, Any]) -> Dict[str, str]:
        """
        Simple rule-based seeds for likelihood / impact.
        LLM can refine/confirm this.

        Returns a dict with fields: likelihood, impact, rationale.
        """
        hazard = fc.get("hazard", "Unknown")

        if hazard == "Heavy Rain & Flooding":
            qpf = float(fc.get("qpf_inches_24h", 0.0))
            if qpf >= 3.0:
                return {"likelihood": "High", "impact": "Dangerous", "rationale": f"QPF={qpf:.2f} in/24h."}
            elif qpf >= 1.5:
                return {"likelihood": "Medium", "impact": "Disruptive", "rationale": f"QPF={qpf:.2f} in/24h."}
            elif qpf >= 0.5:
                return {"likelihood": "Low", "impact": "Nuisance", "rationale": f"QPF={qpf:.2f} in/24h."}
            else:
                return {"likelihood": "Low", "impact": "Nuisance", "rationale": "Minimal QPF."}

        if hazard == "Excessive Heat":
            hi = float(fc.get("heat_index", 0.0))
            if hi >= 108:
                return {"likelihood": "High", "impact": "Dangerous", "rationale": f"Heat index={hi:.1f}."}
            elif hi >= 103:
                return {"likelihood": "Medium", "impact": "Disruptive", "rationale": f"Heat index={hi:.1f}."}
            elif hi >= 95:
                return {"likelihood": "Low", "impact": "Nuisance", "rationale": f"Heat index={hi:.1f}."}
            else:
                return {"likelihood": "Low", "impact": "Nuisance", "rationale": "Heat not critical."}

        if hazard == "No Significant Hazard":
            return {
                "likelihood": "Low",
                "impact": "Nuisance",
                "rationale": "No significant hazard indicated.",
            }

        # Fallback for unknown hazards
        return {
            "likelihood": "Low",
            "impact": "Nuisance",
            "rationale": "Default / unknown hazard.",
        }

    def _refine_with_gemini(
        self,
        fc: Dict[str, Any],
        seed: Dict[str, str],
        bulletins: List[str],
    ) -> Dict[str, str]:
        """
        Optional refinement of likelihood/impact using Gemini.
        Returns the same schema as seed: likelihood, impact, rationale.
        """
        if not self.use_gemini:
            return seed

        joined_bulletins = "\n".join(bulletins)
        prompt = (
            "You are an emergency management hazard analyst. "
            "Given the forecast slice and seed assessment below, "
            "decide if the likelihood and impact need to be adjusted. "
            "Respond in strict JSON with fields: likelihood, impact, rationale.\n\n"
            f"Forecast slice:\n{json.dumps(fc)}\n\n"
            f"Seed assessment:\n{json.dumps(seed)}\n\n"
            f"Relevant bulletins:\n{joined_bulletins[:3000]}"
        )

        raw = call_gemini(prompt, max_output_tokens=384)

        # Try to parse JSON from the LLM; if fails, just return seed.
        try:
            start = raw.find("{")
            end = raw.rfind("}")
            if start != -1 and end != -1:
                j = json.loads(raw[start:end + 1])
                likelihood = j.get("likelihood", seed["likelihood"])
                impact = j.get("impact", seed["impact"])
                rationale = j.get("rationale", seed["rationale"])
                return {
                    "likelihood": likelihood,
                    "impact": impact,
                    "rationale": rationale,
                }
        except Exception as e:
            logging.warning(f"Could not parse Gemini hazard refinement: {e}")

        return seed

    def assess(self, inputs_msg: HazardInputsMessage) -> HazardRisksMessage:
        """
        Public interface used by the orchestrator.
        """
        risks: List[HazardRisk] = []

        for fc in inputs_msg.forecasts:
            area = fc.get("area", "Unknown")
            hazard = fc.get("hazard", "Unknown")
            timeframe = fc.get("timeframe", "Next 24 hours")

            seed = self._rule_based_seed(fc)
            refined = self._refine_with_gemini(fc, seed, inputs_msg.bulletins)

            risks.append(
                HazardRisk(
                    area=area,
                    hazard=hazard,
                    timeframe=timeframe,
                    likelihood=refined["likelihood"],
                    impact=refined["impact"],
                    rationale=refined["rationale"],
                    supporting_evidence=fc.get("products", []),
                )
            )

        return HazardRisksMessage(
            run_id=inputs_msg.run_id,
            as_of=inputs_msg.as_of,
            risks=risks,
        )


# For now, keep Gemini OFF here to avoid quota issues
hazard_agent = HazardInterpretationAgent(use_gemini=False)

In [8]:
# 8. Trigger Evaluation Agent (trigger_results A2A message)

class TriggerEvaluationAgent:
    """
    Trigger Evaluation Agent

    Responsibilities:
    - Compare each HazardRisk against the configured TRIGGERS.
    - Compute a recommended readiness posture per area.
    - Emit a TriggerResultsMessage (A2A: trigger_results) with:
        * areas: list of AreaTriggerSummary (name, posture, fired_triggers)
    """

    def __init__(self, triggers: List[Dict[str, Any]]):
        self.triggers = triggers
        # Posture hierarchy for "upgrade" logic
        self.posture_rank = ["Normal", "Enhanced Monitoring", "Response Consideration"]

    def _meets_trigger(self, risk: HazardRisk, trig: Dict[str, Any]) -> bool:
        """
        Return True if a given risk satisfies a trigger's minimum
        likelihood and impact thresholds for the matching hazard.
        """
        if risk.hazard != trig["hazard"]:
            return False

        if LIKELIHOOD_ORDER.index(risk.likelihood) < LIKELIHOOD_ORDER.index(trig["min_likelihood"]):
            return False

        if IMPACT_ORDER.index(risk.impact) < IMPACT_ORDER.index(trig["min_impact"]):
            return False

        return True

    def evaluate(self, risk_msg: HazardRisksMessage) -> TriggerResultsMessage:
        """
        Evaluate a HazardRisksMessage and return a TriggerResultsMessage.

        For each area:
        - Start with posture = "Normal".
        - For each risk and trigger that match, upgrade posture if needed.
        - Record a list of fired_triggers with rationale.
        """
        # Initialize each area with Normal posture
        areas_dict: Dict[str, AreaTriggerSummary] = {}

        for r in risk_msg.risks:
            if r.area not in areas_dict:
                areas_dict[r.area] = AreaTriggerSummary(
                    name=r.area,
                    posture="Normal",
                    fired_triggers=[]
                )

        # Evaluate triggers for each risk
        for r in risk_msg.risks:
            for trig in self.triggers:
                if self._meets_trigger(r, trig):
                    summary = areas_dict[r.area]
                    new_posture = trig["recommended_posture"]

                    # Upgrade posture if new_posture is "higher" in the hierarchy
                    if self.posture_rank.index(new_posture) > self.posture_rank.index(summary.posture):
                        summary.posture = new_posture

                    summary.fired_triggers.append(
                        {
                            "trigger_id": trig["id"],
                            "name": trig["name"],
                            "rationale": f"{r.likelihood} likelihood, {r.impact} impact; {r.rationale}",
                        }
                    )

        return TriggerResultsMessage(
            run_id=risk_msg.run_id,
            as_of=risk_msg.as_of,
            areas=list(areas_dict.values()),
        )


trigger_agent = TriggerEvaluationAgent(TRIGGERS)

In [9]:
# CELL 9 — Briefing Agent (Gemini-generated brief_packet A2A message)

class BriefingAgent:
    """
    Briefing Agent

    Responsibilities:
    - Convert HazardRisksMessage + TriggerResultsMessage into a concise brief
      for regional leadership.
    - Use Gemini to write the narrative when available.
    - Always emit a BriefPacketMessage (A2A: brief_packet) with:
        * markdown_brief
        * text_brief
        * posture_overview (area → posture)
    """

    def __init__(self, region_name: str, use_gemini: bool = True):
        self.region_name = region_name
        self.use_gemini = use_gemini

    def _build_structured_summary(
        self,
        risks_msg: HazardRisksMessage,
        triggers_msg: TriggerResultsMessage,
    ) -> Dict[str, Any]:
        """
        Build a compact JSON-ready summary to feed into Gemini.
        """
        return {
            "region_name": self.region_name,
            "as_of": risks_msg.as_of.isoformat(),
            "risks": [asdict(r) for r in risks_msg.risks],
            "areas": [
                {
                    "name": a.name,
                    "posture": a.posture,
                    "fired_triggers": a.fired_triggers,
                }
                for a in triggers_msg.areas
            ],
        }

    def _fallback_brief_text(
        self,
        risks_msg: HazardRisksMessage,
        triggers_msg: TriggerResultsMessage,
    ) -> str:
        """
        Simple non-LLM brief (used if Gemini not configured or errors).
        """
        run_time = risks_msg.as_of
        lines: List[str] = []

        lines.append(f"Weather & Hazard Brief for {self.region_name}")
        lines.append(f"As of {run_time.isoformat()} UTC\n")

        risks = risks_msg.risks
        if not risks:
            lines.append("Overall: No significant hazards identified for the monitored period.")
        else:
            lines.append("Key Hazards:")
            for r in risks:
                lines.append(
                    f"- {r.area}: {r.hazard} "
                    f"({r.likelihood} likelihood, {r.impact} impact) – {r.timeframe}. "
                    f"{r.rationale}"
                )

        if triggers_msg.areas:
            lines.append("\nRecommended Readiness Posture:")
            for a in triggers_msg.areas:
                if not a.fired_triggers:
                    lines.append(f"- {a.name}: {a.posture} (no triggers fired).")
                else:
                    reasons = "; ".join(
                        f"{t['name']} ({t['rationale']})" for t in a.fired_triggers
                    )
                    lines.append(
                        f"- {a.name}: {a.posture} due to {reasons}."
                    )
        else:
            lines.append(
                "\nRecommended Readiness Posture: Normal operations for all monitored areas."
            )

        return "\n".join(lines)

    def generate(
        self,
        risks_msg: HazardRisksMessage,
        triggers_msg: TriggerResultsMessage,
    ) -> BriefPacketMessage:
        """
        Main entrypoint used by the orchestrator.
        """
        posture_overview = {a.name: a.posture for a in triggers_msg.areas}

        # If no Gemini or use_gemini=False, just use fallback text
        if (not self.use_gemini) or (not GEMINI_API_KEY) or (genai is None):
            brief_text = self._fallback_brief_text(risks_msg, triggers_msg)
        else:
            structured = self._build_structured_summary(risks_msg, triggers_msg)
            prompt = (
                "You are generating an internal weather & hazard brief for the American Red Cross. "
                "Write a concise, plain-text brief for regional leadership, with sections:\n"
                "1) Overview\n2) Key Hazards by Area\n3) Recommended Readiness Posture\n\n"
                "Focus on timing, likelihood, impact, and operational implications. "
                "Avoid overly technical meteorological jargon. "
                "Do not use Markdown formatting.\n\n"
                "Input (JSON):\n\n"
                f"{json.dumps(structured)[:4000]}"
            )

            brief_text = call_gemini(prompt, max_output_tokens=900)

            # If Gemini output looks like an error or raw dump, fall back
            if (
                not brief_text
                or brief_text.startswith("[Gemini error")
                or brief_text.startswith("[Gemini raw response]")
                or brief_text.startswith("[Gemini fallback")
            ):
                logging.warning("BriefingAgent: falling back to non-LLM brief.")
                brief_text = self._fallback_brief_text(risks_msg, triggers_msg)

        markdown_brief = brief_text
        text_brief = brief_text

        return BriefPacketMessage(
            run_id=risks_msg.run_id,
            as_of=risks_msg.as_of,
            markdown_brief=markdown_brief,
            text_brief=text_brief,
            posture_overview=posture_overview,
        )


briefing_agent = BriefingAgent(REGION_NAME, use_gemini=True)

In [10]:
# 10. Memory & Logging Agent (including checkpoint state)

class MemoryLoggingAgent:
    """
    Memory & Logging Agent

    Responsibilities:
    - Store run-level info, risks, triggers, and briefs for EDA/evaluation.
    - Maintain a CheckpointState for long-running operations (pause/resume).
    """

    def __init__(self):
        # Per-cycle logs
        self.runs: List[Dict[str, Any]] = []        # one row per monitoring cycle
        self.risks_log: List[Dict[str, Any]] = []   # one row per HazardRisk
        self.triggers_log: List[Dict[str, Any]] = []  # one row per fired trigger
        self.briefs_log: List[Dict[str, Any]] = []  # one row per brief

        # Checkpoint for long-running ops
        self.checkpoint = CheckpointState(
            last_run_time=None,
            last_posture_by_area={},
            last_run_id=None,
            operational_period_label="Initial",
        )

    def log_cycle(
        self,
        hazard_inputs: HazardInputsMessage,
        risks_msg: HazardRisksMessage,
        trig_msg: TriggerResultsMessage,
        brief_msg: BriefPacketMessage,
    ) -> None:
        """
        Log a full monitoring cycle into in-memory lists and update checkpoint.
        """
        # Run-level log
        self.runs.append({
            "run_id": hazard_inputs.run_id,
            "as_of": hazard_inputs.as_of,
            "areas": ",".join(hazard_inputs.areas),
            "n_forecasts": len(hazard_inputs.forecasts),
            "n_bulletins": len(hazard_inputs.bulletins),
            "n_risks": len(risks_msg.risks),
            "n_trigger_areas": len(trig_msg.areas),
        })

        # Risk log
        for r in risks_msg.risks:
            row = asdict(r)
            row["run_id"] = risks_msg.run_id
            row["as_of"] = risks_msg.as_of
            self.risks_log.append(row)

        # Trigger log (only fired ones)
        for a in trig_msg.areas:
            for t in a.fired_triggers:
                self.triggers_log.append({
                    "run_id": trig_msg.run_id,
                    "as_of": trig_msg.as_of,
                    "area": a.name,
                    "posture": a.posture,
                    "trigger_id": t["trigger_id"],
                    "trigger_name": t["name"],
                    "rationale": t["rationale"],
                })

        # Brief log
        self.briefs_log.append({
            "run_id": brief_msg.run_id,
            "as_of": brief_msg.as_of,
            "brief": brief_msg.text_brief,
        })

        # Checkpoint update
        self.checkpoint = CheckpointState(
            last_run_time=hazard_inputs.as_of,
            last_posture_by_area=brief_msg.posture_overview,
            last_run_id=hazard_inputs.run_id,
            operational_period_label="Ongoing",
        )

    def to_dataframes(self) -> Dict[str, pd.DataFrame]:
        """
        Convert internal logs into pandas DataFrames for analysis and plotting.
        """
        return {
            "runs": pd.DataFrame(self.runs),
            "risks": pd.DataFrame(self.risks_log),
            "triggers": pd.DataFrame(self.triggers_log),
            "briefs": pd.DataFrame(self.briefs_log),
        }


memory_agent = MemoryLoggingAgent()

In [11]:
# 11. Orchestrator / Scheduler (loop agent, pause/resume)

class OrchestratorScheduler:
    """
    Orchestrator / Scheduler

    Responsibilities:
    - Loop / sequential agent coordinating the full A2A pipeline.
    - One "monitoring cycle" = Ingestion → Hazard Interpretation →
      Trigger Evaluation → Briefing → Memory.
    - Supports pause/resume conceptually (for long-running jobs / off-season).
    """

    def __init__(
        self,
        ingestion: DataIngestionAgent,
        hazard_int: HazardInterpretationAgent,
        trigger_eval: TriggerEvaluationAgent,
        briefing: BriefingAgent,
        memory: MemoryLoggingAgent,
    ):
        self.ingestion = ingestion
        self.hazard_int = hazard_int
        self.trigger_eval = trigger_eval
        self.briefing = briefing
        self.memory = memory
        self.paused = False

    def pause(self) -> None:
        """Pause long-running operations (no new cycles)."""
        self.paused = True

    def resume(self) -> None:
        """
        Resume from paused state.

        In a Cloud Run deployment, a higher-level wrapper can also reload
        checkpoint state from storage before calling run_cycle().
        """
        self.paused = False

    def run_cycle(
        self,
        time_window: str = "next_72_hours",
        products: Optional[List[str]] = None,
        demo_force_hazard: bool = False,
    ):
        """
        Run one monitoring cycle over all configured areas.

        Flow:
          1) Data Ingestion Agent  → hazard_inputs (HazardInputsMessage)
          2) Hazard Interpretation → hazard_risks (HazardRisksMessage)
          3) Trigger Evaluation    → trigger_results (TriggerResultsMessage)
          4) Briefing Agent        → brief_packet (BriefPacketMessage)
          5) Memory & Logging      → store logs + update checkpoint

        Returns
        -------
        (hazard_inputs_msg, risks_msg, trig_msg, brief_msg) or None if paused.
        """
        if self.paused:
            logging.info("Orchestrator is paused; skipping cycle.")
            return None

        # 1) Data Ingestion Agent → hazard_inputs
        hazard_inputs_msg = self.ingestion.ingest(
            time_window=time_window,
            products=products,
            demo_force_hazard=demo_force_hazard,
        )

        # 2) Hazard Interpretation Agent → hazard_risks
        risks_msg = self.hazard_int.assess(hazard_inputs_msg)

        # 3) Trigger Evaluation Agent → trigger_results
        trig_msg = self.trigger_eval.evaluate(risks_msg)

        # 4) Briefing Agent → brief_packet
        brief_msg = self.briefing.generate(risks_msg, trig_msg)

        # 5) Memory & Logging Agent
        self.memory.log_cycle(hazard_inputs_msg, risks_msg, trig_msg, brief_msg)

        return hazard_inputs_msg, risks_msg, trig_msg, brief_msg


orchestrator = OrchestratorScheduler(
    ingestion=ingestion_agent,
    hazard_int=hazard_agent,
    trigger_eval=trigger_agent,
    briefing=briefing_agent,
    memory=memory_agent,
)

## 5. Demo: Single Monitoring Cycle (Heavy Rain Scenario)

In **Cell 12**, we run a single monitoring cycle with:

```python
result = orchestrator.run_cycle(demo_force_hazard=True)

In [12]:
# 12. Single monitoring cycle demo (run in Kaggle to test)

# Set demo_force_hazard=True to show a clear heavy rain scenario
result = orchestrator.run_cycle(demo_force_hazard=True)

if result is None:
    print("No cycle run (orchestrator paused).")
else:
    hazard_inputs_msg, risks_msg, trig_msg, brief_msg = result

    print("=== Generated Brief ===")
    print(brief_msg.text_brief)
    print("\n=== Posture Overview ===")
    for area, posture in brief_msg.posture_overview.items():
        print(f"- {area}: {posture}")

    dfs = memory_agent.to_dataframes()
    print("\n=== Runs log ===")
    display(dfs["runs"])
    print("\n=== Risks log ===")
    display(dfs["risks"])
    print("\n=== Triggers log ===")
    display(dfs["triggers"])

INFO:root:Weather API call OK.
INFO:root:Weather API call OK.
INFO:root:Weather API call OK.


=== Generated Brief ===
Weather & Hazard Brief for American Red Cross | Texas Gulf Coast Region
As of 2025-11-27T05:20:05.993705 UTC

Key Hazards:
- Coastal Bend: Heavy Rain & Flooding (High likelihood, Dangerous impact) – Next 24 hours. QPF=3.20 in/24h.
- Houston Metro: No Significant Hazard (Low likelihood, Nuisance impact) – next_72_hours. No significant hazard indicated.
- Golden Triangle: No Significant Hazard (Low likelihood, Nuisance impact) – next_72_hours. No significant hazard indicated.

Recommended Readiness Posture:
- Coastal Bend: Response Consideration due to Heavy Rain – Flash Flood Watch (High likelihood, Dangerous impact; QPF=3.20 in/24h.); Heavy Rain – Possible Flash Flooding (High likelihood, Dangerous impact; QPF=3.20 in/24h.).
- Houston Metro: Normal (no triggers fired).
- Golden Triangle: Normal (no triggers fired).

=== Posture Overview ===
- Coastal Bend: Response Consideration
- Houston Metro: Normal
- Golden Triangle: Normal

=== Runs log ===


Unnamed: 0,run_id,as_of,areas,n_forecasts,n_bulletins,n_risks,n_trigger_areas
0,d15ba78f-75cc-4605-be68-23eeb06cc840,2025-11-27 05:20:05.993705,"Coastal Bend,Houston Metro,Golden Triangle",3,3,3,3



=== Risks log ===


Unnamed: 0,area,hazard,timeframe,likelihood,impact,rationale,supporting_evidence,run_id,as_of
0,Coastal Bend,Heavy Rain & Flooding,Next 24 hours,High,Dangerous,QPF=3.20 in/24h.,"[openweather_current, demo_override]",d15ba78f-75cc-4605-be68-23eeb06cc840,2025-11-27 05:20:05.993705
1,Houston Metro,No Significant Hazard,next_72_hours,Low,Nuisance,No significant hazard indicated.,[openweather_current],d15ba78f-75cc-4605-be68-23eeb06cc840,2025-11-27 05:20:05.993705
2,Golden Triangle,No Significant Hazard,next_72_hours,Low,Nuisance,No significant hazard indicated.,[openweather_current],d15ba78f-75cc-4605-be68-23eeb06cc840,2025-11-27 05:20:05.993705



=== Triggers log ===


Unnamed: 0,run_id,as_of,area,posture,trigger_id,trigger_name,rationale
0,d15ba78f-75cc-4605-be68-23eeb06cc840,2025-11-27 05:20:05.993705,Coastal Bend,Response Consideration,flood_enhanced_monitoring,Heavy Rain – Flash Flood Watch,"High likelihood, Dangerous impact; QPF=3.20 in..."
1,d15ba78f-75cc-4605-be68-23eeb06cc840,2025-11-27 05:20:05.993705,Coastal Bend,Response Consideration,flood_response_consideration,Heavy Rain – Possible Flash Flooding,"High likelihood, Dangerous impact; QPF=3.20 in..."


## 6. Multiple Cycles & Long-Running Behavior

In a real region workflow, the sentinel wouldn’t run just once.  
It would run **every morning / every shift / before a weather event**, so we simulate that here.

This cell runs **three back-to-back monitoring cycles**:

```python
for i in range(3):
    print(f"\n--- Monitoring cycle #{i+1} ---")
    orchestrator.run_cycle()

In [13]:
briefing_agent.use_gemini = False

# 13. Multiple cycles simulation (conceptual long-running behavior)

for i in range(3):
    print(f"\n--- Monitoring cycle #{i+1} ---")
    orchestrator.run_cycle()

dfs_multi = memory_agent.to_dataframes()
print("\n=== All runs so far ===")
display(dfs_multi["runs"])

print("\n=== Checkpoint state ===")
print(memory_agent.checkpoint)


--- Monitoring cycle #1 ---


INFO:root:Weather API call OK.
INFO:root:Weather API call OK.
INFO:root:Weather API call OK.
INFO:root:Weather API call OK.



--- Monitoring cycle #2 ---


INFO:root:Weather API call OK.
INFO:root:Weather API call OK.
INFO:root:Weather API call OK.



--- Monitoring cycle #3 ---


INFO:root:Weather API call OK.
INFO:root:Weather API call OK.



=== All runs so far ===


Unnamed: 0,run_id,as_of,areas,n_forecasts,n_bulletins,n_risks,n_trigger_areas
0,d15ba78f-75cc-4605-be68-23eeb06cc840,2025-11-27 05:20:05.993705,"Coastal Bend,Houston Metro,Golden Triangle",3,3,3,3
1,26c49663-a16a-4c1d-9abf-443b4ad30300,2025-11-27 05:20:14.400049,"Coastal Bend,Houston Metro,Golden Triangle",3,3,3,3
2,e3eb8c0c-26ff-4859-a614-890168c4e14f,2025-11-27 05:20:14.758070,"Coastal Bend,Houston Metro,Golden Triangle",3,3,3,3
3,96e138d9-9743-462e-9201-fa8aea871ee0,2025-11-27 05:20:15.109789,"Coastal Bend,Houston Metro,Golden Triangle",3,3,3,3



=== Checkpoint state ===
CheckpointState(last_run_time=datetime.datetime(2025, 11, 27, 5, 20, 15, 109789), last_posture_by_area={'Coastal Bend': 'Normal', 'Houston Metro': 'Normal', 'Golden Triangle': 'Normal'}, last_run_id='96e138d9-9743-462e-9201-fa8aea871ee0', operational_period_label='Ongoing')


## 7. Checkpointing to Cloud Storage (GCS)

To run this system as a **cloud service**, we need a way to:

- Remember the **last run** across container restarts.  
- Persist the last known **posture by area** and `run_id`.  

This section adds two helpers that talk to **Google Cloud Storage (GCS)**:

- `save_checkpoint_to_gcs(checkpoint: CheckpointState)`
- `load_checkpoint_from_gcs() -> Optional[CheckpointState]`

### How It Works

- `CHECKPOINT_BLOB_NAME = "weather_hazard_sentinel/checkpoint.json"`  
  → This is the path inside the GCS bucket where the checkpoint JSON is stored.

- `save_checkpoint_to_gcs(...)`:
  - Serializes:
    - `last_run_time` (ISO format)
    - `last_posture_by_area`
    - `last_run_id`
    - `operational_period_label`
  - Uploads the JSON to `gs://<GCS_BUCKET>/weather_hazard_sentinel/checkpoint.json`.

- `load_checkpoint_from_gcs()`:
  - Checks whether the blob exists.  
  - If it does, downloads + parses JSON back into a `CheckpointState`.  
  - If not, returns `None`.

### Requirements

To actually use this in production, you need:

- `google-cloud-storage` installed in the environment.  
- A valid `GCP_PROJECT_ID`.  
- A `GCS_BUCKET` with correct permissions for the service account (Cloud Run / Cloud Functions).  

In Kaggle, these functions are defined and **safe to call**, but the notebook logs:

> “GCS storage not configured; skipping checkpoint load/save.”

which is expected because there is no GCP auth in this environment.

### Deployment Flow — Cloud Run + Cloud Scheduler

```mermaid
flowchart TD
    A["Developer Notebook / GitHub Repo"]
    B["Container Build & Push"]
    C["Cloud Run Service\nweather-hazard-sentinel"]
    D["Orchestrator Cycle\nrun_cycle()"]
    E["GCS Checkpoint\ncheckpoint.json"]

    A --> B --> C
    C -->|"POST /run"| D
    D --> E

    subgraph Scheduler
        F["Cloud Scheduler\nCron Job (HTTP)"]
    end

    F -->|"Trigger /run"| C

In [14]:
# 14. GCS checkpoint save/load helpers (for Cloud Run / cron deployment)

# NOTE:
# - This requires `google-cloud-storage` to be installed and proper service account auth.
#   In Kaggle, you can install the library, but auth needs a key or Workload Identity in Cloud Run.
# - You won't usually run this in Kaggle; it's for the Cloud Run service.

try:
    from google.cloud import storage
except Exception as e:
    storage = None
    logging.warning(f"google-cloud-storage not available: {e}")

CHECKPOINT_BLOB_NAME = "weather_hazard_sentinel/checkpoint.json"

def save_checkpoint_to_gcs(checkpoint: CheckpointState):
    if not storage or not GCS_BUCKET:
        logging.warning("GCS storage not configured; skipping checkpoint save.")
        return

    client = storage.Client(project=GCP_PROJECT_ID or None)
    bucket = client.bucket(GCS_BUCKET)
    blob = bucket.blob(CHECKPOINT_BLOB_NAME)

    payload = {
        "last_run_time": checkpoint.last_run_time.isoformat() if checkpoint.last_run_time else None,
        "last_posture_by_area": checkpoint.last_posture_by_area,
        "last_run_id": checkpoint.last_run_id,
        "operational_period_label": checkpoint.operational_period_label,
    }

    blob.upload_from_string(json.dumps(payload), content_type="application/json")
    logging.info(f"Checkpoint saved to gs://{GCS_BUCKET}/{CHECKPOINT_BLOB_NAME}")


def load_checkpoint_from_gcs() -> Optional[CheckpointState]:
    if not storage or not GCS_BUCKET:
        logging.warning("GCS storage not configured; skipping checkpoint load.")
        return None

    client = storage.Client(project=GCP_PROJECT_ID or None)
    bucket = client.bucket(GCS_BUCKET)
    blob = bucket.blob(CHECKPOINT_BLOB_NAME)

    if not blob.exists():
        logging.info("No checkpoint found in GCS; returning None.")
        return None

    data = json.loads(blob.download_as_text())
    last_run_time = dt.datetime.fromisoformat(data["last_run_time"]) if data["last_run_time"] else None

    cp = CheckpointState(
        last_run_time=last_run_time,
        last_posture_by_area=data.get("last_posture_by_area", {}),
        last_run_id=data.get("last_run_id"),
        operational_period_label=data.get("operational_period_label", "Ongoing"),
    )

    logging.info(f"Checkpoint loaded from gs://{GCS_BUCKET}/{CHECKPOINT_BLOB_NAME}")
    return cp

## 8. Cloud Run Integration (Cron-Driven Agent)

In production, we want the sentinel to run on a **schedule** without opening the notebook.

A simple pattern is:

- Deploy the core logic to **Cloud Run** (or Cloud Functions).  
- Trigger it with **Cloud Scheduler** (e.g., every day at 07:30, or every 3 hours).  

This cell defines `orchestrator_cloud_run_cycle()` as the **core function** a web handler would call.

### Flow in `orchestrator_cloud_run_cycle()`

1. **Load checkpoint** from GCS (if present):

   ```python
   cp = load_checkpoint_from_gcs()
   if cp:
       memory_agent.checkpoint = cp

In [15]:
# 15. Example wiring for Cloud Run main handler (conceptual)

"""
This cell provides a function you can reuse in a Cloud Run service.

- Use `orchestrator_cloud_run_cycle` as the core logic.
- Wrap it in a Flask/FastAPI/Functions handler to be triggered by Cloud Scheduler.
"""

def orchestrator_cloud_run_cycle():
    """
    Core monitoring cycle for Cloud Run / cron job.
    - Load checkpoint from GCS
    - Run one cycle
    - Save updated checkpoint
    """
    cp = load_checkpoint_from_gcs()
    if cp:
        # In a more advanced version, you might adjust behavior based on cp.
        memory_agent.checkpoint = cp

    result = orchestrator.run_cycle()

    if result is None:
        logging.info("No cycle run (orchestrator paused in cloud context).")
        return

    # After a successful cycle, save checkpoint
    save_checkpoint_to_gcs(memory_agent.checkpoint)
    logging.info("Cloud Run cycle completed.")

    return "OK"

## 9. Local Sanity Check for Cloud Run Cycle

We also include a quick **local test**:

```python
try:
    resp = orchestrator_cloud_run_cycle()
    print("orchestrator_cloud_run_cycle response:", resp)
except Exception as e:
    print("Error in orchestrator_cloud_run_cycle (expected if no GCS auth):", e)

In [16]:
# 16. Placeholder: simple local test for Cloud Run cycle function

# You won't call this in Cloud Run; but you can run once in Kaggle just to ensure
# the function definition doesn't crash (GCS may still be unconfigured here).
try:
    resp = orchestrator_cloud_run_cycle()
    print("orchestrator_cloud_run_cycle response:", resp)
except Exception as e:
    print("Error in orchestrator_cloud_run_cycle (expected if no GCS auth):", e)

INFO:root:Weather API call OK.
INFO:root:Weather API call OK.
INFO:root:Weather API call OK.
INFO:root:Cloud Run cycle completed.


orchestrator_cloud_run_cycle response: OK


# Deployment Artifacts (Cloud Run / FastAPI)

This section shows how the notebook logic can be deployed as a
Cloud Run service triggered by Cloud Scheduler.

Files:

- `agents.py` — core orchestrator + agents, refactored for deployment.
- `main.py` — FastAPI wrapper exposing a `/run` endpoint.
- `Dockerfile` — container image for Cloud Run.

In [17]:
%%writefile agents.py

# Exported from notebook:
# Copy all code from Cells 1–15 here manually for Cloud Run.
# (Notebook users see this as reference; GitHub repo will contain full file.)

# ===== agents.py (deployment module) =====
"""
Core Weather & Hazard Sentinel logic for deployment.

This file is a refactored version of the Kaggle notebook code:
- Uses os.getenv(...) instead of kaggle_secrets.
- Omits visualization / display calls.
- Exposes `orchestrator_cloud_run_cycle()` for Cloud Run / FastAPI.
"""

import os
import json
import uuid
import datetime as dt
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Optional

import logging
import requests
import pandas as pd

# Optional: GCS storage for checkpoint
try:
    from google.cloud import storage
except Exception:
    storage = None

logging.basicConfig(level=logging.INFO)

# -------------------------------------------------------
# CONFIG (env-driven for deployment)
# -------------------------------------------------------

REGION_NAME = os.getenv("REGION_NAME", "American Red Cross | Texas Gulf Coast Region")

REGION_AREAS = [
    "Coastal Bend",
    "Houston Metro",
    "Golden Triangle",
]

REGION_AREA_COORDS = {
    "Coastal Bend": {"lat": 27.8, "lon": -97.4},
    "Houston Metro": {"lat": 29.76, "lon": -95.37},
    "Golden Triangle": {"lat": 30.08, "lon": -94.13},
}

HAZARD_TYPES = [
    "Heavy Rain & Flooding",
    "Severe Storms",
    "Excessive Heat",
    "Wildfire",
]

TRIGGERS = [
    {
        "id": "flood_enhanced_monitoring",
        "hazard": "Heavy Rain & Flooding",
        "min_likelihood": "Medium",
        "min_impact": "Disruptive",
        "recommended_posture": "Enhanced Monitoring",
        "name": "Heavy Rain – Flash Flood Watch",
        "note": "Consider readiness actions for flood-prone areas.",
    },
    {
        "id": "flood_response_consideration",
        "hazard": "Heavy Rain & Flooding",
        "min_likelihood": "High",
        "min_impact": "Dangerous",
        "recommended_posture": "Response Consideration",
        "name": "Heavy Rain – Possible Flash Flooding",
        "note": "Discuss shelter readiness and resource pre-positioning.",
    },
]

LIKELIHOOD_ORDER = ["Low", "Medium", "High"]
IMPACT_ORDER = ["Nuisance", "Disruptive", "Dangerous"]

WEATHER_API_KEY = os.getenv("WEATHER_API_KEY", "")
WEATHER_API_URL = os.getenv("WEATHER_API_URL", "https://api.openweathermap.org/data/2.5/weather")

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.5-pro")

GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID", "")
GCS_BUCKET = os.getenv("GCS_BUCKET", "")
CHECKPOINT_BLOB_NAME = "weather_hazard_sentinel/checkpoint.json"

# -------------------------------------------------------
# Gemini client (optional)
# -------------------------------------------------------

try:
    import google.generativeai as genai
    if GEMINI_API_KEY:
        genai.configure(api_key=GEMINI_API_KEY)
        logging.info("Gemini client configured in agents.py.")
    else:
        genai = None
        logging.info("GEMINI_API_KEY not set; Gemini disabled in agents.py.")
except Exception as e:
    genai = None
    logging.warning(f"Could not import google.generativeai in agents.py: {e}")


def call_gemini(prompt: str,
                model_name: str = GEMINI_MODEL,
                temperature: float = 0.2,
                max_output_tokens: int = 512) -> str:
    if not genai or not GEMINI_API_KEY:
        return "[Gemini disabled] " + prompt[:200]

    try:
        model = genai.GenerativeModel(model_name)
        response = model.generate_content(
            prompt,
            generation_config=genai.types.GenerationConfig(
                temperature=temperature,
                max_output_tokens=max_output_tokens,
            ),
        )
        try:
            if getattr(response, "candidates", None):
                parts = response.candidates[0].content.parts
                text = "".join(getattr(p, "text", "") for p in parts)
                if text.strip():
                    return text
        except Exception as inner:
            logging.warning(f"Could not extract Gemini text: {inner}")
        return "[Gemini raw response] " + str(response)[:400]
    except Exception as e:
        logging.error(f"Gemini call failed: {e}")
        return f"[Gemini error: {e}]"

# -------------------------------------------------------
# Dataclasses (A2A schemas)
# -------------------------------------------------------

@dataclass
class HazardInputsMessage:
    run_id: str
    as_of: dt.datetime
    areas: List[str]
    forecasts: List[Dict[str, Any]]
    bulletins: List[str]


@dataclass
class HazardRisk:
    area: str
    hazard: str
    timeframe: str
    likelihood: str
    impact: str
    rationale: str
    supporting_evidence: List[str]


@dataclass
class HazardRisksMessage:
    run_id: str
    as_of: dt.datetime
    risks: List[HazardRisk]


@dataclass
class AreaTriggerSummary:
    name: str
    posture: str
    fired_triggers: List[Dict[str, Any]]


@dataclass
class TriggerResultsMessage:
    run_id: str
    as_of: dt.datetime
    areas: List[AreaTriggerSummary]


@dataclass
class BriefPacketMessage:
    run_id: str
    as_of: dt.datetime
    markdown_brief: str
    text_brief: str
    posture_overview: Dict[str, str]


@dataclass
class CheckpointState:
    last_run_time: Optional[dt.datetime]
    last_posture_by_area: Dict[str, str]
    last_run_id: Optional[str]
    operational_period_label: str

# -------------------------------------------------------
# Weather helper
# -------------------------------------------------------

def fetch_weather_raw(lat: float,
                      lon: float,
                      units: str = "metric") -> Dict[str, Any]:
    if not WEATHER_API_KEY:
        logging.warning("No WEATHER_API_KEY set; returning empty weather.")
        return {}

    params = {"lat": lat, "lon": lon, "units": units, "appid": WEATHER_API_KEY}
    try:
        resp = requests.get(WEATHER_API_URL, params=params, timeout=10)
        resp.raise_for_status()
        return resp.json()
    except Exception as e:
        logging.error(f"Weather API call failed: {e}")
        return {}

# -------------------------------------------------------
# Agents: Ingestion, Hazard Interpretation, Trigger Evaluation, Briefing, Memory
# (same logic as notebook, but without Kaggle-specific bits)
# -------------------------------------------------------

class DataIngestionAgent:
    def __init__(self, region_areas: List[str]):
        self.region_areas = region_areas

    def ingest(self,
               time_window: str = "next_24_hours",
               products: Optional[List[str]] = None,
               demo_force_hazard: bool = False) -> HazardInputsMessage:
        if products is None:
            products = ["openweather_current"]

        run_id = str(uuid.uuid4())
        now = dt.datetime.utcnow()
        forecasts: List[Dict[str, Any]] = []
        bulletins: List[str] = []

        for area in self.region_areas:
            coords = REGION_AREA_COORDS.get(area)
            if not coords:
                continue
            raw = fetch_weather_raw(coords["lat"], coords["lon"])
            if not raw:
                forecasts.append({
                    "area": area,
                    "hazard": "Unknown",
                    "timeframe": time_window,
                    "products": products,
                })
                bulletins.append(f"{area}: Unable to retrieve external weather data.")
                continue

            main = raw.get("main", {})
            temp = float(main.get("temp", 0.0))
            feels_like = float(main.get("feels_like", temp))
            rain_mm = 0.0
            if "rain" in raw:
                rain = raw["rain"]
                rain_mm = float(rain.get("1h", rain.get("3h", 0.0)))
            qpf_inches_24h = rain_mm / 25.4 if rain_mm else 0.0

            hazards_here = []
            if qpf_inches_24h >= 0.1:
                hazards_here.append({
                    "area": area,
                    "hazard": "Heavy Rain & Flooding",
                    "qpf_inches_24h": qpf_inches_24h,
                    "timeframe": "Next 6–24 hours",
                    "products": products,
                })

            if feels_like >= 35.0:
                hi = feels_like * 1.1
                hazards_here.append({
                    "area": area,
                    "hazard": "Excessive Heat",
                    "heat_index": hi,
                    "timeframe": "Afternoon",
                    "products": products,
                })

            if not hazards_here:
                hazards_here.append({
                    "area": area,
                    "hazard": "No Significant Hazard",
                    "timeframe": time_window,
                    "products": products,
                })

            if demo_force_hazard and area == "Coastal Bend":
                hazards_here = [{
                    "area": area,
                    "hazard": "Heavy Rain & Flooding",
                    "qpf_inches_24h": 3.2,
                    "timeframe": "Next 24 hours",
                    "products": products + ["demo_override"],
                }]

            forecasts.extend(hazards_here)

            summary_text = (
                f"Current temp {temp:.1f}°C (feels like {feels_like:.1f}°C), "
                f"rain last hour {rain_mm:.1f} mm. "
            )
            if hazards_here and hazards_here[0]["hazard"] != "No Significant Hazard":
                summary_text += "Potential operational impacts due to highlighted hazards."
            else:
                summary_text += "No significant hazards detected at this time."
            bulletins.append(f"{area}: {summary_text}")

        return HazardInputsMessage(
            run_id=run_id,
            as_of=now,
            areas=self.region_areas,
            forecasts=forecasts,
            bulletins=bulletins,
        )

class HazardInterpretationAgent:
    def __init__(self, use_gemini: bool = False):
        self.use_gemini = use_gemini

    def _rule_based_seed(self, fc: Dict[str, Any]) -> Dict[str, str]:
        hazard = fc.get("hazard", "Unknown")
        if hazard == "Heavy Rain & Flooding":
            qpf = float(fc.get("qpf_inches_24h", 0.0))
            if qpf >= 3.0:
                return {"likelihood": "High", "impact": "Dangerous", "rationale": f"QPF={qpf:.2f} in/24h."}
            elif qpf >= 1.5:
                return {"likelihood": "Medium", "impact": "Disruptive", "rationale": f"QPF={qpf:.2f} in/24h."}
            elif qpf >= 0.5:
                return {"likelihood": "Low", "impact": "Nuisance", "rationale": f"QPF={qpf:.2f} in/24h."}
            else:
                return {"likelihood": "Low", "impact": "Nuisance", "rationale": "Minimal QPF."}
        if hazard == "Excessive Heat":
            hi = float(fc.get("heat_index", 0.0))
            if hi >= 108:
                return {"likelihood": "High", "impact": "Dangerous", "rationale": f"Heat index={hi:.1f}."}
            elif hi >= 103:
                return {"likelihood": "Medium", "impact": "Disruptive", "rationale": f"Heat index={hi:.1f}."}
            elif hi >= 95:
                return {"likelihood": "Low", "impact": "Nuisance", "rationale": f"Heat index={hi:.1f}."}
            else:
                return {"likelihood": "Low", "impact": "Nuisance", "rationale": "Heat not critical."}
        if hazard == "No Significant Hazard":
            return {"likelihood": "Low", "impact": "Nuisance", "rationale": "No significant hazard indicated."}
        return {"likelihood": "Low", "impact": "Nuisance", "rationale": "Default / unknown hazard."}

    def assess(self, inputs_msg: HazardInputsMessage) -> HazardRisksMessage:
        risks: List[HazardRisk] = []
        for fc in inputs_msg.forecasts:
            area = fc.get("area", "Unknown")
            hazard = fc.get("hazard", "Unknown")
            timeframe = fc.get("timeframe", "Next 24 hours")
            seed = self._rule_based_seed(fc)
            risks.append(
                HazardRisk(
                    area=area,
                    hazard=hazard,
                    timeframe=timeframe,
                    likelihood=seed["likelihood"],
                    impact=seed["impact"],
                    rationale=seed["rationale"],
                    supporting_evidence=fc.get("products", []),
                )
            )
        return HazardRisksMessage(
            run_id=inputs_msg.run_id,
            as_of=inputs_msg.as_of,
            risks=risks,
        )

class TriggerEvaluationAgent:
    def __init__(self, triggers: List[Dict[str, Any]]):
        self.triggers = triggers
        self.posture_rank = ["Normal", "Enhanced Monitoring", "Response Consideration"]

    def _meets_trigger(self, risk: HazardRisk, trig: Dict[str, Any]) -> bool:
        if risk.hazard != trig["hazard"]:
            return False
        if LIKELIHOOD_ORDER.index(risk.likelihood) < LIKELIHOOD_ORDER.index(trig["min_likelihood"]):
            return False
        if IMPACT_ORDER.index(risk.impact) < IMPACT_ORDER.index(trig["min_impact"]):
            return False
        return True

    def evaluate(self, risk_msg: HazardRisksMessage) -> TriggerResultsMessage:
        areas_dict: Dict[str, AreaTriggerSummary] = {}
        for r in risk_msg.risks:
            if r.area not in areas_dict:
                areas_dict[r.area] = AreaTriggerSummary(name=r.area, posture="Normal", fired_triggers=[])
        for r in risk_msg.risks:
            for trig in self.triggers:
                if self._meets_trigger(r, trig):
                    summary = areas_dict[r.area]
                    new_posture = trig["recommended_posture"]
                    if self.posture_rank.index(new_posture) > self.posture_rank.index(summary.posture):
                        summary.posture = new_posture
                    summary.fired_triggers.append(
                        {
                            "trigger_id": trig["id"],
                            "name": trig["name"],
                            "rationale": f"{r.likelihood} likelihood, {r.impact} impact; {r.rationale}",
                        }
                    )
        return TriggerResultsMessage(
            run_id=risk_msg.run_id,
            as_of=risk_msg.as_of,
            areas=list(areas_dict.values()),
        )

class BriefingAgent:
    def __init__(self, region_name: str, use_gemini: bool = True):
        self.region_name = region_name
        self.use_gemini = use_gemini

    def _fallback_brief_text(self, risks_msg: HazardRisksMessage, triggers_msg: TriggerResultsMessage) -> str:
        run_time = risks_msg.as_of
        lines: List[str] = []
        lines.append(f"Weather & Hazard Brief for {self.region_name}")
        lines.append(f"As of {run_time.isoformat()} UTC\n")
        risks = risks_msg.risks
        if not risks:
            lines.append("Overall: No significant hazards identified for the monitored period.")
        else:
            lines.append("Key Hazards:")
            for r in risks:
                lines.append(
                    f"- {r.area}: {r.hazard} "
                    f"({r.likelihood} likelihood, {r.impact} impact) – {r.timeframe}. "
                    f"{r.rationale}"
                )
        if triggers_msg.areas:
            lines.append("\nRecommended Readiness Posture:")
            for a in triggers_msg.areas:
                if not a.fired_triggers:
                    lines.append(f"- {a.name}: {a.posture} (no triggers fired).")
                else:
                    reasons = "; ".join(f"{t['name']} ({t['rationale']})" for t in a.fired_triggers)
                    lines.append(f"- {a.name}: {a.posture} due to {reasons}.")
        else:
            lines.append("\nRecommended Readiness Posture: Normal operations for all monitored areas.")
        return "\n".join(lines)

    def generate(self, risks_msg: HazardRisksMessage, triggers_msg: TriggerResultsMessage) -> BriefPacketMessage:
        posture_overview = {a.name: a.posture for a in triggers_msg.areas}
        if not self.use_gemini or not GEMINI_API_KEY:
            brief_text = self._fallback_brief_text(risks_msg, triggers_msg)
        else:
            structured = {
                "region_name": self.region_name,
                "as_of": risks_msg.as_of.isoformat(),
                "risks": [asdict(r) for r in risks_msg.risks],
                "areas": [
                    {"name": a.name, "posture": a.posture, "fired_triggers": a.fired_triggers}
                    for a in triggers_msg.areas
                ],
            }
            prompt = (
                "You are generating an internal weather & hazard brief for the American Red Cross. "
                "Write a concise brief for regional leadership, with sections:\n"
                "1) Overview\n2) Key Hazards by Area\n3) Recommended Readiness Posture\n\n"
                "Focus on timing, likelihood, impact, and operational implications. "
                "Avoid overly technical meteorological jargon. "
                "Input (JSON):\n\n"
                f"{json.dumps(structured)[:4000]}"
            )
            brief_text = call_gemini(prompt)
            if brief_text.startswith("[Gemini error") or brief_text.startswith("[Gemini raw"):
                brief_text = self._fallback_brief_text(risks_msg, triggers_msg)
        return BriefPacketMessage(
            run_id=risks_msg.run_id,
            as_of=risks_msg.as_of,
            markdown_brief=brief_text,
            text_brief=brief_text,
            posture_overview=posture_overview,
        )

class MemoryLoggingAgent:
    def __init__(self):
        self.runs: List[Dict[str, Any]] = []
        self.risks_log: List[Dict[str, Any]] = []
        self.triggers_log: List[Dict[str, Any]] = []
        self.briefs_log: List[Dict[str, Any]] = []
        self.checkpoint = CheckpointState(
            last_run_time=None,
            last_posture_by_area={},
            last_run_id=None,
            operational_period_label="Initial",
        )

    def log_cycle(self,
                  hazard_inputs: HazardInputsMessage,
                  risks_msg: HazardRisksMessage,
                  trig_msg: TriggerResultsMessage,
                  brief_msg: BriefPacketMessage):
        self.runs.append({
            "run_id": hazard_inputs.run_id,
            "as_of": hazard_inputs.as_of,
            "areas": ",".join(hazard_inputs.areas),
            "n_forecasts": len(hazard_inputs.forecasts),
            "n_bulletins": len(hazard_inputs.bulletins),
            "n_risks": len(risks_msg.risks),
            "n_trigger_areas": len(trig_msg.areas),
        })
        for r in risks_msg.risks:
            row = asdict(r)
            row["run_id"] = risks_msg.run_id
            row["as_of"] = risks_msg.as_of
            self.risks_log.append(row)
        for a in trig_msg.areas:
            for t in a.fired_triggers:
                self.triggers_log.append({
                    "run_id": trig_msg.run_id,
                    "as_of": trig_msg.as_of,
                    "area": a.name,
                    "posture": a.posture,
                    "trigger_id": t["trigger_id"],
                    "trigger_name": t["name"],
                    "rationale": t["rationale"],
                })
        self.briefs_log.append({
            "run_id": brief_msg.run_id,
            "as_of": brief_msg.as_of,
            "brief": brief_msg.text_brief,
        })
        self.checkpoint = CheckpointState(
            last_run_time=hazard_inputs.as_of,
            last_posture_by_area=brief_msg.posture_overview,
            last_run_id=hazard_inputs.run_id,
            operational_period_label="Ongoing",
        )

# -------------------------------------------------------
# GCS checkpoint helpers
# -------------------------------------------------------

def save_checkpoint_to_gcs(checkpoint: CheckpointState):
    if not storage or not GCS_BUCKET:
        logging.warning("GCS not configured; skipping checkpoint save.")
        return
    client = storage.Client(project=GCP_PROJECT_ID or None)
    bucket = client.bucket(GCS_BUCKET)
    blob = bucket.blob(CHECKPOINT_BLOB_NAME)
    payload = {
        "last_run_time": checkpoint.last_run_time.isoformat() if checkpoint.last_run_time else None,
        "last_posture_by_area": checkpoint.last_posture_by_area,
        "last_run_id": checkpoint.last_run_id,
        "operational_period_label": checkpoint.operational_period_label,
    }
    blob.upload_from_string(json.dumps(payload), content_type="application/json")
    logging.info("Checkpoint saved to GCS.")

def load_checkpoint_from_gcs() -> Optional[CheckpointState]:
    if not storage or not GCS_BUCKET:
        logging.warning("GCS not configured; skipping checkpoint load.")
        return None
    client = storage.Client(project=GCP_PROJECT_ID or None)
    bucket = client.bucket(GCS_BUCKET)
    blob = bucket.blob(CHECKPOINT_BLOB_NAME)
    if not blob.exists():
        logging.info("No checkpoint in GCS.")
        return None
    data = json.loads(blob.download_as_text())
    last_run_time = dt.datetime.fromisoformat(data["last_run_time"]) if data["last_run_time"] else None
    return CheckpointState(
        last_run_time=last_run_time,
        last_posture_by_area=data.get("last_posture_by_area", {}),
        last_run_id=data.get("last_run_id"),
        operational_period_label=data.get("operational_period_label", "Ongoing"),
    )

# -------------------------------------------------------
# Orchestrator + exported cycle
# -------------------------------------------------------

class OrchestratorScheduler:
    def __init__(self,
                 ingestion: DataIngestionAgent,
                 hazard_int: HazardInterpretationAgent,
                 trigger_eval: TriggerEvaluationAgent,
                 briefing: BriefingAgent,
                 memory: MemoryLoggingAgent):
        self.ingestion = ingestion
        self.hazard_int = hazard_int
        self.trigger_eval = trigger_eval
        self.briefing = briefing
        self.memory = memory
        self.paused = False

    def run_cycle(self,
                  time_window: str = "next_72_hours",
                  products: Optional[List[str]] = None,
                  demo_force_hazard: bool = False):
        if self.paused:
            logging.info("Orchestrator paused; skipping cycle.")
            return None
        hazard_inputs_msg = self.ingestion.ingest(
            time_window=time_window,
            products=products,
            demo_force_hazard=demo_force_hazard,
        )
        risks_msg = self.hazard_int.assess(hazard_inputs_msg)
        trig_msg = self.trigger_eval.evaluate(risks_msg)
        brief_msg = self.briefing.generate(risks_msg, trig_msg)
        self.memory.log_cycle(hazard_inputs_msg, risks_msg, trig_msg, brief_msg)
        return hazard_inputs_msg, risks_msg, trig_msg, brief_msg


# Instantiate global agents + orchestrator for deployment
ingestion_agent = DataIngestionAgent(REGION_AREAS)
hazard_agent = HazardInterpretationAgent(use_gemini=False)
trigger_agent = TriggerEvaluationAgent(TRIGGERS)
briefing_agent = BriefingAgent(REGION_NAME, use_gemini=True)
memory_agent = MemoryLoggingAgent()

orchestrator = OrchestratorScheduler(
    ingestion=ingestion_agent,
    hazard_int=hazard_agent,
    trigger_eval=trigger_agent,
    briefing=briefing_agent,
    memory=memory_agent,
)

def orchestrator_cloud_run_cycle():
    """
    Core monitoring cycle for Cloud Run / Scheduler:
    - Load checkpoint
    - Run one cycle
    - Save checkpoint
    """
    cp = load_checkpoint_from_gcs()
    if cp:
        memory_agent.checkpoint = cp
    result = orchestrator.run_cycle()
    if result is None:
        return "SKIPPED"
    save_checkpoint_to_gcs(memory_agent.checkpoint)
    return "OK"

Writing agents.py


In [18]:
%%writefile main.py

# ===== main.py (FastAPI wrapper for Cloud Run) =====
"""
HTTP wrapper around orchestrator_cloud_run_cycle().

- GET "/"   → healthcheck
- POST "/run" → run one monitoring cycle (for Cloud Scheduler)
"""

from fastapi import FastAPI
from fastapi.responses import JSONResponse

from agents import orchestrator_cloud_run_cycle

app = FastAPI(
    title="Weather & Hazard Sentinel",
    description="Red Cross Weather & Hazard Monitoring Orchestrator",
    version="1.0.0",
)

@app.get("/")
def healthcheck():
    return {"status": "ok", "service": "weather-hazard-sentinel"}

@app.post("/run")
def run_cycle():
    try:
        result = orchestrator_cloud_run_cycle()
        return JSONResponse(status_code=200, content={"status": "ok", "result": result})
    except Exception as e:
        return JSONResponse(status_code=500, content={"status": "error", "detail": str(e)})

Writing main.py


In [19]:
%%writefile Dockerfile

# ===== Dockerfile (for Cloud Run deployment) =====
FROM python:3.11-slim

# Set working directory
WORKDIR /app

# Install system deps (if needed)
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Copy Python files
COPY agents.py main.py ./ 

# Install Python dependencies
RUN pip install --no-cache-dir \
    fastapi \
    uvicorn[standard] \
    google-generativeai \
    google-cloud-storage \
    requests \
    pandas

# Environment variables (Cloud Run can override)
ENV PORT=8080
EXPOSE 8080

# Start the FastAPI app
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]

Writing Dockerfile
