# Zyra Notebook: Drought Animation Walkthrough (draft)

This walkthrough recreates the `samples/swarm/drought_animation.yaml` flow:

- Sync the last year of weekly drought frames from NOAA FTP.
- Scan frames metadata to identify cadence/missing timestamps.
- Fill gaps with a basemap-backed placeholder set.
- Compose an MP4 animation and write it locally.
- Optional narration via `narrate swarm` with mock-safe provider selection.
- Export pipeline/CLI equivalents for reproducibility.

Prereqs: Pillow for `process pad-missing`, FFmpeg on `PATH` for `visualize compose-video`, and access to the packaged basemap `pkg:zyra.assets/images/earth_vegetation.jpg` (avoids external downloads).


In [1]:
# LLM provider (optional narration; mock-safe fallback)
import os

from zyra.wizard import _test_llm_connectivity

# Prefer Google Gemini 2.5 Flash by default
os.environ.setdefault("ZYRA_LLM_PROVIDER", "gemini")
os.environ.setdefault("LLM_MODEL", "gemini-2.5-flash")

PROVIDER = os.environ.get("ZYRA_LLM_PROVIDER", "gemini")
AVAILABLE = {"ollama", "openai", "gemini", "vertex", "mock"}
if PROVIDER not in AVAILABLE:
    PROVIDER = "gemini"

# Swarm debug logging to show per-agent outputs; set to "" to silence
os.environ.setdefault("ZYRA_VERBOSITY", "debug")


def choose_provider() -> str:
    has_openai = bool(os.environ.get("OPENAI_API_KEY"))
    has_google = bool(os.environ.get("GOOGLE_API_KEY"))
    has_ollama = bool(os.environ.get("OLLAMA_BASE_URL"))
    if PROVIDER in {"gemini", "vertex", "google"} and has_google:
        return "gemini"
    if PROVIDER == "openai" and has_openai:
        return "openai"
    if PROVIDER == "ollama" and has_ollama:
        return "ollama"
    return "mock"


LLM_PROVIDER = choose_provider()
LLM_MODEL = os.environ.get("LLM_MODEL")
status_ok, status_msg = _test_llm_connectivity(LLM_PROVIDER, LLM_MODEL)
print(status_msg)
print("LLM model:", LLM_MODEL)
print("LLM provider:", LLM_PROVIDER)

✅ Connected to Gemini (gemini-2.5-flash)
LLM model: gemini-2.5-flash
LLM provider: gemini


## LLM provider setup
- Default: Gemini Flash when `GOOGLE_API_KEY` (or Vertex envs) is present; otherwise try `OPENAI_API_KEY`, then `OLLAMA_BASE_URL`; fallback to `mock` so the notebook runs offline.
- Set `ZYRA_LLM_PROVIDER` and `LLM_MODEL` in the top provider cell to override.
- Env hints: Google (`GOOGLE_API_KEY`), OpenAI (`OPENAI_API_KEY`), Ollama (`OLLAMA_BASE_URL`), verbosity (`ZYRA_VERBOSITY=debug`).


In [2]:
# Notebook session + workspace
from pathlib import Path

from zyra.notebook import create_session

# Default workspace: ZYRA_NOTEBOOK_DIR -> /kaggle/working -> cwd
os.environ["ZYRA_NOTEBOOK_DIR"] = "/app/data"
os.environ.setdefault(
    "ZYRA_NOTEBOOK_PROVENANCE",
    str(
        Path(os.environ["ZYRA_NOTEBOOK_DIR"]) / "drought_notebook" / "provenance.sqlite"
    ),
)
sess = create_session()
WORKSPACE = sess.workspace()
DROUGHT_DIR = WORKSPACE / "drought_notebook"
DROUGHT_DIR.mkdir(parents=True, exist_ok=True)
print("Workspace root:", WORKSPACE)
print("Drought run dir:", DROUGHT_DIR)

Workspace root: /app/data
Drought run dir: /app/data/drought_notebook


In [3]:
# Paths and cadence defaults
FRAMES_RAW = DROUGHT_DIR / "frames_raw"
FRAMES_PADDED = DROUGHT_DIR / "frames_padded"
FRAMES_META = DROUGHT_DIR / "frames_meta.json"
VIDEO_OUT = DROUGHT_DIR / "drought_animation.mp4"
BASEMAP_REF = "pkg:zyra.assets/images/earth_vegetation.jpg"

FTP_PATH = "ftp://ftp.nnvl.noaa.gov/SOS/DroughtRisk_Weekly"
PATTERN = r"^DroughtRisk_Weekly_[0-9]{8}\.png$"  # single-escaped dot
CADENCE_SECONDS = 7 * 24 * 3600

for folder in (FRAMES_RAW, FRAMES_PADDED):
    folder.mkdir(parents=True, exist_ok=True)

print("Raw frames dir:", FRAMES_RAW)
print("Padded frames dir:", FRAMES_PADDED)

Raw frames dir: /app/data/drought_notebook/frames_raw
Padded frames dir: /app/data/drought_notebook/frames_padded


In [4]:
# Acquire drought frames (FTP sync for the past year)
import contextlib
import re
from datetime import datetime

from zyra.connectors.backends import ftp as ftp_backend
from zyra.utils.date_manager import DateManager


def frame_count(path: Path) -> int:
    return sum(1 for f in path.iterdir() if f.is_file())


date_start, _ = DateManager().get_date_range_iso("P1Y")
remote_filtered = (
    ftp_backend.list_files(
        FTP_PATH,
        pattern=PATTERN,
        since=date_start.isoformat(),
        date_format="%Y%m%d",
    )
    or []
)
dates = []
for name in remote_filtered:
    m = re.search(r"(\d{8})", name)
    if m:
        with contextlib.suppress(Exception):
            dates.append(datetime.strptime(m.group(1), "%Y%m%d"))
print(f"Remote frames past year: {len(remote_filtered)}")
if dates:
    print("Remote date span:", min(dates).date(), "to", max(dates).date())
if remote_filtered:
    print("Newest remote samples:", remote_filtered[-3:])

try:
    sess.acquire.ftp(
        path=FTP_PATH,
        sync_dir=str(FRAMES_RAW),
        pattern=PATTERN,
        since_period="P1Y",
        date_format="%Y%m%d",
    )
    print("Synced frames from FTP ->", FRAMES_RAW)
except Exception as exc:
    raise RuntimeError(f"FTP sync failed: {exc}") from exc

ready = frame_count(FRAMES_RAW)
print(f"Local frames downloaded: {ready}")
if ready:
    sample_local = sorted(FRAMES_RAW.iterdir())[-3:]
    print("Newest local files:", [p.name for p in sample_local])

Remote frames past year: 51
Remote date span: 2024-12-05 to 2025-11-20
Newest remote samples: ['DroughtRisk_Weekly_20251106.png', 'DroughtRisk_Weekly_20251113.png', 'DroughtRisk_Weekly_20251120.png']
Synced frames from FTP -> /app/data/drought_notebook/frames_raw
Local frames downloaded: 51
Newest local files: ['DroughtRisk_Weekly_20251106.png', 'DroughtRisk_Weekly_20251113.png', 'DroughtRisk_Weekly_20251120.png']


## Create a gap in the frames
We'll delete a couple of frames to see how pad-missing backfills missing timestamps.

In [5]:
# Remove two frames to simulate missing data
import random

frames = sorted(p for p in FRAMES_RAW.iterdir() if p.is_file())
if len(frames) < 2:
    print("Not enough frames to delete; skipping gap simulation")
else:
    to_delete = random.sample(frames, 2)
    for fp in to_delete:
        fp.unlink(missing_ok=True)
    print("Deleted frames:", [fp.name for fp in sorted(to_delete)])
    print("Remaining frame count:", sum(1 for f in FRAMES_RAW.iterdir() if f.is_file()))

Deleted frames: ['DroughtRisk_Weekly_20250710.png', 'DroughtRisk_Weekly_20251009.png']
Remaining frame count: 49


In [6]:
# Scan frames metadata (cadence, missing timestamps)
import json

meta_result = sess.transform.scan_frames(
    frames_dir=str(FRAMES_RAW),
    pattern=PATTERN,
    datetime_format="%Y%m%d",
    period_seconds=CADENCE_SECONDS,
    output=str(FRAMES_META),
)
summary = json.loads(FRAMES_META.read_text()) if FRAMES_META.exists() else {}
print("Frames meta:", FRAMES_META)
print(
    "Actual frames:",
    summary.get("frame_count_actual"),
    "Missing:",
    summary.get("missing_count"),
)

Frames meta: /app/data/drought_notebook/frames_meta.json
Actual frames: 49 Missing: 2


In [7]:
# Inline analysis of drought frames using the packaged colorbar
import json
import math
from collections import defaultdict
from pathlib import Path
from typing import Any, Iterable, Tuple

from PIL import Image

try:
    import reverse_geocoder as rg
except Exception:
    rg = None

SAMPLE_STRATEGY = "monthly"  # monthly|every_n
SAMPLE_EVERY_N = 4
DOWNSAMPLE_MAX_SIDE = 512  # max width/height for analysis
LON_MIN, LON_MAX = -180.0, 180.0
LAT_MIN, LAT_MAX = -90.0, 90.0
MAX_COORD_SAMPLES = 500  # limit stored lon/lat samples per frame per class


def _register_drought_analysis(session) -> None:
    def _select_frames(frames_dir: Path) -> list[Path]:
        frames = sorted(frames_dir.glob("*.png"))
        if SAMPLE_STRATEGY == "monthly":
            by_month: dict[str, Path] = {}
            for fp in frames:
                key = fp.stem.split("_")[-1][:6] if "_" in fp.stem else fp.stem[:6]
                by_month.setdefault(key, fp)
            return [by_month[k] for k in sorted(by_month.keys())]
        if SAMPLE_STRATEGY == "every_n":
            return frames[:: max(SAMPLE_EVERY_N, 1)]
        return frames

    def _downsample(img: Image.Image) -> Tuple[Image.Image, float]:
        w, h = img.size
        scale = min(DOWNSAMPLE_MAX_SIDE / max(w, h), 1.0)
        if scale < 1.0:
            new_size = (max(1, int(w * scale)), max(1, int(h * scale)))
            return img.resize(new_size, Image.BOX), scale
        return img, 1.0

    def _lonlat_from_xy(x: int, y: int, w: int, h: int) -> tuple[float, float]:
        lon = LON_MIN + (x / max(w - 1, 1)) * (LON_MAX - LON_MIN)
        lat = LAT_MAX - (y / max(h - 1, 1)) * (LAT_MAX - LAT_MIN)
        return lon, lat

    def _nearest_place(
        lat: float, lon: float, cache: dict[tuple[float, float], str]
    ) -> str:
        if rg is None:
            return ""
        key = (round(lat, 3), round(lon, 3))
        if key in cache:
            return cache[key]
        try:
            res = rg.search((lat, lon), mode=1, verbose=False)
            label = ""
            if res:
                entry = res[0]
                parts = [entry.get("name"), entry.get("admin1"), entry.get("cc")]
                label = ", ".join(p for p in parts if p)
        except Exception:
            label = ""
        cache[key] = label
        return label

    def analyze(ns: Any) -> dict[str, Any]:
        frames_dir = Path(ns.frames_dir)
        colorbar = Path(ns.colorbar)
        tol = float(vars(ns).get("tolerance", 30))
        bar = Image.open(colorbar).convert("RGB")
        row = bar.height // 2
        colors = [bar.getpixel((x, row)) for x in range(bar.width)]
        palette: list[tuple[int, int, int]] = []
        for c in colors:
            if not palette or c != palette[-1]:
                palette.append(c)

        def classify_pixel(rgb: tuple[int, int, int]) -> str:
            best_idx, best_d = None, 1e9
            for idx, pc in enumerate(palette):
                d = math.dist(rgb, pc)
                if d < best_d:
                    best_d, best_idx = d, idx
            if best_d > tol:
                return "nodata"
            frac = best_idx / max(len(palette) - 1, 1)
            if frac < 0.3:
                return "moderate"
            if frac < 0.65:
                return "high"
            return "extreme"

        summary: dict[str, Any] = {
            "frames": {},
            "aggregate": {
                "total_pixels": 0,
                "nodata": 0,
                "moderate": 0,
                "high": 0,
                "extreme": 0,
            },
        }
        sampled = _select_frames(frames_dir)
        summary["sampled_frames"] = [p.name for p in sampled]
        place_cache: dict[tuple[float, float], str] = {}
        for img_path in sampled:
            img, scale = _downsample(Image.open(img_path).convert("RGB"))
            w, h = img.size
            counts = defaultdict(int)
            samples: dict[str, list[tuple[float, float]]] = {
                "moderate": [],
                "high": [],
                "extreme": [],
                "nodata": [],
            }
            sample_places: dict[str, list[dict[str, float | str]]] = {
                "moderate": [],
                "high": [],
                "extreme": [],
                "nodata": [],
            }
            for y, row_px in enumerate(_iter_rows(img)):
                for x, rgb in enumerate(row_px):
                    bucket = classify_pixel(rgb)
                    counts[bucket] += 1
                    if len(samples[bucket]) < MAX_COORD_SAMPLES:
                        lon, lat = _lonlat_from_xy(x, y, w, h)
                        samples[bucket].append((lon, lat))
                        place_label = _nearest_place(lat, lon, place_cache)
                        if place_label:
                            sample_places[bucket].append(
                                {"lon": lon, "lat": lat, "place": place_label}
                            )
            total = sum(counts.values())
            counts["total"] = total
            summary["frames"][img_path.name] = {
                "counts": dict(counts),
                "sampled_coords": samples,
                "sampled_places": sample_places,
            }
            agg = summary["aggregate"]
            agg["total_pixels"] += total
            for k in ("moderate", "high", "extreme", "nodata"):
                agg[k] += counts.get(k, 0)
        return summary

    session.process.register(
        "analyze_drought_frames",
        analyze,
        returns="object",
        extras=["pillow", "reverse_geocoder"],
    )

    @staticmethod
    def _iter_rows(img: Image.Image) -> Iterable[Tuple[tuple[int, int, int], ...]]:
        w, h = img.size
        data = list(img.getdata())
        for i in range(h):
            yield tuple(data[i * w : (i + 1) * w])


_register_drought_analysis(sess)
analysis = sess.process.analyze_drought_frames(
    frames_dir=str(FRAMES_PADDED),
    colorbar=str(DROUGHT_DIR / "VTHI.colorbar.png"),
    tolerance=30,
)
print("Analysis aggregate:", analysis.get("aggregate", {}))
print("Sampled frames count:", len(analysis.get("sampled_frames", [])))
ANALYSIS_JSON = DROUGHT_DIR / "analysis.json"
ANALYSIS_JSON.write_text(json.dumps(analysis, indent=2, default=str))
print("Analysis saved to", ANALYSIS_JSON)

# Example per-frame peek (first frame)
first_frame = next(iter(analysis.get("frames", {}).items()), None)
if first_frame:
    name, stats = first_frame
    counts = stats.get("counts", {})
    print(
        f"Sample frame {name}:",
        {k: counts.get(k, 0) for k in ("moderate", "high", "extreme", "nodata")},
    )
    coords = stats.get("sampled_coords", {})
    nodata_samples = coords.get("nodata", [])[:3]
    print("Sample nodata lon/lat:", nodata_samples)

Analysis aggregate: {'total_pixels': 1572864, 'nodata': 1570507, 'moderate': 1321, 'high': 995, 'extreme': 41}
Sampled frames count: 12
Analysis saved to /app/data/drought_notebook/analysis.json
Sample frame DroughtRisk_Weekly_20241205.png: {'moderate': 169, 'high': 221, 'extreme': 4, 'nodata': 130678}
Sample nodata lon/lat: [(-180.0, 90.0), (-179.2954990215264, 90.0), (-178.59099804305285, 90.0)]


> Note: the inline drought analyzer registered above is exported to `notebook_capabilities_overlay.json` and will show up as an inline suggestion in the planner prompts below; accepted/remaining suggestions are printed and persisted to `plan_session_inline.json` along with provenance.


In [8]:
# Fill missing frames using basemap placeholders
import shutil

# Start padded dir fresh and seed it with existing frames
if FRAMES_PADDED.exists():
    shutil.rmtree(FRAMES_PADDED)
FRAMES_PADDED.mkdir(parents=True, exist_ok=True)
for src in FRAMES_RAW.iterdir():
    if src.is_file():
        shutil.copy2(src, FRAMES_PADDED / src.name)

pad_result = sess.process.pad_missing(
    frames_meta=str(FRAMES_META),
    output_dir=str(FRAMES_PADDED),
    fill_mode="basemap",
    basemap=BASEMAP_REF,
    overwrite=True,
)
filled = sum(1 for f in FRAMES_PADDED.iterdir() if f.is_file())
print("Padded frames dir:", FRAMES_PADDED)
print("Total frames after padding:", filled)

Padded frames dir: /app/data/drought_notebook/frames_padded
Total frames after padding: 51


In [9]:
# Compose MP4 animation from padded frames (requires ffmpeg on PATH)
video_path = sess.visualize.compose_video(
    frames=str(FRAMES_PADDED),
    output=str(VIDEO_OUT),
    fps=4,
    basemap=BASEMAP_REF,
)
print("Video path:", video_path)

Video path: /app/data/drought_notebook/drought_animation.mp4


In [10]:
# Save final animation locally (mirrors disseminate/local stage)
final_path = sess.disseminate.local(
    input=str(VIDEO_OUT),
    path=str(VIDEO_OUT),
)
print("Local copy:", final_path)

Local copy: /app/data/drought_notebook/drought_animation.mp4


In [None]:
# Reload cached scan/analysis results to skip heavy reruns
import json
from pathlib import Path

FRAMES_META_JSON = DROUGHT_DIR / "frames_meta.json"
ANALYSIS_JSON = DROUGHT_DIR / "analysis.json"

if FRAMES_META_JSON.exists():
    summary = json.loads(FRAMES_META_JSON.read_text())
    print("Loaded summary from cache:", FRAMES_META_JSON)
if ANALYSIS_JSON.exists():
    analysis = json.loads(ANALYSIS_JSON.read_text())
    print("Loaded analysis from cache:", ANALYSIS_JSON)

In [11]:
# Build frame/location bullets from cached analysis
import json

frames_locations = []
frames_locations_text = []
frames_dict = (analysis or {}).get("frames", {}) if "analysis" in globals() else {}


def _fmt_coords(coords):
    out = []
    for c in coords[:3]:
        try:
            lon, lat = float(c[0]), float(c[1])
            out.append(f"({lon:.2f}, {lat:.2f})")
        except Exception:
            continue
    return out


def _fmt_places(entries):
    out = []
    for entry in entries[:3]:
        place = entry.get("place")
        lon = entry.get("lon")
        lat = entry.get("lat")
        if place and isinstance(lon, (int, float)) and isinstance(lat, (int, float)):
            out.append(f"{place} ({lon:.2f}, {lat:.2f})")
        elif place:
            out.append(str(place))
    return out


for name, stats in sorted(frames_dict.items()):
    coords = stats.get("sampled_coords") or {}
    places = stats.get("sampled_places") or {}
    h_places = _fmt_places(places.get("high") or [])
    x_places = _fmt_places(places.get("extreme") or [])
    h_fmt = h_places or _fmt_coords(coords.get("high") or [])
    x_fmt = x_places or _fmt_coords(coords.get("extreme") or [])
    if not h_fmt and not x_fmt:
        continue
    frame_date = name.split("_")[-1].split(".")[0]
    frames_locations.append(
        {
            "frame": name,
            "date": frame_date,
            "high": h_fmt,
            "extreme": x_fmt,
        }
    )
    high_txt = ", ".join(h_fmt) or "none"
    ext_txt = ", ".join(x_fmt) or "none"
    frames_locations_text.append(
        f"- {frame_date or name}: High: {high_txt}; Extreme: {ext_txt}"
    )

print("Frames with locations:", len(frames_locations))
print("\n".join(frames_locations_text[:5]))

Frames with locations: 12
- 20241205: High: Pavlovskaya, Krasnodarskiy, RU (39.80, 46.24), Yoloeten, Mary, TM (62.35, 36.35), Bala Murghab, Badghis, AF (63.05, 36.35); Extreme: Kumlinge, Alands skaergard, AX (20.78, 60.35), Port-aux-Francais, Kerguelen, TF (51.08, -66.00), Port-aux-Francais, Kerguelen, TF (49.67, -66.71)
- 20250102: High: Pavlovskaya, Krasnodarskiy, RU (39.80, 46.24), Kalnibolotskaya, Krasnodarskiy, RU (40.51, 46.24), Zaxu, Dahuk, IQ (42.62, 37.06); Extreme: Kumlinge, Alands skaergard, AX (20.78, 60.35), Port-aux-Francais, Kerguelen, TF (51.08, -66.00), Port-aux-Francais, Kerguelen, TF (49.67, -66.71)
- 20250206: High: Guisser, Chaouia-Ouardigha, MA (-7.40, 32.82), Sidi Bou Othmane, Marrakech-Tensift-Al Haouz, MA (-8.10, 32.12), Castroville, Texas, US (-98.98, 29.29); Extreme: Port-aux-Francais, Kerguelen, TF (51.08, -66.00), Port-aux-Francais, Kerguelen, TF (49.67, -66.71), Port-aux-Francais, Kerguelen, TF (48.26, -67.41)
- 20250306: High: Bolivar, Missouri, US (-93.3

## Narration intent
- Scenario: Weekly drought risk animation; narration must stay in drought/dryness terms (no storms/weather/solar/etc.).
- First sentence must begin with `Drought risk:` and the bullets should mirror the frame/location list built from analysis.
- Uses the selected LLM provider/model; if no credentials, the mock provider will return deterministic text.


In [12]:
# Lightweight narrate rerun (drought-only wording, no rewrite)

import yaml

narration_meta = summary if isinstance(summary, dict) else {}
missing_dates_raw = narration_meta.get("missing_timestamps") or []
missing_dates = [str(ts).split("T")[0] for ts in missing_dates_raw if ts is not None]
analysis_meta = analysis if isinstance(analysis, dict) else {}
analysis_frames = analysis_meta.get("frames") if isinstance(analysis_meta, dict) else {}
frames_locations = frames_locations if "frames_locations" in globals() else []
frames_locations_text = (
    frames_locations_text if "frames_locations_text" in globals() else []
)

print("frames_locations_text count:", len(frames_locations_text))
if not frames_locations_text:
    print("No frames/places found; rerun analysis/build-bullets cells before narrate.")
    narration = None
else:
    narrative_text = "\n".join(frames_locations_text)
    narration_input = {
        "title": "Weekly drought risk animation",
        "description": "Start first sentence with 'Drought risk:' and summarize drought risk changes over time. Use only drought/dryness terminology; do not mention storms, weather, precipitation, solar, or geomagnetic events. Emit per-frame drought risk bullets.",
        "narrative": narrative_text,
        "data": {
            "frames_locations_text": frames_locations_text,
            "frames_metadata": narration_meta,
        },
    }

    PACK_PATH = DROUGHT_DIR / "narrate_pack.yaml"
    RUBRIC_PATH = DROUGHT_DIR / "critic_rubric.yaml"
    RUBRIC_PATH.write_text(
        "- Reject any storms/weather/wind/precip/solar/geomagnetic/event wording; require explicit drought/dryness phrasing and fidelity to provided locations.\n"
        '- Summary must start with "Drought risk:".\n'
        "- Editor must preserve drought/dryness wording and all locations from the bullets.\n",
        encoding="utf-8",
    )

    try:
        narration = sess.narrate.swarm(
            provider=LLM_PROVIDER,
            model=LLM_MODEL,
            base_url=os.environ.get("OLLAMA_BASE_URL"),
            preset="scientific_rigorous",
            max_workers=1,
            max_rounds=3,
            agents="context,summary,critic,editor",
            strict_grounding=False,
            critic_structured=True,
            rubric=str(RUBRIC_PATH),
            style="detailed",
            input_data=narration_input,
            pack=str(PACK_PATH),
            memory="-",
        )
        print("Narration:", narration)
    except Exception as exc:
        print("Narrate/swarm not executed:", exc)
        narration = None

    try:
        pack_doc = yaml.safe_load(PACK_PATH.read_text()) or {}
        pack = pack_doc.get("narrative_pack") or {}
        outputs = pack.get("outputs") or {}
        errors = pack.get("errors") or []
        provenance = pack.get("provenance") or []
        print("Agent outputs:", outputs)
        if errors:
            print("Errors:", errors)
        if provenance:
            print("Provenance entries:")
            for entry in provenance:
                print(entry)
        preview = pack.get("input_preview") or {}
        print("Input preview:", preview)
        final_narration = outputs.get("edited") or outputs.get("summary") or narration
        print("Narration:", final_narration)
    except Exception as exc:
        print("Pack parse failed:", exc)

Overriding config 'style' from 'technical' to 'detailed' via CLI
Overriding config 'max_rounds' from '2' to '3' via CLI
Overriding config 'agents' from '['summary', 'context', 'critic', 'editor']' to '['context', 'summary', 'critic', 'editor']' via CLI
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): generativelanguage.googleapis.com:443


frames_locations_text count: 12


DEBUG:urllib3.connectionpool:https://generativelanguage.googleapis.com:443 "POST /v1beta/models/gemini-2.5-flash:generateContent HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://generativelanguage.googleapis.com:443 "POST /v1beta/models/gemini-2.5-flash:generateContent HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://generativelanguage.googleapis.com:443 "POST /v1beta/models/gemini-2.5-flash:generateContent HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://generativelanguage.googleapis.com:443 "POST /v1beta/models/gemini-2.5-flash:generateContent HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://generativelanguage.googleapis.com:443 "POST /v1beta/models/gemini-2.5-flash:generateContent HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://generativelanguage.googleapis.com:443 "POST /v1beta/models/gemini-2.5-flash:generateContent HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://generativelanguage.googleapis.com:443 "POST /v1beta/models/gemini-2.5-flash

Narration: Drought risk: On December 5, 2024, high drought threat levels were observed in Pavlovskaya, Krasnodar region, Russia; Yoloeten, Mary region, Turkmenistan; and Bala Murghab, Badghis province, Afghanistan. Extreme threat levels also affected Kumlinge, Åland Islands, and Port-aux-Francais, Kerguelen Islands.
Agent outputs: {'context': 'On December 5, 2024, high-level events were recorded in Pavlovskaya (Krasnodar Krai, Russia), Ýolöten (Mary Province, Turkmenistan), and Bala Murghab (Badghis Province, Afghanistan), while extreme-level events occurred in Kumlinge (Åland archipelago, Finland) and Port-aux-Français (Kerguelen, French Southern and Antarctic Lands), with the provided coordinates for the latter (51.08°N, 66.00°W) conflicting with its established geographical location.', 'summary': 'On December 5, 2024, high threat levels were observed in Pavlovskaya, Krasnodar region, Russia; Yoloeten, Mary region, Turkmenistan; and Bala Murghab, Badghis province, Afghanistan, while 

In [13]:
# Pipeline + CLI summary
from pprint import pprint

pipeline = sess.to_pipeline()
cli_cmds = sess.to_cli()

print("Pipeline stages:")
pprint(pipeline)

print("CLI commands:")
for cmd in cli_cmds:
    print(cmd)

Pipeline stages:
[{'args': {'date_format': '%Y%m%d',
           'output': '/app/data/acquire_ftp.tmp',
           'path': 'ftp://ftp.nnvl.noaa.gov/SOS/DroughtRisk_Weekly',
           'pattern': '^DroughtRisk_Weekly_[0-9]{8}\\.png$',
           'since_period': 'P1Y',
           'sync_dir': '/app/data/drought_notebook/frames_raw'},
  'command': 'ftp',
  'stage': 'acquire'},
 {'args': {'datetime_format': '%Y%m%d',
           'frames_dir': '/app/data/drought_notebook/frames_raw',
           'output': '/app/data/drought_notebook/frames_meta.json',
           'pattern': '^DroughtRisk_Weekly_[0-9]{8}\\.png$',
           'period_seconds': 604800},
  'command': 'scan-frames',
  'stage': 'transform'},
 {'args': {'colorbar': '/app/data/drought_notebook/VTHI.colorbar.png',
           'frames_dir': '/app/data/drought_notebook/frames_padded',
           'tolerance': 30},
  'command': 'analyze_drought_frames',
  'stage': 'process'},
 {'args': {'basemap': 'pkg:zyra.assets/images/earth_vegetation.jpg',

In [14]:
# Inspect provenance SQLite logs
import os
import sqlite3
from contextlib import suppress
from pathlib import Path

prov_path = Path(
    os.environ.get("ZYRA_NOTEBOOK_PROVENANCE", WORKSPACE / "provenance.sqlite")
)
print("Provenance DB:", prov_path)
if not prov_path.exists():
    print("Provenance database not found; run some commands first.")
else:
    try:
        conn = sqlite3.connect(prov_path)
        cur = conn.cursor()
        tables = cur.execute(
            "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
        ).fetchall()
        print("Tables:", [t[0] for t in tables])
        if any(t[0] == "runs" for t in tables):
            runs = cur.execute(
                "SELECT run_id, started, completed, status FROM runs ORDER BY started DESC LIMIT 5"
            ).fetchall()
            print("Runs (latest 5):")
            for row in runs:
                print(row)
        if any(t[0] == "events" for t in tables):
            cols = cur.execute("PRAGMA table_info(events)").fetchall()
            print("events columns:", [c[1] for c in cols])
            rows = cur.execute(
                "SELECT id, run_id, event, agent, created, payload FROM events ORDER BY id DESC LIMIT 10"
            ).fetchall()
            print("Last 10 events:")
            for row in rows:
                print(row)
    except Exception as exc:
        print("Failed to read provenance DB:", exc)
    finally:
        with suppress(Exception):
            conn.close()

Provenance DB: /app/data/drought_notebook/provenance.sqlite
Tables: ['events', 'runs', 'sqlite_sequence']
Runs (latest 5):
('726d5c56eb584e0f83ad16a437026482', '2025-11-30T04:31:09.299444+00:00', None, None)
('f8783341af19428ab4601e4000d1f573', '2025-11-30T04:12:11.595231+00:00', None, None)
('d86b8bbddc12492680e119d43c62ea91', '2025-11-30T00:27:33.801139+00:00', None, None)
('df3d4a8fec934def87bca2c34851701a', '2025-11-30T00:26:01.179693+00:00', None, None)
('6a7823d698a44f11ab8b5fdf168210cb', '2025-11-30T00:11:56.330998+00:00', None, None)
events columns: ['id', 'run_id', 'event', 'agent', 'created', 'payload']
Last 10 events:
(24, '726d5c56eb584e0f83ad16a437026482', 'notebook_tool_completed', None, '2025-11-30T04:34:39.775790+00:00', '{"tool":"narrate swarm","returns":"text","workdir":"/app/data","args":{"provider":"gemini","model":"gemini-2.5-flash","base_url":"http://host.docker.internal:11434","preset":"scientific_rigorous","max_workers":1,"max_rounds":3,"agents":"context,summary

## Planner suggestions
- The inline planner prompts for missing args, then runs the value engine.
- Accepted suggestions are stored under `accepted_suggestions`; unaccepted remain under `suggestions` in `plan_session_inline.json`.
- The cell prints counts and lists to help inspect what was applied vs. left for later.


## Inline tools in planner
- Custom tools registered via `process.register` (e.g., the drought frame analyzer) are exported to `notebook_capabilities_overlay.json`.
- Planner cells load this overlay so suggestions may include your inline tool; prompts will label them as inline/notebook.
- The overlay is merged when `ZYRA_NOTEBOOK_OVERLAY` is set (done in the inline planner cell).


In [15]:
# Planner inline prompts (notebook-friendly)
import contextlib
import json
import os
from copy import deepcopy
from pathlib import Path

from zyra.swarm import planner as planner_mod
from zyra.swarm import suggest_augmentations

intent = (
    "Download the last year of Weekly Drought Risk PNG frames from FTP, "
    "analyze frames with the registered drought analyzer, "
    "fill missing frames, compose an MP4 animation, and save to disk."
)
plan_path = DROUGHT_DIR / "plan_session_inline.json"
overlay_path = DROUGHT_DIR / "notebook_capabilities_overlay.json"
os.environ["ZYRA_NOTEBOOK_OVERLAY"] = str(overlay_path)
with contextlib.suppress(Exception):
    planner_mod.planner._caps = None  # reset so overlays load

# Generate manifest without prompting
manifest = planner_mod.planner.plan(intent)
manifest = deepcopy(manifest)

# Collect gaps and prompt inline
gaps = planner_mod._collect_arg_gaps(manifest)  # noqa: SLF001
GAP_REASONS = {"missing_arg", "placeholder", "resolver_hint", "confirm_choice"}
for gap in gaps:
    if gap.get("reason") not in GAP_REASONS:
        continue
    field = gap.get("field")
    agent_ref = gap.get("agent_ref")
    if not field or not isinstance(agent_ref, dict):
        continue
    label = gap.get("agent_id") or gap.get("stage") or "agent"
    stage = gap.get("stage") or ""
    command = gap.get("command") or ""
    current = gap.get("current")
    suffix = f" (current: {current})" if current else ""
    help_text = planner_mod._field_help_text(stage, command, field)  # noqa: SLF001
    print(f"Clarify [{label} — {stage} {command}] for '{field}'{suffix}")
    if help_text:
        print(f"  hint: {help_text}")
    resp = input("Enter value (leave blank to skip): ").strip()
    if not resp:
        continue
    args = agent_ref.setdefault("args", {})
    args[field] = resp

# Apply value-engine suggestions
auto_suggestions = suggest_augmentations(manifest, intent=intent)
accepted = []
if auto_suggestions:
    print("\nSuggestions (y/N to accept):")
    for sug in auto_suggestions:
        stage = sug.get("stage")
        desc = sug.get("description") or sug.get("text") or ""
        origin = sug.get("origin") or sug.get("origin_detail")
        origin_label = f" [inline: {origin}]" if origin else ""
        choice = (
            input(f"  Accept {stage}{origin_label}: {desc}? [y/N] ").strip().lower()
        )
        if choice == "y":
            accepted.append(sug)
if accepted:
    manifest = planner_mod._apply_suggestion_templates(manifest, accepted)  # noqa: SLF001
    manifest["accepted_suggestions"] = accepted
else:
    manifest["suggestions"] = auto_suggestions

# Normalize custom analyzer args and dependencies
agents = manifest.setdefault("agents", [])
pad_id = next((a.get("id") for a in agents if a.get("command") == "pad-missing"), None)
for agent in agents:
    if (
        agent.get("command") == "analyze_drought_frames"
        or agent.get("id") == "analyze_drought_frames"
    ):
        args = agent.setdefault("args", {})
        args["frames_dir"] = str(FRAMES_PADDED)
        args["colorbar"] = str(DROUGHT_DIR / "VTHI.colorbar.png")
        args.setdefault("tolerance", 30)
        deps = agent.setdefault("depends_on", [])
        if pad_id and pad_id not in deps:
            deps.append(pad_id)
# Ensure scan/pad steps exist and wire dependencies
agents = manifest.setdefault("agents", [])
ids = {a.get("id") for a in agents if isinstance(a, dict)}

ftp_agent = next((a for a in agents if a.get("command") == "ftp"), None)
if ftp_agent:
    ftp_args = ftp_agent.setdefault("args", {})
    ftp_args.setdefault("sync_dir", str(FRAMES_RAW))
    ftp_args.setdefault("pattern", PATTERN)
    ftp_args.setdefault("date_format", "%Y%m%d")

scan_agent = next(
    (
        a
        for a in agents
        if a.get("stage") == "process" and a.get("command") == "scan-frames"
    ),
    None,
)
if not scan_agent:
    scan_agent = {
        "id": "scan_frames" if "scan_frames" not in ids else "scan_frames_1",
        "stage": "process",
        "command": "scan-frames",
        "depends_on": [ftp_agent.get("id")] if ftp_agent else [],
        "args": {
            "frames_dir": str(FRAMES_RAW),
            "pattern": PATTERN,
            "datetime_format": "%Y%m%d",
            "period_seconds": CADENCE_SECONDS,
            "output": str(FRAMES_META),
        },
    }
    agents.append(scan_agent)

pad_agent = next(
    (
        a
        for a in agents
        if a.get("stage") == "process" and a.get("command") == "pad-missing"
    ),
    None,
)
if not pad_agent:
    pad_agent = {
        "id": "pad_missing" if "pad_missing" not in ids else "pad_missing_1",
        "stage": "process",
        "command": "pad-missing",
        "depends_on": [scan_agent.get("id")],
        "args": {
            "frames_meta": str(FRAMES_META),
            "output_dir": str(FRAMES_PADDED),
            "fill_mode": "basemap",
            "basemap": BASEMAP_REF,
            "overwrite": True,
        },
    }
    agents.append(pad_agent)

compose_agent = next(
    (
        a
        for a in agents
        if a.get("stage") == "visualize" and a.get("command") == "compose-video"
    ),
    None,
)
if compose_agent:
    args = compose_agent.setdefault("args", {})
    args.setdefault("frames", str(FRAMES_PADDED))
    args.setdefault("output", str(VIDEO_OUT))
    deps = compose_agent.setdefault("depends_on", [])
    if pad_agent and pad_agent.get("id") not in deps:
        deps.append(pad_agent.get("id"))

local_agent = next(
    (
        a
        for a in agents
        if a.get("stage") in {"decimate", "disseminate", "export"}
        and a.get("command") == "local"
    ),
    None,
)
if compose_agent and local_agent:
    la_args = local_agent.setdefault("args", {})
    la_args.setdefault(
        "input", compose_agent.get("args", {}).get("output", str(VIDEO_OUT))
    )
    la_args.setdefault("path", str(VIDEO_OUT))
    deps = local_agent.setdefault("depends_on", [])
    if compose_agent.get("id") not in deps:
        deps.append(compose_agent.get("id"))

# Validate and finalize
manifest = planner_mod._propagate_inferred_args(manifest)  # noqa: SLF001
errors = planner_mod._validate_manifest(manifest)  # noqa: SLF001
if errors:
    print("Validation warnings:")
    for err in errors:
        print("  -", err)

planner_mod._ensure_auto_verify_agent(manifest)  # noqa: SLF001
planner_mod._ensure_verify_agents_materialized(manifest)  # noqa: SLF001
planner_mod._trace_agent_reasoning(manifest, intent)  # noqa: SLF001
manifest.setdefault("metadata", {})["intent"] = intent
manifest["intent"] = intent
plan_path.write_text(json.dumps(manifest, indent=2), encoding="utf-8")
print("Plan written to:", plan_path)
print(plan_path.read_text()[:2000])

# Suggestions summary
print("Accepted suggestions count:", len(manifest.get("accepted_suggestions") or []))
print("Remaining suggestions count:", len(manifest.get("suggestions") or []))
if manifest.get("accepted_suggestions"):
    print("Accepted suggestions:")
    for s in manifest.get("accepted_suggestions"):
        print(s)
if manifest.get("suggestions"):
    print("Unaccepted suggestions:")
    for s in manifest.get("suggestions"):
        print(s)

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): generativelanguage.googleapis.com:443
DEBUG:urllib3.connectionpool:https://generativelanguage.googleapis.com:443 "POST /v1beta/models/gemini-2.5-flash:generateContent HTTP/1.1" 200 None


Clarify [fetch_drought_frames — acquire ftp] for 'path' (current: ftp://drought.gov/weekly_risk_frames)
  hint: ftp://host/path or host/path
Clarify [pad_missing_frames — process pad-missing] for 'fill_mode' (current: basemap)
  hint: Strategy for filling gaps (default: blank)


DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): generativelanguage.googleapis.com:443
DEBUG:urllib3.connectionpool:https://generativelanguage.googleapis.com:443 "POST /v1beta/models/gemini-2.5-flash:generateContent HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): generativelanguage.googleapis.com:443
DEBUG:urllib3.connectionpool:https://generativelanguage.googleapis.com:443 "POST /v1beta/models/gemini-2.5-flash:generateContent HTTP/1.1" 200 None



Suggestions (y/N to accept):
Plan written to: /app/data/drought_notebook/plan_session_inline.json
{
  "intent": "Download the last year of Weekly Drought Risk PNG frames from FTP, analyze frames with the registered drought analyzer, fill missing frames, compose an MP4 animation, and save to disk.",
  "agents": [
    {
      "id": "fetch_drought_frames",
      "stage": "acquire",
      "command": "ftp",
      "args": {
        "path": "ftp://ftp.nnvl.noaa.gov/SOS/DroughtRisk_Weekly",
        "pattern": "^WeeklyDroughtRisk_[0-9]{8}\\.png$",
        "since_period": "P1Y",
        "sync_dir": "data/drought_risk_raw",
        "date_format": "%Y%m%d"
      },
      "role": null,
      "outputs": [],
      "stdin_from": null,
      "stdout_key": "fetch_drought_frames",
      "depends_on": [],
      "params": {},
      "parallel_ok": true,
      "behavior": "cli",
      "metadata": {},
      "prompt_ref": null
    },
    {
      "id": "scan_drought_frames",
      "stage": "process",
      "co

## Export and replay
- Plans: inline planner writes `plan_session_inline.json` under `DROUGHT_DIR`; the session planner writes `plan_session.json`.
- Provenance: `ZYRA_NOTEBOOK_PROVENANCE` (default `/app/data/drought_notebook/provenance.sqlite`) records tool runs. Inspect via the provenance cell.
- Replay plan via CLI (from repo root): `poetry run zyra swarm --plan /app/data/drought_notebook/plan_session_inline.json --stage acquire,process,visualize,decimate`.
- Pipeline/CLI exports: use the `Pipeline + CLI summary` cell (`sess.to_pipeline()` / `sess.to_cli()`) for a quick view.
