In [None]:
# rank clip score to select part of the questions for characteristic score calculation

In [None]:
import json
from sklearn.metrics import roc_auc_score, roc_curve, auc
import numpy as np
import random
from pathlib import Path

from infer_utili.data_utili import get_data, read_json, save_json

In [None]:
target_model = ['llava_v1_5_7b', 'llama_adapter_v2', 'MiniGPT4'][0]
used_dataset = ['img_Flickr', 'img_dalle'][0]

filter_apply = ['noFilter', 'withFilter'][0]
ask_type = ['ordered_choice', 'random_choice'][0]
print(f'{used_dataset}\n{target_model}\n\n{filter_apply}\n{ask_type}')

In [None]:
# get confuser res generated from `get_clipScore.py`
confuser_res_add = f'ObjColor_exp/confuser_res/{filter_apply}/{used_dataset}_by_gpt-4o-mini/{target_model}/res_w_clip.json'
confuser_res = read_json(confuser_res_add)

In [None]:
# modify the res add based on your actual directory  
traverse_res_add = f'ObjColor_exp/traverse_res/{filter_apply}/{used_dataset}/{target_model}/{ask_type}/temp_0.3'
traverse_res = read_json(traverse_res_add)

In [None]:
def build_clip_score_dict(confuser_entries):
    """
    Build a dict for fast lookup: (img_id, object_index) -> clip_score
    """
    score_dict = {}
    for entry in confuser_entries:
        img_id = entry["original_img_id"]
        for i, sam in enumerate(entry.get("sam_result", [])):
            score_dict[(img_id, i)] = sam.get("clip_score", 0.0)
    return score_dict

def compute_auc(grouped_data, question_type, ignore_invalid, topk=None, clip_score_lookup=None):
    scores = []
    labels = []
    label0_ids = []
    label1_ids = []

    for entry in grouped_data:
        label = entry["ground_truth_label"]
        img_id = entry["img_id"]

        # select only questions of target type and (optionally) valid
        target_questions = [
            q for q in entry["questions"]
            if q["question_type"] == question_type and (not ignore_invalid or q["invalid_info"] == 0)
        ]

        if not target_questions:
            continue

        # inject clip_score if possible
        if clip_score_lookup:
            for q in target_questions:
                key = (img_id, q["object_index"])
                q["clip_score"] = clip_score_lookup.get(key, 0.0)

            target_questions.sort(key=lambda q: q["clip_score"], reverse=True)

        if topk is not None:
            target_questions = target_questions[:topk]

        image_score = sum(q["acc"] for q in target_questions) / len(target_questions)
        scores.append(image_score)
        labels.append(label)

        if label == 0:
            label0_ids.append(img_id)
        else:
            label1_ids.append(img_id)

    if len(set(labels)) < 2:
        print("[WARN] Only one label class found, AUC cannot be computed.")
        return None, len(label0_ids), len(label1_ids)

    auc = roc_auc_score(labels, scores)
    return auc, len(label0_ids), len(label1_ids)

In [None]:
def num_auc_analysis(confuser_res, traverse_res, topk_for_auc):
    # === Run CLIP-based AUC evaluation ===
    print(f"[Num={topk_for_auc}] Running CLIP-sorted AUC evaluation...")
    confuser_json_for_auc = True

    if confuser_json_for_auc:
        clip_confuser = confuser_res
        clip_score_lookup = {
            (entry["original_img_id"], i): sam.get("clip_score", 0.0)
            for entry in clip_confuser
            for i, sam in enumerate(entry["sam_result"] or [])
        }
    else:
        clip_score_lookup = None

    for qtype in ["ask_masked_obj", "ask_obj_color", "both"]:
        for ignore_flag in [False, True]:
            print(f"\n>>> AUC Eval | qtype={qtype} | ignore_invalid={ignore_flag} | topk={topk_for_auc}")

            if qtype == "both":
                def merged_clip_auc(data, ignore_invalid, topk, clip_score_lookup):
                    scores, labels, label0_ids, label1_ids = [], [], [], []
                    for entry in data:
                        label = entry["ground_truth_label"]
                        img_id = entry["img_id"]
                        valid_qs = [
                            q for q in entry["questions"]
                            if not ignore_invalid or q["invalid_info"] == 0
                        ]
                        if not valid_qs:
                            continue

                        # add clip_score
                        if clip_score_lookup:
                            for q in valid_qs:
                                q["clip_score"] = clip_score_lookup.get((img_id, q["object_index"]), 0.0)
                            valid_qs.sort(key=lambda q: q["clip_score"], reverse=True)

                        if topk is not None:
                            valid_qs = valid_qs[:topk]

                        image_score = sum(q["acc"] for q in valid_qs) / len(valid_qs)
                        scores.append(image_score)
                        labels.append(label)
                        (label0_ids if label == 0 else label1_ids).append(img_id)

                    if len(set(labels)) < 2:
                        print("[WARN] Only one label class found. Skipping.")
                        return None, len(label0_ids), len(label1_ids)
                    auc = roc_auc_score(labels, scores)
                    return auc, len(label0_ids), len(label1_ids)

                auc, n0, n1 = merged_clip_auc(traverse_res, ignore_flag, topk_for_auc, clip_score_lookup)
            else:
                auc, n0, n1 = compute_auc(
                    grouped_data=traverse_res,
                    question_type=qtype,
                    ignore_invalid=ignore_flag,
                    topk=topk_for_auc,
                    clip_score_lookup=clip_score_lookup
                )

            if auc is not None:
                print(f"[AUC] {auc:.4f} | label0: {n0}, label1: {n1}")
            else:
                print("[AUC] N/A")

In [None]:
# specify the number of used questions per image
topk_for_auc = 5
num_auc_analysis(confuser_res, traverse_res, topk_for_auc)