# Bedrock unified use-case notebook (config + modular tests)

This notebook is refactored to keep **what belongs in YAML** vs **what belongs in notebook code** clearly separated.

## YAML should store (stable config)

- Auth/env var names
- Endpoint/provider config
- Model catalog (`models`)
- Use-case registry (`use_cases`)
- Quality levels and fallback chains (`presets` under each use case)

## Notebook should store (test-time inputs)

- Prompt text
- Image/video file paths
- Per-run overrides for generation/analysis
- A/B test combinations

So prompts are now fully in notebook test functions, not in YAML.


In [None]:
# =========================
# 0) Load config + auth
# =========================
import os
from pathlib import Path

try:
    import yaml
except ImportError as e:
    raise ImportError("Missing dependency: pyyaml. Run: pip install pyyaml") from e

CONFIG_ENV = "BEDROCK_HARNESS_CONFIG"
CONFIG_PATH = os.environ.get(CONFIG_ENV, "bedrock_harness.yaml")
cfg_path = Path(CONFIG_PATH).expanduser().resolve()
CFG_BASE_DIR = cfg_path.parent

if not cfg_path.exists():
    raise FileNotFoundError(
        f"Missing config file: {cfg_path}\n"
        "Put 'bedrock_harness.yaml' next to this notebook, or set:\n"
        f"  export {CONFIG_ENV}=/path/to/bedrock_harness.yaml"
    )

cfg = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) or {}

auth_cfg = cfg.get("auth", {}) or {}
TOKEN_ENV = auth_cfg.get("token_env", "AWS_BEARER_TOKEN_BEDROCK")
REGION_ENV = auth_cfg.get("region_env", "AWS_REGION")
DEFAULT_REGION = auth_cfg.get("default_region", "us-east-1")

token = (os.environ.get(TOKEN_ENV) or "").strip()
if not token or len(token) < 20:
    raise RuntimeError(
        f"Missing env var {TOKEN_ENV}.\n\n"
        "Export it before launching VS Code / Jupyter, e.g.:\n"
        f"  export {TOKEN_ENV}='ABSK...'\n"
        f"  export {REGION_ENV}='us-east-1'"
    )

AWS_REGION = (os.environ.get(REGION_ENV) or DEFAULT_REGION).strip()
os.environ[REGION_ENV] = AWS_REGION

use_cases = cfg.get("use_cases", {}) or {}
if not use_cases:
    raise ValueError("YAML missing `use_cases` section")

print("Loaded config:", str(cfg_path))
print("Region:", AWS_REGION)
print("Token env:", TOKEN_ENV, "=", "set" if bool(os.environ.get(TOKEN_ENV)) else "missing")
print("Use cases:")
for uc_name, uc in use_cases.items():
    presets = sorted((uc.get("presets", {}) or {}).keys())
    default_preset = uc.get("default_preset")
    adapter = uc.get("adapter")
    print(f"  - {uc_name}: adapter={adapter}, default={default_preset}, presets={presets}")


In [None]:
# =========================
# 1) Unified modular harness
# =========================
from __future__ import annotations

import base64
import json
import mimetypes
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import requests


def _normalize_usage(provider: str, resp_json: Dict[str, Any]) -> Dict[str, Optional[int]]:
    u = (resp_json or {}).get("usage") or {}
    inp = out = total = None

    if provider == "anthropic":
        inp = u.get("input_tokens")
        out = u.get("output_tokens")
        total = u.get("total_tokens")

    elif provider in {"nova", "titan_image"}:
        inp = u.get("inputTokens", u.get("input_tokens"))
        out = u.get("outputTokens", u.get("output_tokens"))
        total = u.get("totalTokens", u.get("total_tokens"))

    elif provider == "openai_compat":
        inp = u.get("prompt_tokens", u.get("input_tokens"))
        out = u.get("completion_tokens", u.get("output_tokens"))
        total = u.get("total_tokens")

    if total is None and (inp is not None or out is not None):
        total = (inp or 0) + (out or 0)

    return {
        "input_tokens": int(inp) if inp is not None else None,
        "output_tokens": int(out) if out is not None else None,
        "total_tokens": int(total) if total is not None else None,
    }


@dataclass(frozen=True)
class CandidateSpec:
    use_case: str
    adapter: str
    media_kind: Optional[str]
    preset: str
    model_ref: str
    provider: str
    model_id: str
    params: Dict[str, Any]


class UnifiedBedrockHarness:
    def __init__(self, cfg: Dict[str, Any], config_dir: Path):
        self.cfg = cfg or {}
        self.config_dir = config_dir

        auth = self.cfg.get("auth", {}) or {}
        self.token_env = auth.get("token_env", "AWS_BEARER_TOKEN_BEDROCK")
        self.region_env = auth.get("region_env", "AWS_REGION")
        self.default_region = auth.get("default_region", "us-east-1")

        http = self.cfg.get("http", {}) or {}
        self.timeout_seconds = int(http.get("timeout_seconds", 120))

        defaults = self.cfg.get("defaults", {}) or {}
        self.debug_fallback_default = bool(defaults.get("debug_fallback", True))

        endpoints = self.cfg.get("endpoints", {}) or {}
        self.runtime_base_tmpl = endpoints.get(
            "bedrock_runtime_base",
            "https://bedrock-runtime.{region}.amazonaws.com",
        )

        providers = self.cfg.get("providers", {}) or {}
        self.anthropic_version = ((providers.get("anthropic", {}) or {}).get("anthropic_version")) or "bedrock-2023-05-31"

        openai_cfg = providers.get("openai_compat", {}) or {}
        self.openai_chat_path = openai_cfg.get("chat_completions_path", "/openai/v1/chat/completions")
        self.openai_system_role = openai_cfg.get("system_role", "developer")
        self.openai_max_tokens_param = openai_cfg.get("max_tokens_param", "max_completion_tokens")

        self.models = self.cfg.get("models", {}) or {}
        self.use_cases = self.cfg.get("use_cases", {}) or {}

        mm_cfg = self.cfg.get("multimodal", {}) or {}
        output_dir = mm_cfg.get("output_dir", "./outputs")
        self.output_dir = self.resolve_path(output_dir)

        self._session = requests.Session()

    # ---------- env ----------
    def region(self) -> str:
        return (os.environ.get(self.region_env) or self.default_region).strip()

    def bearer_token(self) -> str:
        tok = (os.environ.get(self.token_env) or "").strip()
        if not tok or len(tok) < 20:
            raise RuntimeError(f"Missing env var {self.token_env}.")
        return tok

    def runtime_base(self) -> str:
        return self.runtime_base_tmpl.format(region=self.region()).rstrip("/")

    def headers_json(self) -> Dict[str, str]:
        return {
            "Authorization": f"Bearer {self.bearer_token()}",
            "Content-Type": "application/json",
        }

    # ---------- config ----------
    def resolve_path(self, p: str) -> Path:
        path = Path(p).expanduser()
        if path.is_absolute():
            return path.resolve()
        return (self.config_dir / path).resolve()

    def list_use_cases(self) -> List[str]:
        return sorted(self.use_cases.keys())

    def list_presets(self, use_case: str) -> List[str]:
        uc = self.use_cases.get(use_case) or {}
        return sorted((uc.get("presets", {}) or {}).keys())

    def _resolve_model(self, model_ref: str) -> Tuple[str, str]:
        if model_ref not in self.models:
            raise ValueError(f"Unknown model_ref '{model_ref}'. Available: {sorted(self.models.keys())}")
        m = self.models[model_ref] or {}
        provider = m.get("provider")
        model_id = m.get("model_id")
        if not provider or not model_id:
            raise ValueError(f"Invalid model config for '{model_ref}': provider/model_id required")
        return provider, model_id

    def build_candidate_specs(self, use_case: str, preset: Optional[str] = None) -> List[CandidateSpec]:
        uc = self.use_cases.get(use_case)
        if not uc:
            raise ValueError(f"Unknown use_case '{use_case}'. Available: {self.list_use_cases()}")

        adapter = uc.get("adapter")
        media_kind = uc.get("media_kind")
        default_preset = uc.get("default_preset")
        chosen_preset = preset or default_preset

        presets = uc.get("presets", {}) or {}
        if chosen_preset not in presets:
            raise ValueError(
                f"Unknown preset '{chosen_preset}' for use_case '{use_case}'. "
                f"Available: {sorted(presets.keys())}"
            )

        out: List[CandidateSpec] = []
        for item in presets[chosen_preset]:
            model_ref = item["model_ref"]
            provider, model_id = self._resolve_model(model_ref)
            params = dict(item)
            params.pop("model_ref", None)
            out.append(
                CandidateSpec(
                    use_case=use_case,
                    adapter=adapter,
                    media_kind=media_kind,
                    preset=chosen_preset,
                    model_ref=model_ref,
                    provider=provider,
                    model_id=model_id,
                    params=params,
                )
            )
        return out

    # ---------- internal helpers ----------
    def _effective_temperature(self, payload: Dict[str, Any], params: Dict[str, Any]) -> Optional[float]:
        if "temperature" in payload and payload.get("temperature") is not None:
            return float(payload.get("temperature"))
        if "temperature" in params and params.get("temperature") is not None:
            return float(params.get("temperature"))
        return None

    def _effective_max_tokens(self, payload: Dict[str, Any], params: Dict[str, Any], fallback: int = 1024) -> int:
        if "max_output_tokens" in payload and payload.get("max_output_tokens") is not None:
            return int(payload.get("max_output_tokens"))
        if "max_output_tokens" in params and params.get("max_output_tokens") is not None:
            return int(params.get("max_output_tokens"))
        return int(fallback)

    def _anthropic_thinking(self, params: Dict[str, Any], max_tokens: int) -> Optional[Dict[str, Any]]:
        anth = params.get("anthropic", {}) or {}
        thinking = (anth.get("thinking", {}) or {})
        enabled = bool(thinking.get("enabled", False))
        if not enabled:
            return None

        out: Dict[str, Any] = {"type": "enabled"}
        if "budget_tokens" in thinking and thinking.get("budget_tokens") is not None:
            budget = int(thinking.get("budget_tokens"))
            out["budget_tokens"] = max(1, min(max_tokens - 1, budget))
        return out

    def _save_base64_file(self, use_case: str, scenario_tag: str, b64_text: str, ext: str) -> str:
        uc_cfg = self.use_cases.get(use_case, {}) or {}
        output_cfg = uc_cfg.get("output", {}) or {}
        subdir = output_cfg.get("save_subdir", use_case)

        out_dir = (self.output_dir / subdir).resolve()
        out_dir.mkdir(parents=True, exist_ok=True)

        ts = int(time.time())
        out_path = out_dir / f"{scenario_tag}_{ts}.{ext.lstrip('.')}"
        out_path.write_bytes(base64.b64decode(b64_text))
        return str(out_path)

    # ---------- adapter: conversation ----------
    def _invoke_conversation(self, spec: CandidateSpec, payload: Dict[str, Any]) -> Dict[str, Any]:
        system_prompt = str(payload.get("system_prompt", "") or "").strip()
        user_prompt = str(payload.get("user_prompt", "") or "").strip()
        if not user_prompt:
            raise ValueError("conversation requires payload.user_prompt")

        if spec.provider == "anthropic":
            return self._invoke_anthropic_text(spec, system_prompt, user_prompt, payload)
        if spec.provider == "nova":
            return self._invoke_nova_text(spec, system_prompt, user_prompt, payload)
        if spec.provider == "openai_compat":
            return self._invoke_openai_text(spec, system_prompt, user_prompt, payload)

        raise ValueError(f"Unsupported provider '{spec.provider}' for conversation")

    def _invoke_anthropic_text(self, spec: CandidateSpec, system_prompt: str, user_prompt: str, payload: Dict[str, Any]) -> Dict[str, Any]:
        max_tokens = self._effective_max_tokens(payload, spec.params, fallback=1024)
        temperature = self._effective_temperature(payload, spec.params)
        thinking = self._anthropic_thinking(spec.params, max_tokens)

        req: Dict[str, Any] = {
            "anthropic_version": self.anthropic_version,
            "messages": [{"role": "user", "content": [{"type": "text", "text": user_prompt}]}],
            "max_tokens": max_tokens,
        }
        if system_prompt:
            req["system"] = system_prompt
        if temperature is not None:
            req["temperature"] = 1.0 if thinking else temperature
        if thinking is not None:
            req["thinking"] = thinking

        url = f"{self.runtime_base()}/model/{spec.model_id}/invoke"
        r = self._session.post(url, headers=self.headers_json(), data=json.dumps(req), timeout=self.timeout_seconds)
        if r.status_code != 200:
            raise RuntimeError(f"Anthropic invoke failed {r.status_code}: {r.text[:1200]}")

        resp = r.json()
        blocks = resp.get("content", []) or []
        text = "".join([b.get("text", "") for b in blocks if isinstance(b, dict) and b.get("type") == "text"]).strip()
        usage = _normalize_usage("anthropic", resp)

        return {"response": text, "usage": usage, "raw": resp, "saved_files": [], "job_id": None}

    def _invoke_nova_text(self, spec: CandidateSpec, system_prompt: str, user_prompt: str, payload: Dict[str, Any]) -> Dict[str, Any]:
        max_tokens = self._effective_max_tokens(payload, spec.params, fallback=1024)
        temperature = self._effective_temperature(payload, spec.params)

        req: Dict[str, Any] = {
            "messages": [{"role": "user", "content": [{"text": user_prompt}]}],
            "inferenceConfig": {"maxTokens": max_tokens},
        }
        if system_prompt:
            req["system"] = [{"text": system_prompt}]
        if temperature is not None:
            req["inferenceConfig"]["temperature"] = temperature

        url = f"{self.runtime_base()}/model/{spec.model_id}/invoke"
        r = self._session.post(url, headers=self.headers_json(), data=json.dumps(req), timeout=self.timeout_seconds)
        if r.status_code != 200:
            raise RuntimeError(f"Nova invoke failed {r.status_code}: {r.text[:1200]}")

        resp = r.json()
        content = (((resp.get("output") or {}).get("message") or {}).get("content")) or []
        text = "".join([c.get("text", "") for c in content if isinstance(c, dict) and "text" in c]).strip()
        usage = _normalize_usage("nova", resp)

        return {"response": text, "usage": usage, "raw": resp, "saved_files": [], "job_id": None}

    def _invoke_openai_text(self, spec: CandidateSpec, system_prompt: str, user_prompt: str, payload: Dict[str, Any]) -> Dict[str, Any]:
        max_tokens = self._effective_max_tokens(payload, spec.params, fallback=1024)
        temperature = self._effective_temperature(payload, spec.params)

        messages: List[Dict[str, Any]] = []
        if system_prompt:
            messages.append({"role": self.openai_system_role, "content": system_prompt})
        messages.append({"role": "user", "content": user_prompt})

        req: Dict[str, Any] = {
            "model": spec.model_id,
            "messages": messages,
            "stream": False,
            self.openai_max_tokens_param: max_tokens,
        }
        if temperature is not None:
            req["temperature"] = temperature

        url = f"{self.runtime_base()}{self.openai_chat_path}"
        r = self._session.post(url, headers=self.headers_json(), data=json.dumps(req), timeout=self.timeout_seconds)
        if r.status_code != 200:
            raise RuntimeError(f"OpenAI-compat invoke failed {r.status_code}: {r.text[:1200]}")

        resp = r.json()
        text = (resp.get("choices", [{}])[0].get("message", {}) or {}).get("content", "") or ""
        usage = _normalize_usage("openai_compat", resp)

        return {"response": text, "usage": usage, "raw": resp, "saved_files": [], "job_id": None}

    # ---------- adapter: understanding ----------
    def _invoke_understanding(self, spec: CandidateSpec, payload: Dict[str, Any]) -> Dict[str, Any]:
        if spec.provider != "anthropic":
            raise ValueError(
                f"Use case '{spec.use_case}' currently supports anthropic provider only for understanding; got {spec.provider}"
            )

        media_path_val = payload.get("media_path")
        if not media_path_val:
            raise ValueError(f"{spec.use_case} requires payload.media_path")

        media_path = self.resolve_path(str(media_path_val))
        if not media_path.exists():
            raise FileNotFoundError(f"Media file not found: {media_path}")

        media_kind = spec.media_kind or payload.get("media_kind") or "image"
        system_prompt = str(payload.get("system_prompt", "") or "").strip()
        user_prompt = str(payload.get("user_prompt", "") or "").strip()

        media_type = payload.get("media_type")
        if not media_type:
            guessed, _ = mimetypes.guess_type(str(media_path))
            if guessed:
                media_type = guessed
            else:
                media_type = "image/jpeg" if media_kind == "image" else "video/mp4"

        b64 = base64.b64encode(media_path.read_bytes()).decode("ascii")

        max_tokens = self._effective_max_tokens(payload, spec.params, fallback=1400)
        temperature = self._effective_temperature(payload, spec.params)
        thinking = self._anthropic_thinking(spec.params, max_tokens)

        content: List[Dict[str, Any]] = [
            {
                "type": media_kind,
                "source": {
                    "type": "base64",
                    "media_type": media_type,
                    "data": b64,
                },
            }
        ]
        if user_prompt:
            content.append({"type": "text", "text": user_prompt})

        req: Dict[str, Any] = {
            "anthropic_version": self.anthropic_version,
            "messages": [{"role": "user", "content": content}],
            "max_tokens": max_tokens,
        }
        if system_prompt:
            req["system"] = system_prompt
        if temperature is not None:
            req["temperature"] = 1.0 if thinking else temperature
        if thinking is not None:
            req["thinking"] = thinking

        url = f"{self.runtime_base()}/model/{spec.model_id}/invoke"
        r = self._session.post(url, headers=self.headers_json(), data=json.dumps(req), timeout=self.timeout_seconds)
        if r.status_code != 200:
            raise RuntimeError(f"Understanding invoke failed {r.status_code}: {r.text[:1200]}")

        resp = r.json()
        blocks = resp.get("content", []) or []
        text = "".join([b.get("text", "") for b in blocks if isinstance(b, dict) and b.get("type") == "text"]).strip()
        usage = _normalize_usage("anthropic", resp)

        return {"response": text, "usage": usage, "raw": resp, "saved_files": [], "job_id": None}

    # ---------- adapter: image_generation ----------
    def _invoke_image_generation(self, spec: CandidateSpec, payload: Dict[str, Any]) -> Dict[str, Any]:
        if spec.provider not in {"nova", "titan_image"}:
            raise ValueError(f"Unsupported provider '{spec.provider}' for image_generation")

        prompt = str(payload.get("prompt", "") or "").strip()
        if not prompt:
            raise ValueError("image_generation requires payload.prompt")

        negative_prompt = str(payload.get("negative_prompt", "") or "").strip()

        width = int(payload.get("width") or spec.params.get("width") or 1024)
        height = int(payload.get("height") or spec.params.get("height") or 1024)
        number_of_images = int(payload.get("number_of_images") or spec.params.get("number_of_images") or 1)
        cfg_scale = float(payload.get("cfg_scale") or spec.params.get("cfg_scale") or 8.0)
        seed = payload.get("seed", spec.params.get("seed"))

        req: Dict[str, Any] = {
            "taskType": "TEXT_IMAGE",
            "textToImageParams": {"text": prompt},
            "imageGenerationConfig": {
                "numberOfImages": number_of_images,
                "width": width,
                "height": height,
                "cfgScale": cfg_scale,
            },
        }
        if negative_prompt:
            req["textToImageParams"]["negativeText"] = negative_prompt
        if seed is not None:
            req["imageGenerationConfig"]["seed"] = int(seed)

        url = f"{self.runtime_base()}/model/{spec.model_id}/invoke"
        r = self._session.post(url, headers=self.headers_json(), data=json.dumps(req), timeout=self.timeout_seconds)
        if r.status_code != 200:
            raise RuntimeError(f"Image generation invoke failed {r.status_code}: {r.text[:1200]}")

        resp = r.json()

        images: List[str] = []
        if isinstance(resp.get("images"), list):
            for it in resp.get("images"):
                if isinstance(it, str):
                    images.append(it)

        if isinstance(resp.get("artifacts"), list):
            for it in resp.get("artifacts"):
                if isinstance(it, dict):
                    b64 = it.get("base64") or it.get("data") or it.get("image")
                    if isinstance(b64, str):
                        images.append(b64)

        saved_files: List[str] = []
        uc_output = (self.use_cases.get(spec.use_case, {}) or {}).get("output", {}) or {}
        ext = uc_output.get("file_ext", "png")
        for idx, b64_img in enumerate(images, start=1):
            saved_files.append(self._save_base64_file(spec.use_case, f"{spec.preset}_{idx:02d}", b64_img, ext))

        response_text = f"Generated {len(saved_files)} image(s)." if saved_files else "No image artifacts found in response."
        usage = _normalize_usage(spec.provider, resp)

        return {"response": response_text, "usage": usage, "raw": resp, "saved_files": saved_files, "job_id": None}

    # ---------- adapter: video_generation ----------
    def _invoke_video_generation(self, spec: CandidateSpec, payload: Dict[str, Any]) -> Dict[str, Any]:
        if spec.provider != "nova":
            raise ValueError(f"Unsupported provider '{spec.provider}' for video_generation")

        prompt = str(payload.get("prompt", "") or "").strip()
        if not prompt:
            raise ValueError("video_generation requires payload.prompt")

        duration_seconds = int(payload.get("duration_seconds") or spec.params.get("duration_seconds") or 6)
        fps = int(payload.get("fps") or spec.params.get("fps") or 24)
        dimension = str(payload.get("dimension") or spec.params.get("dimension") or "1280x720")
        seed = payload.get("seed", spec.params.get("seed"))

        req: Dict[str, Any] = {
            "taskType": "TEXT_VIDEO",
            "textToVideoParams": {"text": prompt},
            "videoGenerationConfig": {
                "durationSeconds": duration_seconds,
                "fps": fps,
                "dimension": dimension,
            },
        }
        if seed is not None:
            req["videoGenerationConfig"]["seed"] = int(seed)

        url = f"{self.runtime_base()}/model/{spec.model_id}/invoke"
        r = self._session.post(url, headers=self.headers_json(), data=json.dumps(req), timeout=self.timeout_seconds)
        if r.status_code != 200:
            raise RuntimeError(f"Video generation invoke failed {r.status_code}: {r.text[:1200]}")

        resp = r.json()

        video_blob = (
            resp.get("video")
            or resp.get("videoBase64")
            or ((resp.get("output") or {}).get("video") if isinstance(resp.get("output"), dict) else None)
            or ((resp.get("result") or {}).get("video") if isinstance(resp.get("result"), dict) else None)
        )

        job_id = (
            resp.get("invocationArn")
            or resp.get("jobArn")
            or resp.get("jobId")
            or resp.get("id")
        )

        saved_files: List[str] = []
        if isinstance(video_blob, str) and len(video_blob) > 100:
            uc_output = (self.use_cases.get(spec.use_case, {}) or {}).get("output", {}) or {}
            ext = uc_output.get("file_ext", "mp4")
            saved_files.append(self._save_base64_file(spec.use_case, spec.preset, video_blob, ext))

        if saved_files:
            response_text = f"Generated {len(saved_files)} video artifact(s)."
        elif job_id:
            response_text = f"Video generation accepted. job_id={job_id}"
        else:
            response_text = "No video artifact/job id found in response."

        usage = _normalize_usage(spec.provider, resp)
        return {"response": response_text, "usage": usage, "raw": resp, "saved_files": saved_files, "job_id": str(job_id) if job_id else None}

    # ---------- public API ----------
    def run(
        self,
        use_case: str,
        payload: Dict[str, Any],
        preset: Optional[str] = None,
        *,
        return_record: bool = False,
        label: str = "",
        debug_fallback: Optional[bool] = None,
    ) -> Union[str, Dict[str, Any]]:
        if debug_fallback is None:
            debug_fallback = self.debug_fallback_default

        candidates = self.build_candidate_specs(use_case, preset)
        t0 = time.time()
        last_err: Optional[Exception] = None

        for idx, spec in enumerate(candidates, start=1):
            try:
                if spec.adapter == "conversation":
                    out = self._invoke_conversation(spec, payload)
                elif spec.adapter == "understanding":
                    out = self._invoke_understanding(spec, payload)
                elif spec.adapter == "image_generation":
                    out = self._invoke_image_generation(spec, payload)
                elif spec.adapter == "video_generation":
                    out = self._invoke_video_generation(spec, payload)
                else:
                    raise ValueError(f"Unsupported adapter '{spec.adapter}' for use_case '{use_case}'")

                latency = round(time.time() - t0, 3)
                if return_record:
                    return {
                        "label": label or spec.preset,
                        "use_case": use_case,
                        "adapter": spec.adapter,
                        "preset": spec.preset,
                        "picked_index": idx,
                        "model_ref": spec.model_ref,
                        "provider": spec.provider,
                        "model_id": spec.model_id,
                        "params": spec.params,
                        "latency_s": latency,
                        "usage": out.get("usage"),
                        "response": out.get("response"),
                        "saved_files": out.get("saved_files") or [],
                        "job_id": out.get("job_id"),
                        "error": None,
                    }
                return str(out.get("response", ""))

            except Exception as e:
                last_err = e
                if debug_fallback:
                    print(
                        f"[fallback] use_case={use_case} preset={spec.preset} "
                        f"model_ref={spec.model_ref} provider={spec.provider} failed: {type(e).__name__}: {e}"
                    )
                continue

        latency = round(time.time() - t0, 3)
        err_text = f"{type(last_err).__name__}: {last_err}" if last_err else "Unknown error"
        if return_record:
            return {
                "label": label or (preset or "default"),
                "use_case": use_case,
                "adapter": None,
                "preset": preset,
                "picked_index": None,
                "model_ref": None,
                "provider": None,
                "model_id": None,
                "params": None,
                "latency_s": latency,
                "usage": None,
                "response": None,
                "saved_files": [],
                "job_id": None,
                "error": err_text,
            }
        return f"An error occurred: {err_text}"


In [None]:
# =========================
# 2) Test modules (prompts and inputs live here)
# =========================
h = UnifiedBedrockHarness(cfg, config_dir=CFG_BASE_DIR)


def test_conversation(
    system_prompt: str,
    user_prompt: str,
    *,
    level: Optional[str] = None,
    label: str = "",
    debug_fallback: Optional[bool] = None,
):
    payload = {
        "system_prompt": system_prompt,
        "user_prompt": user_prompt,
    }
    return h.run(
        "conversation",
        payload,
        preset=level,
        return_record=True,
        label=label or (level or "conversation"),
        debug_fallback=debug_fallback,
    )


def test_image_understanding(
    image_path: str,
    user_prompt: str,
    *,
    system_prompt: str = "You are a visual analyst. Be precise and concise.",
    level: Optional[str] = None,
    media_type: Optional[str] = None,
    max_output_tokens: Optional[int] = None,
    temperature: Optional[float] = None,
    label: str = "",
    debug_fallback: Optional[bool] = None,
):
    payload: Dict[str, Any] = {
        "media_path": image_path,
        "user_prompt": user_prompt,
        "system_prompt": system_prompt,
    }
    if media_type:
        payload["media_type"] = media_type
    if max_output_tokens is not None:
        payload["max_output_tokens"] = int(max_output_tokens)
    if temperature is not None:
        payload["temperature"] = float(temperature)

    return h.run(
        "image_understanding",
        payload,
        preset=level,
        return_record=True,
        label=label or (level or "image_understanding"),
        debug_fallback=debug_fallback,
    )


def test_video_understanding(
    video_path: str,
    user_prompt: str,
    *,
    system_prompt: str = "You are a video analyst. Summarize actions, timeline, and key events.",
    level: Optional[str] = None,
    media_type: Optional[str] = None,
    max_output_tokens: Optional[int] = None,
    temperature: Optional[float] = None,
    label: str = "",
    debug_fallback: Optional[bool] = None,
):
    payload: Dict[str, Any] = {
        "media_path": video_path,
        "user_prompt": user_prompt,
        "system_prompt": system_prompt,
    }
    if media_type:
        payload["media_type"] = media_type
    if max_output_tokens is not None:
        payload["max_output_tokens"] = int(max_output_tokens)
    if temperature is not None:
        payload["temperature"] = float(temperature)

    return h.run(
        "video_understanding",
        payload,
        preset=level,
        return_record=True,
        label=label or (level or "video_understanding"),
        debug_fallback=debug_fallback,
    )


def test_image_generation(
    prompt: str,
    *,
    negative_prompt: str = "",
    level: Optional[str] = None,
    width: Optional[int] = None,
    height: Optional[int] = None,
    number_of_images: Optional[int] = None,
    cfg_scale: Optional[float] = None,
    seed: Optional[int] = None,
    label: str = "",
    debug_fallback: Optional[bool] = None,
):
    payload: Dict[str, Any] = {
        "prompt": prompt,
        "negative_prompt": negative_prompt,
    }
    if width is not None:
        payload["width"] = int(width)
    if height is not None:
        payload["height"] = int(height)
    if number_of_images is not None:
        payload["number_of_images"] = int(number_of_images)
    if cfg_scale is not None:
        payload["cfg_scale"] = float(cfg_scale)
    if seed is not None:
        payload["seed"] = int(seed)

    return h.run(
        "image_generation",
        payload,
        preset=level,
        return_record=True,
        label=label or (level or "image_generation"),
        debug_fallback=debug_fallback,
    )


def test_video_generation(
    prompt: str,
    *,
    level: Optional[str] = None,
    duration_seconds: Optional[int] = None,
    fps: Optional[int] = None,
    dimension: Optional[str] = None,
    seed: Optional[int] = None,
    label: str = "",
    debug_fallback: Optional[bool] = None,
):
    payload: Dict[str, Any] = {"prompt": prompt}
    if duration_seconds is not None:
        payload["duration_seconds"] = int(duration_seconds)
    if fps is not None:
        payload["fps"] = int(fps)
    if dimension is not None:
        payload["dimension"] = str(dimension)
    if seed is not None:
        payload["seed"] = int(seed)

    return h.run(
        "video_generation",
        payload,
        preset=level,
        return_record=True,
        label=label or (level or "video_generation"),
        debug_fallback=debug_fallback,
    )


In [None]:
# =========================
# 3) Display helpers
# =========================
from IPython.display import Markdown, display
import html
from pathlib import Path
from typing import Any, Dict, Optional


def display_records(*records: Dict[str, Any], title: Optional[str] = None, max_col_width_px: int = 520):
    cols = list(records[0]) if (len(records) == 1 and isinstance(records[0], (list, tuple))) else list(records)

    def esc(x: Any) -> str:
        s = "" if x is None else str(x)
        return html.escape(s).replace("\n", "<br/>")

    def fmt_tokens(rec: Dict[str, Any]) -> str:
        u = rec.get("usage") or {}
        if not isinstance(u, dict) or not u:
            return ""
        inp = u.get("input_tokens")
        out = u.get("output_tokens")
        tot = u.get("total_tokens")
        parts = []
        if inp is not None:
            parts.append(f"in:{inp}")
        if out is not None:
            parts.append(f"out:{out}")
        if tot is not None:
            parts.append(f"total:{tot}")
        return "tokens: " + " · ".join(parts) if parts else ""

    def header_block(rec: Dict[str, Any]) -> str:
        lines = [
            f"use_case: {rec.get('use_case')}",
            f"preset: {rec.get('preset')}",
            f"picked: #{rec.get('picked_index')}",
            f"provider: {rec.get('provider')}",
            f"model_ref: {rec.get('model_ref')}",
            f"model_id: {rec.get('model_id')}",
            f"latency: {rec.get('latency_s')}s",
        ]
        if rec.get("job_id"):
            lines.append(f"job_id: {rec.get('job_id')}")
        tok = fmt_tokens(rec)
        if tok:
            lines.append(tok)
        if rec.get("error"):
            lines.append(f"error: {rec.get('error')}")

        files = rec.get("saved_files") or []
        if files:
            lines.append("saved_files:")
            for fp in files:
                lines.append(f"  - {fp}")

        return "<br/>".join(esc(x) for x in lines if x and x != "#None")

    table = []
    table.append("<div style='overflow-x:auto; padding:6px 2px;'>")
    table.append("<table style='border-collapse:collapse; width:max-content; min-width:100%;'>")

    if title:
        table.append(
            f"<tr><th colspan='{len(cols)}' "
            "style='text-align:left; font-size:16px; padding:10px; border:1px solid #ddd; background:#fafafa;'>"
            f"{esc(title)}</th></tr>"
        )

    table.append("<tr>")
    for i, rec in enumerate(cols):
        label = rec.get("label") or f"Variant {i+1}"
        meta = header_block(rec)
        table.append(
            "<th style='text-align:left; vertical-align:top; border:1px solid #ddd; padding:10px; "
            "background:#f7f7f7; min-width:320px; "
            f"max-width:{max_col_width_px}px;'>"
            f"{esc(label)}<br/><span style='font-weight:normal;color:#666'>{meta}</span></th>"
        )
    table.append("</tr>")

    table.append("<tr>")
    for rec in cols:
        resp = rec.get("response", "")
        table.append(
            "<td style='vertical-align:top; border:1px solid #ddd; padding:10px; "
            f"max-width:{max_col_width_px}px;'>"
            "<div><b>Response</b></div>"
            f"<div style='margin-top:6px; line-height:1.35;'>{esc(resp)}</div>"
            "</td>"
        )
    table.append("</tr>")

    table.append("</table></div>")
    display(Markdown("".join(table)))


# Optional: quickly check artifact files exist

def print_artifact_status(rec: Dict[str, Any]):
    files = rec.get("saved_files") or []
    if not files:
        return
    for fp in files:
        p = Path(fp)
        print("artifact:", p, "exists=" + str(p.exists()))


In [None]:
# =========================
# 4) Example tests (prompts are in notebook, not YAML)
# =========================

# A) Conversation A/B
SYSTEM = "You are an AI engineering expert. Explain with practical examples."
USER = "RAG、MCP、Skill 的区别是什么？请用中文给我一个工程实践角度的解释。"

resp_conv_default = test_conversation(SYSTEM, USER, level="analysis_default", label="conv_default")
resp_conv_max = test_conversation(SYSTEM, USER, level="analysis_max", label="conv_max")

display_records(resp_conv_default, resp_conv_max, title="Conversation: analysis_default vs analysis_max")

# B) Image understanding (run only when file exists)
img_default = cfg.get("multimodal", {}).get("default_image_path", "./samples/demo_image.jpg")
img_abs = h.resolve_path(img_default)
if img_abs.exists():
    resp_img = test_image_understanding(
        str(img_abs),
        "请提取图中关键元素，并给出场景推断。",
        level="vision_default",
        label="image_understanding_default",
    )
    display_records(resp_img, title="Image Understanding")
else:
    print(f"Skip image_understanding: file not found -> {img_abs}")

# C) Video understanding (run only when file exists)
video_default = cfg.get("multimodal", {}).get("default_video_path", "./samples/demo_video.mp4")
video_abs = h.resolve_path(video_default)
if video_abs.exists():
    resp_video = test_video_understanding(
        str(video_abs),
        "请按时间顺序总结视频里的主要事件。",
        level="video_default",
        label="video_understanding_default",
    )
    display_records(resp_video, title="Video Understanding")
else:
    print(f"Skip video_understanding: file not found -> {video_abs}")

# D) Generation examples moved to separate code cells below.


In [None]:
# =========================
# 5) Image Generation (separate chunk)
# =========================
RUN_IMAGE_GENERATION = False

if RUN_IMAGE_GENERATION:
    resp_img_gen = test_image_generation(
        prompt="A modern electric concept car in a minimalist studio, cinematic lighting.",
        level="image_default",
        negative_prompt="blurry, text watermark",
        label="image_generation_default",
    )
    display_records(resp_img_gen, title="Image Generation")
    print_artifact_status(resp_img_gen)
else:
    print("Skip image generation. Set RUN_IMAGE_GENERATION=True to run.")


In [None]:
# =========================
# 6) Video Generation (separate chunk)
# =========================
RUN_VIDEO_GENERATION = False

if RUN_VIDEO_GENERATION:
    resp_video_gen = test_video_generation(
        prompt="A smooth drone shot above ocean cliffs at sunset.",
        level="video_default",
        label="video_generation_default",
    )
    display_records(resp_video_gen, title="Video Generation")
    print_artifact_status(resp_video_gen)
else:
    print("Skip video generation. Set RUN_VIDEO_GENERATION=True to run.")
