# Testing pipeline

In [None]:
!pip install  -U -q git+https://github.com/huggingface/transformers.git git+https://github.com/huggingface/trl.git datasets bitsandbytes peft qwen-vl-utils wandb accelerate
# Tested with transformers==4.53.0.dev0, trl==0.20.0.dev0, datasets==3.6.0, bitsandbytes==0.46.0, peft==0.15.2, qwen-vl-utils==0.0.11, wandb==0.20.1, accelerate==1.8.1

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone


In [None]:
!pip install -q torch==2.4.1+cu121 torchvision==0.19.1+cu121 torchaudio==2.4.1+cu121 --extra-index-url https://download.pytorch.org/whl/cu121

In [None]:
from pathlib import Path
import os
import torch
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor

os.environ["DATA"] = "/content/drive/MyDrive/IMT/Internship"
db_dir = os.environ["DATA"] + "PatImgXAI_data/db.vlm/" #(6000  train datasets , 600 test, 600 valid)
db = os.path.join(db_dir, "datasets")
model_dir = os.environ["DATA"] + "models/db.vlm/qwen2.5-vl"

In [None]:
system_message = """You are a Vision Language Model specialized in analyzing images containing colored shapes on a 6x6 grid labeled A–F (columns) and 1–6 (rows).
Follow the RULE in the user message exactly.

Respond in EXACTLY two tagged sections and in THIS order:
-“only X” → zero non-X within the stated scope; empty cells are ignored (one X + the rest empty still qualifies).
- “at least / exactly / at most N” → ≥ / == / ≤.
- Comparatives (e.g., “twice as many X as Y”) refer to counts of shapes matching those filters.
<|human|>
- Answer: 'This respect the rule' or 'This does not respect the rule'.
- Then explain: counts by shape/color and positions used to decide. Empty cells never count or break rules.

<|system|>
In THIS section return **only** a valid JSON object (no extra text) with exactly these keys:
{
  "relevant_positions":   ["A6"],
  "irrelevant_positions": ["A2","A4",…],
  "relevant_shapes":      ["circle"],
  "irrelevant_shapes":    ["triangle","triangle",…],
  "relevant_colors":      ["blue"],
  "irrelevant_colors":    ["green","blue",…]
}
- Return **only** a valid JSON object in THIS section. **Stop immediately after the closing `}`. Do not output anything else.**
- Coverage: `relevant_*` + `irrelevant_*` must enumerate **all shapes in the entire grid** (empty cells ignored).
- Arrays are **index-aligned**: positions[i] ↔ shapes[i] ↔ colors[i].
- Total shapes ≤ 36. No duplicates.

"""

In [None]:
from PIL import Image
import os
import json

def format_data(sample, rule_prompt):
    # Construct the full path to the image file
    image_path = os.path.join(db_dir, sample["path"])
    return [
        {"role": "system", "content": [{"type": "text", "text": system_message}]},
        {"role": "user", "content": [
            {"type": "image", "image": image_path},
            {"type": "text", "text": rule_prompt}
        ]},
        {"role": "assistant", "content": [
            {"type": "text", "text":
                "<|human|>"  + sample["explanation"]["human"] + "\n" +
                "<|system|>" + json.dumps(sample["explanation"]["system"], separators=(',',':'))}
        ]}
    ]

###Data Preprocessing

In [None]:
file_rule_map = {
    "onlycircle_train_explanations.json": "Does this image has at least one row that contains only circles?",
    "red_in_row_2_train_explanations.json": "If row 2 (index 1) contains a red symbol, does the image contain at least as many triangles as blue symbols? Otherwise, does the image contain at least as many green symbols as squares?",
}

def get_red_in_row_2_rule(sample):
    """
    This function checks if a red symbol is in row 2 and returns the appropriate rule.
    """
    # Assuming the 'explanation' contains information about the grid
    if "row 2" in sample["explanation"] and "red" in sample["explanation"]:
        return "Does the image contain at least as many triangles as blue symbols?"
    else:
        return "Does the image contain at least as many green symbols as squares?"


def get_rule_prompt(filename, sample=None):
    """
    Get the rule prompt for a given filename.
    :param filename: The name of the dataset file
    :param sample: The data sample, required for conditional rules.
    :return: The corresponding rule prompt.
    """
    # takes a filename and and replace any file with _valid or _train extension at the end with _train in order to get the rule prompt for that filename
    basename = os.path.basename(filename)
    lookup_name = basename.replace("_valid_", "_train_").replace("_test_", "_train_")

    if lookup_name == "red_in_row_2_train_explanations.json":
        if sample:
            return get_red_in_row_2_rule(sample)
        else:
            # Return a default or combined rule description if no sample is provided
            return file_rule_map[lookup_name]

    return file_rule_map[lookup_name]

In [None]:
import json

train_files = [
    "onlycircle_train_explanations.json",
    "red_in_row_2_train_explanations.json"
]

valid_files = [
    "onlycircle_valid_explanations.json",
    "red_in_row_2_valid_explanations.json"
]

test_files = [
    "onlycircle_test_explanations.json",
    "red_in_row_2_test_explanations.json"
]

train_data_fullpath = [os.path.join(db, fname) for fname in train_files]
valid_data_fullpath = [os.path.join(db, fname) for fname in valid_files]
test_data_fullpath  = [os.path.join(db, fname) for fname in test_files]

#preprocess dataset
def process_files(file_list):
    all_formatted = []
    for file in file_list:
        with open(file, 'r') as f:
            samples = json.load(f)
            for sample in samples:
                rule_prompt = get_rule_prompt(file, sample)
                # Modify to return a list of dictionaries
                all_formatted.append({"messages": format_data(sample, rule_prompt)})
    return all_formatted

#format the dataset into a conversation structure
train_dataset = process_files(train_data_fullpath)
valid_dataset = process_files(valid_data_fullpath)
test_dataset = process_files(test_data_fullpath)


print("All datasets processed and saved!")


All datasets processed and saved!


In [None]:
test_dataset[0]

{'messages': [{'role': 'system',
   'content': [{'type': 'text',
     'text': 'You are a Vision Language Model specialized in analyzing images containing colored shapes on a 6x6 grid labeled A–F (columns) and 1–6 (rows).\nFollow the RULE in the user message exactly.\n\nRespond in EXACTLY two tagged sections and in THIS order:\n-“only X” → zero non-X within the stated scope; empty cells are ignored (one X + the rest empty still qualifies).\n- “at least / exactly / at most N” → ≥ / == / ≤.\n- Comparatives (e.g., “twice as many X as Y”) refer to counts of shapes matching those filters.\n<|human|>\n- Answer: \'This respect the rule\' or \'This does not respect the rule\'.\n- Then explain: counts by shape/color and positions used to decide. Empty cells never count or break rules.\n\n<|system|>\nIn THIS section return **only** a valid JSON object (no extra text) with exactly these keys:\n{\n  "relevant_positions":   ["A6"],\n  "irrelevant_positions": ["A2","A4",…],\n  "relevant_shapes":     

In [None]:
test_dataset

[{'messages': [{'role': 'system',
    'content': [{'type': 'text',
      'text': 'You are a Vision Language Model specialized in analyzing images containing colored shapes on a 6x6 grid labeled A–F (columns) and 1–6 (rows).\nFollow the RULE in the user message exactly.\n\nRespond in EXACTLY two tagged sections and in THIS order:\n-“only X” → zero non-X within the stated scope; empty cells are ignored (one X + the rest empty still qualifies).\n- “at least / exactly / at most N” → ≥ / == / ≤.\n- Comparatives (e.g., “twice as many X as Y”) refer to counts of shapes matching those filters.\n<|human|>\n- Answer: \'This respect the rule\' or \'This does not respect the rule\'.\n- Then explain: counts by shape/color and positions used to decide. Empty cells never count or break rules.\n\n<|system|>\nIn THIS section return **only** a valid JSON object (no extra text) with exactly these keys:\n{\n  "relevant_positions":   ["A6"],\n  "irrelevant_positions": ["A2","A4",…],\n  "relevant_shapes":  

In [None]:
test_dataset[0]["messages"][2]["content"][0]["text"]

'<|human|>This image does not respect the \'at least one row with only circles\' rule, as no row was found to contain only circle symbols.\n<|system|>{"relevant_positions":[],"irrelevant_positions":["A1","A4","A6","B4","B5","B6","C1","C3","C5","D3","D5","E3","E5","E6","F1","F2","F4","F5"],"relevant_shapes":[],"irrelevant_shapes":["triangle","circle","circle","triangle","triangle","circle","square","square","square","circle","triangle","square","triangle","triangle","square","triangle","square","circle"],"relevant_colors":[],"irrelevant_colors":["red","blue","green","red","blue","green","red","red","green","blue","red","yellow","green","red","green","red","green","yellow"]}'

In [None]:
from datasets import Dataset
train_dataset = Dataset.from_list(train_dataset)
valid_dataset = Dataset.from_list(valid_dataset)
test_dataset = Dataset.from_list(test_dataset)
from qwen_vl_utils import process_vision_info

###Model Loading and Preparation

In [None]:
model_id = "Qwen/Qwen2.5-VL-7B-Instruct"
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    model_id,
    torch_dtype=torch.float16,
    device_map="auto",
    attn_implementation="sdpa"
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
adapter_path = "/content/drive/MyDrive/IMT/Internshipmodels/db.vlm/qwen2.5-vl/checkpoint-320"
model.load_adapter(adapter_path, adapter_name="finetuned_adapter")
processor = AutoProcessor.from_pretrained(model_id)

###Data Cleaning and Formatting for Model Input

In [None]:
def clean_turns(messages):
    cleaned = []
    for turn in messages:
        new_content = []
        for b in turn.get("content", []):
            if b.get("type") == "image" and b.get("image") is not None:
                new_content.append({"type":"image","image": b["image"]})
            elif b.get("type") == "text" and b.get("text") is not None:
                new_content.append({"type":"text","text": b["text"]})
        if new_content:
            cleaned.append({"role": turn["role"], "content": new_content})
    return cleaned

def only_image_blocks(messages):
    image_only = []
    for turn in messages:
        image_blocks = [b for b in turn.get("content", []) if b.get("type") == "image" and b.get("image") is not None]
        if image_blocks:
            image_only.append({"role": turn["role"], "content": image_blocks})
    return image_only

def generate_text_from_sample(model, processor, sample, max_new_tokens=1024, device="cuda"):
    """
    Generates text from a single sample containing text and an image.
    """
    messages = sample if isinstance(sample, list) else sample["messages"]
    # Use both system and user prompts for context
    raw_ctx = [m for m in messages if m["role"] in ("system", "user")][:2]
    ctx = clean_turns(raw_ctx)

    text_input = processor.apply_chat_template(
        ctx, tokenize=False, add_generation_prompt=True
    )

    # Process the visual input from the sample
    # This part assumes your utility functions can handle the message format
    img_ctx = only_image_blocks(ctx)
    image_inputs, _ = process_vision_info(img_ctx)

    # Prepare the inputs for the model
    model_inputs = processor(
        text=[text_input],
        images=image_inputs,
        return_tensors="pt",
    ).to(device)

    # Generate text with the model
    generated_ids = model.generate(
        **model_inputs,
        max_new_tokens=max_new_tokens,
        do_sample=False,
        eos_token_id=processor.tokenizer.eos_token_id
    )

    # Trim the generated ids to remove the input ids
    trimmed_generated_ids = [out_ids[len(in_ids):] for in_ids, out_ids in zip(model_inputs.input_ids, generated_ids)]

    # Decode the output text
    output_text = processor.batch_decode(
        trimmed_generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False
    )

    return output_text[0]


`torch_dtype` is deprecated! Use `dtype` instead!
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Loading checkpoint shards:   0%|          | 0/5 [00:00<?, ?it/s]

The image processor of type `Qwen2VLImageProcessor` is now loaded as a fast processor by default, even if the model checkpoint was saved with a slow processor. This is a breaking change and may produce slightly different outputs. To continue using the slow processor, instantiate this class with `use_fast=False`. Note that this behavior will be extended to all models in a future release.
You have video processor config saved in `preprocessor.json` file which is deprecated. Video processor configs should be saved in their own `video_preprocessor.json` file. You can rename the file or load and save the processor back which renames it automatically. Loading from `preprocessor.json` will be removed in v5.0.


## Text Generation and Evaluate Model

In [None]:
!pip install bert-score pandas



In [None]:
from bert_score import score as bertscore
import pandas as pd
import re
from tqdm.auto import tqdm

def _extract_between(text: str, start_token: str, end_token: str) -> str:
    """Extract substring between start_token and end_token; if end_token not found, take to end."""
    if text is None:
        return ""
    i = text.find(start_token)
    if i == -1:
        return text.strip()
    i += len(start_token)
    j = text.find(end_token, i)
    if j == -1:
        return text[i:].strip()
    return text[i:j].strip()

def extract_human_section_from_messages(messages: list) -> str:
    """
    In your dataset, the reference is stored in the assistant 'text' block:
    '<|human|> ... <|system|> {...}'
    This function pulls the <|human|> section out as the reference text.
    """
    # Find the last assistant turn with text content
    for turn in reversed(messages):
        if turn.get("role") == "assistant":
            for block in turn.get("content", []):
                if block.get("type") == "text" and isinstance(block.get("text"), str):
                    txt = block["text"]
                    return _extract_between(txt, "<|human|>", "<|system|>")
    return ""

def extract_human_section_from_model_output(model_output: str) -> str:
    """
    Your model is instructed to answer in two tagged sections. We evaluate only the <|human|> section.
    If the tags are missing, we fall back to the whole output.
    """
    if "<|human|>" in model_output:
        return _extract_between(model_output, "<|human|>", "<|system|>")
    # Fallback: try to trim any trailing JSON if present
    # Heuristic: cut at first '{' that looks like the system JSON start
    m = re.search(r"\n\{", model_output)
    trimmed = model_output[:m.start()] if m else model_output
    return trimmed.strip()

def batch_generate_human_sections(model, processor, dataset, max_new_tokens=512, device="cuda"):
    """
    Iterates over an HF Dataset where each item has {"messages": [...]}
    Returns lists: predictions (candidate human texts), references (gold human texts).
    """
    preds, refs = [], []
    for i in tqdm(range(len(dataset)), desc="Generating"):
        item = dataset[i]
        messages = item["messages"]
        # reference
        ref = extract_human_section_from_messages(messages)

        # prediction
        pred_raw = generate_text_from_sample(model, processor, item["messages"], max_new_tokens=max_new_tokens, device=device)
        pred = extract_human_section_from_model_output(pred_raw)

        preds.append(pred)
        refs.append(ref)
    return preds, refs



In [None]:
def compute_bertscore(preds, refs, device_str="cuda", model_type="microsoft/deberta-xlarge-mnli",
                      use_idf=False, batch_size=32, rescale_with_baseline=True, lang=None):
    """
    Computes BERTScore. Set lang='en' to force English baselines; otherwise the library auto-infers.
    """
    P, R, F1 = bertscore(
        cands=preds,
        refs=refs,
        model_type=model_type,
        device=device_str if torch.cuda.is_available() and device_str == "cuda" else "cpu",
        num_layers=None,            # default for chosen model
        idf=use_idf,
        batch_size=batch_size,
        rescale_with_baseline=rescale_with_baseline,
        lang=lang                   # e.g., 'en' or None
    )
    # Convert tensors to Python floats
    return [p.item() for p in P], [r.item() for r in R], [f.item() for f in F1]

In [None]:
def evaluate_with_bertscore(model, processor, dataset, out_csv_path=None,
                            gen_max_new_tokens=512, gen_device="cuda",
                            bs_device="cuda", bs_model="microsoft/deberta-xlarge-mnli",
                            bs_use_idf=False, bs_batch_size=32, bs_lang="en"):
    """
    Full pipeline: generate → compute BERTScore → (optional) save CSV → print summary.
    """
    preds, refs = batch_generate_human_sections(
        model, processor, dataset,
        max_new_tokens=gen_max_new_tokens, device=gen_device
    )

    P, R, F1 = compute_bertscore(
        preds, refs,
        device_str=bs_device,
        model_type=bs_model,
        use_idf=bs_use_idf,
        batch_size=bs_batch_size,
        rescale_with_baseline=True,
        lang=bs_lang
    )

    df = pd.DataFrame({
        "index": list(range(len(preds))),
        "prediction_human": preds,
        "reference_human": refs,
        "bertscore_precision": P,
        "bertscore_recall": R,
        "bertscore_f1": F1
    })

    if out_csv_path:
        df.to_csv(out_csv_path, index=False)

    summary = {
        "N": len(F1),
        "precision_mean": float(pd.Series(P).mean()),
        "recall_mean": float(pd.Series(R).mean()),
        "f1_mean": float(pd.Series(F1).mean())
    }

    print("BERTScore Summary (model: {}):".format(bs_model))
    print(f"  Samples: {summary['N']}")
    print(f"  Precision (mean): {summary['precision_mean']:.4f}")
    print(f"  Recall    (mean): {summary['recall_mean']:.4f}")
    print(f"  F1        (mean): {summary['f1_mean']:.4f}")

    return df, summary

In [None]:
df_scores, summary = evaluate_with_bertscore(
    model, processor, test_dataset,
    out_csv_path="bertscore_test.csv",
    gen_max_new_tokens=512,
    gen_device="cuda",
    bs_device="cuda",
    bs_model="roberta-large",   # strong general-purpose model
    bs_use_idf=False,                           # set True to weight rarer tokens higher
    bs_batch_size=32,
    bs_lang="en"
)

Generating:   0%|          | 0/600 [00:00<?, ?it/s]

The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


BERTScore Summary (model: roberta-large):
  Samples: 600
  Precision (mean): 0.3943
  Recall    (mean): 0.1863
  F1        (mean): 0.2836
