In [1]:
# AndroidWorld 轨迹可视化

# 这个 notebook 用于可视化 `runs/xxx` 下的轨迹：

# - 展示 **Goal(指令)** + **整条轨迹**
# - 每个 step：展示 `screenshot_step{n}.png`，并 `print` 对应的 `model_response`（即 `result.json` 里的 `trajectory[n].response`）
# - 若该 step 的 tool call 是 `click` 或 `long_press`，在截图上用 **红色圆圈** 标注点击点


from __future__ import annotations

from pathlib import Path
import json
import re

from PIL import Image, ImageDraw
from IPython.display import display, Markdown

RUNS_DIR = Path("/Users/chengkanzhi/Desktop/ScaleCUA/evaluation/AndroidWorld/runs/diy_0126")

_TOOL_RE = re.compile(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", re.DOTALL)


def _extract_click_actions(response_text: str):
    """从 response 里抽取 click/long_press。

    注意：AndroidWorld 的坐标是归一化到 [0, 1000] 的。
    """

    actions = []
    for m in _TOOL_RE.finditer(response_text or ""):
        try:
            call = json.loads(m.group(1))
        except Exception:
            continue

        args = (call or {}).get("arguments") or {}
        action = args.get("action")
        if action not in {"click", "long_press"}:
            continue

        coord = args.get("coordinate")
        if isinstance(coord, (list, tuple)) and len(coord) == 2:
            # 坐标归一化到 0-1000
            actions.append({"action": action, "x_norm": float(coord[0]), "y_norm": float(coord[1])})

    return actions


def _annotate_clicks(img: Image.Image, actions):
    if not actions:
        return img

    img = img.convert("RGBA")
    draw = ImageDraw.Draw(img)
    w, h = img.size
    r = max(10, int(min(w, h) * 0.03))
    stroke = max(3, r // 4)

    for a in actions:
        x = int(round(a["x_norm"] / 1000.0 * w))
        y = int(round(a["y_norm"] / 1000.0 * h))
        draw.ellipse((x - r, y - r, x + r, y + r), outline=(255, 0, 0, 255), width=stroke)

    return img


def _resize_for_display(img: Image.Image, scale: float = 0.2) -> Image.Image:
    if scale >= 1:
        return img
    w, h = img.size
    nw, nh = max(1, int(w * scale)), max(1, int(h * scale))
    resample = getattr(Image, "Resampling", Image).LANCZOS
    return img.resize((nw, nh), resample)


def show_trajectory(traj_dir: Path, max_steps: int | None = None, display_scale: float = 0.2):
    """展示某条轨迹：每步截图 + response，并对 click/long_press 标注红圈。

    display_scale: 显示缩放比例（默认 1/5）。
    """

    data = json.loads((traj_dir / "result.json").read_text(encoding="utf-8"))
    goal = data.get("goal", "")
    display(Markdown(f"## {traj_dir.name}\n\n**Goal**: {goal}"))

    traj = data.get("trajectory") or []
    if max_steps is not None:
        traj = traj[: max_steps]

    for item in traj:
        step = item.get("step")
        response = item.get("response", "")
        actions = _extract_click_actions(response)

        display(Markdown(f"### Step {step}"))

        img_path = None
        for p in sorted(traj_dir.glob(f"screenshot_step{step}.*")):
            img_path = p
            break

        if img_path and img_path.exists():
            img = Image.open(img_path)
            img = _annotate_clicks(img, actions)
            display(_resize_for_display(img, display_scale))
        else:
            print(f"[缺少截图] step={step}")

        print(response)
        print("-" * 80)


In [None]:
import random


def list_trajectories(runs_dir: Path = RUNS_DIR):
    return sorted([p for p in runs_dir.iterdir() if p.is_dir() and (p / "result.json").exists()])


def _is_success_response(response_text: str) -> bool:
    """与 process_trajs.py 一致：terminate(success) 或 answer 视为成功。"""
    for m in _TOOL_RE.finditer(response_text or ""):
        try:
            call = json.loads(m.group(1))
        except Exception:
            continue
        if (call or {}).get("name") != "mobile_use":
            continue
        args = (call or {}).get("arguments") or {}
        action = args.get("action")
        return action == "answer" or (action == "terminate" and args.get("status") == "success")
    return False


def _is_success_traj(traj_dir: Path) -> bool:
    data = json.loads((traj_dir / "result.json").read_text(encoding="utf-8"))
    traj = data.get("trajectory") or []
    return bool(traj) and _is_success_response(traj[-1].get("response", ""))


def list_success_trajectories(runs_dir: Path = RUNS_DIR):
    trajs = list_trajectories(runs_dir)
    return [p for p in trajs if _is_success_traj(p)]


def show_random_trajectory(
    runs_dir: Path = RUNS_DIR,
    max_steps: int | None = None,
    seed: int | None = None,
    display_scale: float = 0.2,
):
    if seed is not None:
        random.seed(seed)

    succ = list_success_trajectories(runs_dir)
    print(f"Success trajs: {len(succ)}")
    if not succ:
        raise FileNotFoundError(f"未找到成功轨迹：{runs_dir}")

    traj_dir = random.choice(succ)
    show_trajectory(traj_dir, max_steps=max_steps, display_scale=display_scale)
    return traj_dir


# 每次运行随机抽 1 条【成功】轨迹展示（默认缩小到 1/5）
show_random_trajectory()

In [None]:
# 从 data_merge.json 随机抽 1 条轨迹可视化
import random

def show_random_from_merged(
    runs_dir: Path = RUNS_DIR,
    seed: int | None = None,
    max_steps: int | None = None,
    display_scale: float = 0.2,
):
    if seed is not None:
        random.seed(seed)

    merge_path = runs_dir / "data_merge_0126.json"
    merged = json.loads(merge_path.read_text(encoding="utf-8"))
    if not merged:
        raise ValueError(f"data_merge.json 为空：{merge_path}")

    # filter specific app/task
    print(f"Num of total tasks: {len(merged)}")
    merged = [item for item in merged if "task" in item["save_dir"].lower()]
    print(f"Num of filtered tasks: {len(merged)}")

    item = random.choice(merged)
    goal = item.get("goal", "")
    traj = item.get("trajectory") or []
    if max_steps is not None:
        traj = traj[:max_steps]

    display(Markdown(f"## {item.get('save_dir','')}\n\n**Goal**: {goal}"))

    for step_obj in traj:
        step = step_obj.get("step")
        response = step_obj.get("response", "")
        actions = _extract_click_actions(response)

        display(Markdown(f"### Step {step}"))

        img_rel = step_obj.get("image")
        img_path = (runs_dir / img_rel) if img_rel else None
        if img_path and img_path.exists():
            img = Image.open(img_path)
            img = _annotate_clicks(img, actions)
            display(_resize_for_display(img, display_scale))
        else:
            print(f"[缺少截图] {img_rel}")

        print(response)
        print("-" * 80)

    return item


# 运行这个：从 data_merge.json 随机抽 1 条可视化
_ = show_random_from_merged()

Num of total tasks: 727


KeyError: 'id'