# Instructions
- Fill in the api keys that you are going to use and whether you are on Google Colab in the next cell and run it.
- Then run all cells under the "Setup for all models" section and the "Model Interface" section
- If you want to run CLIP models execute the cells in the "CLIP models" section, else skip to the "VLMs" section to run all other VLMs
- In either section select the models you want to run
- The "Localisation Experiment" section contains the code for section 5.1 in the paper, that tests gpt4o and o4-mini for their localisation capabilities

In [None]:
# FILL IN THOSE KEYS THAT YOU ARE GOING TO USE
ANTHROPIC_KEY = ""        # Required for Claude Sonnet
OPENAI_KEY = ""           # Required for gpt-4o and o4-mini
OPENROUTER_KEY = ""       # Required for all other API models
HUGGINGFACE_TOKEN = ""    # Required for Paligemma
# ARE YOU ON GOOGLE COLAB?
COLAB = True

# Setup for all models

In [None]:
#%%capture
!pip install open_clip_torch
!pip install transformers
!pip install --upgrade datasets
!pip install openai
!pip install anthropic
!pip install gdown
!pip install tqdm

In [None]:
from transformers import AutoProcessor, PaliGemmaForConditionalGeneration
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
import requests
import re
import json
from PIL import Image
import csv
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import pandas as pd
from datasets import Dataset
import os
import base64
from io import BytesIO
import anthropic
import openai
from openai import OpenAI
from urllib.request import urlopen
import open_clip
from open_clip import create_model_from_pretrained, get_tokenizer
from huggingface_hub import login
from datasets import load_dataset
from abc import ABC, abstractmethod
import gdown
from collections import Counter

In [None]:
dataset = load_dataset("nilshoehing/rocketsciencebench", split="train")
if HUGGINGFACE_TOKEN:
  login(token=HUGGINGFACE_TOKEN)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
if COLAB:
  from google.colab import drive
  drive.mount('/content/drive')
  PATH = "/content/drive/MyDrive/RocketScienceResults/"
else:
  PATH = "RocketScienceResults/"
os.makedirs(PATH, exist_ok=True)

# Model-Interface

In [None]:
WINOGROUND_SCORES_FILE = PATH+"vlm_winoground_scores.csv"
CATEGORY_SCORES_FILE = PATH+"vlm_category_scores.csv"

def update_or_add_row(csv_filename, new_row, id_column='model_name'):
    file_exists = os.path.isfile(csv_filename)

    if file_exists:
        with open(csv_filename, mode='r', newline='') as file:
            reader = csv.DictReader(file)
            rows = list(reader)
            fieldnames = reader.fieldnames
    else:
        rows = []
        fieldnames = list(new_row.keys())

    row_exists = False
    for index, row in enumerate(rows):
        if row[id_column] == new_row[id_column]:
            rows[index] = new_row
            row_exists = True
            break

    if not row_exists:
        rows.append(new_row)

    with open(csv_filename, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)

def get_logfile_name(model_name):
    model_base_name = model_name.split("/")[-1]
    return f"{PATH}{model_base_name}_results.json"

def extract_single_quoted_strings(text):
    return re.findall(r"'(.*?)'", text[1:-1])

class Model(ABC):
    """Abstract base class for models."""

    def __init__(self, model_name, reasoning=False):
        self.reasoning = reasoning
        self.model_name = model_name
        self.output_file = get_logfile_name(model_name)
        if reasoning:
            self.output_file = self.output_file.replace(".json", "_reasoning.json")

    @abstractmethod
    def generate(self, text, images) -> str:
        """Generate a response from the model given a prompt."""
        pass

    def preprocess_image(self, image):
      image = image.resize((1024, 1024))
      buffered = BytesIO()
      image.save(buffered, format="PNG")
      img_bytes = buffered.getvalue()
      return base64.b64encode(img_bytes).decode("utf-8")

    def evaluate(self, dataset):
        # Start from existing state if exists
        if os.path.exists(self.output_file):
          with open(self.output_file, "r") as f:
            results = json.load(f)
            starting_index = len(results)
        else:
          results = []
          starting_index = 0

        for i, item in enumerate(tqdm(dataset)):
          if i < starting_index:
            continue

          image1 = self.preprocess_image(item["image1"])
          image2 = self.preprocess_image(item["image2"])

          if self.reasoning:
            select_text_prompt = "Which caption fits the image best? Reason about it and at the end write \"RESPONSE\" and reply only with the number 1 or 2. 1.) " + item["text1"] + " 2.) " + item["text2"]
            select_image_prompt1 = "Which image fits the caption best? Reason about it and at the end write \"RESPONSE\" and reply only with the number 1 or 2. Caption: " + item["text1"]
            select_image_prompt2 = "Which image fits the caption best? Reason about it and at the end write \"RESPONSE\" and reply only with the number 1 or 2. Caption: " + item["text2"]
          else:
            select_text_prompt = "Which caption fits the image best? Reply only with the number 1 or 2, nothing else. 1.) " + item["text1"] + " 2.) " + item["text2"]
            select_image_prompt1 = "Which image fits the caption best? Reply only with the number 1 or 2, nothing else. Caption: " + item["text1"]
            select_image_prompt2 = "Which image fits the caption best? Reply only with the number 1 or 2, nothing else. Caption: " + item["text2"]

          result_entry = {
              "id": i,
              "select_text_prompt": select_text_prompt,
              "select_image_prompt1": select_image_prompt1,
              "select_image_prompt2": select_image_prompt2,
              "response_1": self.generate(select_text_prompt, [image1]),
              "response_2": self.generate(select_text_prompt, [image2]),
              "response_3": self.generate(select_image_prompt1, [image1, image2]),
              "response_4": self.generate(select_image_prompt2, [image1, image2]),
          }
          results.append(result_entry)

          # Save after each iteration to avoid losing progress
          with open(self.output_file, "w") as f:
              json.dump(results, f, indent=4)

    def get_text_image_group_scores(self, result_file=WINOGROUND_SCORES_FILE):
        with open(self.output_file, "r") as f:
          data = json.load(f)

        text_correct_count = 0
        image_correct_count = 0
        group_correct_count = 0
        for item in data:
          if self.reasoning:
              res1 = re.sub(r'\D', '', item["response_1"].split("RESPONSE")[-1])
              res2 = re.sub(r'\D', '', item["response_2"].split("RESPONSE")[-1])
              res3 = re.sub(r'\D', '', item["response_3"].split("RESPONSE")[-1])
              res4 = re.sub(r'\D', '', item["response_4"].split("RESPONSE")[-1])
          else:
            res1 = re.sub(r'\D', '', item["response_1"])
            res2 = re.sub(r'\D', '', item["response_2"])
            res3 = re.sub(r'\D', '', item["response_3"])
            res4 = re.sub(r'\D', '', item["response_4"])

          text_correct_count += 1 if res1 == "1" and res2 == "2" else 0
          image_correct_count += 1 if res3 == "1" and res4 == "2" else 0
          group_correct_count += 1 if res1 == "1" and res2 == "2" and res3 == "1" and res4 == "2" else 0

        denominator = len(data)
        model = self.model_name if not self.reasoning else self.model_name + "_reasoning"
        new_row = {'model_name': model, 'text_score': text_correct_count/denominator, 'image_score': image_correct_count/denominator, 'group_score': group_correct_count/denominator}
        update_or_add_row(result_file, new_row)

    def get_results_by_category(self, result_file=CATEGORY_SCORES_FILE):
        with open(self.output_file, "r") as f:
          data = json.load(f)

        keys = [
            "Horizontal Position", "Vertical Position", "Depth",
            "Proximity", "Order", "Absolute Position", "Other"
        ]
        results = {key: {"text_correct_count": 0, "image_correct_count": 0, "group_correct_count": 0} for key in keys}

        categories = [extract_single_quoted_strings(d) for d in dataset["Category"]]
        for i, item in enumerate(data):
          if self.reasoning:
            res1 = re.sub(r'\D', '', item["response_1"].split("RESPONSE")[-1])
            res2 = re.sub(r'\D', '', item["response_2"].split("RESPONSE")[-1])
            res3 = re.sub(r'\D', '', item["response_3"].split("RESPONSE")[-1])
            res4 = re.sub(r'\D', '', item["response_4"].split("RESPONSE")[-1])
          else:
            res1 = re.sub(r'\D', '', item["response_1"])
            res2 = re.sub(r'\D', '', item["response_2"])
            res3 = re.sub(r'\D', '', item["response_3"])
            res4 = re.sub(r'\D', '', item["response_4"])

          cats = categories[i]
          for cat in cats:
              results[cat]["text_correct_count"] += 1 if res1 == "1" and res2 == "2" else 0
              results[cat]["image_correct_count"] += 1 if res3 == "1" and res4 == "2" else 0
              results[cat]["group_correct_count"] += 1 if res1 == "1" and res2 == "2" and res3 == "1" and res4 == "2" else 0

        model = self.model_name if not self.reasoning else self.model_name + "_reasoning"
        new_row = {'model_name': model}

        flattened_categories = [item for s in categories for item in s]
        c = Counter(flattened_categories)
        for key, value in results.items():
            new_row[f"{key}_ts"] = value["text_correct_count"] / c[key]
            new_row[f"{key}_is"] = value["image_correct_count"] / c[key]
            new_row[f"{key}_gs"] = value["group_correct_count"] / c[key]
        print(result_file)
        update_or_add_row(result_file, new_row)

# CLIP models

In [None]:
class OpenCLIPModel(Model):
    def __init__(self, model_name, model_path):
        super().__init__(model_name+model_path.split("/")[-1])
        if model_path.split("/")[-1] == "negCLIP.pt":
          gdown.download("https://drive.google.com/uc?id=1ooVVPxB-tvptgmHlIMMFGV3Cg-IrhbRZ&confirm=t", quiet=False)

        self.model, self.preprocess = create_model_from_pretrained(model_name, pretrained=model_path,device=device, load_weights_only=False)
        self.tokenizer = get_tokenizer(model_name)
        print(model_name, model_path)
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model.to(self.device)
        self.model.eval()

    def generate(self, text, images) -> str:
        pass # not used for CLIP models

    def evaluate(self, dataset):
        results = []
        for i, item in enumerate(tqdm(dataset)):

          image = torch.stack([self.preprocess(item["image1"].convert("RGB")), self.preprocess(item["image2"].convert("RGB"))]).to(device)
          text = self.tokenizer([item["text1"], item["text2"]], context_length=self.model.context_length).to(device)

          with torch.no_grad(), torch.cuda.amp.autocast():
            image_features = self.model.encode_image(image)
            text_features = self.model.encode_text(text)
            image_features /= image_features.norm(dim=-1, keepdim=True)
            text_features /= text_features.norm(dim=-1, keepdim=True)
            image_probs = (100.0 * text_features @ image_features.T) # images (columns), texts (rows)

          select_text_prompt = ""
          select_image_prompt1 = item["text1"]
          select_image_prompt2 = item["text2"]

          result_entry = {
              "id": i,
              "select_text_prompt": select_text_prompt,
              "select_image_prompt1": select_image_prompt1,
              "select_image_prompt2": select_image_prompt2,
              "response_1": "1" if image_probs[0][0].item() > image_probs[1][0].item() else "2",
              "response_2": "2" if image_probs[1][1].item() > image_probs[0][1].item() else "1",
              "response_3": "1" if image_probs[0][0].item() > image_probs[0][1].item() else "2",
              "response_4": "2" if image_probs[1][1].item() > image_probs[1][0].item() else "1",
          }
          results.append(result_entry)

          # Save after each iteration to avoid losing progress
          with open(self.output_file, "w") as f:
              json.dump(results, f, indent=4)

In [None]:
# This shows a list of available models
open_clip.list_pretrained()

In [None]:
# Add all models to the list, each model is identified by a pair of strings
models = [('EVA02-B-16', 'merged2b_s8b_b131k'),('EVA02-L-14-336', 'merged2b_s6b_b61k'),
 ('ViT-B-16-SigLIP','webli'),('ViT-L-16-SigLIP-384', 'webli'),('ViT-L-14-CLIPA', 'datacomp1b'),
  ('ViT-L-16-SigLIP2-512', 'webli'),('coca_ViT-B-32', 'laion2b_s13b_b90k'),('coca_ViT-L-14', 'laion2b_s13b_b90k'),
   ('ViT-B-16-SigLIP2-512', 'webli'),('ViT-B-16', 'openai'),('ViT-B-32', 'openai')]

# model.evaluate() produces json logs of the responses
# model.get_results_by_category() and model.get_text_image_group_scores() produce csv tables with scores
# all files will be placed in the RocketScienceResults folder
for model_name, model_path in models:
  openclip_model = OpenCLIPModel(model_name, model_path)
  openclip_model.evaluate(dataset)
  openclip_model.get_results_by_category()
  openclip_model.get_text_image_group_scores()

# VLMs

In [None]:
class OpenAIModel(Model):
    def __init__(self, api_key: str, model_name: str = "gpt-4o", reasoning=False, base_url ="https://api.openai.com/v1"):
        super().__init__(model_name, reasoning = reasoning)
        self.client = OpenAI(api_key=api_key, base_url=base_url)

    def generate(self, text, images) -> str:
      delay = 1
      for i in range(6):
        try:
            response = self.client.chat.completions.create(
            model=self.model_name,
            messages=[
              {"role": "user", "content": [
                  {"type": "text", "text": text},
                  *[{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image}"}} for image in images]
              ]}
            ],
            temperature=0.0,
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Unexpected error: {e}")
            break
        time.sleep(delay * (2 ** i))  # Exponential backoff
      return None

class OpenAIReasoningModel(Model):
    def __init__(self, api_key: str, model_name: str = "o4-mini", base_url ="https://api.openai.com/v1"):
        super().__init__(model_name)
        self.client = OpenAI(api_key=api_key, base_url=base_url)

    def generate(self, text, images) -> str:
      for i in range(6):
        try:
            response = self.client.chat.completions.create(
            model=self.model_name,
            messages=[
              {"role": "user", "content": [
                  {"type": "text", "text": text},
                  *[{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image}"}} for image in images]
              ]}
            ],
            reasoning_effort="medium",
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Unexpected error: {e}")
            break
        time.sleep(delay * (2 ** i))  # Exponential backoff
      return None

class AnthropicModel(Model):
    def __init__(self, api_key: str, model_name: str = "claude-3-7-sonnet-20250219"):
        super().__init__(model_name)
        self.client = anthropic.Anthropic(api_key=api_key)

    def generate(self, text, images) -> str:
      message = self.client.messages.create(
        model=self.model_name,
        max_tokens=50,
        temperature=0.0,
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": text
                    },
                    *[{"type": "image", "source": {"type": "base64", "media_type": "image/png","data": image}} for image in images]
                ]
            }
        ]
      )
      return message.content[0].text

class PaliGemma(Model):
    def __init__(self, model_name: str):
        super().__init__(model_name.split("/")[-1])
        self.model = PaliGemmaForConditionalGeneration.from_pretrained(model_name)
        self.model = self.model.eval().to(device)
        self.processor = AutoProcessor.from_pretrained(model_name)

    def preprocess_image(self, image):
        return image

    def generate(self, text, images) -> str:
        inputs = self.processor(images=[images], text="answer en " + text + "\n", return_tensors="pt").to(device)
        output = self.model.generate(**inputs, max_new_tokens=20, top_k=1)
        res = self.processor.decode(output[0], skip_special_tokens=True)
        return res.split("\n")[-1]

In [None]:
# AVAILABLE MODELS
# VLMs
# model = AnthropicModel(model_name="claude-3-7-sonnet-20250219", api_key=ANTHROPIC_KEY)
# model = OpenAIModel(model_name = "gpt-4o", api_key=OPENAI_KEY)
# model = OpenAIModel(model_name = "qwen/qwen-2.5-vl-72b-instruct", api_key=OPENROUTER_KEY, base_url="https://openrouter.ai/api/v1")
# model = OpenAIModel(model_name = "qwen/qwen-vl-max", api_key=OPENROUTER_KEY, base_url="https://openrouter.ai/api/v1")
# model = OpenAIModel(model_name = "meta-llama/llama-4-maverick", api_key=OPENROUTER_KEY, base_url="https://openrouter.ai/api/v1")

# # VLMs with explicit chain of thought
# model = OpenAIModel(model_name = "gpt-4o", reasoning=True, api_key=OPENAI_KEY)
# model = OpenAIModel(model_name = "meta-llama/llama-4-maverick", reasoning=True, api_key=OPENROUTER_KEY, base_url="https://openrouter.ai/api/v1")

# # VLMs with implicit chain of thought
# model = OpenAIReasoningModel(model_name="o4-mini",api_key=OPENAI_KEY)
# model = OpenAIModel(model_name = "google/gemini-2.5-pro-preview-03-25", api_key=OPENROUTER_KEY, base_url="https://openrouter.ai/api/v1")

# Paligemma, works on a Colab T4-Gpu
# model.evaluate() produces json logs of the responses
# model.get_results_by_category() and model.get_text_image_group_scores() produce csv tables with scores
# all files will be placed in the RocketScienceResults folder
model = PaliGemma("google/paligemma-3b-mix-448")
model.evaluate(dataset)
model.get_text_image_group_scores()
model.get_results_by_category()

# Localisation Experiment

In [None]:
objects = """{'left': 'black sock', 'right': 'metal bottle'}
{'left': '3 red berries', 'right': '1 glass'}
{'left': 'five red berries', 'right': 'six brussel sprouts'}
{'left': 'light blue mug', 'right': 'orange mug'}
{'left': 'cien bottle', 'right': 'listerine bottle'}
{'left': 'used paper towel', 'right': 'fresh paper towel'}
{'left': 'white cleaning cloth', 'right': 'blue cleaning cloth'}
{'left': 'white and grey traffic post', 'right': 'white and yellow traffic post'}
{'left': 'parking meter', 'right': 'few rocks'}
{'left': 'tree', 'right': 'metal barrier'}
{'left': 'slightly shorter green mailbox', 'right': 'slightly taller green mailbox'}
{'left': 'grey recycling bin', 'right': 'green recycling bin'}
{'left': 'metallic pole', 'right': 'blue sign'}
{'left': 'grey traffic pole', 'right': 'red traffic cone'}
{'left': 'green fuse box', 'right': 'fence'}
{'left': 'wall', 'right': 'traffic cone'}
{'left': 'intact soy milk carton', 'right': 'crushed soy milk carton'}
{'left': 'big button', 'right': 'small button'}
{'left': 'soap bar', 'right': 'liquid soap dispenser'}
{'left': 'blue bottle with a red cap', 'right': 'blue bottle without a red cap'}
{'left': 'slim fire extinguisher', 'right': 'bulky fire extinguisher'}
{'left': 'white panel', 'right': 'fire alarm'}
{'left': 'plant', 'right': 'can of beans'}
{'left': 'concrete wall', 'right': 'four holes arranged in a rectangle shape'}
{'left': 'manhole cover with the text CATV UPC', 'right': 'two packs of passata'}
{'left': 'sidewalk', 'right': 'traffic sign'}
{'left': 'moss', 'right': 'sidewalk'}
{'left': 'dark blue symbol', 'right': 'light blue symbol'}
{'left': 'pole with light blue stripes', 'right': 'pole with dark blue stripes'}
{'left': 'orange leaves', 'right': 'green ivy leaves'}
{'left': 'bicycle lane', 'right': 'road'}
{'left': 'empty bag of flour', 'right': 'full bag of flour'}
{'left': 'Three fir cones', 'right': 'Two fir cones'}
{'left': 'guard railing', 'right': 'sidewalk'}
{'left': 'silver posts', 'right': 'sidewalk'}
{'left': 'wooden pole', 'right': 'tree'}
{'left': 'most of the leaves', 'right': 'a fuse box'}
{'left': 'metal manhole cover', 'right': 'concrete manhole cover'}
{'left': 'sidewalk', 'right': 'street'}
{'left': 'footwalk', 'right': 'green lamps'}
{'left': 'green bottle with a blue head', 'right': 'green bottle with a white head'}
{'left': 'one vertical tea spoon', 'right': 'two horizontal tea spoons'}
{'left': 'grey chairs', 'right': 'purple chairs'}
{'left': 'short window', 'right': 'tall window'}
{'left': 'higher metal pole', 'right': 'metal pole'}
{'left': 'Ivy leaves', 'right': 'tree'}
{'left': 'a further red and white bar', 'right': 'a closer red and white bar'}
{'left': 'green bush', 'right': 'dried plants'}
{'left': 'gate', 'right': 'tallest tree'}
{'left': 'image', 'right': 'peak'}
{'left': 'sidewalk', 'right': 'road'}
{'left': '3 glasses', 'right': '1 red berry'}
{'left': 'five brussel sprouts', 'right': 'six red berries'}
{'left': 'listerine bottle', 'right': 'noir bottle'}
{'left': 'meadow fresh', 'right': 'classic houmous'}
{'left': 'a scale on one roll of toilet paper', 'right': 'two rolls of toilet paper'}
{'left': 'rectangular manhole cover', 'right': 'round manhole cover'}
{'left': 'purple chair', 'right': 'grey chair'}
{'left': 'green bush', 'right': 'dried plants'}
{'left': 'cien bottle', 'right': 'listerine bottle'}"""

In [None]:
def preprocess_image(image):
    image = image.resize((1024, 1024))
    buffered = BytesIO()
    image.save(buffered, format="PNG")
    img_bytes = buffered.getvalue()
    return base64.b64encode(img_bytes).decode("utf-8")

In [None]:
client = OpenAI(api_key=OPENAI_KEY, base_url="https://api.openai.com/v1")
objects_list = objects.replace("'", "\"").split("\n")

responses = {}
i = 0
for d in tqdm(dataset):
  if d["Category"] == ["Horizontal Position"]:
    images = [preprocess_image(d["image1"])]
    obj = json.loads(objects_list[i])
    prompt = "Fill in the rough coordinates of the bounding boxes of the two objects in this json. The coordinates range from 0 to 1024 in a row-major coordinate system (also known as image coordinate system). You can do it: {\"" + obj["right"] + "\": [xmin, ymin, xmax, ymax], \"" + obj["left"] + "\": [xmin, ymin, xmax, ymax]} "
    response = client.chat.completions.create(
            model="o4-mini", #gpt-4o
            messages=[
              {"role": "user", "content": [
                  {"type": "text", "text": prompt},
                  *[{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image}"}} for image in images]
              ]}
            ],
            reasoning_effort="medium"
            )
    print(response.choices[0].message.content)
    res = response.choices[0].message.content.replace("\n", "")
    match = re.search(r'\{.*?\}', res)
    if match:
        res = match.group(0)
    else:
        print("No match found")
        break

    res = json.loads(res)
    with open("output.txt", "a") as file:
        file.write(str(res[obj["left"]]) + ":" + str(res[obj["right"]]) + "\n")
    responses[i] = (res[obj["left"]], res[obj["right"]])
    i +=1