# Beyond Words Inference

This notebook is designed for **local execution**.

**Requirements:**
- Python 3.8+
- PyTorch
- Transformers
- OpenAI Python SDK

**Setup:**
- Ensure your OpenAI API Key is set as `OPENAI_API_KEY` environment variable.
- Clone Beyond Words GitHub Repo to place data under correct folders as expected in the code.


# Main Model + Helper Functions

In [None]:
from PIL import Image
from openai import OpenAI
from io import BytesIO
import base64
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
import os

# Load fine-tuned Qwen Model
ft_model = Qwen2_5_VLForConditionalGeneration.from_pretrained("./checkpoints/qwen-finetuned-lora", trust_remote_code=True).to("cuda")

# Load base Qwen Model
base_model = Qwen2_5_VLForConditionalGeneration.from_pretrained("Qwen/Qwen2.5-VL-3B-Instruct", trust_remote_code=True).to("cuda")

processor = AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-3B-Instruct")
api_key = os.getenv("OPENAI_API_KEY")

def model_generate(model_choice, image_path, prompt, sys_prompt, temperature, max_new_tokens=128):
    """
    Function to call either GPT or Qwen model generation based on model_choice.
    """
    if model_choice == "gpt":
        return gpt_generate(model_choice, image_path, prompt, sys_prompt, temperature, max_new_tokens)
    else:
        return qwen_generate(model_choice, image_path, prompt, sys_prompt, temperature, max_new_tokens)

def gpt_generate(model_choice, image_path, prompt, sys_prompt, temperature, max_new_tokens=128):
    """
    Calls OpenAI GPT-4 Turbo with an image and text prompt.
    """
    img = Image.open(image_path).convert("RGB")
    buf = BytesIO()
    img.save(buf, format="PNG")
    img_b64 = base64.b64encode(buf.getvalue()).decode()

    client = OpenAI(api_key=api_key)
    messages = [
        {"role": "system", "content": sys_prompt},
        {"role": "user", "content": [
            {"type": "text", "text": prompt},
            {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_b64}"}},
        ]},
    ]
    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=messages,
        temperature=temperature,
        max_tokens=max_new_tokens,
    )
    return response.choices[0].message.content.strip()

def qwen_generate(model_choice, image_path, prompt, sys_prompt, temperature, max_new_tokens=128):
    """
    Calls fine-tuned or base Qwen model to generate next steps conditioned on an image and prompt.
    """
    image = Image.open(image_path)
    messages = [
        {"role": "system", "content": sys_prompt},
        {"role": "user", "content": [
            {"type": "text", "text": prompt},
            {"image": "file://" + image_path},
        ]},
    ]

    text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = processor(text=[text], images=[image], padding=True, return_tensors="pt").to("cuda")

    model = ft_model if model_choice == "ft" else base_model
    output_ids = model.generate(**inputs, max_new_tokens=max_new_tokens, temperature=temperature, do_sample=False if temperature == 0.0 else True)
    generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, output_ids)]
    output_text = processor.batch_decode(generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True)
    return output_text[0]

# Verifier

In [None]:
import base64
import re
from io import BytesIO
from PIL import Image
from openai import OpenAI

client = OpenAI(api_key=api_key)

def verifier_generate(image_path, goal, steps, candidate):
    """
    Uses GPT-4-turbo to score the plausibility of a candidate action 
    given an egocentric image, the task goal, and prior steps.
    Returns a score between 0 and 1.
    """
    # Handle quick rule-based cases
    if "[EOP]" in candidate:
        return 0.5
    if not candidate.strip():
        return 0.0

    try:
        # Load and preprocess image
        img = Image.open(image_path).convert("RGB")
        img = img.resize((224,224), Image.BILINEAR)
        buf = BytesIO()
        img.save(buf, format="PNG")
        img_b64 = base64.b64encode(buf.getvalue()).decode()

        # GPT-4-turbo prompt
        prompt = (
            f"[GOAL]: {goal}\n"
            f"[HISTORY]: {' '.join(steps)}\n"
            f"[CANDIDATE]: {candidate}\n\n"
            "On a scale from 0 (impossible) to 1 (certain), how likely is the candidate action? Respond with the numeric continuous value and then an explanation."
        )

        # Send to GPT-4 with image context
        resp = client.chat.completions.create(
            model="gpt-4-turbo",
            messages=[{
                "role": "user",
                "content": [
                    {"type": "text", "text": prompt},
                    {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_b64}"}}
                ]
            }],
            temperature=0.7
        )

        # Parse score from response
        text = resp.choices[0].message.content
        m = re.search(r"\b(?:0|1|0?\.\d+)\b", text)
        if not m:
            raise ValueError(f"No score found in response: {text!r}")
        score = float(m.group())
        return score

    except Exception as e:
        print(f"Verifier failed: {e}")
        return 0.0

# Beam Search

In [None]:
import torch

# Verifier-guided Step-wise Beam Search Algorithm
def beam_search(model_choice, image_path, sys_prompt, goal, temperature, beam_width=1, num_candidates=2, max_steps=5, debug=False):
    """
    Performs step-wise beam search for procedural planning, guided by a verifier model.
    Returns a list of beams containing steps and accumulated scores
    """

    # Initialize beams so that each beam is a partial plan, starting empty
    beams = [{"steps": [], "score": 0.0, "done": False}]

    for step_idx in range(max_steps):
        if debug:
            print(f"\n=== Step {step_idx + 1} ===")
        new_beams = []

        for beam_idx, beam in enumerate(beams):
            if debug:
                print(f"\nBeam {beam_idx + 1}: {' '.join(beam['steps']) if beam['steps'] else '[START]'}")

            if beam["done"]:
                if debug:
                    print("   Beam is already completed (EOP). Skipping.")
                new_beams.append(beam)
                continue
            
            # First, build new prompt by combiningthe goal with all previous steps
            prompt = goal + " " + " ".join(beam["steps"])

            # Then, sample multiple candidate next steps
            candidates = [model_generate(model_choice, image_path, prompt, sys_prompt, temperature) for _ in range(num_candidates)]

            if debug:
                print("   Generated Candidates:")

            for cand_idx, candidate in enumerate(candidates):
                if candidate.strip() != "[EOP]" and not candidate.strip().startswith("[STEP]"):
                    candidate = "[STEP] " + candidate.strip()

                # Avoid step duplication
                if candidate in beam["steps"]:
                    if debug:
                        print(f"   Duplicate skipped: {candidate}")
                    continue
                
                # Score the new candidates using the VLM verifier
                full_steps = beam["steps"] + [candidate]
                score = verifier_generate(image_path, goal, beam["steps"], candidate)
                done = "[EOP]" in candidate

                # Make sure we accuulate verifier scores across steps
                new_beams.append({
                    "steps": full_steps,
                    "score": beam["score"] + score,
                    "done": done
                })

                if debug:
                    print(f"     {cand_idx+1}. {candidate} | Score: {score:.4f} {' [EOP]' if done else ''}")
        
        # Keep top-k beams based on accumulated verifier scores
        beams = sorted(new_beams, key=lambda b: b["score"], reverse=True)[:beam_width]

        if debug:
            print("\n Top Beams After This Step:")
            for idx, b in enumerate(beams):
                steps_text = "\n         ".join(b["steps"])
                print(f"   Beam {idx+1} (Score: {b['score']:.4f}):\n         {steps_text}")

        if all(b["done"] for b in beams):
            if debug:
                print("\n All beams have ended with [EOP]. Stopping early.")
            break

    return beams


def beyond_words(model_choice, sys_prompt, goal, temperature, img, beam_width=1, num_candidates=2, debug=False):
    """
    High-level function that runs beam search and assembles the best resulting plan.
    
    Selects the highest-scoring beam after stepwise expansion and returns the final plan.

    Returns:
        Final plan as a formatted multi-line string.
    """
    # Run beam search to generate plans
    final_beams = beam_search(model_choice, img, sys_prompt, goal, temperature, beam_width, num_candidates, debug=debug)

    if not final_beams:
        return "No valid plan generated"
    
    # Pick the best beam (the one w/ highest cumulative score)
    best_plan = final_beams[0]
    
    # Remove repeated goals if model mistakenly includes it
    clean_steps = [step for step in best_plan["steps"] if step.strip() != goal]

    if debug:
      print("\n Final Plan:")
      print(goal)
      for step in clean_steps:
          print(step)

    torch.cuda.empty_cache()
    return f"\n".join(clean_steps)



# GPT-Judge

In [None]:
import openai

client = openai.OpenAI(api_key=api_key)

def full_plan_generate(model_choice, image_path, goal, temperature=0.5):
    """
    Generates a full procedural plan in a single model pass, given an initial goal and image. Used as a baseline to compare against stepwise planning approaches.
    
    This function prompts the model to output an entire plan (up to 5 steps) at once, rather than generating steps incrementally.

    Returns:
        str: The full plan as a multiline string.
    """
    sys_prompt = """
    You are an expert AI planner. Given a context image and a [GOAL], generate the full plan from scratch, consisting of at most 5 clear [STEP]s followed by [EOP] if the plan is complete.
    Example 1:
    [GOAL] make a sandwich
    Output 1:
    [GOAL] make a sandwich
    [STEP] pick up bread
    [STEP] pick up turkey slices
    [STEP] put turkey slices on bread
    [STEP] place the sandwich on the plate
    [EOP]
    Example 2:
    [GOAL] clean the clothes in the washing machine
    Output 2:
    [GOAL] clean the clothes in the washing machine
    [STEP] pick up detergent
    [STEP] open the detergent bottle
    [STEP] pour detergent into the washing machine
    [STEP] close the detergent bottle
    [STEP] start the washing machine
    [EOP]
    """
    prompt = f"{goal}"
    return model_generate(model_choice, image_path, prompt, sys_prompt, temperature).strip()

# GPT-4 Judge: Plan Quality Comparison
def gpt_judge(plan_a, plan_b, goal, debug=False):
    """
    Uses GPT-4o to judge which of two plans (Plan A or Plan B) is superior based on conciseness, minimality, and actionability.
    """
    system_prompt = (
        "You are an AI judge. You are given two plans for the same goal. Your task is to decide which plan is more concise and minimal, consisting only of essential action steps. "
        "You should prefer plans that are short, omit obvious or unnecessary actions, and are easy for a robot to follow. "
        "- Respond with '0' if Plan A is better.\n"
        "- Respond with '1' if Plan B is better.\n"
        "- Respond with '-1' if it's a tie."
    )
    system_prompt = ""

    user_prompt = f"""{goal}

Plan A:
{plan_a}

Plan B:
{plan_b}

Which response is better? Prefer plans that are concise, minimal, essential action steps. Try to see the value of Plan B more, where it can be used in a robot easier and connect to policy networks easier. Respond only with: 0 (for Plan A), 1 (for Plan B), or -1 (for Tie)."""

    if debug: print(user_prompt)

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": user_prompt}
        ],
        temperature=0
    )
    output = response.choices[0].message.content.strip()
    return output

# === Run Comparison ===
def compare_all_strategies(
    img_path,
    goal,
    plan_a_config,
    plan_b_config,
    temperature=0.5,
    debug=False,
):
    # Helper function to generate a plan given a config
    def generate_plan(config):
        if config["type"] == "full":
            return full_plan_generate(
                model_choice=config["model_choice"],
                image_path=img_path,
                goal=goal,
                temperature=temperature,
            )
        elif config["type"] == "beam":
            sys_prompt = "Predict the next step in this task or [EOP] (End Of Plan). There should be 5 steps total so pace yourself accordingly. Example: [GOAL] make a sandwich [STEP] pick up bread. Output: [STEP] pick up turkey slices."
            return beyond_words(
                model_choice=config["model_choice"],
                sys_prompt=sys_prompt,
                goal=goal,
                temperature=temperature,
                img=img_path,
                beam_width=config["beam_width"],
                num_candidates=config["num_candidates"],
                debug=debug,
            )
        else:
            raise ValueError(f"Unknown plan type: {config['type']}")

    # Generate plans
    plan_a = generate_plan(plan_a_config)
    plan_b = generate_plan(plan_b_config)

    # Judge
    if debug:
      print("\n[Judge] Which is better?")
    result = gpt_judge(plan_a, plan_b, goal, debug)
    print("Result:", result)

    return {
        "plan_a": plan_a,
        "plan_b": plan_b,
        "vote": result,
    }

In [None]:
import json
from collections import Counter

def evaluate_dataset(json_path, plan_a_config, plan_b_config, temperature=0.5, limit=None, debug=False):
    """
    Evaluates two competing planning strategies across an entire dataset of image-goal pairs.
    Returns:
        tuple: (results, vote_counter)
            - results: list of individual evaluation outputs per example.
            - vote_counter: Counter summarizing how many times Plan A, Plan B, or a Tie was preferred.
    """
    with open(json_path, 'r') as f:
        data = json.load(f)

    if limit is not None:
        data = data[:limit]

    results = []
    vote_counter = Counter()

    for idx, entry in enumerate(data):
        print(f"\n=== Evaluation {idx+1}/{len(data)} ===")
        goal = entry["goal"]
        image_path = entry["image"]

        try:
            result = compare_all_strategies(
                img_path=image_path,
                goal=goal,
                plan_a_config=plan_a_config,
                plan_b_config=plan_b_config,
                temperature=temperature,
                debug=debug,
            )
            results.append(result)
            vote_counter[result["vote"]] += 1

        except Exception as e:
            print(f"Skipping due to error: {e}")

    return results, vote_counter

# **Research Question #1: Does step-wise verifier-guided beam search improve procedural planning quality compared to full-plan generation?**

In [None]:
"""RQ1: Beam search vs full-plan"""

import torch
torch.manual_seed(42)
data_json = "./data/test_set.json"

# Experiment 1: Finetuned Qwen (1 beam, 1 candidate) vs Full-Plan Base Qwen (few-shot prompted)
plan_a_config = {
    "type": "full",
    "model_choice": "base",
}

plan_b_config = {
    "type": "beam",
    "model_choice": "ft",
    "beam_width": 1,
    "num_candidates": 1,
}

results_1, vote_counter_1 = evaluate_dataset(
    data_json,
    plan_a_config=plan_a_config,
    plan_b_config=plan_b_config,
    temperature=0.5,
    limit=50,
    debug=False
)
print("\n=== 📊 Final Vote Summary ===")
print(f"Full-Plan Base Qwen (few-shot prompted) (0): {vote_counter_1['0']}")
print(f"Finetuned Qwen (1 beam, 1 candidate) (1): {vote_counter_1['1']}")
print(f"Ties (-1): {vote_counter_1['-1']}")

"""
Results (on 50 test examples):
Full-Plan Base Qwen (few-shot prompted): 27 votes
Finetuned Qwen (1 beam, 1 candidate): 8 votes
Ties: 15 votes
"""


# Experiment 2: Finetuned Qwen (2 beam, 3 candidate) vs Full-Plan Base Qwen (few-shot prompted)
plan_a_config = {
    "type": "full",
    "model_choice": "base",
}

plan_b_config = {
    "type": "beam",
    "model_choice": "ft",
    "beam_width": 2,
    "num_candidates": 3,
}
results_2, vote_counter_2 = evaluate_dataset(
    data_json,
    plan_a_config=plan_a_config,
    plan_b_config=plan_b_config,
    temperature=0.5,
    limit=50,
    debug=False
)

print("\n=== 📊 Final Vote Summary ===")
print(f"Full-Plan Base Qwen (few-shot prompted) (0): {vote_counter_2['0']}")
print(f"Finetuned Qwen (2 beam, 3 candidate) (1): {vote_counter_2['1']}")
print(f"Ties (-1): {vote_counter_2['-1']}")

"""
Results (on 50 test examples):
Full-Plan Base Qwen (few-shot prompted): 22 votes
Finetuned Qwen (2 beam, 3 candidate): 16 votes
Ties: 12 votes
"""

# Experiment 3: Beam Search GPT-4o (2 beam, 3 candidate) (few-shot prompted) vs Full-Plan GPT-4o (few-shot prompted)
plan_a_config = {
    "type": "full",
    "model_choice": "gpt",
}

plan_b_config = {
    "type": "beam",
    "model_choice": "gpt",
    "beam_width": 2,
    "num_candidates": 3,
}

results_3, vote_counter_3 = evaluate_dataset(
    data_json,
    plan_a_config=plan_a_config,
    plan_b_config=plan_b_config,
    temperature=0.5,
    limit=5,
    debug=False
)

print("\n=== 📊 Final Vote Summary ===")
print(f"Full-Plan GPT-4 (few-shot prompted) (0): {vote_counter_3['0']}")
print(f"Beam Search GPT-4 (few-shot prompted) (1): {vote_counter_3['1']}")
print(f"Ties (-1): {vote_counter_3['-1']}")

"""
Results (on 50 test examples):
Full-Plan GPT-4 (few-shot prompted): 9 votes
Beam Search GPT-4 (few-shot prompted): 15 votes
Ties: 26 votes
"""
# NOTE: From qualitative analysis, we see that Beam Search allows GPT-4 to come up with more creative plans than regular full-planing
# This helps the model navigate more complex embodied scenarios

# **Research Question #2: How does the number of beams and candidates impact the quality of step-wise planning?**

In [None]:
"""RQ2: Finding optimal number of beams/candidates"""
import torch
torch.manual_seed(42)
data_json = "./data/test_set.json"


# Experiment 1: Finetuned Qwen (1 beam, 1 candidate) vs Finetuned Qwen (1 beam, 3 candidate)
plan_a_config = {
    "type": "beam",
    "model_choice": "ft",
    "beam_width": 1,
    "num_candidates": 1,
}

plan_b_config = {
    "type": "beam",
    "model_choice": "ft",
    "beam_width": 1,
    "num_candidates": 3,
}

results_1, vote_counter_1 = evaluate_dataset(
    data_json,
    plan_a_config=plan_a_config,
    plan_b_config=plan_b_config,
    temperature=0.5,
    limit=5,
    debug=False
)
print("\n=== 📊 Final Vote Summary ===")
print(f"Finetuned Qwen (1 beam, 1 candidate) (0): {vote_counter_1['0']}")
print(f"Finetuned Qwen (1 beam, 3 candidate) (1): {vote_counter_1['1']}")
print(f"Ties (-1): {vote_counter_1['-1']}")

"""
Results (on 50 test examples):
Finetuned Qwen (1 beam, 1 candidate): 9 votes
Finetuned Qwen (1 beam, 3 candidate): 23 votes
Ties: 18 votes
"""

# Experiment 2: Finetuned Qwen (1 beam, 3 candidate) vs Finetuned Qwen (2 beam, 3 candidate)
plan_a_config = {
    "type": "beam",
    "model_choice": "ft",
    "beam_width": 1,
    "num_candidates": 3,
}

plan_b_config = {
    "type": "beam",
    "model_choice": "ft",
    "beam_width": 2,
    "num_candidates": 3,
}

results_2, vote_counter_2 = evaluate_dataset(
    data_json,
    plan_a_config=plan_a_config,
    plan_b_config=plan_b_config,
    temperature=0.5,
    limit=5,
    debug=False
)
print("\n=== 📊 Final Vote Summary ===")
print(f"Finetuned Qwen (1 beam, 3 candidate) (0): {vote_counter_2['0']}")
print(f"Finetuned Qwen (2 beam, 3 candidate) (1): {vote_counter_2['1']}")
print(f"Ties (-1): {vote_counter_2['-1']}")

"""
Results (on 50 test examples):
Finetuned Qwen (1 beam, 3 candidate): 11 votes
Finetuned Qwen (2 beam, 3 candidate): 14 votes
Ties: 25 votes
"""

# **Research Question #3: Does finetuning a VLM on egocentric, chain-of-thought data improve planning compared to few-shot prompting?**

In [None]:
"""RQ3: EgoCOT-finetuned Qwen vs Base Qwen"""
import torch
torch.manual_seed(42)
data_json = "./data/test_set.json"

# Experiment 1: Finetuned Qwen (2 beam, 3 candidate) vs Base Qwen (2 beam, 3 candidate) (few shot prompted)
plan_a_config = {
    "type": "beam",
    "model_choice": "base",
    "beam_width": 2,
    "num_candidates": 3,
}

plan_b_config = {
    "type": "beam",
    "model_choice": "ft",
    "beam_width": 2,
    "num_candidates": 3,
}


results_1, vote_counter_1 = evaluate_dataset(
    data_json,
    plan_a_config=plan_a_config,
    plan_b_config=plan_b_config,
    temperature=0.5,
    limit=5,
    debug=False
)
print("\n=== Final Vote Summary ===")
print(f"Finetuned Qwen (2 beam, 3 candidate) (0): {vote_counter_1['0']}")
print(f"Base Qwen (2 beam, 3 candidate) (few shot prompted) (1): {vote_counter_1['1']}")
print(f"Ties (-1): {vote_counter_1['-1']}")

"""
Results (on 50 test examples):
Finetuned Qwen (2 beam, 3 candidate): 14 votes
Base Qwen (2 beam, 3 candidate) (few shot prompted): 25 votes
Ties: 11 votes
"""