In [None]:
"""Chart QA via BLIP-2 (standard pipeline) ‚Äî chart image + question (TF or MC) ‚Üí answer."""

!pip install -q transformers torch accelerate pillow openpyxl tqdm

import os
import re
import json
import random
import time
from datetime import datetime
import pandas as pd
import torch
from tqdm import tqdm
from PIL import Image
from torchvision import transforms as T
from transformers import Blip2Processor, Blip2ForConditionalGeneration
from huggingface_hub import notebook_login
from google.colab import drive


In [None]:
# -------------- Mount Drive --------------
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# -------------- Configuration --------------
SEED = 42
random.seed(SEED)
torch.manual_seed(SEED)

HF_TOKEN = "********************************"   # your token
MODEL_ID = "Salesforce/blip2-opt-2.7b"
DTYPE = torch.float16

os.environ["HUGGINGFACE_HUB_TOKEN"] = HF_TOKEN
notebook_login()


VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv‚Ä¶

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"‚úÖ Running on {device.upper()}")

‚úÖ Running on CUDA


In [None]:
# -------------- Paths --------------
DATA_ROOT = "/content/drive/MyDrive/Project/test_folder"
TF_JSON = os.path.join(DATA_ROOT, "TF_data.json")
MC_JSON = os.path.join(DATA_ROOT, "MC_data.json")

DETAILED_RESULTS_XLSX = os.path.join(DATA_ROOT, "blip2_detailed_results.xlsx")
SUMMARY_RESULTS_XLSX  = os.path.join(DATA_ROOT, "blip2_summary_result.xlsx")

assert os.path.exists(TF_JSON), f"TF JSON not found: {TF_JSON}"
assert os.path.exists(MC_JSON), f"MC JSON not found: {MC_JSON}"


In [None]:
# -------------- Helpers --------------
def normalize_tf(pred):
    p = str(pred).strip().lower()
    if re.fullmatch(r"(true|t)", p): return "True"
    if re.fullmatch(r"(false|f)", p): return "False"
    if "true" in p and "false" not in p: return "True"
    if "false" in p and "true" in p: return "False"
    return "True" if "yes" in p else ("False" if "no" in p else "False")

def normalize_mc(pred, choices=None):
    if not pred:
        return ""
    p = str(pred).strip().upper()
    m = re.search(r"\b([A-F])\b", p)
    if m:
        return m.group(1)
    if choices:
        for idx, c in enumerate(choices):
            if c and c.strip().lower() in p.lower():
                return chr(ord("A") + idx)
    return p[:1] if p else ""

def build_prompt(qtype, question, choices=None):
    if qtype == "TF":
        return (
            "Example 1: Net profit rose in 2020 vs 2019? <answer>False</answer>\n"
            "Example 2: Expenses decreased in 2022 vs 2021? <answer>True</answer>\n"
            "<start_of_image>\n"
            "You are given a chart image and a True/False question.\n"
            "Respond ONLY with 'True' or 'False' inside <answer> tags.\n\n"
            f"Question: {question}\n<answer>"
        )
    elif qtype == "MC":
        example = (
            "Example 1: Which year had highest revenue? Choices: A.2018 B.2019 C.2020 D.2021 <answer>A</answer>\n"
            "Example 2: Which product had lowest cost? Choices: A.P1 B.P2 C.P3 D.P4 <answer>D</answer>\n"
            "Example 3: Which region grew fastest? Choices: A.Asia B.Europe C.US D.Africa <answer>B</answer>\n"
        )
        choices_str = "\n".join(f"{chr(ord('A')+i)}. {c}" for i, c in enumerate(choices or []))
        return (
            example
            + "<start_of_image>\n"
            "You are given a chart image and a multiple‚Äêchoice question.\n"
            "Respond ONLY with the letter inside <answer> tags.\n\n"
            f"Question: {question}\nChoices:\n{choices_str}\n<answer>"
        )
    else:
        raise ValueError(f"Unsupported question type: {qtype}")

def extract_final_answer(raw_output: str, choices=None):
    if not isinstance(raw_output, str) or not raw_output.strip():
        return ""
    text = raw_output.strip()
    m_tag = re.findall(r"<\s*answer\s*>\s*([A-Z0-9TrueFalse]+?)\s*<\s*/\s*answer\s*>", text,
                       flags=re.IGNORECASE)
    if m_tag:
        ans = m_tag[-1].strip().capitalize()
        return ans
    if choices:
        for idx, c in enumerate(choices):
            if c and c.strip().lower() in text.lower():
                return chr(ord('A') + idx)
    m_fallback = re.findall(r"\b(True|False|[A-F])\b", text, flags=re.IGNORECASE)
    if m_fallback:
        return m_fallback[-1].strip().capitalize()
    return ""


# ---------------- Dataset Loader ----------------
def load_items(path, qtype, subfolder):
    with open(path, "r", encoding="utf-8") as f:
        items = json.load(f)
    return [
        {
            "type":      qtype,
            "question":  it["question"],
            "answer":    it["answer"],
            "choices":   (it.get("choices") if qtype=="MC" else None),
            "image_path": os.path.join(DATA_ROOT, subfolder, it["image"])
        }
        for it in items
    ]

def load_local_dataset(num_mc=10, num_tf=10):
    mc = load_items(MC_JSON, "MC", "MC_images")
    tf = load_items(TF_JSON, "TF", "TF_images")
    data = mc[:num_mc] + tf[:num_tf]
    random.shuffle(data)
    return data


In [None]:
# -------------- Model Setup --------------
print("üîÑ Loading BLIP-2 model ‚Ä¶")
processor = Blip2Processor.from_pretrained(MODEL_ID)
model = Blip2ForConditionalGeneration.from_pretrained(
    MODEL_ID,
    torch_dtype=DTYPE,
    device_map="auto"
).to(device)
model.eval()
print("‚úÖ Model loaded on:", device)

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


üîÑ Loading BLIP-2 model ‚Ä¶


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/432 [00:00<?, ?B/s]

processor_config.json:   0%|          | 0.00/68.0 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/882 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

added_tokens.json:   0%|          | 0.00/23.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/548 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


config.json: 0.00B [00:00, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/10.0G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/141 [00:00<?, ?B/s]

‚úÖ Model loaded on: cuda


In [None]:
# -------------- Inference Function --------------
@torch.no_grad()
def infer_blip2(image_path, question, qtype, choices=None):
    # load image
    image = Image.open(image_path).convert("RGB")
    prompt = build_prompt(qtype, question, choices)
    inputs = processor(images=image, text=prompt, return_tensors="pt").to(device, DTYPE)
    out_ids = model.generate(**inputs, max_new_tokens=128)
    answer  = processor.decode(out_ids[0], skip_special_tokens=True)
    return answer.strip()

def accuracy(preds, gts):
    return sum(str(p).strip()==str(gt).strip() for p,gt in zip(preds, gts)) / len(preds) if preds else 0


In [None]:
# ---------------- Evaluation ----------------
def evaluate_and_summarize(examples):
    rows, preds, gts, types = [], [], [], []
    print("üöÄ Running inference ‚Ä¶")

    start_all = time.time()

    for idx, ex in enumerate(tqdm(examples, desc="Evaluating", unit="example")):
        t0 = time.time()
        raw = infer_blip2(ex["image_path"], ex["question"], ex["type"], ex.get("choices"))
        t1 = time.time()
        time_taken = t1 - t0

        if ex["type"] == "MC":
            extracted = extract_final_answer(raw, ex.get("choices"))
        else:
            extracted = extract_final_answer(raw, None)

        if ex["type"] == "MC":
            norm = normalize_mc(extracted, ex.get("choices"))
        else:
            norm = normalize_tf(extracted)

        rows.append({
            "image_path":       ex["image_path"],
            "type":             ex["type"],
            "question":         ex["question"],
            "ground_truth":     ex["answer"],
            "prediction_by_model": norm,
            "raw_output":       (raw[:200] + "..." if len(raw)>200 else raw),
            "raw_output_2":     raw,
            "raw_output_length": len(raw),
            "inference_time_secs": time_taken,
            "tokens_per_question": None  # text token count could be added
        })
        preds.append(norm)
        gts.append(ex["answer"])
        types.append(ex["type"])

    total_time = time.time() - start_all
    avg_time = total_time / len(examples) if examples else 0
    throughput = len(examples) / total_time if total_time > 0 else 0

    summary = {
        "model_name":        MODEL_ID,
        "MC_Score":          round(accuracy([p for p,t in zip(preds,types) if t=="MC"],
                                            [g for g,t in zip(gts,types) if t=="MC"]), 4),
        "TF_Score":          round(accuracy([p for p,t in zip(preds,types) if t=="TF"],
                                            [g for g,t in zip(gts,types) if t=="TF"]), 4),
        "Weighted_Avg":      round(accuracy(preds, gts), 4),
        "Total_Questions":   len(preds),
        "Total_Time_secs":   round(total_time, 2),
        "Avg_Time_per_Q":    round(avg_time, 4),
        "Examples_per_sec":  round(throughput, 2),
        "Timestamp":         datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    print("‚úÖ Inference completed!")
    print(f"üìä MC Accuracy: {summary['MC_Score']}")
    print(f"üìä TF Accuracy: {summary['TF_Score']}")
    print(f"üìä Weighted Avg: {summary['Weighted_Avg']}")
    print(f"‚è± Total Time (s): {summary['Total_Time_secs']}")
    print(f"üìà Throughput (examples/sec): {summary['Examples_per_sec']}")
    return pd.DataFrame(rows), summary


In [None]:
# ---------------- Main Execution ----------------
if __name__ == "__main__":
    examples = load_local_dataset(num_mc=10, num_tf=10)
    print(f"Total examples: {len(examples)}")
    df_details, summary = evaluate_and_summarize(examples)

    timestamp     = datetime.now().strftime("%Y%m%d_%H%M%S")
    detailed_path = DETAILED_RESULTS_XLSX.replace(".xlsx", f"_{timestamp}.xlsx")
    df_details.to_excel(detailed_path, index=False)
    print("‚úÖ Saved detailed results:", detailed_path)

    df_sum = pd.DataFrame([summary])
    if os.path.exists(SUMMARY_RESULTS_XLSX):
        old   = pd.read_excel(SUMMARY_RESULTS_XLSX)
        df_sum = pd.concat([old, df_sum], ignore_index=True)
    df_sum.to_excel(SUMMARY_RESULTS_XLSX, index=False)
    print("‚úÖ Saved summary results:", SUMMARY_RESULTS_XLSX)


Total examples: 20
üöÄ Running inference ‚Ä¶


Evaluating: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 20/20 [00:13<00:00,  1.51example/s]


‚úÖ Inference completed!
üìä MC Accuracy: 0.4
üìä TF Accuracy: 0.6
üìä Weighted Avg: 0.5
‚è± Total Time (s): 13.25
üìà Throughput (examples/sec): 1.51
‚úÖ Saved detailed results: /content/drive/MyDrive/Project/test_folder/blip2_detailed_results_20251216_194245.xlsx
‚úÖ Saved summary results: /content/drive/MyDrive/Project/test_folder/blip2_summary_result.xlsx
