In [None]:
# Cell 1 — Install deps, check GPU, define config
%pip -q install -U "pillow==11.3.0" "qwen-vl-utils" "accelerate" "bitsandbytes" "safetensors" "pypdf>=5.0.0" "sentencepiece"
%pip -q install -U "git+https://github.com/huggingface/transformers"

import os, torch
from PIL import Image
import transformers

# Define model/config
CFG = {
    "model_id": "Qwen/Qwen2-VL-2B-Instruct",
    "max_side": 1024,
    "max_new_tokens": 256,
}
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DTYPE  = torch.float16 if DEVICE == "cuda" else torch.float32

# Check environment
print("OK | device:", DEVICE, "| torch:", torch.__version__)
print("OK | transformers:", transformers.__version__, "| pillow:", Image.__version__)
os.makedirs("/content/data", exist_ok=True)
print("OK | data dir:", "/content/data")


  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
OK | device: cuda | torch: 2.9.0+cu126
OK | transformers: 5.0.0.dev0 | pillow: 11.3.0
OK | data dir: /content/data


In [None]:
# Cell 2 — To load model, define analyzer (image -> JSON)
import re, json
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
from qwen_vl_utils import process_vision_info

# To load processor and model
processor = AutoProcessor.from_pretrained(CFG["model_id"])
model = Qwen2VLForConditionalGeneration.from_pretrained(
    CFG["model_id"], torch_dtype=DTYPE, device_map="auto" if DEVICE=="cuda" else None
)
if DEVICE != "cuda": model = model.to(DEVICE)
model.eval()

# To extract JSON safely
def _extract_json(s: str) -> dict:
    m = re.search(r"\{.*\}", s, flags=re.S)
    return json.loads(m.group(0) if m else s)

# To analyze an image with a single prompt
def analyze_image(image_path: str) -> dict:
    messages = [{"role":"user","content":[
        {"type":"image","image": image_path},
        {"type":"text","text":(
            "Return ONLY valid JSON with keys: caption (string), facts (list of strings), "
            "entities (list of strings), tasks (list of strings)."
        )},
    ]}]
    text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    images, videos = process_vision_info(messages)
    inputs = processor(text=[text], images=images, videos=videos, return_tensors="pt", padding=True).to(DEVICE)
    out_ids = model.generate(**inputs, max_new_tokens=CFG["max_new_tokens"], do_sample=False)
    out_txt = processor.batch_decode(out_ids, skip_special_tokens=True)[0]
    return _extract_json(out_txt)

print("OK | model loaded:", CFG["model_id"])
# To demo after uploading an image:
# img_path = "/content/data/test.jpg"
# print(json.dumps(analyze_image(img_path), indent=2, ensure_ascii=False))


Downloading (incomplete total...): 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

Loading weights:   0%|          | 0/729 [00:00<?, ?it/s]

OK | model loaded: Qwen/Qwen2-VL-2B-Instruct


In [None]:
# Cell X — Fix JSON extraction (handle extra text before/after JSON)
import json, re

def _extract_json(text: str):
    s = text.strip()
    s = re.sub(r"^```(?:json)?\s*|\s*```$", "", s, flags=re.I | re.S).strip()
    i1, i2 = s.find("{"), s.find("[")
    starts = [i for i in (i1, i2) if i != -1]
    if not starts:
        raise ValueError("No JSON found in model output.")
    s = s[min(starts):]
    obj, _ = json.JSONDecoder().raw_decode(s)  # ignore trailing text
    return obj

print("OK | JSON extractor patched")


OK | JSON extractor patched


In [None]:
# Patch JSON extraction (pick the JSON object that contains required keys)
import json, re

def extract_best_json(text: str, required_keys: list[str] | None = None):
    s = text.strip()
    s = re.sub(r"^```(?:json)?\s*|\s*```$", "", s, flags=re.I | re.S).strip()

    dec = json.JSONDecoder()
    candidates = []
    for start_char in ("{", "["):
        idx = 0
        while True:
            i = s.find(start_char, idx)
            if i == -1: break
            try:
                obj, end = dec.raw_decode(s[i:])
                candidates.append(obj)
                idx = i + max(end, 1)
            except Exception:
                idx = i + 1

    if not candidates:
        raise ValueError("No JSON found in model output.")

    if required_keys:
        for obj in reversed(candidates):
            if isinstance(obj, dict) and all(k in obj for k in required_keys):
                return obj

    return candidates[-1]


In [None]:
# Cell 3 — To analyze a PDF (extract text + render page -> reuse vision JSON)
%pip -q install -U "pypdfium2>=4.30.0"

import pypdfium2 as pdfium
from pypdf import PdfReader

# To render one PDF page to an image file
def _render_pdf_page(pdf_path: str, pageno: int, out_path: str, scale: float = 2.0) -> str:
    pdf = pdfium.PdfDocument(pdf_path)
    page = pdf.get_page(pageno)
    pil = page.render(scale=scale).to_pil()
    pil.save(out_path)
    page.close(); pdf.close()
    return out_path

# To analyze PDF using (page image + extracted text)
def analyze_pdf(pdf_path: str, pageno: int = 0) -> dict:
    reader = PdfReader(pdf_path)
    txt = " ".join([(p.extract_text() or "") for p in reader.pages[:2]])
    txt = txt[:4000]
    img_path = _render_pdf_page(pdf_path, pageno, "/content/data/_pdf_page0.jpg")

    messages = [{"role":"user","content":[
        {"type":"image","image": img_path},
        {"type":"text","text":(
            "You receive a PDF page screenshot + extracted text excerpt.\n"
            f"TEXT_EXCERPT:\n{txt}\n\n"
            "Return ONLY valid JSON with keys: summary, key_points, entities, tasks."
        )},
    ]}]
    chat = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    images, videos = process_vision_info(messages)
    inputs = processor(text=[chat], images=images, videos=videos, return_tensors="pt", padding=True).to(DEVICE)
    out_ids = model.generate(**inputs, max_new_tokens=CFG["max_new_tokens"], do_sample=False)
    out_txt = processor.batch_decode(out_ids, skip_special_tokens=True)[0]
    return _extract_json(out_txt)

# To demo after uploading a PDF:
# pdf_path = "/content/data/test.pdf"
# print(json.dumps(analyze_pdf(pdf_path), indent=2, ensure_ascii=False))


In [None]:
# Cell 4 — To route input (image/PDF) and return unified JSON
from pathlib import Path
from datetime import datetime

# To normalize outputs into one schema
def _normalize(res: dict, kind: str) -> dict:
    if kind == "pdf":
        summary = res.get("summary", "")
        points  = res.get("key_points", [])
    else:
        summary = res.get("caption", "")
        points  = res.get("facts", [])
    return {
        "input_type": kind,
        "timestamp": datetime.now().isoformat(timespec="seconds"),
        "summary": summary,
        "key_points": points,
        "entities": res.get("entities", []),
        "tasks": res.get("tasks", []),
        "raw": res,
    }

# To analyze any supported file path
def analyze(input_path: str) -> dict:
    p = Path(input_path)
    ext = p.suffix.lower()
    if ext == ".pdf":
        return _normalize(analyze_pdf(str(p)), "pdf")
    if ext in [".png", ".jpg", ".jpeg", ".webp", ".bmp"]:
        return _normalize(analyze_image(str(p)), "image")
    raise ValueError(f"Unsupported file type: {ext}")

# To demo:
# print(json.dumps(analyze("/content/data/test.jpg"), indent=2, ensure_ascii=False))
# print(json.dumps(analyze("/content/data/test.pdf"), indent=2, ensure_ascii=False))


In [None]:
# Cell 5 — To run a simple agent loop (plan -> answer -> self-check) with cache + logs
import time
CACHE, LOGS = {}, []

# To call LLM and force JSON output
def llm_json(system_text: str, user_text: str) -> dict:
    msgs = [{"role":"system","content":[{"type":"text","text":system_text}]},
            {"role":"user","content":[{"type":"text","text":user_text}]}]
    chat = processor.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True)
    inputs = processor(text=[chat], return_tensors="pt", padding=True).to(DEVICE)
    out = model.generate(**inputs, max_new_tokens=CFG["max_new_tokens"], do_sample=False)
    txt = processor.batch_decode(out, skip_special_tokens=True)[0]
    return _extract_json(txt)

# To run the multimodal agent on any file + question
def run_agent(question: str, input_path: str) -> dict:
    k = (question.strip(), input_path)
    if k in CACHE: return CACHE[k]
    t0 = time.time()
    ctx = analyze(input_path)
    sys = "You are a careful analyst. Output ONLY valid JSON."
    usr = json.dumps({
        "question": question,
        "context": {k: ctx[k] for k in ["input_type","summary","key_points","entities","tasks"]},
        "output_schema": {
            "plan": ["..."], "answer": "...", "self_check": "...",
            "confidence": 0.0, "next_actions": ["..."]
        }
    }, ensure_ascii=False)
    res = llm_json(sys, usr)
    LOGS.append({"seconds": round(time.time()-t0, 2), "input": input_path, "q": question[:80]})
    CACHE[k] = {"result": res, "agent_context": ctx, "log": LOGS[-1]}
    return CACHE[k]

# To demo:
# print(json.dumps(run_agent("What is this document/image about?", "/content/data/test.jpg"), indent=2, ensure_ascii=False))


In [None]:
# Cell 6 — Download sample image (avoid 403), run agent, save report
import os, json, urllib.request
from pathlib import Path

# Create data folder
os.makedirs("/content/data", exist_ok=True)

# Download file with browser-like headers
def download_file(url: str, out_path: str) -> str:
    req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
    with urllib.request.urlopen(req) as r, open(out_path, "wb") as f:
        f.write(r.read())
    return out_path

# Download sample image (fallback if blocked)
urls = [
    "https://upload.wikimedia.org/wikipedia/commons/thumb/3/3f/Fronalpstock_big.jpg/512px-Fronalpstock_big.jpg",
    "https://picsum.photos/512",
]
input_path = "/content/data/sample_image.jpg"

for u in urls:
    try:
        download_file(u, input_path)
        print("Downloaded:", input_path, "| source:", u)
        break
    except Exception as e:
        print("Download failed:", u, "|", repr(e))

# Run agent
question = "Summarize the image and list actionable tasks."
out = run_agent(question, input_path)
print(json.dumps(out["result"], indent=2, ensure_ascii=False))

# Save report
report_path = "/content/data/report.json"
with open(report_path, "w", encoding="utf-8") as f:
    json.dump(out, f, ensure_ascii=False, indent=2)
print("Saved report:", report_path)


Downloaded: /content/data/sample_image.jpg | source: https://upload.wikimedia.org/wikipedia/commons/thumb/3/3f/Fronalpstock_big.jpg/512px-Fronalpstock_big.jpg
{
  "question": "Summarize the image and list actionable tasks.",
  "context": {
    "input_type": "image",
    "summary": "A panoramic view of the Swiss Alps with green mountains and snow-capped peaks in the distance.",
    "key_points": [
      "The Swiss Alps are a range of mountains in central Europe.",
      "The Alps are known for their rugged terrain and diverse ecosystems.",
      "The region around the Alps is a popular destination for hikers and outdoor enthusiasts."
    ],
    "entities": [
      "Swiss Alps",
      "central Europe",
      "mountains",
      "Switzerland",
      "hikers",
      "outdoor enthusiasts"
    ],
    "tasks": []
  },
  "output_schema": {
    "plan": [
      "..."
    ],
    "answer": "...",
    "self_check": "...",
    "confidence": 0.0,
    "next_actions": [
      "..."
    ]
  }
}
Saved rep

In [None]:
# Cell 7 — Patch agent JSON parsing, validate output, retry on failure, write Markdown report
import time, json, re

# Extract best JSON block from model output
def extract_best_json(text: str, required_keys: list[str] | None = None):
    s = text.strip()
    s = re.sub(r"^```(?:json)?\s*|\s*```$", "", s, flags=re.I | re.S).strip()

    dec = json.JSONDecoder()
    candidates = []

    idx = 0
    while idx < len(s):
        brace_pos = min([p for p in [s.find("{", idx), s.find("[", idx)] if p != -1], default=-1)
        if brace_pos == -1:
            break
        try:
            obj, end = dec.raw_decode(s[brace_pos:])
            candidates.append(obj)
            idx = brace_pos + max(end, 1)
        except Exception:
            idx = brace_pos + 1

    if not candidates:
        raise ValueError("No JSON found in model output.")

    if required_keys:
        for obj in reversed(candidates):
            if isinstance(obj, dict) and all(k in obj for k in required_keys):
                return obj

    return candidates[-1]

REQUIRED_KEYS = ["plan", "answer", "self_check", "confidence", "next_actions"]
AGENT_CACHE, AGENT_LOGS = {}, []

# Validate agent JSON schema
def validate_agent_output(obj: dict) -> None:
    if not isinstance(obj, dict):
        raise ValueError("Output is not a JSON object.")
    missing = [k for k in REQUIRED_KEYS if k not in obj]
    if missing:
        raise ValueError(f"Missing keys: {missing}")
    if not isinstance(obj["plan"], list) or not all(isinstance(x, str) for x in obj["plan"]):
        raise ValueError("plan must be a list of strings.")
    if not isinstance(obj["answer"], str):
        raise ValueError("answer must be a string.")
    if not isinstance(obj["self_check"], str):
        raise ValueError("self_check must be a string.")
    if not isinstance(obj["next_actions"], list) or not all(isinstance(x, str) for x in obj["next_actions"]):
        raise ValueError("next_actions must be a list of strings.")
    if not isinstance(obj["confidence"], (int, float)) or not (0.0 <= float(obj["confidence"]) <= 1.0):
        raise ValueError("confidence must be a number between 0 and 1.")

# Call text-only LLM and return validated JSON object
def llm_agent(system_text: str, user_payload: dict, required_keys: list[str]):
    msgs = [
        {"role": "system", "content": [{"type": "text", "text": system_text}]},
        {"role": "user", "content": [{"type": "text", "text": json.dumps(user_payload, ensure_ascii=False)}]},
    ]
    chat = processor.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True)
    inputs = processor(text=[chat], return_tensors="pt", padding=True).to(DEVICE)
    out = model.generate(**inputs, max_new_tokens=CFG["max_new_tokens"], do_sample=False)
    txt = processor.batch_decode(out, skip_special_tokens=True)[0]
    return extract_best_json(txt, required_keys)

# Run the multimodal agent on any file + question (return only final schema)
def run_agent(question: str, input_path: str, retries: int = 2) -> dict:
    key = (question.strip(), input_path)
    if key in AGENT_CACHE:
        return AGENT_CACHE[key]

    t0 = time.time()
    ctx = analyze(input_path)

    system_text = "Return ONLY valid JSON. No prose. No markdown."
    base_payload = {
        "question": question,
        "context": {k: ctx[k] for k in ["input_type", "summary", "key_points", "entities", "tasks"]},
        "required_schema": {
            "plan": ["string", "string"],
            "answer": "string",
            "self_check": "string",
            "confidence": 0.0,
            "next_actions": ["string", "string"],
        },
        "rules": [
            "Output must be ONLY a JSON object.",
            "Include ALL required keys exactly as specified.",
            "Keep answer concise and actionable.",
        ],
    }

    last_err = None
    for attempt in range(retries + 1):
        payload = dict(base_payload)
        if last_err:
            payload["fix_request"] = f"Previous output invalid. Fix it. Error: {last_err}"

        obj = llm_agent(system_text, payload, REQUIRED_KEYS)
        try:
            validate_agent_output(obj)
            AGENT_LOGS.append({"seconds": round(time.time() - t0, 2), "input": input_path, "attempts": attempt + 1})
            AGENT_CACHE[key] = obj
            return obj
        except Exception as e:
            last_err = str(e)

    raise ValueError(f"Agent failed after retries. Last error: {last_err}")

# Build clean Markdown report
def build_markdown_report(question: str, input_path: str, ctx: dict, agent_out: dict) -> str:
    kp = "\n".join([f"- {x}" for x in (ctx.get("key_points", []) or [])][:12]) or "- (none)"
    plan = "\n".join([f"- {x}" for x in (agent_out.get("plan", []) or [])]) or "- (none)"
    na = "\n".join([f"- {x}" for x in (agent_out.get("next_actions", []) or [])]) or "- (none)"
    ents = ", ".join(ctx.get("entities", []) or [])

    return f"""# Multimodal Agent Report

**Input:** `{input_path}`
**Question:** {question}

## Extracted Context
**Summary:** {ctx.get("summary","")}

**Key points:**
{kp}

**Entities:** {ents}

## Agent Output
**Plan:**
{plan}

**Answer:**
{agent_out.get("answer","")}

**Self-check:**
{agent_out.get("self_check","")}

**Confidence:** {agent_out.get("confidence", 0.0)}

**Next actions:**
{na}
"""

# Run demo using existing sample path
try:
    input_path
except NameError:
    input_path = "/content/data/sample_image.jpg"

question = "Summarize the image and list actionable tasks."
ctx = analyze(input_path)
agent_out = run_agent(question, input_path)

md_path = "/content/data/report.md"
with open(md_path, "w", encoding="utf-8") as f:
    f.write(build_markdown_report(question, input_path, ctx, agent_out))

print("OK | saved:", md_path)
print(json.dumps(agent_out, indent=2, ensure_ascii=False))


OK | saved: /content/data/report.md
{
  "plan": [
    "The Swiss Alps are a range of mountains in central Europe.",
    "The Alps are known for their rugged terrain and diverse ecosystems.",
    "The region around the Alps is a popular destination for hikers and outdoor enthusiasts."
  ],
  "answer": "The Swiss Alps are a range of mountains in central Europe, known for their rugged terrain and diverse ecosystems. The region around the Alps is a popular destination for hikers and outdoor enthusiasts.",
  "self_check": "The plan is a list of strings.",
  "confidence": 0.0,
  "next_actions": [
    "No actions needed."
  ]
}


In [None]:
# Cell 8 — Mental Health Coach skill (fixed, self-contained)
import json, re

MH_KEYS = ["situation", "goals", "cbt_exercises", "anger_tools", "conflict_script", "daily_5min_plan", "warnings", "confidence"]

# Validate MH output schema
def validate_mh_output(obj: dict) -> None:
    if not isinstance(obj, dict): raise ValueError("Output is not a JSON object.")
    missing = [k for k in MH_KEYS if k not in obj]
    if missing: raise ValueError(f"Missing keys: {missing}")
    if not isinstance(obj["situation"], str): raise ValueError("situation must be a string.")
    for k in ["goals", "cbt_exercises", "anger_tools", "daily_5min_plan", "warnings"]:
        if not isinstance(obj[k], list) or not all(isinstance(x, str) for x in obj[k]):
            raise ValueError(f"{k} must be a list of strings.")
    if not isinstance(obj["conflict_script"], str): raise ValueError("conflict_script must be a string.")
    if not isinstance(obj["confidence"], (int, float)) or not (0.0 <= float(obj["confidence"]) <= 1.0):
        raise ValueError("confidence must be a number between 0 and 1.")

# Coerce value into list[str]
def coerce_list_str(x):
    if isinstance(x, list):
        return [str(i).strip() for i in x if str(i).strip()]
    if isinstance(x, str):
        s = x.strip()
        parts = re.split(r"\n+|;+", s)
        parts = [re.sub(r"^\s*[-•\d\.\)\:]+\s*", "", p).strip() for p in parts]
        return [p for p in parts if p]
    return [str(x).strip()] if x is not None else []

# Coerce MH output to schema-friendly types
def coerce_mh_output(obj: dict) -> dict:
    if not isinstance(obj, dict):
        return obj
    for k in ["goals", "cbt_exercises", "anger_tools", "daily_5min_plan", "warnings"]:
        obj[k] = coerce_list_str(obj.get(k, []))
    if "confidence" in obj and not isinstance(obj["confidence"], (int, float)):
        try: obj["confidence"] = float(obj["confidence"])
        except Exception: obj["confidence"] = 0.3
    if "situation" in obj and not isinstance(obj["situation"], str):
        obj["situation"] = str(obj["situation"])
    if "conflict_script" in obj and not isinstance(obj["conflict_script"], str):
        obj["conflict_script"] = str(obj["conflict_script"])
    return obj

# Run mental health coach skill (optional multimodal context)
def run_mental_health_coach(user_message: str, input_path: str | None = None, retries: int = 2) -> dict:
    ctx = analyze(input_path) if input_path else {"input_type":"none","summary":"","key_points":[],"entities":[],"tasks":[]}

    system_text = (
        "You are a CBT-informed coach for a teen. Return ONLY valid JSON. No prose, no markdown. "
        "Be practical, gentle, and non-judgmental. Provide tools that take 1–10 minutes. "
        "Avoid medical claims. Suggest professional help if symptoms are severe or persistent. "
        "Never provide self-harm instructions or graphic content."
    )

    base_payload = {
        "user_message": user_message,
        "optional_context": {k: ctx.get(k) for k in ["input_type", "summary", "key_points", "entities"]},
        "required_schema": {
            "situation": "string",
            "goals": ["string", "string"],
            "cbt_exercises": ["string", "string", "string"],
            "anger_tools": ["string", "string", "string"],
            "conflict_script": "string",
            "daily_5min_plan": ["string", "string", "string"],
            "warnings": ["string"],
            "confidence": 0.0
        },
        "rules": [
            "Output must be ONLY a JSON object.",
            "Exercises must be concrete steps, not theory.",
            "daily_5min_plan must be a JSON array of short steps.",
            "Conflict_script must be something the user can say in one breath."
        ],
    }

    last_err = None
    for _ in range(retries + 1):
        payload = dict(base_payload)
        if last_err:
            payload["fix_request"] = f"Previous output invalid. Fix it. Error: {last_err}"

        obj = llm_agent(system_text, payload, MH_KEYS)
        obj = coerce_mh_output(obj)
        try:
            validate_mh_output(obj)
            return obj
        except Exception as e:
            last_err = str(e)

    raise ValueError(f"Mental health skill failed after retries. Last error: {last_err}")

# Demo
demo_msg = "I feel anxious before meetings and sometimes get angry when someone provokes me. Give CBT techniques and a calm conflict script."
mh = run_mental_health_coach(demo_msg)
print(json.dumps(mh, indent=2, ensure_ascii=False))


{
  "situation": "anxiety before meetings and anger provocation",
  "goals": [
    "manage anxiety",
    "learn calm conflict resolution"
  ],
  "cbt_exercises": [
    "Deep breathing exercises",
    "Mindfulness meditation",
    "Positive self-talk",
    "Mindful listening",
    "Time management",
    "Avoiding distractions"
  ],
  "anger_tools": [
    "Physical exercise",
    "Journaling",
    "Mindfulness meditation",
    "Mindful listening",
    "Time management",
    "Avoiding distractions"
  ],
  "conflict_script": "I am calm and collected. I will not let my emotions control me. I will approach the situation with a calm and collected mindset.",
  "daily_5min_plan": [
    "Take a 5-minute walk",
    "Practice deep breathing exercises",
    "Practice mindfulness meditation",
    "Practice positive self-talk",
    "Practice mindful listening",
    "Practice time management",
    "Avoid distractions"
  ],
    "It's important to seek professional help if anxiety or anger becomes sever

In [None]:
# Cell 9 — Skill router + single agent_chat() entrypoint
import json

ROUTER_KEYS = ["intent", "skill", "reason", "confidence"]

# Validate router output
def validate_router(obj: dict) -> None:
    if not isinstance(obj, dict): raise ValueError("Router output is not an object.")
    miss = [k for k in ROUTER_KEYS if k not in obj]
    if miss: raise ValueError(f"Missing keys: {miss}")
    if obj["intent"] not in ["general", "doc_qa", "mental_health", "summarize"]:
        raise ValueError("Invalid intent.")
    if obj["skill"] not in ["run_agent", "run_doc_qa", "run_mental_health_coach", "run_summarize"]:
        raise ValueError("Invalid skill.")
    if not isinstance(obj["reason"], str): raise ValueError("reason must be string.")
    if not isinstance(obj["confidence"], (int,float)) or not (0 <= float(obj["confidence"]) <= 1):
        raise ValueError("confidence must be 0..1.")

# Answer question grounded in analyzed context
def run_doc_qa(question: str, input_path: str) -> dict:
    ctx = analyze(input_path)
    keys = ["answer", "evidence", "limits", "confidence", "next_actions"]
    system_text = "Answer using ONLY the provided context. Return ONLY valid JSON."
    payload = {
        "question": question,
        "context": {k: ctx[k] for k in ["input_type","summary","key_points","entities"]},
        "required_schema": {
            "answer":"string",
            "evidence":["string","string"],
            "limits":"string",
            "confidence":0.0,
            "next_actions":["string","string"]
        },
        "rules": [
            "If context is insufficient, say so in limits and lower confidence.",
            "Evidence must quote short phrases from summary/key_points (no long text)."
        ]
    }
    return llm_agent(system_text, payload, keys)

# Summarize any file
def run_summarize(input_path: str) -> dict:
    return run_agent("Summarize and list key actions.", input_path)

# Classify intent and pick skill (LLM-first, safe fallback)
def route(user_message: str, has_file: bool) -> dict:
    system_text = "Classify user intent. Return ONLY valid JSON."
    payload = {
        "user_message": user_message,
        "has_file": has_file,
        "intents": ["general","doc_qa","mental_health","summarize"],
        "skills": ["run_agent","run_doc_qa","run_mental_health_coach","run_summarize"],
        "rules": [
            "mental_health if CBT/anxiety/anger/conflict/stress coping.",
            "doc_qa if asking what the provided file says/contains.",
            "summarize if explicitly asking to summarize/outline.",
            "general otherwise."
        ],
        "required_schema": {"intent":"string","skill":"string","reason":"string","confidence":0.0}
    }
    try:
        obj = llm_agent(system_text, payload, ROUTER_KEYS)
        validate_router(obj)
        return obj
    except Exception:
        m = user_message.lower()
        if any(w in m for w in ["cbt","anxiety","panic","anger","conflict","stress","rumination"]):
            return {"intent":"mental_health","skill":"run_mental_health_coach","reason":"keyword fallback","confidence":0.6}
        if has_file and any(w in m for w in ["pdf","image","document","page","according to","what does it say"]):
            return {"intent":"doc_qa","skill":"run_doc_qa","reason":"keyword fallback + file","confidence":0.6}
        if any(w in m for w in ["summarize","summary","outline","tl;dr"]):
            return {"intent":"summarize","skill":"run_summarize","reason":"keyword fallback","confidence":0.6}
        return {"intent":"general","skill":"run_agent","reason":"default fallback","confidence":0.4}

# Run one unified agent call (text-only or multimodal)
def agent_chat(user_message: str, input_path: str | None = None) -> dict:
    has_file = input_path is not None
    r = route(user_message, has_file)

    if r["skill"] == "run_mental_health_coach":
        out = run_mental_health_coach(user_message, input_path=input_path)
    elif r["skill"] == "run_doc_qa":
        if not input_path: raise ValueError("doc_qa requires input_path.")
        out = run_doc_qa(user_message, input_path)
    elif r["skill"] == "run_summarize":
        if not input_path: raise ValueError("summarize requires input_path.")
        out = run_summarize(input_path)
    else:
        if input_path:
            out = run_agent(user_message, input_path)
        else:
            out = llm_agent(
                "Return ONLY valid JSON.",
                {"question": user_message, "required_schema": {"answer":"string","confidence":0.0}},
                ["answer","confidence"]
            )

    return {"router": r, "output": out}

# Demo (no user interaction)
try:
    sample_path = input_path
except NameError:
    sample_path = "/content/data/sample_image.jpg"

print(json.dumps(agent_chat("I feel anxious before meetings. Give CBT tools.", None), indent=2, ensure_ascii=False)[:1200])
print(json.dumps(agent_chat("Summarize this file.", sample_path), indent=2, ensure_ascii=False)[:1200])


{
  "router": {
    "intent": "mental_health",
    "skill": "run_mental_health_coach",
    "reason": "keyword fallback",
    "confidence": 0.6
  },
  "output": {
    "situation": "I feel anxious before meetings.",
    "goals": [
      "I want to reduce my anxiety before meetings.",
      "I want to feel more confident in my ability to handle meetings without anxiety."
    ],
    "cbt_exercises": [
      "Deep breathing exercises",
      "Mindfulness meditation",
      "Progressive muscle relaxation"
    ],
    "anger_tools": [
      "Positive self-talk",
      "Physical activity",
      "Time management"
    ],
    "conflict_script": "I feel anxious before meetings.",
    "daily_5min_plan": [
      "Take a 5-minute break before the meeting.",
      "Practice deep breathing exercises before the meeting.",
      "Take a 5-minute walk before the meeting."
    ],
      "Do not use self-harm instructions or graphic content."
    ],
    "confidence": 0.0
  }
}
{
  "router": {
    "intent": "

In [None]:
# Cell 10 — Memory + conversation state for multi-turn chat
import time, json, hashlib

STATE = {
    "history": [],          # list[{"role": "...", "text": "...", "ts": "..."}]
    "last_file": None,      # last used file path
    "file_context": {},     # cache: file_hash -> analyzed context
    "preferences": {        # customizable settings
        "max_history": 8,
        "default_skill": "auto",
    },
}

# Hash file bytes to key cached context
def file_hash(path: str) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b""):
            h.update(chunk)
    return h.hexdigest()[:16]

# Add message to history
def remember(role: str, text: str) -> None:
    STATE["history"].append({"role": role, "text": text, "ts": time.strftime("%Y-%m-%d %H:%M:%S")})
    STATE["history"] = STATE["history"][-STATE["preferences"]["max_history"]:]

# Get analyzed context with caching
def get_file_context(path: str) -> dict:
    fh = file_hash(path)
    if fh not in STATE["file_context"]:
        STATE["file_context"][fh] = analyze(path)
    STATE["last_file"] = path
    return STATE["file_context"][fh]

# Chat with memory (auto reuse last file)
def agent_chat_mem(user_message: str, input_path: str | None = None) -> dict:
    remember("user", user_message)

    if input_path is None:
        input_path = STATE["last_file"]

    ctx = None
    if input_path is not None:
        ctx = get_file_context(input_path)

    r = route(user_message, has_file=input_path is not None)

    if r["skill"] == "run_mental_health_coach":
        out = run_mental_health_coach(user_message, input_path=input_path)
    elif r["skill"] == "run_doc_qa":
        out = run_doc_qa(user_message, input_path)
    elif r["skill"] == "run_summarize":
        out = run_summarize(input_path)
    else:
        if input_path:
            out = run_agent(user_message, input_path)
        else:
            out = llm_agent(
                "Return ONLY valid JSON.",
                {"question": user_message, "history": STATE["history"], "required_schema": {"answer":"string","confidence":0.0}},
                ["answer","confidence"]
            )

    remember("assistant", json.dumps(out, ensure_ascii=False)[:800])
    return {"router": r, "output": out, "used_file": input_path, "history_size": len(STATE["history"])}

# Demo
try:
    sample_path = input_path
except NameError:
    sample_path = "/content/data/sample_image.jpg"

_ = agent_chat_mem("Summarize this file.", sample_path)
print(json.dumps(agent_chat_mem("Now ask me 2 follow-up questions about it.", None), indent=2, ensure_ascii=False)[:1200])


{
  "router": {
    "intent": "general",
    "skill": "run_agent",
    "reason": "default fallback",
    "confidence": 0.4
  },
  "output": {
    "plan": [
      "string",
      "string"
    ],
    "answer": "string",
    "self_check": "string",
    "confidence": 0.0,
    "next_actions": [
      "string",
      "string"
    ]
  },
  "used_file": "/content/data/sample_image.jpg",
  "history_size": 4
}


In [None]:
# Cell 12 — Fix run_doc_qa() to always return required schema (no eval failures)
import json

DOCQA_KEYS = ["answer", "evidence", "limits", "confidence", "next_actions"]

# Run doc Q&A grounded in analyzed context with guaranteed schema
def run_doc_qa(question: str, input_path: str) -> dict:
    ctx = analyze(input_path)

    system_text = (
        "Return ONLY valid JSON. No prose, no markdown.\n"
        "Answer using ONLY the provided context.\n"
        "You MUST output ALL keys in the template exactly.\n"
        "If context is insufficient, say so in limits and lower confidence."
    )

    payload = {
        "question": question,
        "context": {k: ctx.get(k) for k in ["input_type","summary","key_points","entities"]},
        "template": {
            "answer": "",
            "evidence": ["", ""],
            "limits": "",
            "confidence": 0.0,
            "next_actions": ["", ""]
        },
        "rules": [
            "Fill evidence with short phrases copied from summary/key_points.",
            "If you cannot support an answer from context, set answer to 'Not enough context.'",
            "confidence must be between 0 and 1."
        ],
    }

    try:
        obj = llm_agent(system_text, payload, DOCQA_KEYS)
    except Exception:
        obj = {}

    if not isinstance(obj, dict):
        obj = {}

    # Fill defaults to guarantee schema
    obj.setdefault("answer", "Not enough context.")
    obj.setdefault("evidence", ["", ""])
    obj.setdefault("limits", "Context may be insufficient or not specific enough to answer reliably.")
    obj.setdefault("confidence", 0.3)
    obj.setdefault("next_actions", ["Ask a more specific question.", "Provide a clearer/closer page or higher-resolution image."])

    # Coerce types
    if not isinstance(obj["answer"], str): obj["answer"] = str(obj["answer"])
    if not isinstance(obj["limits"], str): obj["limits"] = str(obj["limits"])
    if not isinstance(obj["evidence"], list): obj["evidence"] = [str(obj["evidence"])]
    obj["evidence"] = [str(x) for x in obj["evidence"]][:2] + ["",""]
    obj["evidence"] = obj["evidence"][:2]
    if not isinstance(obj["next_actions"], list): obj["next_actions"] = [str(obj["next_actions"])]
    obj["next_actions"] = [str(x) for x in obj["next_actions"]][:2] + ["",""]
    obj["next_actions"] = obj["next_actions"][:2]
    if not isinstance(obj["confidence"], (int, float)):
        try: obj["confidence"] = float(obj["confidence"])
        except Exception: obj["confidence"] = 0.3
    obj["confidence"] = float(max(0.0, min(1.0, obj["confidence"])))

    return obj

print("OK | run_doc_qa patched")


OK | run_doc_qa patched


In [None]:
# Cell 11 — Evaluation harness (robust, never crashes)
import os, json, time, urllib.request

os.makedirs("/content/data", exist_ok=True)

# Download helper (no user interaction)
def download_file(url: str, out_path: str) -> bool:
    try:
        req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
        with urllib.request.urlopen(req, timeout=30) as r, open(out_path, "wb") as f:
            f.write(r.read())
        return True
    except Exception:
        return False

# Ensure sample image exists
sample_image = "/content/data/sample_image.jpg"
if not os.path.exists(sample_image):
    ok = download_file("https://picsum.photos/512", sample_image)
    print("Sample image ready?", ok, "|", sample_image)

# Ensure sample PDF exists (optional)
sample_pdf = "/content/data/sample.pdf"
if not os.path.exists(sample_pdf):
    ok = download_file("https://raw.githubusercontent.com/mozilla/pdf.js/master/examples/learning/helloworld.pdf", sample_pdf)
    print("Sample PDF ready?", ok, "|", sample_pdf)

# Safe call wrapper
def safe_call(name: str, fn):
    t0 = time.time()
    try:
        out = fn()
        return {"test": name, "ok": True, "seconds": round(time.time()-t0, 2), "error": None, "output_preview": str(out)[:400]}
    except Exception as e:
        return {"test": name, "ok": False, "seconds": round(time.time()-t0, 2), "error": repr(e), "output_preview": None}

# Basic schema checks
def has_keys(d, keys):
    return isinstance(d, dict) and all(k in d for k in keys)

results = []

# Test 1: Multimodal analyze(image)
def test_analyze_image():
    if "analyze" not in globals(): raise NameError("analyze() not defined")
    out = analyze(sample_image)
    assert has_keys(out, ["input_type","summary","key_points","entities","tasks","raw"])
    return {"input_type": out["input_type"], "summary": out["summary"], "n_key_points": len(out["key_points"])}

results.append(safe_call("analyze_image", test_analyze_image))

# Test 2: Multimodal analyze(pdf) (skip if PDF missing)
def test_analyze_pdf():
    if not os.path.exists(sample_pdf):
        return {"skipped": True, "reason": "sample_pdf not available"}
    if "analyze" not in globals(): raise NameError("analyze() not defined")
    out = analyze(sample_pdf)
    assert has_keys(out, ["input_type","summary","key_points","entities","tasks","raw"])
    return {"input_type": out["input_type"], "summary": out["summary"], "n_key_points": len(out["key_points"])}

results.append(safe_call("analyze_pdf", test_analyze_pdf))

# Test 3: Mental Health skill (CBT)
def test_mental_health():
    if "run_mental_health_coach" not in globals(): raise NameError("run_mental_health_coach() not defined")
    out = run_mental_health_coach("I feel anxious before meetings and sometimes get angry. Give CBT tools and a calm conflict script.")
    need = ["situation","goals","cbt_exercises","anger_tools","conflict_script","daily_5min_plan","warnings","confidence"]
    assert has_keys(out, need)
    return {"confidence": out["confidence"], "script": out["conflict_script"][:120]}

results.append(safe_call("mental_health_cbt", test_mental_health))

# Test 4: Router (text-only)
def test_router():
    if "route" not in globals(): raise NameError("route() not defined")
    out = route("I feel anxious. Give CBT.", has_file=False)
    assert has_keys(out, ["intent","skill","reason","confidence"])
    return out

results.append(safe_call("router_text", test_router))

# Test 5: Doc QA grounded on image context
def test_doc_qa():
    if "run_doc_qa" not in globals(): raise NameError("run_doc_qa() not defined")
    out = run_doc_qa("What objects or scene elements are visible?", sample_image)
    need = ["answer","evidence","limits","confidence","next_actions"]
    assert has_keys(out, need)
    return {"answer": out["answer"][:160], "confidence": out["confidence"]}

results.append(safe_call("doc_qa_image", test_doc_qa))

# Save results
out_path = "/content/data/eval_results.json"
with open(out_path, "w", encoding="utf-8") as f:
    json.dump(results, f, ensure_ascii=False, indent=2)

passed = sum(1 for r in results if r["ok"])
print(f"Done | passed {passed}/{len(results)} | saved: {out_path}")
print(json.dumps(results, indent=2, ensure_ascii=False))


Done | passed 5/5 | saved: /content/data/eval_results.json
[
  {
    "test": "analyze_image",
    "ok": true,
    "seconds": 5.62,
    "error": null,
    "output_preview": "{'input_type': 'image', 'summary': 'A panoramic view of the Swiss Alps with green mountains and snow-capped peaks in the distance.', 'n_key_points': 3}"
  },
  {
    "test": "analyze_pdf",
    "ok": true,
    "seconds": 2.24,
    "error": null,
    "output_preview": "{'input_type': 'pdf', 'summary': 'The text is a simple greeting to the world.', 'n_key_points': 1}"
  },
  {
    "test": "mental_health_cbt",
    "ok": true,
    "seconds": 21.94,
    "error": null,
    "output_preview": "{'confidence': 0.0, 'script': 'I am calm and assertive in my communication.'}"
  },
  {
    "test": "router_text",
    "ok": true,
    "seconds": 6.52,
    "error": null,
    "output_preview": "{'intent': 'mental_health', 'skill': 'run_mental_health_coach', 'reason': 'keyword fallback', 'confidence': 0.6}"
  },
  {
    "test": "doc_qa_

In [None]:
# Patch run_doc_qa (handle both signatures + never return empty answer if evidence exists)

import inspect

def _coerce_docqa_output(obj):
    if not isinstance(obj, dict):
        obj = {}

    ans = str(obj.get("answer", "") or "").strip()
    ev = obj.get("evidence", [])
    lim = str(obj.get("limits", "") or "").strip()
    na  = obj.get("next_actions", [])

    if isinstance(ev, str):
        ev = [ev]
    if not isinstance(ev, list):
        ev = []
    ev = [str(x).strip() for x in ev if str(x).strip()][:6]

    if not ans and ev:
        ans = "Main text: " + " ".join(ev[:2])

    try:
        conf = float(obj.get("confidence", 0.0))
    except Exception:
        conf = 0.0
    conf = max(0.0, min(1.0, conf))
    if ev and conf < 0.3:
        conf = 0.6

    if isinstance(na, str):
        na = [na]
    if not isinstance(na, list):
        na = []
    na = [str(x) for x in na][:6]

    return {"answer": ans, "evidence": ev, "limits": lim, "confidence": conf, "next_actions": na}

# Keep original once
if "_run_doc_qa_original" not in globals():
    _run_doc_qa_original = run_doc_qa

def run_doc_qa(question: str, input_path: str, retries: int = 2) -> dict:
    try:
        sig = inspect.signature(_run_doc_qa_original)
        if "retries" in sig.parameters:
            out = _run_doc_qa_original(question, input_path, retries=retries)
        else:
            out = _run_doc_qa_original(question, input_path)
    except TypeError:
        out = _run_doc_qa_original(question, input_path)

    return _coerce_docqa_output(out)

print("Patched run_doc_qa OK (signature-safe)")


Patched run_doc_qa OK (signature-safe)


In [None]:
# Cell 13 — One-click demo (download sample inputs, run agent, show report + eval summary)
import os, json, time, urllib.request

os.makedirs("/content/data", exist_ok=True)

# Download file with headers
def download_file(url: str, out_path: str) -> bool:
    try:
        req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
        with urllib.request.urlopen(req, timeout=30) as r, open(out_path, "wb") as f:
            f.write(r.read())
        return True
    except Exception as e:
        print("Download failed:", url, "|", repr(e))
        return False

# Ensure demo assets exist
def prepare_demo_assets():
    sample_image = "/content/data/sample_image.jpg"
    sample_pdf = "/content/data/sample.pdf"
    if not os.path.exists(sample_image):
        download_file("https://picsum.photos/512", sample_image)
    if not os.path.exists(sample_pdf):
        download_file("https://raw.githubusercontent.com/mozilla/pdf.js/master/examples/learning/helloworld.pdf", sample_pdf)
    return sample_image, sample_pdf

# Run evaluation harness and return results
def run_eval_suite(sample_image: str, sample_pdf: str):
    tests = [
        ("analyze_image", lambda: {"ok": True, "out": analyze(sample_image)}),
        ("analyze_pdf",   lambda: {"ok": True, "out": analyze(sample_pdf)}),
        ("mental_health", lambda: {"ok": True, "out": run_mental_health_coach("I feel anxious before meetings. Give CBT tools.")}),
        ("router_text",   lambda: {"ok": True, "out": route("I feel anxious. Give CBT.", has_file=False)}),
        ("doc_qa_image",  lambda: {"ok": True, "out": run_doc_qa("What objects or elements are visible?", sample_image)}),
    ]
    results = []
    for name, fn in tests:
        t0 = time.time()
        try:
            r = fn()
            results.append({"test": name, "ok": True, "seconds": round(time.time()-t0, 2), "preview": str(r["out"])[:220]})
        except Exception as e:
            results.append({"test": name, "ok": False, "seconds": round(time.time()-t0, 2), "error": repr(e), "preview": None})
    return results

# One-click demo
def demo():
    img_path, pdf_path = prepare_demo_assets()

    print("=== DEMO 1: Mental Health (text-only) ===")
    mh = agent_chat_mem("I feel anxious before meetings and sometimes get angry when provoked. Give CBT tools + a conflict script.")
    print(json.dumps(mh, indent=2, ensure_ascii=False)[:1400], "\n")

    print("=== DEMO 2: Multimodal Summary (image) ===")
    img_sum = agent_chat_mem("Summarize this file and list actions.", img_path)
    print(json.dumps(img_sum, indent=2, ensure_ascii=False)[:1400], "\n")

    print("=== DEMO 3: Doc Q&A (image) ===")
    qa = agent_chat_mem("What elements are visible in this file?", img_path)
    print(json.dumps(qa, indent=2, ensure_ascii=False)[:1400], "\n")

    print("=== DEMO 4: Multimodal Summary (pdf) ===")
    pdf_sum = agent_chat_mem("Summarize this PDF and list actions.", pdf_path)
    print(json.dumps(pdf_sum, indent=2, ensure_ascii=False)[:1400], "\n")

    print("=== EVAL SUMMARY ===")
    eval_results = run_eval_suite(img_path, pdf_path)
    passed = sum(1 for r in eval_results if r["ok"])
    out_path = "/content/data/eval_results.json"
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(eval_results, f, ensure_ascii=False, indent=2)
    print(f"Passed {passed}/{len(eval_results)} | saved: {out_path}")
    print(json.dumps(eval_results, indent=2, ensure_ascii=False))

# Run demo
demo()


=== DEMO 1: Mental Health (text-only) ===
{
  "router": {
    "intent": "mental_health",
    "skill": "run_mental_health_coach",
    "reason": "keyword fallback",
    "confidence": 0.6
  },
  "output": {
    "situation": "Anxious before meetings and angry when provoked.",
    "goals": [
      "Manage anxiety before meetings and reduce anger when provoked.",
      "Develop conflict resolution skills."
    ],
    "cbt_exercises": [
      "Mindfulness meditation",
      "Deep breathing exercises",
      "Positive self-talk",
      "Mindful listening",
      "Emotional awareness"
    ],
    "anger_tools": [
      "Positive self-talk",
      "Emotional awareness",
      "Mindful listening",
      "Positive reinforcement"
    ],
    "conflict_script": "I will not let this person provoke me. I will respond calmly and assertively.",
    "daily_5min_plan": [
      "5 minutes of mindfulness meditation",
      "5 minutes of deep breathing exercises",
      "5 minutes of positive self-talk",
     

In [None]:
# Cell 14 — Robust URL demo (fix JSON-missing-keys + fix .bin downloads)

import os, json, time, re, urllib.request
from urllib.parse import urlparse
from pathlib import Path
import io
from PIL import Image

os.makedirs("/content/data", exist_ok=True)

# ----------------------------
# 1) Robust downloader (infer extension from headers/content)
# ----------------------------
_MIME_TO_EXT = {
    "image/jpeg": ".jpg",
    "image/jpg":  ".jpg",
    "image/png":  ".png",
    "image/webp": ".webp",
    "application/pdf": ".pdf",
}

def _infer_ext_from_bytes(data: bytes, content_type: str | None) -> str:
    ct = (content_type or "").split(";")[0].strip().lower()

    # header-based
    if ct in _MIME_TO_EXT:
        return _MIME_TO_EXT[ct]

    # signature-based
    if data[:4] == b"%PDF":
        return ".pdf"

    # image sniff (PIL)
    try:
        img = Image.open(io.BytesIO(data))
        fmt = (img.format or "").lower()
        if fmt == "jpeg":
            return ".jpg"
        if fmt in ("png", "webp", "bmp"):
            return f".{fmt}"
    except Exception:
        pass

    return ".bin"

def download_url(url: str, out_dir="/content/data", base_name="downloaded") -> str:
    req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
    with urllib.request.urlopen(req, timeout=30) as r:
        data = r.read()
        ct = r.headers.get("Content-Type", "")

    # extension from URL path
    path = urlparse(url).path
    ext = Path(path).suffix.lower()
    if ext not in (".jpg", ".jpeg", ".png", ".webp", ".bmp", ".pdf"):
        ext = _infer_ext_from_bytes(data, ct)

    # normalize jpeg ext
    if ext == ".jpeg":
        ext = ".jpg"

    out_path = os.path.join(out_dir, f"{base_name}{ext}")
    with open(out_path, "wb") as f:
        f.write(data)
    return out_path

# ----------------------------
# 2) Patch JSON extraction to NEVER return a "wrong JSON" when required keys are missing
# ----------------------------
REQUIRED_KEYS = ["plan", "answer", "self_check", "confidence", "next_actions"]

def extract_best_json(text: str, required_keys: list[str] | None = None):
    s = text.strip()
    s = re.sub(r"^```(?:json)?\s*|\s*```$", "", s, flags=re.I | re.S).strip()

    dec = json.JSONDecoder()
    candidates = []
    idx = 0

    while idx < len(s):
        brace_pos = min([p for p in [s.find("{", idx), s.find("[", idx)] if p != -1], default=-1)
        if brace_pos == -1:
            break
        try:
            obj, end = dec.raw_decode(s[brace_pos:])
            candidates.append(obj)
            idx = brace_pos + max(end, 1)
        except Exception:
            idx = brace_pos + 1

    if not candidates:
        raise ValueError("No JSON found in model output.")

    if required_keys:
        for obj in reversed(candidates):
            if isinstance(obj, dict) and all(k in obj for k in required_keys):
                return obj
        raise ValueError(f"No JSON object contained required keys: {required_keys}")

    return candidates[-1]

# ----------------------------
# 3) Make run_agent robust (retry + fallback instead of crashing)
# ----------------------------
def validate_agent_output(obj: dict) -> None:
    if not isinstance(obj, dict):
        raise ValueError("Output is not a JSON object.")
    missing = [k for k in REQUIRED_KEYS if k not in obj]
    if missing:
        raise ValueError(f"Missing keys: {missing}")
    if not isinstance(obj["plan"], list) or not all(isinstance(x, str) for x in obj["plan"]):
        raise ValueError("plan must be a list of strings.")
    if not isinstance(obj["answer"], str):
        raise ValueError("answer must be a string.")
    if not isinstance(obj["self_check"], str):
        raise ValueError("self_check must be a string.")
    try:
        float(obj["confidence"])
    except Exception:
        raise ValueError("confidence must be a number.")
    if not isinstance(obj["next_actions"], list) or not all(isinstance(x, str) for x in obj["next_actions"]):
        raise ValueError("next_actions must be a list of strings.")

def _coerce_agent_output(obj) -> dict:
    if not isinstance(obj, dict):
        obj = {}
    plan = obj.get("plan", [])
    if isinstance(plan, str):
        plan = [plan]
    if not isinstance(plan, list):
        plan = []
    plan = [str(x) for x in plan][:8]

    answer = obj.get("answer", "")
    answer = str(answer) if answer is not None else ""

    self_check = obj.get("self_check", "")
    self_check = str(self_check) if self_check is not None else ""

    conf = obj.get("confidence", 0.0)
    try:
        conf = float(conf)
    except Exception:
        conf = 0.0
    conf = max(0.0, min(1.0, conf))

    na = obj.get("next_actions", [])
    if isinstance(na, str):
        na = [na]
    if not isinstance(na, list):
        na = []
    na = [str(x) for x in na][:8]

    return {"plan": plan, "answer": answer, "self_check": self_check, "confidence": conf, "next_actions": na}

# Keep caches if you had them; otherwise create
if "AGENT_CACHE" not in globals():
    AGENT_CACHE = {}
if "AGENT_LOGS" not in globals():
    AGENT_LOGS = []

def run_agent(question: str, input_path: str, retries: int = 3) -> dict:
    key = (question.strip(), input_path)
    if key in AGENT_CACHE:
        return AGENT_CACHE[key]

    t0 = time.time()
    ctx = analyze(input_path)  # uses your existing analyze()

    system_text = (
        "Return ONLY valid JSON. No prose. No markdown.\n"
        "You MUST output a JSON object with keys: plan, answer, self_check, confidence, next_actions.\n"
        "If unsure, still include all keys with empty strings/lists and confidence 0.0."
    )
    base_payload = {
        "question": question,
        "context": {k: ctx[k] for k in ["input_type", "summary", "key_points", "entities", "tasks"]},
        "required_schema": {
            "plan": ["string", "string"],
            "answer": "string",
            "self_check": "string",
            "confidence": 0.0,
            "next_actions": ["string", "string"],
        },
        "rules": [
            "Output must be ONLY a JSON object.",
            "Include ALL required keys exactly as specified.",
            "Keep answer concise and actionable.",
        ],
    }

    last_err = None
    for attempt in range(retries + 1):
        payload = dict(base_payload)
        if last_err:
            payload["fix_request"] = f"Previous output invalid. Return ONLY the corrected JSON. Error: {last_err}"

        try:
            # llm_agent must exist from your previous cells
            obj = llm_agent(system_text, payload, REQUIRED_KEYS)
            obj = _coerce_agent_output(obj)
            validate_agent_output(obj)

            AGENT_LOGS.append({"seconds": round(time.time() - t0, 2), "input": input_path, "attempts": attempt + 1})
            AGENT_CACHE[key] = obj
            return obj
        except Exception as e:
            last_err = str(e)

    # Fallback (never crash)
    fallback = {
        "plan": ["Use extracted summary/key points", "Answer the question briefly", "Propose next actions"],
        "answer": ctx.get("summary", ""),
        "self_check": f"Fallback used because the model output was invalid after retries. Last error: {last_err}",
        "confidence": 0.0,
        "next_actions": [
            "Retry with a simpler question",
            "Increase CFG['max_new_tokens'] if outputs look truncated",
        ],
    }
    AGENT_LOGS.append({"seconds": round(time.time() - t0, 2), "input": input_path, "attempts": retries + 1, "fallback": True})
    AGENT_CACHE[key] = fallback
    return fallback

# ----------------------------
# 4) ask_agent helper (URL or local path)
# ----------------------------
def ask_agent(question: str, file_or_url: str | None = None):
    if file_or_url is None:
        # text-only path (use your router if available)
        if "agent_chat_mem" in globals():
            return agent_chat_mem(question)
        return {"error": "No agent_chat_mem() found for text-only. Provide a file or use run_mental_health_coach()."}

    if file_or_url.startswith("http://") or file_or_url.startswith("https://"):
        # decide a clean base name
        bn = "sample"
        local_path = download_url(file_or_url, base_name=bn)
    else:
        local_path = file_or_url

    # if you have the router wrapper, use it; otherwise call run_agent directly
    if "agent_chat_mem" in globals():
        return agent_chat_mem(question, local_path)
    return run_agent(question, local_path)

# ----------------------------
# 5) Demo URLs (same as your cell)
# ----------------------------
demo_img_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/3/3f/Fronalpstock_big.jpg/512px-Fronalpstock_big.jpg"
demo_pdf_url = "https://raw.githubusercontent.com/mozilla/pdf.js/master/examples/learning/helloworld.pdf"

print(ask_agent("Summarize this file.", demo_img_url))
print(ask_agent("What objects or scene elements are visible?", demo_img_url))
print(ask_agent("What does the PDF say? Provide 2 evidence snippets.", demo_pdf_url))


{'router': {'intent': 'summarize', 'skill': 'run_summarize', 'reason': 'keyword fallback', 'confidence': 0.6}, 'output': {'plan': ['Summarize and list key actions.'], 'answer': 'The Swiss Alps are a range of mountains in central Europe. They are known for their rugged terrain and diverse ecosystems. The region around the Alps is a popular destination for hikers and outdoor enthusiasts.', 'self_check': 'The summary is clear and concise, and the key points are listed in a logical order.', 'confidence': 0.9, 'next_actions': []}, 'used_file': '/content/data/sample.jpg', 'history_size': 8}
{'router': {'intent': 'general', 'skill': 'run_agent', 'reason': 'default fallback', 'confidence': 0.4}, 'output': {'plan': ['Identify and describe the objects and scene elements visible in the image.'], 'answer': 'The image shows a panoramic view of the Swiss Alps with green mountains and snow-capped peaks in the distance. The Swiss Alps are a range of mountains in central Europe, known for their rugged 

In [None]:
# Install Gradio (optional if already installed)
!pip -q install gradio

import json, gradio as gr

# Run the agent from UI
def ui_run(question, file_obj):
    try:
        file_path = None
        if file_obj is not None:
            file_path = file_obj.name  # Gradio temp file path
        res = ask_agent(question, file_path)  # use your existing ask_agent + router
        return json.dumps(res, indent=2, ensure_ascii=False)
    except Exception as e:
        return json.dumps({"error": str(e)}, indent=2, ensure_ascii=False)

# Launch simple web UI
with gr.Blocks(title="Multimodal Agent Demo") as demo:
    gr.Markdown("# Multimodal Agent (Image / PDF / Text)\nUpload an image or PDF (optional), ask a question, get structured JSON.")
    with gr.Row():
        with gr.Column(scale=1):
            file_in = gr.File(label="File (optional) — image or PDF", file_types=[".png",".jpg",".jpeg",".webp",".pdf"])
            question_in = gr.Textbox(
                label="Question",
                value="Summarize this file and list key actions.",
                lines=2
            )
            with gr.Row():
                run_btn = gr.Button("Run")
                clear_btn = gr.Button("Clear")
        with gr.Column(scale=1):
            out = gr.Code(label="Agent Output (JSON)", language="json")

    examples = [
        ["Summarize this file.", None],
        ["What objects or scene elements are visible?", None],
        ["I feel anxious before meetings. Give CBT tools + a calm conflict script.", None],
    ]
    gr.Examples(examples=examples, inputs=[question_in, file_in], label="Examples (upload a file first if needed)")

    run_btn.click(fn=ui_run, inputs=[question_in, file_in], outputs=out)
    clear_btn.click(fn=lambda: ("", None, ""), inputs=None, outputs=[question_in, file_in, out])

demo.launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://39b8283075ac28cf64.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


