In [None]:
# -----------------------------
# Step 0: Install packages
# -----------------------------
!pip install torch torchvision transformers datasets diffusers accelerate --quiet
# Install or upgrade bitsandbytes
!pip install -U bitsandbytes



In [2]:
# -----------------------------
# Step 1: Imports
# -----------------------------
import os
import json
import pandas as pd
from PIL import Image
from tqdm import tqdm
import torch
from transformers import pipeline, Blip2Processor, Blip2ForConditionalGeneration
from transformers import BitsAndBytesConfig


In [3]:
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from transformers import BitsAndBytesConfig

TIFA_MODEL_NAME = "tifa-benchmark/llama2_tifa_question_generation"
# Doing 8 bit model quantization
bnb_config = BitsAndBytesConfig(
    load_in_8bit=True,
    llm_int8_enable_fp32_cpu_offload=True
)

# Loading model and tokenizer separately
tokenizer = AutoTokenizer.from_pretrained(TIFA_MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(
    TIFA_MODEL_NAME,
    device_map="auto",
    quantization_config=bnb_config
)

# Creating pipeline WITHOUT passing quantization_config
tifa_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    device_map="auto"
)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/725 [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/414 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/626 [00:00<?, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/9.98G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/3.50G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/132 [00:00<?, ?B/s]

Device set to use cuda:0


In [4]:
# 8-bit config for BLIP-2
bnb_blip = BitsAndBytesConfig(
    load_in_8bit=True,
    llm_int8_enable_fp32_cpu_offload=True
)

# Load BLIP-2 processor and model
blip_processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
blip_model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",
    torch_dtype=torch.float16,
    quantization_config=bnb_blip
)


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

preprocessor_config.json:   0%|          | 0.00/432 [00:00<?, ?B/s]

processor_config.json:   0%|          | 0.00/68.0 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/882 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

added_tokens.json:   0%|          | 0.00/23.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/548 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


config.json: 0.00B [00:00, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/10.0G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/141 [00:00<?, ?B/s]

In [8]:
# -----------------------------
# Step 2: Paths & settings
# -----------------------------
CSV_PATH = "/content/drive/MyDrive/DrawBenchPrompts.csv"
IMAGE_BASE_PATH = "/content/drive/MyDrive/eval_images"

IMAGE_FOLDERS = [
    "SDXL_drawbench_images"   # ← This comment says you will switch to SD2 or Flux-Dev variants
]

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# -----------------------------
# Step 3: Load CSV
# -----------------------------
df = pd.read_csv(CSV_PATH)
assert "image_name" in df.columns and "Prompts" in df.columns


In [9]:
# -----------------------------
# Step 5: TIFA prompt function
# -----------------------------
def create_qg_prompt(caption):
    intro = (
        "Given an image description, generate one or two multiple-choice questions that verifies if "
        "the image description is correct.\nClassify each concept into a type "
        "(object, human, animal, food, activity, attribute, counting, color, material, spatial, location, shape, other), "
        "and then generate a question for each type.\n"
    )
    return f"<s>[INST] <<SYS>>\n{intro}\n<</SYS>>\n\nDescription: {caption} [/INST] Entities:"
# This is the prompt used in the TIFA github repository

# -----------------------------
# Step 6: Evaluation loop
# -----------------------------
all_results = {}

for folder in IMAGE_FOLDERS:
    folder_path = os.path.join(IMAGE_BASE_PATH, folder)
    folder_results = []

    for _, row in tqdm(df.iterrows(), total=len(df), desc=f"Processing {folder}"):
        img_name = str(row["image_name"])
        caption = row["Prompts"]

        # Find image file
        img_path = None
        for ext in [".jpg", ".jpeg", ".png"]:
            tmp = os.path.join(folder_path, f"{img_name}{ext}")
            if os.path.exists(tmp):
                img_path = tmp
                break
        if img_path is None:
            print(f"Image {img_name} not found in {folder}")
            continue

        # Load image
        image = Image.open(img_path).convert("RGB")

        # Generate QA prompt
        prompt = create_qg_prompt(caption)

        # -----------------------------
        # Generate QA pairs safely
        # -----------------------------
        sequences = tifa_pipeline(
            prompt,
            do_sample=False,
            num_beams=5,
            num_return_sequences=1,
            max_new_tokens=150  # <-- I set it to 150 as its enough token size which results into faster inference
        )

        output_text = sequences[0]["generated_text"]
        if output_text.startswith(prompt):
            output_text = output_text[len(prompt):]

        output_text = output_text.split("\n\n")[0]

        # -----------------------------
        # Parse output into (question, answer) pairs
        # -----------------------------
        qa_pairs = []
        lines = output_text.split("\n")
        current_q, current_a = None, None
        for line in lines:
            line = line.strip()
            if line.startswith("Q:"):
                current_q = line[2:].strip()
            elif line.startswith("A:") and current_q:
                current_a = line[2:].strip()
                qa_pairs.append((current_q, current_a))
                current_q, current_a = None, None

        # -----------------------------
        # Compute faithfulness using BLIP-2
        # -----------------------------
        correct, total = 0, 0
        for q, a in qa_pairs:
            inputs = blip_processor(images=image, text=q, return_tensors="pt").to(DEVICE)
            output_ids = blip_model.generate(**inputs)
            pred_answer = blip_processor.decode(output_ids[0], skip_special_tokens=True).lower()
            if pred_answer.strip() == a.lower():
                correct += 1
            total += 1
        faithfulness = correct / total if total > 0 else 0.0

        folder_results.append({
            "image_name": img_name,
            "caption": caption,
            "faithfulness_score": faithfulness,
            "qa_pairs": qa_pairs
        })

        # -----------------------------
        # Incrementally save JSON to prevent data loss
        # -----------------------------
        all_results[folder] = {
            "per_image": folder_results,
            "average_score": sum(r["faithfulness_score"] for r in folder_results) / len(folder_results) if folder_results else 0.0
        }
        with open("/content/drive/MyDrive/SDXL_generated.json", "w") as f:
            json.dump(all_results, f, indent=4)

print("Evaluation complete! Results saved to SDXL_generated.json")


Processing SDXL_drawbench_images:   0%|          | 0/200 [00:00<?, ?it/s]The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
Processing SDXL_drawbench_images:   5%|▌         | 10/200 [05:28<1:42:23, 32.33s/it]You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset
Processing SDXL_drawbench_images: 100%|██████████| 200/200 [1:47:59<00:00, 32.40s/it]

Evaluation complete! Results saved to SDXL_generated.json





In [None]:

# Computing TIFA score using the NLI model as mentioned in TIFA Repository

import os
import json
import re
import time
from tqdm import tqdm

import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

# OPTIONAL: if WordNet not available
try:
    import nltk
    from nltk.corpus import wordnet as wn
except Exception:
    import nltk
    nltk.download("wordnet")
    from nltk.corpus import wordnet as wn

# -----------------------------
# Settings / paths
# -----------------------------
INPUT_JSON = "/content/drive/MyDrive/SDXL_generated.json"
OUTPUT_JSON = "/content/drive/MyDrive/sdxl_tifa.json" 
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# -----------------------------
# Load existing JSON
# -----------------------------
with open(INPUT_JSON, "r") as f:
    data = json.load(f)

# -----------------------------
# Load NLI model (Roberta large MNLI)
# -----------------------------
print("Loading NLI model (roberta-large-mnli)...")
nli_name = "roberta-large-mnli"
tokenizer = AutoTokenizer.from_pretrained(nli_name)
model = AutoModelForSequenceClassification.from_pretrained(nli_name).to(DEVICE)
model.eval()
print("Loaded.")

# -----------------------------
# Helpers
# -----------------------------
def normalize(text):
    if text is None:
        return ""
    return re.sub(r"\s+", " ", text.strip()).strip()

def get_synonyms(word, max_syn=6):
    """Return a short set of lemma names for WordNet synsets of 'word'."""
    if not word or len(word.split()) > 3:
        return []
    syns = set()
    try:
        for s in wn.synsets(word):
            for l in s.lemmas():
                name = l.name().replace("_", " ")
                if name.lower() != word.lower():
                    syns.add(name)
            if len(syns) >= max_syn:
                break
    except Exception:
        pass
    return list(syns)[:max_syn]

def nli_entailment_prob(premise, hypothesis):
    """Return probability that hypothesis is ENTAILED by premise (index 2 of MNLI)."""
    premise = normalize(premise)
    hypothesis = normalize(hypothesis)
    if len(hypothesis)==0:
        return 0.0
    inputs = tokenizer(premise, hypothesis, return_tensors="pt", truncation=True, max_length=512).to(DEVICE)
    with torch.no_grad():
        logits = model(**inputs).logits
        probs = torch.softmax(logits, dim=-1).cpu().numpy()[0]
    # MNLI output order: [contradiction, neutral, entailment]
    return float(probs[2])

# Heuristic templates to turn Q+A into natural hypotheses
def generate_hypotheses_from_qa(q, a):
    q0 = normalize(q).lower()
    a0 = normalize(a).lower()
    hypos = []

    # canonicalize yes/no
    if a0 in {"y", "yes", "true", "yeah", "yep", "yup"}:
        a_yesno = "yes"
    elif a0 in {"n", "no", "false", "nope"}:
        a_yesno = "no"
    else:
        a_yesno = None

    # 1) is this a/an X?
    m = re.match(r"^is (?:this|it|there) (?:a|an) (.+)\??$", q0)
    if m:
        target = m.group(1)
        if a_yesno == "yes":
            hypos.append(f"There is a {target} in the image.")
            hypos.append(f"The image contains a {target}.")
        elif a_yesno == "no":
            hypos.append(f"There is no {target} in the image.")
            hypos.append(f"The image does not contain a {target}.")
        else:
            # sometimes answer itself is the class (e.g., answer "car")
            hypos.append(f"There is a {a0} in the image.")
            hypos.append(f"The image contains a {a0}.")
        return hypos

    # 2) is the <obj> <attr>?  -> "The <obj> is <answer>."
    m = re.match(r"^is (?:the |a |an )?(.+?) (.+)\??$", q0)
    if m and ("color" not in q0):
        subj = m.group(1)
        # if answer is yes/no about attribute: e.g., "is the car red?" + "yes"
        if a_yesno == "yes" and len(m.group(2).split())==1:
            attr = m.group(2)
            hypos.append(f"The {subj} is {attr}.")
            hypos.append(f"The {subj} appears {attr}.")
        elif a_yesno == "no":
            attr = m.group(2)
            hypos.append(f"The {subj} is not {attr}.")
        else:
            # fallback: The subj is <answer>
            hypos.append(f"The {subj} is {a0}.")
        return hypos

    # 3) what color is the X?
    m = re.match(r"^what color is (?:the |a |an )?(.+)\??$", q0)
    if m:
        subj = m.group(1)
        hypos.append(f"The {subj} is {a0}.")
        hypos.append(f"The {subj} appears {a0}.")
        hypos.append(f"There is a {a0} {subj} in the image.")
        return hypos

    # 4) what type / what kind / what type of vehicle is this?
    if "what type" in q0 or "what kind" in q0 or "what type of" in q0:
        # generic "The object is a {answer}."
        hypos.append(f"The object is a {a0}.")
        hypos.append(f"The image contains a {a0}.")
        hypos.append(f"There is a {a0} in the image.")
        return hypos

    # 5) what animal / what is the animal / what animal is in the picture
    if "what animal" in q0 or "what animal is" in q0 or ("animal" in q0 and q0.startswith("what")):
        hypos.append(f"There is a {a0} in the image.")
        hypos.append(f"The image contains a {a0}.")
        return hypos

    # 6) counting: how many
    m = re.match(r"^how many (.+)\??$", q0)
    if m:
        noun = m.group(1)
        # normalize a0 numbers: accept words or numerals - keep original
        hypos.append(f"There are {a0} {noun} in the image.")
        hypos.append(f"The image contains {a0} {noun}.")
        return hypos

    # 7) yes/no generic starting with is/are/do/does/was/were
    if q0.startswith(("is ", "are ", "was ", "were ", "does ", "do ", "did ")):
        # try to create a simple positive/negative hypothesis
        if a_yesno == "yes":
            # extract noun-like chunk - fallback: use the rest of sentence after verb
            rest = re.sub(r"^(is|are|was|were|does|do|did)\s+", "", q0).rstrip("?")
            hypos.append(f"There is {rest} in the image.")
            hypos.append(f"The {rest} appears in the image.")
        elif a_yesno == "no":
            rest = re.sub(r"^(is|are|was|were|does|do|did)\s+", "", q0).rstrip("?")
            hypos.append(f"There is no {rest} in the image.")
            hypos.append(f"The {rest} does not appear in the image.")
        else:
            hypos.append(f"The answer to the question '{q}' is '{a}'.")
        return hypos

    # 8) fallback: try the most natural sentence form:
    hypos.append(f"The answer to the question '{q}' is '{a}'.")
    # Also try a neutral paraphrase
    # for short noun answers: "There is a {a} in the image."
    if len(a0.split()) <= 3:
        hypos.append(f"There is a {a0} in the image.")
        hypos.append(f"The image contains a {a0}.")
    return hypos

# -----------------------------
# Main rescoring routine
# -----------------------------
print("Starting NLI rescoring...")

# iterate over folders and images
for folder_name, folder_data in data.items():
    per_image = folder_data.get("per_image", [])
    for i, img in enumerate(tqdm(per_image, desc=f"Folder {folder_name}")):
        caption = img.get("caption", "")
        qa_pairs = img.get("qa_pairs", [])
        caption_norm = normalize(caption)

        per_q_scores = []
        for q, a in qa_pairs:
            q = normalize(q)
            a = normalize(a)
            if q == "" or a == "":
                per_q_scores.append(0.0)
                continue

            hypos = generate_hypotheses_from_qa(q, a)

            # compute entailment for all hypos and synonyms fallback
            best_prob = 0.0
            for hypo in hypos:
                prob = nli_entailment_prob(caption_norm, hypo)
                if prob > best_prob:
                    best_prob = prob

            # if best_prob low (<0.55), try synonyms of the answer (only single-token answers)
            if best_prob < 0.55 and len(a.split()) == 1:
                syns = get_synonyms(a)
                for s in syns:
                    # create variant hypotheses by replacing answer token in existing hypos
                    for hypo in hypos:
                        # try simple replacement where it makes sense
                        if re.search(r"\b" + re.escape(a) + r"\b", hypo, flags=re.I):
                            variant = re.sub(r"\b" + re.escape(a) + r"\b", s, hypo, flags=re.I)
                        else:
                            # if answer wasn't present, insert simple templates
                            variant = f"There is a {s} in the image."
                        p = nli_entailment_prob(caption_norm, variant)
                        if p > best_prob:
                            best_prob = p

            per_q_scores.append(best_prob)

        # average for image
        img_score = (sum(per_q_scores) / len(per_q_scores)) if per_q_scores else 0.0
        img["faithfulness_score"] = img_score
        img["_nli_q_scores"] = per_q_scores  # optional per-question probabilities

        # autosave after each image to avoid losing progress
        # write to temp file then move atomically
        try:
            tmp_out = OUTPUT_JSON + ".part"
            with open(tmp_out, "w") as f:
                json.dump(data, f, indent=2)
            os.replace(tmp_out, OUTPUT_JSON)
        except Exception as e:
            print("Autosave failed:", e)

# recompute per-folder average
for folder_name, folder_data in data.items():
    per_image = folder_data.get("per_image", [])
    folder_data["average_score"] = (sum(x.get("faithfulness_score", 0.0) for x in per_image) / len(per_image)) if per_image else 0.0

# final save
with open(OUTPUT_JSON, "w") as f:
    json.dump(data, f, indent=2)

print("Done. Results saved to:", OUTPUT_JSON)
