In [1]:
import os, json, csv, time, random, base64, re
import requests
from pathlib import Path
from typing import Dict, Any, List, Tuple, Optional

MODEL_NAME = "llava:13b"
OLLAMA_HOST = "http://localhost:11434"
USE_CHAT_API = True
STREAM = False

IMAGE_ROOT = Path(r"C:\Users\Melita\CSE 4001\VLM-Eval-Research\data")
VLAT_JSON  = Path(r"C:\Users\Melita\CSE 4001\VLM-Eval-Research\data\VLAT\vlat_skip_orig.json")
CALVI_JSON = Path(r"C:\Users\Melita\CSE 4001\VLM-Eval-Research\data\CALVI\calvi_orig.json")

OUT_VLAT = IMAGE_ROOT.parent / "Output" / "VLAT" / "Llava13b_Eval" / "Random"
OUT_CALVI = IMAGE_ROOT.parent / "Output" / "CALVI" / "Llava13b_Eval" / "Random"

NUM_RUNS = 10
SLEEP_MIN_SEC = 5
SLEEP_MAX_SEC = 10
BASE_SEED = 12345
REQUEST_TIMEOUT = 600

VLAT_PROMPT = (
"""I am about to show you an image and ask you a multiple choice question about that image. 
Please structure your response in the following format:
Answer: [Enter the exact text of your chosen option]
Explanation: [Provide your reasoning]
Select the BEST answer, based only on the chart and not external knowledge. DO NOT GUESS.
If you are not sure about your answer or your answer is based on a guess, select "Omit".
Choose your answer ONLY from the provided options."""
)

CALVI_PROMPT = (
"""I am about to show you an image and ask you a multiple choice question about that image. 
Please structure your response in the following format:
Answer: [Enter the exact text of your chosen option(s)]
Explanation: [Provide your reasoning]
Select the BEST answer, based only on the chart and not external knowledge.
Choose your answer ONLY from the provided options."""
)


In [2]:
def _chunk_slices(total: int, plan: List[Any]) -> List[Tuple[int, int]]:
    out, start, remain = [], 0, total
    for p in (plan or []):
        n = remain if (p == "rest") else min(int(p), remain)
        if n <= 0:
            break
        out.append((start, start + n))
        start += n
        remain -= n
        if remain <= 0:
            break
    if not out and total > 0:
        out = [(0, total)]
    return out

def _sleep_between(min_s=SLEEP_MIN_SEC, max_s=SLEEP_MAX_SEC):
    dur = random.randint(int(min_s), int(max_s))
    print(f"Cooling down for {dur} seconds to avoid timeouts...")
    time.sleep(dur)

def _fieldnames_for_dataset(name: str) -> List[str]:
    base = [
        "id","testname","question","Chart_type","Task",
        "options","correct_answer","model_answer","is_correct",
        "image_path","elapsed_seconds","response_raw"
    ]
    if name.upper() == "CALVI":
        i = base.index("correct_answer") + 1
        base[i:i] = ["Misleader","wrong_due_to_misleader"]
    return base

def _ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

def _load_json(path: Path) -> Dict[str, Any]:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def _read_image_b64(path: Path) -> str:
    with open(path, "rb") as f:
        return base64.b64encode(f.read()).decode("utf-8")


In [3]:
def _base_prompt(q: Dict[str, Any]) -> str:
    opts = q.get("options", [])
    letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    lines = [
        f"Question: {q.get('question','').strip()}",
        "",
        "Options:"
    ]
    for i, opt in enumerate(opts):
        lines.append(f"{letters[i]}. {opt}")
    return "\n".join(lines)

def _format_prompt(q: Dict[str, Any], prepend: Optional[str] = None) -> str:
    core = _base_prompt(q)
    if prepend:
        return f"{prepend.strip()}\n\n{core}"
    return core

def _normalize(s: str) -> str:
    return re.sub(r"\s+", " ", s.strip().lower())

def _parse_model_answer(raw: str, options: List[str]) -> str:
    if not raw:
        return ""
    text = raw.strip()

    m = re.search(r"^answer\s*:\s*(.+)$", text, re.IGNORECASE | re.MULTILINE)
    if m:
        ans_txt = m.group(1).strip().strip('"').strip("'")
        ans_txt = re.sub(r"^\[(.*)\]$", r"\1", ans_txt).strip()

        norm_ans = _normalize(ans_txt)
        for opt in options:
            if _normalize(opt) == norm_ans:
                return opt
        for opt in options:
            if _normalize(opt) in norm_ans or norm_ans in _normalize(opt):
                return opt
        if re.fullmatch(r"[A-Za-z]", ans_txt):
            idx = ord(ans_txt.upper()) - ord('A')
            if 0 <= idx < len(options):
                return options[idx]
        return ans_txt

    m2 = re.search(r"\b([A-H])\b", text, re.IGNORECASE)
    if m2:
        idx = ord(m2.group(1).upper()) - ord('A')
        if 0 <= idx < len(options):
            return options[idx]

    lowered = _normalize(text)
    for opt in options:
        if _normalize(opt) in lowered:
            return opt

    return text


In [4]:
def _ollama_chat(prompt: str, images_b64: List[str]) -> str:
    url = f"{OLLAMA_HOST}/api/chat"
    payload = {
        "model": MODEL_NAME,
        "stream": False,
        "messages": [{
            "role": "user",
            "content": prompt,
            "images": images_b64 if images_b64 else None
        }]
    }
    if payload["messages"][0]["images"] is None:
        del payload["messages"][0]["images"]
    r = requests.post(url, json=payload, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()
    data = r.json()
    return data.get("message", {}).get("content", "")

def _ollama_generate(prompt: str, images_b64: List[str]) -> str:
    url = f"{OLLAMA_HOST}/api/generate"
    payload = {
        "model": MODEL_NAME,
        "prompt": prompt,
        "images": images_b64 if images_b64 else None,
        "stream": False
    }
    if payload["images"] is None:
        del payload["images"]
    r = requests.post(url, json=payload, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()
    data = r.json()
    return data.get("response", "")

def _resolve_image(img_rel: str) -> Path | None:
    img_rel_norm = str(img_rel).replace("\\", "/")
    p = Path(img_rel_norm)
    candidates = []

    if p.is_absolute():
        candidates.append(p)

    candidates.append(IMAGE_ROOT / p)

    parts = p.parts
    name = p.name
    dataset = parts[0] if parts else None
    if dataset in {"VLAT", "CALVI"}:
        candidates.append(IMAGE_ROOT / dataset / "images" / name)

    candidates.append(IMAGE_ROOT / "data" / p)
    if dataset in {"VLAT", "CALVI"}:
        candidates.append(IMAGE_ROOT / "data" / dataset / "images" / name)

    seen = set()
    for c in candidates:
        if c in seen:
            continue
        seen.add(c)
        if c.exists():
            return c
    return None

def call_llava(question: Dict[str, Any], prepend_prompt: Optional[str] = None) -> str:
    prompt = _format_prompt(question, prepend=prepend_prompt)
    images_b64 = []

    image_path = question.get("image_path")
    if image_path:
        resolved = _resolve_image(image_path)
        if resolved:
            try:
                images_b64.append(_read_image_b64(resolved))
            except Exception as e:
                print(f"[WARN] Could not read image {resolved}: {e}")
        else:
            print(f"[WARN] Image file not found (tried common locations) -> '{image_path}'")

    try:
        if USE_CHAT_API:
            return _ollama_chat(prompt, images_b64)
        else:
            return _ollama_generate(prompt, images_b64)
    except Exception as e:
        print(f"[ERROR] Ollama request failed: {e}")
        return ""


In [5]:
def _write_row(writer: csv.DictWriter, row: Dict[str, Any], f):
    writer.writerow(row)
    f.flush()

def _eval_one_run(dataset_name: str, qlist: List[Dict[str, Any]], out_csv_path: Path, run_index: int, prepend_prompt: Optional[str] = None):
    _ensure_dir(out_csv_path.parent)
    fields = _fieldnames_for_dataset(dataset_name)
    with open(out_csv_path, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fields)
        writer.writeheader()
        f.flush()

        N = len(qlist)
        first_batch_n = min(20, N)
        batches = _chunk_slices(N, [first_batch_n, "rest"])

        for bi, (s, e) in enumerate(batches):
            batch = qlist[s:e]
            for idx, q in enumerate(batch, start=s+1):
                t0 = time.time()
                raw = call_llava(q, prepend_prompt)
                elapsed = time.time() - t0

                model_ans = _parse_model_answer(raw, q.get("options", []))
                correct = q.get("correct_answer", "").strip()

                row = {
                    "id": q.get("id"),
                    "testname": dataset_name.upper(),
                    "question": q.get("question"),
                    "Chart_type": q.get("Chart_type"),
                    "Task": q.get("Task"),
                    "options": "; ".join(q.get("options", [])),
                    "correct_answer": correct,
                    "model_answer": model_ans,
                    "is_correct": str(model_ans.strip() == correct),
                    "image_path": q.get("image_path", ""),
                    "elapsed_seconds": f"{elapsed:.3f}",
                    "response_raw": raw.strip()
                }
                if dataset_name.upper() == "CALVI":
                    row["Misleader"] = q.get("Misleader", "")
                    row["wrong_due_to_misleader"] = q.get("wrong_due_to_misleader", "")

                _write_row(writer, row, f)

                if s == 0:
                    first_done = idx - s
                    if first_done == 5:
                        if dataset_name.upper() == "VLAT":
                            print("first 5 questions done for vlat")
                        else:
                            print("first 5 questions done for calvi")
                    elif first_done == 10:
                        if dataset_name.upper() == "VLAT":
                            print("first next 5 done")
                        else:
                            print("first next 5 done (calvi)")

            if bi == 0 and len(batches) > 1:
                _sleep_between()


In [6]:
def _outdir_for(dataset_name: str) -> Path:
    if dataset_name.upper() == "VLAT":
        return OUT_VLAT
    elif dataset_name.upper() == "CALVI":
        return OUT_CALVI
    else:
        raise ValueError("dataset_name must be 'VLAT' or 'CALVI'")

def run_experiment(dataset_name: str, json_path: Path, prompt_overrides: Optional[str] = None, runs: int = NUM_RUNS):
    dataset_name_u = dataset_name.upper()
    data = _load_json(json_path)
    questions = list(data.get("questions", []))
    outdir = _outdir_for(dataset_name_u)
    _ensure_dir(outdir)

    for run_idx in range(1, runs+1):
        if dataset_name_u == "VLAT":
            seed = BASE_SEED + run_idx
        else:
            seed = BASE_SEED + 10_000 + run_idx
        random.seed(seed)
        qlist = questions[:]
        random.shuffle(qlist)

        if dataset_name_u == "VLAT":
            out_csv = outdir / f"vlat_llava13b_run_{run_idx:02d}.csv"
        else:
            out_csv = outdir / f"calvi_llava13b_run_{run_idx:02d}.csv"

        print(f"Running {dataset_name_u}, run {run_idx:02d} -> {out_csv}")
        _eval_one_run(dataset_name_u, qlist, out_csv, run_idx, prepend_prompt=prompt_overrides)

    print(f"All {dataset_name_u} runs completed.")

def run_all(do_vlat: bool = True, do_calvi: bool = True, runs: int = NUM_RUNS):
    if do_vlat:
        run_experiment("VLAT", VLAT_JSON, prompt_overrides=VLAT_PROMPT, runs=runs)
    if do_calvi:
        run_experiment("CALVI", CALVI_JSON, prompt_overrides=CALVI_PROMPT, runs=runs)


In [7]:
run_experiment("VLAT",  VLAT_JSON,  prompt_overrides=VLAT_PROMPT,  runs=5)
run_experiment("CALVI", CALVI_JSON, prompt_overrides=CALVI_PROMPT, runs=5)

# Or both:
# run_all(do_vlat=True, do_calvi=True, runs=10)


Running VLAT, run 01 -> C:\Users\Melita\CSE 4001\VLM-Eval-Research\Output\VLAT\Llava13b_Eval\Random\vlat_llava13b_run_01.csv
first 5 questions done for vlat
first next 5 done
Cooling down for 8 seconds to avoid timeouts...
Running VLAT, run 02 -> C:\Users\Melita\CSE 4001\VLM-Eval-Research\Output\VLAT\Llava13b_Eval\Random\vlat_llava13b_run_02.csv
first 5 questions done for vlat
first next 5 done
Cooling down for 5 seconds to avoid timeouts...
Running VLAT, run 03 -> C:\Users\Melita\CSE 4001\VLM-Eval-Research\Output\VLAT\Llava13b_Eval\Random\vlat_llava13b_run_03.csv
first 5 questions done for vlat
first next 5 done
Cooling down for 10 seconds to avoid timeouts...
Running VLAT, run 04 -> C:\Users\Melita\CSE 4001\VLM-Eval-Research\Output\VLAT\Llava13b_Eval\Random\vlat_llava13b_run_04.csv
first 5 questions done for vlat
first next 5 done
Cooling down for 9 seconds to avoid timeouts...
Running VLAT, run 05 -> C:\Users\Melita\CSE 4001\VLM-Eval-Research\Output\VLAT\Llava13b_Eval\Random\vlat_ll