# Qwen2.5-7B-Instruct to predict IPV and utilize DetectIPV Paper Evaluation Pipeline
Key tasks:
* Binary IPV detection (IPV vs NOT_IPV)
* Multi-label subtype detection (Physical, Emotional, Sexual)

**Key Evaluation Components from the Paper**
| Figure/Table | What it Shows |
|---------------|----------------|
| **Fig. 2(a)** | ROC curve for *General Violence Model* |
| **Fig. 2(b–c)** | “Waterfall” plot: sentences sorted by predicted confidence; color = ground-truth subtype |
| **Fig. 3(a)** | AUROC of Type-Specific vs General Models |
| **Fig. 3(b–e)** | More waterfall plots for type-specific models |
| **Fig. 4** | Radial visualization of confidence in 3-D type space |
| **Table 4** | AUROC for models trained on different negative examples |

## 1. Prompt Design

To evaluate how prompt structure influences LLM performance on IPV detection, I used **five prompting strategies** for both binary (IPV vs. NOT_IPV) and multi-label (Physical, Emotional, Sexual) tasks.  
This allows testing how reasoning depth, context, and self-reflection affect accuracy, AUC, and interpretability.

| Prompt Type | Description | Advantage |
|--------------|-------------|------------|
| **Zero-Shot** | Direct question without examples or reasoning. | Tests model’s innate understanding of IPV cues. |
| **Few-Shot** | Includes short examples before prediction. | Leverages in-context learning and improves clarity. |
| **Chain-of-Thought** | Prompts reasoning steps before final label. | Makes predictions more explainable and robust. |
| **Meta-Prompt** | Frames model as an IPV analyst with reflective reasoning. | Encourages careful, context-aware decisions. |
| **Self-Consistency** | Runs multiple internal judgments and votes. | Improves stability and reduces random errors. |

These prompt variants help identify which instruction style yields the **most reliable and interpretable results** for both general and type-specific IPV detection.

### 1.1 Binary IPV Detection Prompts

In [None]:
# (1) Zero-Shot — final (no inline JSON examples)
prompt_binary_zeroshot = """
You are analyzing whether a sentence describes Intimate Partner Violence (IPV).

Definition:
Intimate Partner Violence includes any form of physical, emotional, or sexual harm
inflicted by a partner. This may involve control, manipulation, threats, humiliation,
or physical aggression.

Decide if the following sentence involves IPV or not.

Sentence: "{text}"

Return ONLY one JSON object, enclosed between <json> and </json>.
Format: a single key "label" with value "IPV" or "NOT_IPV".
Print nothing else.
""".strip()


# (2) Few-Shot — final (no inline JSON examples)
prompt_binary_fewshot = """
You are classifying sentences as describing Intimate Partner Violence (IPV) or NOT_IPV.

Definition:
IPV occurs when one partner harms or controls another physically, emotionally, or sexually.

Here are examples (for guidance only):
1. "He pushed me against the wall and shouted that I’m worthless." → IPV
2. "She often insults me and forbids me from seeing my friends." → IPV
3. "We argued but both apologized and moved on peacefully." → NOT_IPV
4. "He surprised me with flowers after work." → NOT_IPV

Now analyze this sentence:
"{text}"

Return ONLY one JSON object, enclosed between <json> and </json>.
Format: {"label": "IPV"} or {"label": "NOT_IPV"}.
Print nothing else.
""".strip()


# (3) Chain-of-Thought — final (no inline JSON examples)
prompt_binary_cot = """
You are reasoning step-by-step to decide if a sentence describes Intimate Partner Violence (IPV).

Guidelines:
- IPV includes threats, coercion, physical harm, humiliation, or emotional manipulation.
- NOT_IPV describes healthy, neutral, or unrelated situations.

Think internally (do NOT show your reasoning) about:
1. Does the sentence show any behavior that causes harm, fear, or control?
2. Is there a partner/relationship context?
3. Does it express affection or support instead of harm?

After thinking silently, return ONLY one JSON object, enclosed between <json> and </json>.
Format: {"label": "IPV"} or {"label": "NOT_IPV"}.
Print nothing else.

Sentence: "{text}"
""".strip()


# (4) Meta — final (no inline JSON examples)
prompt_binary_meta = """
You are a social-behavioral analyst evaluating sentences for signs of Intimate Partner Violence (IPV).

Your objective is to be accurate but cautious.
- If the sentence clearly involves harm, coercion, or control → label as IPV.
- If the sentence shows affection, neutrality, or uncertainty → label as NOT_IPV.

Reflect internally before answering; do NOT print your reasoning.

Sentence: "{text}"

Return ONLY one JSON object, enclosed between <json> and </json>.
Format: {"label": "IPV"} or {"label": "NOT_IPV"}.
Print nothing else.
""".strip()


# (5) Self-Consistency — final (no inline JSON examples)
prompt_binary_selfconsistency = """
You will internally evaluate the sentence for Intimate Partner Violence (IPV) multiple times
and choose the majority label as your final answer.

Guidelines:
- IPV → signs of physical, emotional, or sexual harm, threats, or coercion.
- NOT_IPV → supportive, neutral, or unrelated content.

Do NOT reveal thoughts or votes.

Sentence: "{text}"

Return ONLY one JSON object, enclosed between <json> and </json>.
Format: {"label": "IPV"} or {"label": "NOT_IPV"}.
Print nothing else.
""".strip()

### 1.2 Multi-label IPV Type Detection Prompts

In [None]:
# (1) Zero-Shot — strict JSON output (no example line)
prompt_multilabel_zeroshot = """
You are classifying a sentence for Intimate Partner Violence (IPV) subtypes.

Valid labels (choose any subset): Physical, Emotional, Sexual, NOT_IPV.
If the sentence does not describe IPV, include only NOT_IPV.
If it shows multiple IPV types, include all that apply.

Return EXACTLY one JSON object wrapped in <json> and </json>.
Use a single key "labels" whose value is a list drawn ONLY from:
["Physical", "Emotional", "Sexual", "NOT_IPV"].
Do not include any other keys or text.

Sentence: "{text}"
""".strip()


# (2) Few-Shot — guided, strict JSON output (no example line)
prompt_multilabel_fewshot = """
You are classifying a sentence for Intimate Partner Violence (IPV) subtypes.

Valid labels (choose any subset): Physical, Emotional, Sexual, NOT_IPV.

Guidance (for understanding only — do not copy these into your output):
- Physical: hitting, pushing, choking, restraining, use or threat of physical force.
- Emotional: humiliation, manipulation, isolation, threats, control, verbal abuse.
- Sexual: coercion, unwanted sexual acts, pressure, harassment.
- NOT_IPV: ordinary disagreement or neutral statement without violence or coercion.

If the sentence shows multiple IPV types, include all.
If it shows none, include only NOT_IPV.

Return EXACTLY one JSON object wrapped in <json> and </json>.
Use a single key "labels" whose value is a list drawn ONLY from:
["Physical", "Emotional", "Sexual", "NOT_IPV"].
Print nothing else.

Sentence: "{text}"
""".strip()


# (3) Chain-of-Thought — silent reasoning, strict JSON output (no example line)
prompt_multilabel_cot = """
Decide which IPV subtype(s) apply to the sentence.
Think silently and do not reveal your reasoning.

Valid labels: Physical, Emotional, Sexual, NOT_IPV.
If none apply, include only NOT_IPV.

Return EXACTLY one JSON object wrapped in <json> and </json>.
Use a single key "labels" whose value is a list drawn ONLY from:
["Physical", "Emotional", "Sexual", "NOT_IPV"].
Print nothing else.

Sentence: "{text}"
""".strip()


# (4) Meta — expert framing, strict JSON output (no example line)
prompt_multilabel_meta = """
You are an expert on Intimate Partner Violence (IPV) classification.
Reflect internally; do not show your reasoning.

Valid labels: Physical, Emotional, Sexual, NOT_IPV.
If none apply, include only NOT_IPV.

Return EXACTLY one JSON object wrapped in <json> and </json>.
Use a single key "labels" whose value is a list drawn ONLY from:
["Physical", "Emotional", "Sexual", "NOT_IPV"].
Print nothing else.

Sentence: "{text}"
""".strip()


# (5) Self-Consistency — internal deliberation, strict JSON output (no example line)
prompt_multilabel_selfconsistency = """
Evaluate the sentence multiple times INTERNALLY and output a stable, consistent final set of labels.

Valid labels: Physical, Emotional, Sexual, NOT_IPV.
If none apply, include only NOT_IPV.

Return EXACTLY one JSON object wrapped in <json> and </json>.
Use a single key "labels" whose value is a list drawn ONLY from:
["Physical", "Emotional", "Sexual", "NOT_IPV"].
Print nothing else.

Sentence: "{text}"
""".strip()

## 2. System & Model Setup

In [None]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import json
import os
import time
from datetime import datetime

In [None]:
#FILENAMES
model_name = "Qwen/Qwen2.5-7B-Instruct"

#Load Model & Tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto",
    torch_dtype=torch.bfloat16
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json:   0%|          | 0.00/663 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 4 files:   0%|          | 0/4 [00:00<?, ?it/s]

model-00001-of-00004.safetensors:   0%|          | 0.00/3.95G [00:00<?, ?B/s]

model-00004-of-00004.safetensors:   0%|          | 0.00/3.56G [00:00<?, ?B/s]

model-00003-of-00004.safetensors:   0%|          | 0.00/3.86G [00:00<?, ?B/s]

model-00002-of-00004.safetensors:   0%|          | 0.00/3.86G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/243 [00:00<?, ?B/s]



## 3. Generate Predictions

In [None]:
#Clone from git
!git clone https://github.com/zelaneroz/ipvresearch25
%cd ipvresearch25/1_LLM_Eval

Cloning into 'ipvresearch25'...
remote: Enumerating objects: 39, done.[K
remote: Counting objects: 100% (39/39), done.[K
remote: Compressing objects: 100% (34/34), done.[K
remote: Total 39 (delta 16), reused 22 (delta 5), pack-reused 0 (from 0)[K
Receiving objects: 100% (39/39), 5.54 MiB | 21.66 MiB/s, done.
Resolving deltas: 100% (16/16), done.
/content/ipvresearch25/1_LLM_Eval


In [None]:
#Load dataset
filename = "../Dataset/617points.csv"
df = pd.read_csv(filename)
print("Loaded dataset with", len(df), "rows")
df.head()

Loaded dataset with 618 rows


Unnamed: 0.1,Unnamed: 0,items,Physical Abuse,Emotional Abuse,Sexual Abuse,Tag,type
0,0,I'm sitting here with a goofy smile and feelin...,False,False,False,False,soft
1,1,It's really boosting my confidence when he say...,False,False,False,False,soft
2,2,I never imagined that someone could make me s...,False,False,False,False,soft
3,3,He motivates me to become the best version of ...,False,False,False,False,soft
4,4,He’s like a best friend that I can also live w...,False,False,False,False,soft


In [None]:
#Prompt dictionaries
binary_prompts = {
    "zeroshot": prompt_binary_zeroshot,
    "fewshot": prompt_binary_fewshot,
    "cot": prompt_binary_cot,
    "meta": prompt_binary_meta,
    "selfconsistency": prompt_binary_selfconsistency
}

multilabel_prompts = {
    "zeroshot": prompt_multilabel_zeroshot,
    "fewshot": prompt_multilabel_fewshot,
    "cot": prompt_multilabel_cot,
    "meta": prompt_multilabel_meta,
    "selfconsistency": prompt_multilabel_selfconsistency
}

In [None]:
def run_llm(prompt_text):
    """Feed a prompt into the LLM and return only the generated portion (not echoed prompt)."""
    inputs = tokenizer(prompt_text, return_tensors="pt").to(model.device)
    output = model.generate(
        **inputs,
        max_new_tokens=64,
        temperature=0.0,   # deterministic
        do_sample=False
    )
    gen_tokens = output[0][inputs["input_ids"].shape[-1]:]
    text = tokenizer.decode(gen_tokens, skip_special_tokens=True)
    return text.strip()

In [None]:
def test_binary_prompts(df, n_samples=3):
    """
    Run and test all binary prompt types.
    Extracts IPV / NOT_IPV from within <json>...</json> blocks
    and saves each prompt type's outputs to a separate JSON file.
    """
    from datetime import datetime
    from pathlib import Path
    import json
    import re

    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    results_dir = Path("../1_LLM_Eval/test_results")
    results_dir.mkdir(parents=True, exist_ok=True)

    df_subset = df.head(n_samples)
    print("Running binary classification tests...")
    print(f"Number of samples: {len(df_subset)}")
    print(f"Results will be saved in: {results_dir}\n")

    for prompt_type, template in binary_prompts.items():
        print(f"Testing prompt type: {prompt_type}")

        records = []

        for i, row in df_subset.iterrows():
            text = row["items"] if "items" in df.columns else str(row.iloc[0])
            prompt_text = template.replace("{text}", text)

            try:
                result_text = run_llm(prompt_text)
            except Exception as e:
                result_text = f"ERROR: {e}"

            # Extract IPV/NOT_IPV from <json>...</json>
            match = re.search(r"<json[^>]*>\s*(.*?)\s*</json>", result_text, re.DOTALL | re.IGNORECASE)
            label = None
            if match:
                block = match.group(1).strip()
                # Try to load as JSON
                try:
                    parsed = json.loads(block)
                    label = parsed.get("label") or parsed.get("labels")
                except json.JSONDecodeError:
                    # Fallback: detect plain string like NOT_IPV or IPV
                    if "NOT_IPV" in block.upper():
                        label = "NOT_IPV"
                    elif "IPV" in block.upper():
                        label = "IPV"
            else:
                # Fallback for any text outside JSON
                if "NOT_IPV" in result_text.upper():
                    label = "NOT_IPV"
                elif "IPV" in result_text.upper(o):
                    label = "IPV"

            record = {
                "id": int(i),
                "prompt_type": prompt_type,
                "extracted_label": label or "UNKNOWN"
            }
            records.append(record)

        # Save per prompt type
        output_path = results_dir / f"binary_{prompt_type}.json"
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(records, f, indent=4, ensure_ascii=False)

        print(f"Saved results for '{prompt_type}' to {output_path}")

    print("\nAll binary prompt tests completed.")

In [None]:
def test_multilabel_prompts(df, n_samples=3):
    """
    Run and test all multilabel prompt types.
    Extracts Physical / Emotional / Sexual / NOT_IPV labels from <json>...</json> blocks
    and saves each prompt type's outputs to a separate JSON file.
    """
    import re
    import json
    from datetime import datetime
    from pathlib import Path

    IPV_TYPES = ["Physical", "Emotional", "Sexual", "NOT_IPV"]

    # Prepare subset and output directory
    df_subset = df.head(n_samples)
    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    results_dir = Path("../1_LLM_Eval/test_results")
    results_dir.mkdir(parents=True, exist_ok=True)

    print("Running multilabel classification tests...")
    print(f"Number of samples: {len(df_subset)}")
    print(f"Results will be saved in: {results_dir}\n")

    # Iterate through multilabel prompt types
    for prompt_type, template in multilabel_prompts.items():
        print(f"Testing prompt type: {prompt_type}")

        records = []

        for i, row in df_subset.iterrows():
            # Get text input
            text = row["items"] if "items" in df.columns else str(row.iloc[0])
            prompt_text = template.replace("{text}", text)

            # Run model
            try:
                inputs = tokenizer(prompt_text, return_tensors="pt").to(model.device)
                output = model.generate(
                    **inputs,
                    max_new_tokens=256,
                    temperature=0.0,
                    do_sample=False
                )
                gen_tokens = output[0][inputs["input_ids"].shape[-1]:]
                result_text = tokenizer.decode(gen_tokens, skip_special_tokens=True)
            except Exception as e:
                result_text = f"ERROR: {e}"

            # Extract content within <json>...</json>
            labels = []
            match = re.search(r"<json[^>]*>\s*(.*?)\s*</json>", result_text, re.DOTALL | re.IGNORECASE)

            if match:
                block = match.group(1).strip()
                try:
                    parsed = json.loads(block)

                    # Handle dict format
                    if isinstance(parsed, dict):
                        val = parsed.get("labels") or parsed.get("label")
                        if val:
                            if isinstance(val, list):
                                labels = [v for v in val if v in IPV_TYPES]
                            elif isinstance(val, str) and val in IPV_TYPES:
                                labels = [val]

                    # Handle direct list format
                    elif isinstance(parsed, list):
                        labels = [v for v in parsed if v in IPV_TYPES]

                except json.JSONDecodeError:
                    # Fallback: scan text manually if JSON fails
                    for t in IPV_TYPES:
                        if t.lower() in block.lower():
                            labels.append(t)
            else:
                # Fallback: scan the entire text if no <json> tags
                for t in IPV_TYPES:
                    if t.lower() in result_text.lower():
                        labels.append(t)

            labels = sorted(set(labels)) or ["UNKNOWN"]

            record = {
                "id": int(i),
                "prompt_type": prompt_type,
                "extracted_labels": labels
            }
            records.append(record)

        # Save per prompt type
        output_path = results_dir / f"multilabel_{prompt_type}.json"
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(records, f, indent=4, ensure_ascii=False)

        print(f"Saved results for '{prompt_type}' to {output_path}")

    print("\nAll multilabel prompt tests completed.")

## 4. Evaluation Pipeline

1. **Prompt Comparison Phase**
   * ~~Design multiple prompt variants for each task:~~
     - ~~**Binary Prompts:** Different phrasings for IPV vs NOT_IPV classification.~~
     - ~~**Multi-label Prompts:** Different phrasings for type-specific labeling (Physical, Emotional, Sexual, None).~~
   * ~~Run all prompts on the same dataset.~~
   * Compute for each prompt:
     - Accuracy, F1-score, ROC-AUC (per task or per subtype)
     - Average confidence calibration (mean predicted probability for positives/negatives)
   * Visualize:
     - **Prompt Comparison Table or Bar Plot** showing AUC/F1 across prompts
     - Identify the **best-performing prompt** for each task (Binary, Multi-label)

2. **Final Evaluation with Best Prompt**
   * **Binary (General IPV Model):**
     - Use the best binary prompt to compute final **ROC curve**, **AUC**, **Precision-Recall**, **Accuracy**, and **F1-score**.
     - Plot **ROC Curve** (replicating Fig. 2a).
     - Generate **Waterfall Plot** — sentences sorted by prediction confidence, color-coded by true label (Physical, Emotional, Sexual, Negative).

   * **Multi-label (Type-Specific Models):**
     - Use the best multi-label prompt to compute **per-type ROC/AUC** for Physical, Emotional, and Sexual abuse.
     - Compare type-specific vs. general model performance (bar chart similar to Fig. 3a).
     - Produce **Waterfall Plots** for each subtype showing confidence distribution and overlap between true labels and predictions.

3. **(Optional) Extended Visualization**
   * Create a **3D or Radial Plot** showing confidence magnitudes of all three type-specific predictions for interpretability.

In [None]:
# ============================================================
# EVALUATION PIPELINE: Prompt Comparison + Final Evaluation
# ============================================================
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import json
import re
import os
import math
import numpy as np
import pandas as pd
from datetime import datetime

# ---------- Utilities ---------------------------------------------------------

def _ensure_list(x):
    if x is None or (isinstance(x, float) and math.isnan(x)):
        return []
    if isinstance(x, list):
        return x
    if isinstance(x, str):
        # split comma-joined strings like "Emotional, Physical"
        parts = [p.strip() for p in re.split(r"[;,]", x) if p.strip()]
        return parts if parts else [x]
    return [x]

def _try_extract_scores(record: dict) -> Optional[Dict[str, float]]:
    """
    Try to extract per-class probabilities/scores if your model emitted them
    inside raw_response JSON (e.g., {"probs": {"IPV":0.81,"NOT_IPV":0.19}} or
    {"scores":{"Physical":0.4,...}}). If not found, return None.
    """
    raw = record.get("raw_response", "")
    # Try to locate JSON block between <json>...</json> first
    m = re.search(r"<json[^>]*>\s*(\{.*?\})\s*</json>", raw, re.DOTALL | re.IGNORECASE)
    text_blocks = []
    if m:
        text_blocks.append(m.group(1))
    else:
        # fallback: any JSON-like object
        m2 = re.search(r"(\{.*\})", raw, re.DOTALL)
        if m2:
            text_blocks.append(m2.group(1))

    for block in text_blocks:
        try:
            parsed = json.loads(block)
            # Common keys people use
            for key in ("probs", "probabilities", "scores", "confidences"):
                if key in parsed and isinstance(parsed[key], dict):
                    # normalize keys to str, vals to float
                    return {str(k): float(v) for k, v in parsed[key].items()}
        except Exception:
            continue
    return None

def _binarize_labels(labels: List[str], positive="IPV") -> int:
    """Return 1 if positive present, else 0."""
    s = {lbl.strip() for lbl in _ensure_list(labels)}
    return 1 if positive in s else 0

def _one_vs_rest_columns() -> List[str]:
    return ["Physical", "Emotional", "Sexual"]

# ---------- Helper: process a results file ------------------------------------

def process_results_file(
    file_path: str | Path,
    df_truth: pd.DataFrame,
    task: str,  # "binary" or "multilabel"
    cache_dir: str | Path = "../1_LLM_Eval/eval_cache"
) -> Dict:
    """
    Load one result JSON file, merge with ground truth, and produce a tidy frame:
    Returns dict with:
      - 'file': file_path
      - 'prompt_type': inferred from file name
      - 'task': task
      - 'df': DataFrame with columns:
          id, y_true, y_pred, y_score (binary)
          ...or for multilabel: id, and for each class C in {Physical, Emotional, Sexual}:
              y_true_C, y_pred_C, y_score_C (if available)
      - 'has_scores': whether per-sample probabilities were found
    Caching: saves a compact .parquet keyed by mtime so future calls reuse.
    """
    file_path = Path(file_path)
    cache_dir = Path(cache_dir)
    cache_dir.mkdir(parents=True, exist_ok=True)
    stamp = f"{file_path.stat().st_mtime_ns}"
    cache_key = cache_dir / f"{file_path.stem}_{stamp}.parquet"

    if cache_key.exists():
        out = pd.read_parquet(cache_key)
        # Rehydrate metadata saved in footer
        meta_path = cache_dir / f"{file_path.stem}_{stamp}.meta.json"
        meta = json.loads(Path(meta_path).read_text()) if meta_path.exists() else {}
        return {
            "file": str(file_path),
            "prompt_type": meta.get("prompt_type", file_path.stem),
            "task": task,
            "df": out.copy(),
            "has_scores": bool(meta.get("has_scores", False)),
        }

    # Read results JSON
    with open(file_path, "r", encoding="utf-8") as f:
        records = json.load(f)

    df_res = pd.DataFrame(records)
    # Infer prompt_type if not carried
    prompt_type = df_res["prompt_type"].iloc[0] if "prompt_type" in df_res.columns else file_path.stem

    # Merge with ground truth on id (left join to keep only evaluated rows)
    key = "id"
    df_truth = df_truth.copy()
    if key not in df_truth.columns:
        raise ValueError("df_truth must include an 'id' column.")

    # Build tidy frame
    if task == "binary":
        if "label" not in df_truth.columns:
            raise ValueError("df_truth for binary must include a 'label' column with values in {'IPV','NOT_IPV'}")
        # y_true
        merged = df_res.merge(df_truth[["id", "label"]], on="id", how="inner")
        merged["y_true"] = merged["label"].map({"IPV": 1, "NOT_IPV": 0})
        # y_pred from extracted_label or response
        if "extracted_label" in merged.columns:
            pred_str = merged["extracted_label"]
        elif "response" in merged.columns:
            pred_str = merged["response"]
        else:
            raise ValueError("Binary results must contain 'extracted_label' or 'response'.")

        merged["y_pred"] = (pred_str.astype(str).str.upper() == "IPV").astype(int)

        # Optional scores
        scores = []
        has_scores = False
        for _, r in merged.iterrows():
            s = _try_extract_scores(r.to_dict())
            if s and any(k.upper() in ("IPV", "NOT_IPV") for k in s.keys()):
                has_scores = True
                # prefer IPV score if present; else invert NOT_IPV
                if "IPV" in s:
                    scores.append(float(s["IPV"]))
                elif "NOT_IPV" in s:
                    scores.append(1.0 - float(s["NOT_IPV"]))
                else:
                    scores.append(np.nan)
            else:
                scores.append(np.nan)
        merged["y_score"] = scores

        tidy = merged[["id", "y_true", "y_pred", "y_score"]].copy()

    elif task == "multilabel":
        # Ground truth expects boolean columns for each subtype
        need_cols = _one_vs_rest_columns()
        for c in need_cols:
            if c not in df_truth.columns:
                raise ValueError(f"df_truth for multilabel must include boolean column '{c}'")

        merged = df_res.merge(df_truth[["id"] + need_cols], on="id", how="inner")

        # Predictions: list of strings in 'extracted_labels' or 'response'
        if "extracted_labels" in merged.columns:
            preds = merged["extracted_labels"].apply(_ensure_list)
        elif "response" in merged.columns:
            preds = merged["response"].apply(_ensure_list)
        else:
            raise ValueError("Multilabel results must contain 'extracted_labels' or 'response' (list or comma string).")

        for c in need_cols:
            merged[f"y_true_{c}"] = merged[c].astype(int)
            merged[f"y_pred_{c}"] = preds.apply(lambda L: int(c in set(map(str, L))))

        # Optional per-class scores if present
        has_scores = False
        for c in need_cols:
            merged[f"y_score_{c}"] = np.nan

        for idx, r in merged.iterrows():
            s = _try_extract_scores(r.to_dict())
            if s:
                has_scores = True
                # normalize keys to capitalized class names if possible
                norm = {k.strip().capitalize(): v for k, v in s.items()}
                for c in need_cols:
                    if c in norm:
                        merged.at[idx, f"y_score_{c}"] = float(norm[c])

        tidy_cols = ["id"] + \
                    [f"y_true_{c}" for c in need_cols] + \
                    [f"y_pred_{c}" for c in need_cols] + \
                    [f"y_score_{c}" for c in need_cols]
        tidy = merged[tidy_cols].copy()

    else:
        raise ValueError("task must be 'binary' or 'multilabel'.")

    # Cache
    tidy.to_parquet(cache_key, index=False)
    meta = {"prompt_type": prompt_type, "has_scores": has_scores}
    (cache_dir / f"{file_path.stem}_{stamp}.meta.json").write_text(json.dumps(meta, indent=2))

    return {
        "file": str(file_path),
        "prompt_type": prompt_type,
        "task": task,
        "df": tidy.copy(),
        "has_scores": has_scores,
    }

In [None]:
# ---------- Metrics -----------------------------------------------------------

def _binary_metrics(df_bin: pd.DataFrame) -> Dict[str, float]:
    from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
    y_true = df_bin["y_true"].values
    y_pred = df_bin["y_pred"].values.astype(int)
    acc = float(accuracy_score(y_true, y_pred))
    f1  = float(f1_score(y_true, y_pred, zero_division=0))
    # AUC if scores available
    if df_bin["y_score"].notna().any():
        y_score = df_bin["y_score"].fillna(0.5).values
        try:
            auc = float(roc_auc_score(y_true, y_score))
        except Exception:
            auc = float("nan")
    else:
        auc = float("nan")
    return {"Accuracy": acc, "F1": f1, "ROC_AUC": auc}

def _multilabel_metrics(df_ml: pd.DataFrame) -> Dict[str, float]:
    from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
    classes = _one_vs_rest_columns()

    # Macro F1 across the three abuse types (ignoring NOT_IPV)
    y_true = np.vstack([df_ml[f"y_true_{c}"].values for c in classes]).T
    y_pred = np.vstack([df_ml[f"y_pred_{c}"].values for c in classes]).T
    f1_macro = float(f1_score(y_true, y_pred, average="macro", zero_division=0))
    f1_micro = float(f1_score(y_true, y_pred, average="micro", zero_division=0))

    # "Exact match" accuracy: all three match
    exact_acc = float((y_true == y_pred).all(axis=1).mean())

    # Per-class AUC if scores present
    aucs = {}
    have_any_scores = False
    for c in classes:
        s = df_ml[f"y_score_{c}"]
        if s.notna().any():
            have_any_scores = True
            try:
                aucs[c] = float(roc_auc_score(df_ml[f"y_true_{c}"].values, s.fillna(0.5).values))
            except Exception:
                aucs[c] = float("nan")
        else:
            aucs[c] = float("nan")

    # Aggregate AUC (macro) across available classes
    if have_any_scores:
        auc_macro = float(np.nanmean([aucs[c] for c in classes]))
    else:
        auc_macro = float("nan")

    out = {
        "F1_macro": f1_macro,
        "F1_micro": f1_micro,
        "ExactMatchAcc": exact_acc,
        "ROC_AUC_macro": auc_macro,
    }
    # include per-class AUC columns
    for c in classes:
        out[f"AUC_{c}"] = aucs[c]
    return out

# ---------- Prompt comparison over a directory --------------------------------

def compare_prompts(
    results_dir: str | Path,
    df_truth_binary: pd.DataFrame,
    df_truth_multilabel: pd.DataFrame,
) -> Dict[str, pd.DataFrame]:
    """
    Loop test_results directory, process each file once (using cache),
    compute metrics, and return two DataFrames:
      - df_cmp_binary: metrics per binary prompt
      - df_cmp_multilabel: metrics per multilabel prompt
    Also returns best prompt names for each task.
    """
    results_dir = Path(results_dir)

    rows_bin, rows_ml = [], []

    for fp in sorted(results_dir.glob("*.json")):
        name = fp.stem
        if name.startswith("binary_"):
            info = process_results_file(fp, df_truth_binary, task="binary")
            metrics = _binary_metrics(info["df"])
            row = {"prompt": info["prompt_type"], **metrics, "file": str(fp)}
            rows_bin.append(row)

        elif name.startswith("multilabel_"):
            info = process_results_file(fp, df_truth_multilabel, task="multilabel")
            metrics = _multilabel_metrics(info["df"])
            row = {"prompt": info["prompt_type"], **metrics, "file": str(fp)}
            rows_ml.append(row)

    df_cmp_binary = pd.DataFrame(rows_bin).sort_values(["F1", "ROC_AUC", "Accuracy"], ascending=False)
    df_cmp_multilabel = pd.DataFrame(rows_ml).sort_values(["F1_macro", "ROC_AUC_macro", "ExactMatchAcc"], ascending=False)

    best_binary = df_cmp_binary.iloc[0]["prompt"] if not df_cmp_binary.empty else None
    best_multilabel = df_cmp_multilabel.iloc[0]["prompt"] if not df_cmp_multilabel.empty else None

    return {
        "binary": df_cmp_binary.reset_index(drop=True),
        "multilabel": df_cmp_multilabel.reset_index(drop=True),
        "best_binary": best_binary,
        "best_multilabel": best_multilabel,
    }

# ---------- Visualization (comparison phase) ----------------------------------

def plot_prompt_comparison(
    df_cmp_binary: pd.DataFrame,
    df_cmp_multilabel: pd.DataFrame,
    out_dir: str | Path = "../1_LLM_Eval/figs"
):
    """
    Create simple tables and barplots comparing prompts (AUC/F1).
    Uses matplotlib only.
    """
    import matplotlib.pyplot as plt
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    # Binary: bar plot F1 and AUC
    if not df_cmp_binary.empty:
        plt.figure()
        x = np.arange(len(df_cmp_binary))
        plt.bar(x - 0.2, df_cmp_binary["F1"], width=0.4, label="F1")
        # Safe AUC display
        auc_vals = df_cmp_binary["ROC_AUC"].fillna(0)
        plt.bar(x + 0.2, auc_vals, width=0.4, label="ROC-AUC")
        plt.xticks(x, df_cmp_binary["prompt"], rotation=45, ha="right")
        plt.ylabel("Score")
        plt.title("Binary Prompt Comparison")
        plt.legend()
        plt.tight_layout()
        plt.savefig(out_dir / "binary_prompt_comparison.png", dpi=150)
        plt.close()

    # Multilabel: bar plot Macro F1 and Macro AUC
    if not df_cmp_multilabel.empty:
        plt.figure()
        x = np.arange(len(df_cmp_multilabel))
        plt.bar(x - 0.2, df_cmp_multilabel["F1_macro"], width=0.4, label="F1_macro")
        auc_vals = df_cmp_multilabel["ROC_AUC_macro"].fillna(0)
        plt.bar(x + 0.2, auc_vals, width=0.4, label="ROC_AUC_macro")
        plt.xticks(x, df_cmp_multilabel["prompt"], rotation=45, ha="right")
        plt.ylabel("Score")
        plt.title("Multilabel Prompt Comparison")
        plt.legend()
        plt.tight_layout()
        plt.savefig(out_dir / "multilabel_prompt_comparison.png", dpi=150)
        plt.close()

# ---------- Final Evaluation with selected prompt (no reprocessing) -----------

def final_evaluation_binary(
    best_file_path: str | Path,
    df_truth_binary: pd.DataFrame
):
    """
    Use the cached, processed outputs for the best binary prompt to:
      - Compute ROC curve, PR curve, Accuracy, F1
      - Produce a waterfall plot (sorted by score if available; else by prediction)
    Saves figures to ../1_LLM_Eval/figs.
    """
    import matplotlib.pyplot as plt
    from sklearn.metrics import roc_curve, precision_recall_curve, auc, accuracy_score, f1_score

    info = process_results_file(best_file_path, df_truth_binary, task="binary")
    dfb = info["df"]
    out_dir = Path("../1_LLM_Eval/figs"); out_dir.mkdir(parents=True, exist_ok=True)

    y_true = dfb["y_true"].values
    y_pred = dfb["y_pred"].values
    acc = float(accuracy_score(y_true, y_pred))
    f1  = float(f1_score(y_true, y_pred, zero_division=0))

    # ROC + PR if scores available; otherwise use 0/1 "scores"
    if dfb["y_score"].notna().any():
        scores = dfb["y_score"].fillna(0.5).values
    else:
        scores = y_pred.astype(float)

    fpr, tpr, _ = roc_curve(y_true, scores)
    roc_auc = float(auc(fpr, tpr))

    prec, rec, _ = precision_recall_curve(y_true, scores)
    pr_auc = float(auc(rec, prec))

    # Save ROC
    plt.figure()
    plt.plot(fpr, tpr, label=f"AUC={roc_auc:.3f}")
    plt.plot([0,1], [0,1], linestyle="--")
    plt.xlabel("FPR"); plt.ylabel("TPR"); plt.title("Binary ROC")
    plt.legend(); plt.tight_layout()
    plt.savefig(out_dir / "binary_ROC.png", dpi=150); plt.close()

    # Save PR
    plt.figure()
    plt.plot(rec, prec, label=f"PR AUC={pr_auc:.3f}")
    plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title("Binary PR Curve")
    plt.legend(); plt.tight_layout()
    plt.savefig(out_dir / "binary_PR.png", dpi=150); plt.close()

    # Waterfall: sort by score desc
    order = np.argsort(-scores)
    plt.figure()
    plt.bar(np.arange(len(scores)), scores[order], edgecolor="none")
    # overlay color by true class (blue=NOT_IPV 0, orange=IPV 1)
    for idx, o in enumerate(order):
        if y_true[o] == 1:
            plt.plot(idx, scores[order][idx], marker="o")  # simple overlay
    plt.xlabel("Samples (sorted by score)"); plt.ylabel("Score")
    plt.title("Binary Waterfall (dots mark true IPV=1)")
    plt.tight_layout()
    plt.savefig(out_dir / "binary_waterfall.png", dpi=150); plt.close()

    return {"Accuracy": acc, "F1": f1, "ROC_AUC": roc_auc, "PR_AUC": pr_auc}

def final_evaluation_multilabel(
    best_file_path: str | Path,
    df_truth_multilabel: pd.DataFrame
):
    """
    Use the cached, processed outputs for the best multi-label prompt to:
      - Compute per-type ROC/AUC (Physical, Emotional, Sexual)
      - Bar chart comparing type-specific AUCs and Macro F1
      - Waterfall plot for each subtype (sorted by score if available)
    Saves figures to ../1_LLM_Eval/figs.
    """
    import matplotlib.pyplot as plt
    from sklearn.metrics import roc_curve, auc, f1_score

    classes = _one_vs_rest_columns()
    info = process_results_file(best_file_path, df_truth_multilabel, task="multilabel")
    dfm = info["df"]
    out_dir = Path("../1_LLM_Eval/figs"); out_dir.mkdir(parents=True, exist_ok=True)

    # Metrics
    f1_macro = float(f1_score(
        np.vstack([dfm[f"y_true_{c}"] for c in classes]).T,
        np.vstack([dfm[f"y_pred_{c}"] for c in classes]).T,
        average="macro",
        zero_division=0
    ))

    aucs = {}
    for c in classes:
        y_true = dfm[f"y_true_{c}"].values
        s = dfm[f"y_score_{c}"]
        if s.notna().any():
            scores = s.fillna(0.5).values
        else:
            scores = dfm[f"y_pred_{c}"].values.astype(float)  # degenerate but allows plotting

        fpr, tpr, _ = roc_curve(y_true, scores)
        aucs[c] = float(auc(fpr, tpr))

        # Save subtype ROC
        plt.figure()
        plt.plot(fpr, tpr, label=f"AUC={aucs[c]:.3f}")
        plt.plot([0,1], [0,1], linestyle="--")
        plt.xlabel("FPR"); plt.ylabel("TPR"); plt.title(f"ROC — {c}")
        plt.legend(); plt.tight_layout()
        plt.savefig(out_dir / f"multilabel_ROC_{c}.png", dpi=150); plt.close()

        # Waterfall per subtype
        order = np.argsort(-scores)
        plt.figure()
        plt.bar(np.arange(len(scores)), scores[order], edgecolor="none")
        for idx, o in enumerate(order):
            if y_true[o] == 1:
                plt.plot(idx, scores[order][idx], marker="o")
        plt.xlabel("Samples (sorted by score)"); plt.ylabel("Score")
        plt.title(f"Waterfall — {c} (dots mark true {c}=1)")
        plt.tight_layout()
        plt.savefig(out_dir / f"multilabel_waterfall_{c}.png", dpi=150); plt.close()

    # Summary bar: AUCs
    plt.figure()
    xs = np.arange(len(classes))
    plt.bar(xs, [aucs[c] for c in classes])
    plt.xticks(xs, classes)
    plt.ylabel("AUC")
    plt.title(f"Type-specific AUCs (Macro F1={f1_macro:.3f})")
    plt.tight_layout()
    plt.savefig(out_dir / "multilabel_type_AUCs.png", dpi=150); plt.close()

    return {"F1_macro": f1_macro, **{f"AUC_{c}": aucs[c] for c in classes}}

# ---------- Example driver (glue) ---------------------------------------------

def run_prompt_comparison_and_final(
    results_dir="../1_LLM_Eval/test_results",
    df_truth_binary: pd.DataFrame = None,
    df_truth_multilabel: pd.DataFrame = None,
):
    """
    1) Prompt Comparison Phase: compute metrics per prompt, plot comparisons,
       identify best prompts for binary and multilabel.
    2) Final Evaluation with the best prompt (uses cached processing).
    Returns a dict with DataFrames, best names, and final metrics.
    """
    cmp_out = compare_prompts(results_dir, df_truth_binary, df_truth_multilabel)
    plot_prompt_comparison(cmp_out["binary"], cmp_out["multilabel"])

    # Find the files for best prompts without reprocessing
    res_dir = Path(results_dir)
    best_bin_name = cmp_out["best_binary"]
    best_ml_name  = cmp_out["best_multilabel"]

    best_bin_file = None
    best_ml_file  = None
    if best_bin_name:
        # pick the latest file that matches binary_bestBinName_*.json
        candidates = sorted(res_dir.glob(f"binary_{best_bin_name}_*.json"))
        if candidates:
            best_bin_file = candidates[-1]
    if best_ml_name:
        candidates = sorted(res_dir.glob(f"multilabel_{best_ml_name}_*.json"))
        if candidates:
            best_ml_file = candidates[-1]

    final_bin = final_ml = None
    if best_bin_file:
        final_bin = final_evaluation_binary(best_bin_file, df_truth_binary)
    if best_ml_file:
        final_ml  = final_evaluation_multilabel(best_ml_file, df_truth_multilabel)

    return {
        "comparison_binary": cmp_out["binary"],
        "comparison_multilabel": cmp_out["multilabel"],
        "best_binary": best_bin_name,
        "best_multilabel": best_ml_name,
        "final_binary_metrics": final_bin,
        "final_multilabel_metrics": final_ml,
    }
