In [1]:

# %pip -q install -U "torch>=2.2" "torchvision>=0.17" "accelerate>=0.33"     "transformers>=4.46" "qwen-vl-utils>=0.0.8"     "pillow>=10" "pandas>=2.2" "numpy>=1.24" "tqdm>=4.66"


In [1]:

from __future__ import annotations
import os, re, json, glob
from typing import List, Dict, Any, Tuple

import torch
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw
from tqdm import tqdm

from transformers import AutoProcessor, AutoModelForCausalLM
try:
    from transformers import Qwen2VLForConditionalGeneration as Qwen2VLClass
except Exception:
    Qwen2VLClass = None

from qwen_vl_utils import process_vision_info

def natural_key(s: str):
    base = os.path.basename(s)
    return [int(t) if t.isdigit() else t.lower() for t in re.split(r"(\d+)", base)]

def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)

def write_jsonl(path: str, rows):
    with open(path, "w", encoding="utf-8") as f:
        for r in rows:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")


  from .autonotebook import tqdm as notebook_tqdm


In [20]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "3"

# --- DOMINO paths ---
DOMINO_ROOT  = "/share_2/users/umair_nawaz/DOMINO-Evaluation/Medical/OHIF/SS"
PRED_DIR  = os.path.join(DOMINO_ROOT, "Predictions-Qwen2-7B-VL")

# --- Model ---
MODEL_ID   = "Qwen/Qwen2-VL-7B-Instruct"
DEVICE_MAP = "cuda"                          # nice default
DTYPE      = torch.float16 if torch.cuda.is_available() else torch.bfloat16
USE_FA2    = False
MAX_NEW_TOKENS = 256

# --- Evaluation ---
PRIMARY_TAU   = 0.30
THRESHOLDS    = [0.30, 0.50, 0.75, 0.90]
REQUIRE_LABEL = True

# --- Misc ---
FILE_EXTS       = (".png", ".jpg", ".jpeg", ".PNG", ".JPG", ".JPEG")
SAVE_OVERLAYS   = True
OUT_IMG_SUFFIX  = "_pred.png"
OUTLINE_WIDTH   = 0
LIMIT_PER_TASK  = 0


In [7]:

def extract_json(text: str) -> Dict[str, Any]:
    """Extract the first top-level JSON object from text (brace-balanced)."""
    m = re.search(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", text, flags=re.S)
    candidate = m.group(1) if m else None
    if candidate is None:
        depth, start = 0, None
        for i, ch in enumerate(text):
            if ch == "{":
                if depth == 0:
                    start = i
                depth += 1
            elif ch == "}":
                if depth > 0:
                    depth -= 1
                    if depth == 0 and start is not None:
                        candidate = text[start:i+1]
                        break
        if candidate is None:
            raise ValueError("No JSON object found in model output.")
    s = re.sub(r"<\|.*?\|>", "", candidate).replace("\n", " ")
    return json.loads(s)


In [8]:

GRID_MAX = 999
_EPS = 1e-6

def _grid999_to_px(v: float, size: int) -> int:
    if size <= 0: return 0
    px = int((float(v) / GRID_MAX) * size)
    return max(0, min(size - 1, px))

def _looks_norm01(vals) -> bool:
    return max(vals) <= 1.5 + _EPS

def _looks_grid999(vals) -> bool:
    mx = max(vals)
    return (mx <= GRID_MAX + _EPS) and (mx > 1.5 + _EPS)

def clamp_box_to_img(x1, y1, x2, y2, W, H):
    X1 = max(0, min(int(round(x1)), W - 1))
    Y1 = max(0, min(int(round(y1)), H - 1))
    X2 = max(0, min(int(round(x2)), W - 1))
    Y2 = max(0, min(int(round(y2)), H - 1))
    if X2 <= X1: X2 = min(W - 1, X1 + 2)
    if Y2 <= Y1: Y2 = min(H - 1, Y1 + 2)
    return (X1, Y1, X2, Y2)

def iou_xyxy(a: Tuple[float,float,float,float], b: Tuple[float,float,float,float]) -> float:
    ax1, ay1, ax2, ay2 = a
    bx1, by1, bx2, by2 = b
    inter_w = max(0.0, min(ax2, bx2) - max(ax1, bx1))
    inter_h = max(0.0, min(ay2, by2) - max(ay1, by1))
    inter   = inter_w * inter_h
    area_a  = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1)
    area_b  = max(0.0, bx2 - bx1) * max(0.0, by2 - by1)
    denom   = area_a + area_b - inter
    return (inter / denom) if denom > 0 else 0.0

def draw_and_save(image_path: str, boxes_xyxy_px: List[Tuple[int,int,int,int]], labels: List[str], out_path: str, outline_width: int = 5):
    img = Image.open(image_path).convert("RGB")
    d = ImageDraw.Draw(img)
    labels = labels or [f"box{i+1}" for i in range(len(boxes_xyxy_px))]
    for (x1, y1, x2, y2), lab in zip(boxes_xyxy_px, labels):
        d.rectangle([x1, y1, x2, y2], outline=(255,0,0), width=outline_width)
        try:
            l,t,r,b = d.textbbox((0,0), lab); tw, th = r-l, b-t
        except Exception:
            tw, th = max(8*len(lab), 16), 16
        pad = 4
        chip = [x1, max(0, y1 - th - 2*pad), x1 + tw + 2*pad, y1]
        d.rectangle(chip, fill=(255,0,0))
        d.text((chip[0]+pad, chip[1]+pad), lab, fill=(255,255,255))
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    img.save(out_path, quality=95)


In [9]:

def _disambiguate_to_ltrb(x1, y1, a, b, mode: str):
    """Return (X1,Y1,X2,Y2) interpreting (x1,y1,a,b) either as XYXY or XYWH.
    mode in {"norm","grid","pixel"} controls scaling for XYWH->XYXY path.
    We first try XYXY; if degenerate (x2<=x1 or y2<=y1), we treat as XYWH.
    """
    if mode == "norm":
        X1, Y1, X2, Y2 = x1, y1, a, b
        if (X2 <= X1 + 1e-9) or (Y2 <= Y1 + 1e-9):
            X2, Y2 = x1 + a, y1 + b
        return X1, Y1, X2, Y2
    elif mode == "grid":
        X1, Y1, X2, Y2 = x1, y1, a, b
        if (X2 <= X1 + 1e-6) or (Y2 <= Y1 + 1e-6):
            X2, Y2 = x1 + a, y1 + b
        return X1, Y1, X2, Y2
    else:  # pixel
        X1, Y1, X2, Y2 = x1, y1, a, b
        if (X2 <= X1) or (Y2 <= Y1):
            X2, Y2 = x1 + a, y1 + b
        return X1, Y1, X2, Y2

def boxes_px_from_res_json(res_json: dict, image_path: str):
    """Convert predicted boxes to absolute pixel XYXY with XYXY/XYWH auto-fix.
    Default expectation for Qwen2-VL: xyxy_norm, but we robustly detect:
      - xyxy_norm / xywh_norm
      - xyxy_grid999 / xywh_grid999
      - xyxy_pixel / xywh_pixel / xyxy / xywh
    If format missing: auto-detect by value ranges (norm vs grid vs pixel),
    then disambiguate XYXY vs XYWH by checking monotonicity.
    """
    W, H = Image.open(image_path).size
    fmt = (res_json.get("box_format") or "").lower().strip()
    out_boxes, out_labels = [], []

    def clamp_and_scale(X1, Y1, X2, Y2, mode):
        if mode == "norm":
            X1, Y1, X2, Y2 = X1 * W, Y1 * H, X2 * W, Y2 * H
        elif mode == "grid":
            X1, Y1, X2, Y2 = _grid999_to_px(X1, W), _grid999_to_px(Y1, H), _grid999_to_px(X2, W), _grid999_to_px(Y2, H)
        return clamp_box_to_img(X1, Y1, X2, Y2, W, H)

    for item in res_json.get("click_boxes", []):
        lab = item.get("label", "box")
        box = item.get("box", [])
        if not (isinstance(box, (list, tuple)) and len(box) == 4):
            continue
        x1, y1, a, b = [float(v) for v in box]
        vals = [abs(x1), abs(y1), abs(a), abs(b)]

        if fmt in ("xyxy_norm","x1y1x2y2_norm","xywh_norm"):
            mode = "norm"
        elif fmt in ("xyxy_grid999","x1y1x2y2_grid999","xywh_grid999"):
            mode = "grid"
        elif fmt in ("xyxy_pixel","x1y1x2y2_pixel","xywh_pixel","xyxy","x1y1x2y2","xywh"):
            mode = "pixel"
        else:
            mode = "norm" if _looks_norm01(vals) else ("grid" if _looks_grid999(vals) else "pixel")

        X1, Y1, X2, Y2 = _disambiguate_to_ltrb(x1, y1, a, b, mode)
        px_box = clamp_and_scale(X1, Y1, X2, Y2, mode)

        out_boxes.append(px_box)
        out_labels.append(lab)

    return out_boxes, out_labels


In [10]:

def build_messages_for_image(image_path: str, task_query: str, image_instruction: str):
    pil_img = Image.open(image_path).convert("RGB")
    sys_txt = (
        "You are a precise UI grounding assistant. "
        "Given a GUI screenshot, a global task, and the step instruction for THIS image, "
        "output STRICT JSON only with bounding boxes needed on THIS screen. "
        "Use normalized coordinates in [0,1] relative to THIS image. "
        "Prefer the format 'xyxy_norm' with [x1,y1,x2,y2]. "
        "If you decide to use widths/heights, set 'box_format' to 'xywh_norm'. "
        "Return a single JSON object and nothing else."
    )
    schema = (
        "{\n"
        '  "image": "<filename>",\n'
        '  "box_format": "xyxy_norm",\n'
        '  "click_boxes": [\n'
        '     {"label": "<instruction>", "box": [x1,y1,x2,y2]}\n'
        "  ]\n"
        "}\n"
        "Only valid JSON. Coordinates 0..1. Empty list if no action is needed."
    )
    system = {"role": "system", "content": [{"type": "text", "text": sys_txt}]}
    user = {
        "role": "user",
        "content": [
            {"type": "text", "text": f"Global task: {task_query}"},
            {"type": "text", "text": f"Instruction for this image: {image_instruction}"},
            {"type": "text", "text": f"Image filename: {os.path.basename(image_path)}"},
            {"type": "image", "image": pil_img},
            {"type": "text", "text": "Return JSON with this schema:"},
            {"type": "text", "text": schema},
        ],
    }
    return [system, user]

@torch.inference_mode()
def infer_one(model, processor, image_path: str, task_query: str, image_instruction: str,
              temperature: float = 0.0, max_new_tokens: int = MAX_NEW_TOKENS):
    messages = build_messages_for_image(image_path, task_query, image_instruction)
    text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)

    images, videos = process_vision_info(messages)
    inputs = processor(text=[text], images=images, videos=videos, return_tensors="pt", padding=True)
    for k, v in list(inputs.items()):
        if hasattr(v, "to"):
            inputs[k] = v.to("cuda" if torch.cuda.is_available() else "cpu")

    gen_ids = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=(temperature is not None and temperature > 0),
        temperature=(temperature if (temperature is not None and temperature > 0) else None),
        top_p=None if (temperature is None or temperature == 0) else 0.95,
    )
    trimmed = gen_ids[:, inputs["input_ids"].shape[1]:]
    txt = processor.batch_decode(trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=True)[0]

    try:
        parsed = extract_json(txt)
    except Exception as e:
        print(f"[WARN] JSON parse failed for {os.path.basename(image_path)}: {e}\nRAW[:400]: {txt[:400]}")
        parsed = {"image": os.path.basename(image_path), "box_format": "xyxy_norm", "click_boxes": []}

    pj = json.loads(json.dumps(parsed))
    for cb in pj.get("click_boxes", []):
        cb["label"] = image_instruction

    boxes_px, labels = boxes_px_from_res_json(pj, image_path)
    W, H = Image.open(image_path).size
    return {
        "raw_text": txt,
        "prediction": pj,
        "boxes_px": boxes_px,
        "labels": labels,
        "image_size": (W, H),
    }


In [11]:

def load_qwen2vl(model_id: str = MODEL_ID, dtype=DTYPE, device_map=DEVICE_MAP, use_fa2: bool = USE_FA2):
    kw = {"trust_remote_code": True}
    if use_fa2:
        kw["attn_implementation"] = "flash_attention_2"
        if dtype == "auto":
            dtype = torch.bfloat16
    if dtype != "auto":
        kw["torch_dtype"] = dtype
    if device_map is not None:
        kw["device_map"] = device_map

    if Qwen2VLClass is not None:
        model = Qwen2VLClass.from_pretrained(model_id, **kw)
    else:
        model = AutoModelForCausalLM.from_pretrained(model_id, **kw)
    processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True)

    eos = getattr(getattr(processor, "tokenizer", None), "eos_token_id", None)
    if getattr(getattr(model, "generation_config", None), "pad_token_id", None) is None and eos is not None:
        model.generation_config.pad_token_id = eos

    model.eval()
    return model, processor


In [12]:

def xywh_to_xyxy(b):
    x, y, w, h = b
    return (x, y, x + w, y + h)

def load_annotations(task_dir: str) -> Dict[str, Any]:
    annot_path = os.path.join(task_dir, "annotations.json")
    with open(annot_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    by_img = {}
    for im in data.get("images", []):
        W, H = im["width"], im["height"]
        boxes = [xywh_to_xyxy(bb["bbox"]) for bb in im.get("bboxes", [])]
        by_img[im["file_name"]] = {
            "boxes": boxes,
            "instruction": im.get("instruction", ""),
            "size": (W, H),
            "step_index": im.get("step_index"),
        }
    task_query = data.get("task_query") or data.get("task_name") or os.path.basename(task_dir)
    return {"task_query": task_query, "by_image": by_img}

def is_task_dir(path: str) -> bool:
    return os.path.isdir(path) and os.path.exists(os.path.join(path, "annotations.json"))

def list_task_dirs(root_dir: str) -> List[str]:
    return [
        os.path.join(root_dir, name)
        for name in sorted(os.listdir(root_dir), key=natural_key)
        if is_task_dir(os.path.join(root_dir, name))
    ]


In [13]:

def run_all_tasks_domino(root_dir: str, pred_root: str, model, processor,
                         temperature: float = 0.0, max_new_tokens: int = MAX_NEW_TOKENS, limit_per_task: int = 0):
    ensure_dir(pred_root)
    tasks = list_task_dirs(root_dir)
    print(f"Found {len(tasks)} task(s). Saving to: {pred_root}")

    agg_path = os.path.join(pred_root, "all_predictions.jsonl")
    open(agg_path, "w", encoding="utf-8").close()

    for task_dir in tasks:
        # load task meta
        with open(os.path.join(task_dir, "annotations.json"), "r", encoding="utf-8") as f:
            data = json.load(f)
        task_label = os.path.basename(task_dir)
        task_query = data.get("task_query") or data.get("task_name") or task_label

        out_task_dir = os.path.join(pred_root, task_label)
        ensure_dir(out_task_dir)
        pred_jsonl_path = os.path.join(out_task_dir, "predictions.jsonl")

        # collect images
        image_paths = []
        for ext in FILE_EXTS:
            image_paths += glob.glob(os.path.join(task_dir, f"*{ext}"))
        image_paths = sorted(set(image_paths), key=natural_key)
        if limit_per_task and limit_per_task > 0:
            image_paths = image_paths[:limit_per_task]

        # map filename -> instruction
        instr_by_file = {im["file_name"]: im.get("instruction","") for im in data.get("images", [])}

        with open(pred_jsonl_path, "w", encoding="utf-8") as f_out, open(agg_path, "a", encoding="utf-8") as agg_out:
            for img_path in image_paths:
                fname = os.path.basename(img_path)
                instruction = instr_by_file.get(fname, "")

                res = infer_one(model, processor, img_path, task_query, instruction,
                                temperature=temperature, max_new_tokens=max_new_tokens)

                boxes_px, labels = res["boxes_px"], res["labels"]
                if SAVE_OVERLAYS:
                    overlay_path = os.path.join(out_task_dir, f"{os.path.splitext(fname)[0]}{OUT_IMG_SUFFIX}")
                    draw_and_save(img_path, boxes_px, labels, overlay_path, outline_width=OUTLINE_WIDTH)

                record = {
                    "task": task_label,
                    "task_query": task_query,
                    "image": fname,
                    "instruction": instruction,
                    "image_relpath": os.path.relpath(img_path, root_dir),
                    "image_size": res["image_size"],
                    "prediction": res["prediction"],
                    "boxes_px": boxes_px,
                    "labels": labels,
                }
                line = json.dumps(record, ensure_ascii=False)
                f_out.write(line + "\n"); agg_out.write(line + "\n")
                print(f"  {task_label} :: {fname} -> boxes {len(boxes_px)}")

        print(f"Saved: {pred_jsonl_path}")


In [14]:

def greedy_match_iou(gt_boxes: List[Tuple[float,float,float,float]],
                     pred_boxes: List[Tuple[int,int,int,int]]):
    pairs = []
    for gi, gb in enumerate(gt_boxes):
        for pi, pb in enumerate(pred_boxes):
            pairs.append((gi, pi, iou_xyxy(gb, pb)))
    pairs.sort(key=lambda x: x[2], reverse=True)

    used_g, used_p, out = set(), set(), []
    for gi, pi, i in pairs:
        if gi in used_g or pi in used_p: continue
        used_g.add(gi); used_p.add(pi)
        out.append((gi, pi, i))
    return out

def boxes_px_from_prediction_record(rec: Dict[str, Any], root_dir: str) -> Tuple[List[Tuple[int,int,int,int]], List[str], Tuple[int,int]]:
    if "boxes_px" in rec and isinstance(rec["boxes_px"], list) and len(rec["boxes_px"]) > 0:
        boxes = [tuple(map(int, b)) for b in rec["boxes_px"]]
        labels = rec.get("labels", [])
        size = tuple(rec.get("image_size", (0,0)))
        return boxes, labels, size

    pred = rec.get("prediction") or {}
    rel = rec.get("image_relpath")
    img_path = os.path.join(root_dir, rel) if rel else None
    if img_path and os.path.exists(img_path):
        boxes, labels = boxes_px_from_res_json(pred, img_path)
        W, H = Image.open(img_path).size
        return boxes, labels, (W, H)

    # fallback if image missing
    labels, boxes = [], []
    for item in pred.get("click_boxes", []):
        lab = item.get("label", "box"); box = item.get("box", [])
        if not (isinstance(box, (list,tuple)) and len(box) == 4): continue
        x1, y1, a, b = [float(v) for v in box]
        X1, Y1, X2, Y2 = x1, y1, a, b
        if X2 <= X1 or Y2 <= Y1:
            X2, Y2 = x1 + a, y1 + b
        boxes.append((int(round(X1)), int(round(Y1)), int(round(X2)), int(round(Y2)))); labels.append(lab)
    return boxes, labels, (0,0)

def score_task(task_dir: str, pred_task_dir: str,
               primary_tau: float = 0.5,
               thresholds: List[float] = [0.25, 0.5, 0.75, 0.9],
               require_label_match: bool = True) -> Dict[str, Any]:
    gt_path = os.path.join(task_dir, "annotations.json")
    if not os.path.exists(gt_path):
        return {"task": os.path.basename(task_dir), "exists": False, "reason": "missing annotations.json"}
    with open(gt_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    gt_by_img = {}
    for im in data.get("images", []):
        W, H = im["width"], im["height"]
        boxes = [(x, y, x+w, y+h) for (x, y, w, h) in [bb["bbox"] for bb in im.get("bboxes", [])]]
        gt_by_img[im["file_name"]] = {"boxes": boxes, "instruction": im.get("instruction", ""), "size": (W,H)}

    pred_file = os.path.join(pred_task_dir, "predictions.jsonl")
    if not os.path.exists(pred_file):
        return {"task": os.path.basename(task_dir), "exists": False, "reason": "missing predictions.jsonl"}

    preds = [json.loads(line) for line in open(pred_file, "r", encoding="utf-8") if line.strip()]
    preds_by_img = {p["image"]: p for p in preds}

    per_image_rows, overall_ious, step_pass = [], [], {}

    for fname, meta in gt_by_img.items():
        gt_boxes = meta["boxes"]
        instruction = meta["instruction"]
        prec = preds_by_img.get(fname)
        if not prec:
            per_image_rows.append({"image": fname, "best_iou": 0.0, "gt_count": len(gt_boxes), "pred_count": 0, "note":"no_pred"})
            overall_ious.extend([0.0]*len(gt_boxes)); step_pass[fname] = False
            continue

        pred_boxes, pred_labels, _sz = boxes_px_from_prediction_record(prec, DOMINO_ROOT)
        if require_label_match:
            keep = [i for i, lab in enumerate(pred_labels) if str(lab).strip() == str(instruction).strip()]
            pred_boxes = [pred_boxes[i] for i in keep]

        matches = greedy_match_iou(gt_boxes, pred_boxes)
        matched_iou = [0.0] * len(gt_boxes)
        for gi, pi, i in matches:
            matched_iou[gi] = max(matched_iou[gi], i)

        step_success = all(i >= primary_tau for i in matched_iou) if gt_boxes else True
        step_pass[fname] = step_success

        per_image_rows.append({
            "image": fname,
            "instruction": instruction,
            "gt_count": len(gt_boxes),
            "pred_count": len(pred_boxes),
            "best_iou": max(matched_iou) if matched_iou else 0.0,
            f"step_success@{primary_tau:.2f}": bool(step_success),
            "ious_per_gt": matched_iou,
        })
        overall_ious.extend(matched_iou if matched_iou else [0.0])

    task_completed = all(step_pass.values()) if step_pass else False
    mIoU = (sum(overall_ious) / max(1, len(overall_ious)))
    acc_at = {f"Acc@{t:.1f}": (sum(1 for v in overall_ious if v >= t) / max(1, len(overall_ious))) for t in thresholds}

    return {
        "task": os.path.basename(task_dir),
        "exists": True,
        f"task_completed@{primary_tau:.2f}": bool(task_completed),
        "num_images": len(gt_by_img),
        "num_gt_boxes": int(sum(len(v["boxes"]) for v in gt_by_img.values())),
        "mIoU": mIoU,
        **acc_at,
        "per_image": per_image_rows,
    }

def evaluate_all_domino(root_dir: str, pred_dir: str,
                        primary_tau: float, thresholds: List[float], require_label_match: bool = True):
    tasks = list_task_dirs(root_dir)
    print(f"Found {len(tasks)} task(s) with annotations.")

    per_task, overall_ious, completed = [], [], 0
    for task_dir in tasks:
        task_name = os.path.basename(task_dir)
        pred_task_dir = os.path.join(pred_dir, task_name)
        res = score_task(task_dir, pred_task_dir, primary_tau=primary_tau, thresholds=thresholds, require_label_match=require_label_match)
        per_task.append(res)

        if res.get("exists"):
            for row in res.get("per_image", []):
                overall_ious.extend(row.get("ious_per_gt", []))
            if res.get(f"task_completed@{primary_tau:.2f}"): completed += 1

        out_task_json = os.path.join(pred_dir, task_name, "eval.json")
        with open(out_task_json, "w", encoding="utf-8") as f:
            json.dump(res, f, indent=2)
        print(f"  Saved per-task eval: {out_task_json}")

    n_tasks = len(tasks)
    overall_gt = len(overall_ious)
    overall_mIoU = (sum(overall_ious) / overall_gt) if overall_gt else 0.0
    overall_acc = {f"Acc@{t:.1f}": (sum(1 for v in overall_ious if v >= t) / overall_gt) if overall_gt else 0.0 for t in thresholds}
    task_completion_rate = (completed / n_tasks) if n_tasks else 0.0

    overall = {
        "num_tasks": n_tasks,
        "overall_num_gt_boxes": int(overall_gt),
        f"task_completion_rate@{primary_tau:.2f}": task_completion_rate,
        "overall_mIoU": overall_mIoU,
        **overall_acc,
    }

    out_summary = os.path.join(pred_dir, "_eval_summary.json")
    with open(out_summary, "w", encoding="utf-8") as f:
        json.dump({"overall": overall, "per_task": per_task}, f, indent=2)
    print("\n=== Overall Summary ===")
    for k, v in overall.items():
        print(f"{k}: {v:.4f}" if isinstance(v, float) else f"{k}: {v}")
    print(f"\nSaved: {out_summary}")


In [None]:

# Load model
def load_model_and_processor():
    kw = {"trust_remote_code": True}
    if USE_FA2:
        kw["attn_implementation"] = "flash_attention_2"
        if DTYPE == "auto":
            kw["torch_dtype"] = torch.bfloat16
    if DTYPE != "auto":
        kw["torch_dtype"] = DTYPE
    if DEVICE_MAP is not None:
        kw["device_map"] = DEVICE_MAP

    if Qwen2VLClass is not None:
        model = Qwen2VLClass.from_pretrained(MODEL_ID, **kw)
    else:
        model = AutoModelForCausalLM.from_pretrained(MODEL_ID, **kw)
    processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)

    eos = getattr(getattr(processor, "tokenizer", None), "eos_token_id", None)
    if getattr(getattr(model, "generation_config", None), "pad_token_id", None) is None and eos is not None:
        model.generation_config.pad_token_id = eos

    model.eval()
    return model, processor

# model, processor = load_model_and_processor()
# print("Model ready.")

# 1) Predict across tasks
run_all_tasks_domino(
    DOMINO_ROOT, PRED_DIR, model, processor,
    temperature=0.0, max_new_tokens=MAX_NEW_TOKENS, limit_per_task=LIMIT_PER_TASK
)

# 2) Evaluate
evaluate_all_domino(
    DOMINO_ROOT, PRED_DIR,
    primary_tau=PRIMARY_TAU,
    thresholds=THRESHOLDS,
    require_label_match=REQUIRE_LABEL
)
