In [1]:
import base64
import io
import random
from textwrap import dedent
from openai import OpenAI
from pydantic import BaseModel
from tqdm import trange
import json


class Question(BaseModel):
    is_negatable: bool
    negated_question: str

import json
import hashlib
import os
from textwrap import dedent
from filelock import FileLock


CACHE_FILE = "response_cache.json"
LOCK_FILE = "response_cache.lock"

lock = FileLock(LOCK_FILE)

def load_cache():
    """Load cache from file safely with a file lock to prevent corruption."""
    with lock:  # Ensures only one process/thread reads/writes at a time
        if os.path.exists(CACHE_FILE):
            with open(CACHE_FILE, "r", encoding="utf-8") as f:
                try:
                    return json.load(f)
                except json.JSONDecodeError:
                    return {}  # If corruption occurs, reset cache
        return {}

def save_cache(cache):
    """Save cache to file safely with a file lock."""
    with lock:
        with open(CACHE_FILE, "w", encoding="utf-8") as f:
            json.dump(cache, f, indent=4, ensure_ascii=False)

def hash_inputs(system_prompt, user_prompt, output_format):
    """Create a unique hash for a given input."""
    data = f"{system_prompt}|{user_prompt}|{output_format}"
    return hashlib.sha256(data.encode()).hexdigest()

def get_reply(client, system_prompt, user_prompt, output_format):
    """Get a reply from the model or retrieve it from cache with atomicity."""
    input_hash = hash_inputs(system_prompt, user_prompt, output_format)

    with lock:  # Prevent multiple processes from modifying cache at the same time
        cache = load_cache()  # Read latest cache

        if input_hash in cache:
            # print("Using cached response")
            return cache[input_hash]  # Return cached response immediately

    # If not in cache, query the model (without holding lock to avoid blocking other threads)
    completion = client.beta.chat.completions.parse(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": dedent(system_prompt)},
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": dedent(user_prompt)},
                ],
            },
        ],
        response_format=output_format,
        temperature=0
    )

    parsed_output = completion.choices[0].message.parsed.dict()

    with lock:  # Ensure only one process updates cache at a time
        cache = load_cache()  # Reload latest cache to prevent overwrites
        cache[input_hash] = parsed_output
        save_cache(cache)

    return parsed_output


# def get_reply(client, system_prompt, user_prompt, output_format):
    
#     completion = client.beta.chat.completions.parse(
#         model="gpt-4o",
#         messages=[
#             {"role": "system", "content": dedent(system_prompt)},
#             {
#                 "role": "user",
#                 "content": [
#                     {"type": "text", "text": dedent(user_prompt)},
#                 ],
#             },
#         ],
#         response_format=output_format,
#         temperature=0
#     )
#     parsed_output = completion.choices[0].message.parsed.dict()
#     return parsed_output

api_key = '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'
client = OpenAI(api_key=api_key)

improved_prompt = """
**Task:**  
You will be given an question collected from existing visual question answering datasets. Your task is to produce a minimally modified, negated version of the question by inserting a negation (e.g., "not", "do not", "isn't", etc.) in a way that:

1. **Minimal Changes:** Alters the original question as little as possible.
2. **Answer Inversion:** Causes the original correct answer to become incorrect while making one of the originally incorrect answers correct.
3. **Linguistic Accuracy:** Adheres to proper grammar and preserves the semantic intent of the question.

**Special Case:**  
1. Do not negate any background that is provided along with the question (e.g., mathematical conditions, background information, etc). Only negate the question itself (usually the last sentence).
2. If it is not possible to create a valid negation that meets these criteria, return an empty string for the negated question and set the flag `is_negatable` to `false`.

**Output Format:**  
Your response should be an object with the following structure:
{
  "negated_question": "<your negated question (with original background information) here, or an empty string if not negatable>",
  "is_negatable": <true/false>
}
"""

# Load your data.
data = [json.loads(line) for line in open("../robust_vqa/VMCBench-9018.jsonl")]

In [2]:
import json
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

def process_item(idx_item):
    idx, item = idx_item
    # Skip if already processed.
    if 'negated_question' in item:
        return idx, item
    
    question = item['question']
    user_prompt = f"Question: {question}"
    
    # Call your API/function to get the negated question.
    try:
        negated_question = get_reply(client, improved_prompt, user_prompt, Question)
    except Exception as e:
        negated_question = {"negated_question": "", "is_negatable": False}
        print(f"Error processing item {idx}: {e}")
    item["negated_question"] = negated_question
    return idx, item

# Choose an appropriate number of workers.
with ThreadPoolExecutor(max_workers=32) as executor:
    # executor.map returns results in order, but we want progress tracking so we wrap it in tqdm.
    _ = list(tqdm(executor.map(process_item, enumerate(data)), total=len(data)))

# save data as "VMCBench-9018-negated.jsonl"
with open("VMCBench-9018-negated.jsonl", "w") as f:
    for item in data:
        f.write(json.dumps(item) + "\n")

  4%|▎         | 337/9018 [00:20<01:32, 94.02it/s] 

Error processing item 3059: Could not parse response content as the length limit was reached - CompletionUsage(completion_tokens=16384, prompt_tokens=432, total_tokens=16816, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0))
Error processing item 3074: Could not parse response content as the length limit was reached - CompletionUsage(completion_tokens=16384, prompt_tokens=413, total_tokens=16797, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0))
Error processing item 2912: Could not parse response content as the length limit was reached - CompletionUsage(completion_tokens=16384, prompt_tokens=424, total_tokens=16808, completion_tokens_details=Com

  5%|▌         | 451/9018 [05:48<2:51:34,  1.20s/it]

Error processing item 450: Could not parse response content as the length limit was reached - CompletionUsage(completion_tokens=16384, prompt_tokens=750, total_tokens=17134, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0))


 32%|███▏      | 2871/9018 [05:50<09:32, 10.73it/s] 

Error processing item 2870: Could not parse response content as the length limit was reached - CompletionUsage(completion_tokens=16384, prompt_tokens=417, total_tokens=16801, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0))


100%|██████████| 9018/9018 [06:25<00:00, 23.41it/s]


In [39]:
import json
import copy
import random
random.seed(1234)

data = [json.loads(line) for line in open("VMCBench-9018-negated.jsonl")]
negatable_data = [item for item in data if item['negated_question']['is_negatable']]
print(len(negatable_data), "out of", len(data), "questions are negatable.")


with open("VNBench-7379.jsonl", "w") as f:
    for item in negatable_data:
        correct_choice = item[item['answer']]
        incorrect_choices = [item[key] for key in 'ABCD' if key != item['answer']]
        incorrect_choice = random.sample(incorrect_choices, 1)[0]

        choices = [correct_choice, incorrect_choice]
        random.shuffle(choices)

        new_item = copy.deepcopy(item)
        new_item.pop('negated_question')

        new_item['original_question'] = item['question']
        new_item['negated_question'] = item['negated_question']['negated_question']
        new_item['A'] = choices[0]
        new_item['B'] = choices[1]

        new_item['original_answer'] = "AB"[choices.index(correct_choice)]
        new_item['negated_answer'] = "AB"[choices.index(incorrect_choice)]

        new_item.pop('question')
        new_item.pop('answer')
        new_item.pop('C')
        new_item.pop('D')
        f.write(json.dumps(new_item) + "\n")

7379 out of 9018 questions are negatable.


In [46]:
import json

data = [json.loads(line) for line in open("VNBench-7379.jsonl")]

with open("VNBench-7379-original.jsonl", "w") as f:
    for item in data:
        new_item = copy.deepcopy(item)
        new_item["question"] = item["original_question"]
        new_item["answer"] = item["original_answer"]
    
        f.write(json.dumps(new_item) + "\n")

with open("VNBench-7379-negated.jsonl", "w") as f:
    for item in data:
        new_item = copy.deepcopy(item)
        new_item["question"] = item["negated_question"]
        new_item["answer"] = item["negated_answer"]
    
        f.write(json.dumps(new_item) + "\n")

In [1]:
import json
data = [json.loads(line) for line in open("VNBench-7379.jsonl")]

# how many questions with correct answer A?
print(len([item for item in data if item['original_answer'] == 'A']), "out of", len(data), "questions have correct answer A.")

3651 out of 7379 questions have correct answer A.


In [5]:
import json
data = [json.loads(line) for line in open("VNBench-7379.jsonl")]

import random
random.seed(1234)
for item in random.sample(data, 30):
    print(item['original_question'])
    print(item['negated_question'])
    print()


What is the room adjacent to the kitchen?
What is not the room adjacent to the kitchen?

what is the bottom word?
what is not the bottom word?

If cricket was removed from the food web, there would be
If cricket was not removed from the food web, there would be

Select the correct cross-section from the A-A cross-section in the following figure( )<image 1>
Select the incorrect cross-section from the A-A cross-section in the following figure( )

Which is the top predator in the food web?
Which is not the top predator in the food web?

What language is the text in this picture written in?
What language is the text in this picture not written in?

Who wrote this book?
Who did not write this book?

Which model can achieve the best ImageNet 10-shot Accuracy score?
Which model cannot achieve the best ImageNet 10-shot Accuracy score?

What was the market value of peer-to-peer consumer lending in France in 2018?
What was not the market value of peer-to-peer consumer lending in France in 2018?


In [17]:
from PIL import Image
import base64
import io

def get_image(image_base64):
    image_data = base64.b64decode(image_base64)
    return Image.open(io.BytesIO(image_data))

In [20]:
# import json
# data = [json.loads(line) for line in open("VNBench-7379.jsonl")]

# # count category of data
# from collections import Counter
# counter = Counter([item['category'] for item in data])


# # for each category, sample one question
# import random
# random.seed(1234)

# for category in counter:
#     print(category)
#     print()
#     category_data = [item for item in data if item['category'] == category]
#     for item in random.sample(category_data, 2):
#         print(item['negated_question'])
#         print('A.', item['A'])
#         print('B.', item['B'])
#         print(item['negated_answer'])
#         display(get_image(item['image']))
#         print()

In [None]:
questions = set()
for item in data:
    question = item['question']
    # if question in questions:
        print(f"Duplicate question: {question}")
    questions.add(question)
print(len(questions), len(data))

In [4]:
idx = 500
print(data[idx]['question'])
question = data[idx]['question']
user_prompt = f"""
Question: {question}
"""

negated_question = get_reply(client, improved_prompt, user_prompt, Question)
print(negated_question)

如图，直线l1∥l2，∠1＝50°，∠2＝75°，则∠3＝（）
{'is_negatable': False, 'negated_question': ''}


In [None]:
# system_prompt = "\nI'm going to create a VQA benchmark dataset with negated questions. I'll give you the original question.\nNegate the question by adding 'not' (such as be not, do not, which is not, etc) according to the semantic meaning of the question and linguistic grammar rules.\nBy negating the question, make sure it will make the original correct answer incorrect, and any of the incorrect answer to be correct.\nIf you cannot negate this question, return the empty string with the is_negatable flag set to false.\nMake sure you do the minimal edits to the question to negate it.\n"

In [None]:
# data = [json.loads(line) for line in open("../robust_vqa/VMCBench-9018.jsonl")]

# for idx in trange(len(data)):
#     if 'negated_question' in data[idx]:
#         continue
#     question = data[idx]['question']
#     user_prompt = f"""
#     Question: {question}
#     """

#     negated_question = get_reply(client, improved_prompt, user_prompt, Question)
#     data[idx]["negated_question"] = negated_question
