In [1]:
import requests
from PIL import Image
import torch
from transformers import AutoProcessor, LlavaForConditionalGeneration
import datasets
import json
import os
import torch
import numpy as np
import gc

spilt = 400

In [2]:
def load_model(model_id, device="cuda:0"):
    # Clear GPU + Python memory
    gc.collect()
    torch.cuda.empty_cache()
    torch.cuda.ipc_collect()

    model = LlavaForConditionalGeneration.from_pretrained(
        model_id,
        torch_dtype=torch.float16,
        low_cpu_mem_usage=True,
    ).to(device)

    processor = AutoProcessor.from_pretrained(model_id)
    return model, processor

In [3]:
def format_choices_for_prompt(question_text, choices_list):
    formatted_choices = []
    for i, choice in enumerate(choices_list):
        letter = chr(ord('A') + i) 
        formatted_choices.append(f"{letter}. {choice}")

    choices_string = "\n".join(formatted_choices)

    final_prompt = f"{question_text}\n\n" + f"{choices_string}\n\n" + "Please provide the best choice among the options (e.g., \"A. Choice Text\") and explain your detailed rationale for selecting it."
    
    return final_prompt

In [4]:
def run_inference(image, text_prompt, model, processor):
    # Prepare the prompt by applying the chat template
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": text_prompt},
                {"type": "image"},
            ],
        },
    ]
    formatted_prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)

    # Process the inputs for the model
    inputs = processor(images=image, text=formatted_prompt, return_tensors='pt').to(0, torch.float16)

    # Generate the model's output
    output = model.generate(**inputs, max_new_tokens=200, do_sample=False)

    # Decode the output to get the human-readable response
    response = processor.decode(output[0][2:], skip_special_tokens=True)
    return response.split("ASSISTANT:")[-1].strip()

print("Inference function 'run_inference' defined.")

Inference function 'run_inference' defined.


In [5]:
generated_responses = []
model, processor = load_model("llava-hf/llava-1.5-7b-hf")
# validation_dataset = datasets.load_dataset("LightChen2333/M3CoT", split="validation")

# output_image_dir = 'generated_images'
# os.makedirs(output_image_dir, exist_ok=True)

# generated_responses = []

# for i, entry in enumerate(validation_dataset):
#     if i == 200:
#         break
#     print(i)
#     image = entry['image']
#     question = entry['question']
#     choices = entry["choices"]
#     id = entry['id']

#     text_prompt = format_choices_for_prompt(question, choices)
#     print(text_prompt)

#     # Save the image to a file and store its path
#     image_filename = f"image_{i:04d}.png" # Example filename: image_0000.png, image_0001.png
#     image_path = os.path.join(output_image_dir, image_filename)
#     image.save(image_path) # Save the PIL Image object

#     # Run inference for the current entry
#     response = run_inference(image, question, model, processor)
#     print("="*100)
#     print(response)
#     print("-"*100)

#     # Store image path, question, and generated response
#     generated_responses.append({
#         'id' :id,
#         'image_path': image_path, # Store the path instead of the Image object
#         'question': question,
#         'choices': choices,
#         'generated_answer': response
#     })

# output_filename = 'generated_responses.json'
# with open(output_filename, 'w') as f:
#     json.dump(generated_responses, f, indent=4)

# print(f"Finished processing validation dataset. Total responses: {len(generated_responses)}", f"Generated responses saved to '{output_filename}'")

import os
import json
from PIL import Image
from tqdm import tqdm

input_jsonl = "m3cot_outputs/annotations/validation.jsonl"
output_json = "generated_responses.json"

generated_responses = []

with open(input_jsonl, "r") as f:
    lines = f.readlines()

for i, line in enumerate(tqdm(lines[:spilt])):
    # if i == 200:
    #     break

    entry = json.loads(line)

    sample_id = i
    image_path = entry["image_path"]
    question = entry["question"]
    choices = entry["choices"]

    # Load image from path
    image = Image.open(image_path).convert("RGB")

    # (Optional) format choices if your model expects it
    text_prompt = format_choices_for_prompt(question, choices)

    # Run inference EXACTLY like your setup
    response = run_inference(image, question, model, processor)

    generated_responses.append({
        "id": sample_id,
        "image_path": image_path,
        "question": question,
        "choices": choices,
        "rationale": entry['raw_response'],
        "generated_answer": response
    })

with open(output_json, "w") as f:
    json.dump(generated_responses, f, indent=4)

print(
    f"Finished processing {len(generated_responses)} samples. "
    f"Saved to '{output_json}'."
)



`torch_dtype` is deprecated! Use `dtype` instead!


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
 50%|█████████████████████████████████████████████████████████████████████████                                                                         | 200/400 [07:00<07:00,  2.10s/it]

Finished processing 200 samples. Saved to 'generated_responses.json'.





In [6]:
def compute_input_difference_vectors(image, prompt1, prompt2, model, processor):
    def get_last_token_hidden_states(current_prompt):
        conversation = [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": current_prompt},
                    {"type": "image"},
                ],
            },
        ]
        formatted_prompt = processor.apply_chat_template(conversation, add_generation_prompt=False, tokenize=False)
        inputs = processor(images=image, text=formatted_prompt, return_tensors='pt').to(0, torch.float16)
        with torch.no_grad():
            outputs = model(**inputs, output_hidden_states=True)

        last_token_hidden_states_list = []
        #print(len(outputs.hidden_states))
        for layer_hidden_states in outputs.hidden_states[1:]:
            last_token_hidden_states_list.append(layer_hidden_states[0, -1, :])
        return last_token_hidden_states_list

    hidden_states_prompt1 = get_last_token_hidden_states(prompt1)
    hidden_states_prompt2 = get_last_token_hidden_states(prompt2)

    difference_vectors = {}
    for i, (hs1, hs2) in enumerate(zip(hidden_states_prompt1, hidden_states_prompt2)):
        diff = (hs2 - hs1).cpu().numpy().tolist()
        difference_vectors[f"layer_{i}"] = diff

    return difference_vectors

In [7]:
#len(compute_input_difference_vectors(image, "test1", "test2", model, processor).keys())

In [8]:
import torch

def compute_layerwise_mean_diff(
    images, prompts1, prompts2, model, processor
):
    layer_sums = None
    n = 0

    for image, p1, p2 in zip(images, prompts1, prompts2):
        diff_dict = compute_input_difference_vectors(
            image, p1, p2, model, processor
        )

        # initialize on first example
        if layer_sums is None:
            layer_sums = {
                k: torch.tensor(v, dtype=torch.float32)
                for k, v in diff_dict.items()
            }
        else:
            for k in layer_sums:
                layer_sums[k] += torch.tensor(diff_dict[k], dtype=torch.float32)

        n += 1

    # mean
    layer_means = {
        k: (v / n).numpy()
        for k, v in layer_sums.items()
    }

    return layer_means


In [9]:
def format_for_prompt_vector(question_text, choices_list, rationale):
    formatted_choices = []
    for i, choice in enumerate(choices_list):
        letter = chr(ord('A') + i) 
        formatted_choices.append(f"{letter}. {choice}")

    choices_string = "\n".join(formatted_choices)
    rationale = f"Rationale: {rationale}"
    final_prompt = f"{question_text}\n\n" + f"{choices_string}\n\n" + rationale
    return final_prompt

In [10]:
output_filename = 'generated_responses.json'
with open(output_filename, 'r') as f:
    generated_responses_data = json.load(f)

print(f"Successfully loaded {len(generated_responses_data)} entries from '{output_filename}'.")

Successfully loaded 200 entries from 'generated_responses.json'.


In [11]:
images, pos_prompt, neg_prompt = [], [], []
for i, response in enumerate(generated_responses_data): #, validation_dataset.select(range(len(generated_responses_data))))):
    positive_prompt = format_for_prompt_vector(response['question'], response['choices'], response['rationale'])
    negative_prompt = format_for_prompt_vector(response['question'], response['choices'], response['generated_answer'])
    
    images.append(response['image_path'])
    pos_prompt.append(positive_prompt)
    neg_prompt.append(negative_prompt)

steer_vector = compute_layerwise_mean_diff(images, pos_prompt, neg_prompt, model, processor)

In [12]:
images[0]

'm3cot_outputs/images/validation/000000.png'

In [13]:
print("="*100)
print(pos_prompt[0])

print("\n\n")
print("="*100)

print(neg_prompt[0])

What is the reason for someone slicing and grabbing a pizza with their hands?

A. They are trying to demonstrate their skill in handling food
B. They cannot find a pizza cutter and have to improvise
C. They think using a knife is quicker than a pizza cutter
D. They want to savor the smell of the pizza

Rationale: <SUMMARY>
I will analyze the image to determine the most plausible reason for the person slicing and grabbing pizza with their hands, based on visual cues and context.
</SUMMARY>

<CAPTION>
A person is using a knife to slice a pizza on a plate and simultaneously grabbing a slice with their other hand. No pizza cutter is visible, and the action appears casual and functional rather than performative or ritualistic.
</CAPTION>

<REASONING>
The absence of a pizza cutter and the use of a knife suggest improvisation. The person’s actions are practical and focused on serving, not demonstrating skill or savoring aroma. This aligns with the most logical explanation: lack of proper tool

In [14]:
# entry = test_dataset[0]
# entry['image']

In [15]:
# entry['question'], entry['choices'], entry['rationale']

In [16]:
def do_multimodal_steering(model, processor, test_images, test_prompts, steering_vec, scale, normalize, layer=None, proj=None):
    generated_texts = []
    handles = []

    def make_modify_activation(resolved_vec):
        def modify_activation(module, input, output):
            #print(output.shape, resolved_vec.shape)
            if isinstance(output, tuple):
                activations = output[0]
            else:
                activations = output
    
            vec = resolved_vec.to(device=activations.device,dtype=activations.dtype)
    
            if normalize:
                vec = torch.nn.functional.normalize(vec, dim=-1)
    
            steered_activations = activations + scale * vec.unsqueeze(0).unsqueeze(0)
    
            if isinstance(output, tuple):
                return (steered_activations,) + output[1:]
            else:
                return steered_activations
    
        return modify_activation

    if steering_vec is not None:

        # -------- Case 1: dict + layer=None → steer ALL layers
        if isinstance(steering_vec, dict) and layer is None:
            for k, vec_np in steering_vec.items():
                layer_idx = int(k.replace("layer_", ""))

                resolved_vec = torch.tensor(
                    vec_np, dtype=torch.float32
                )

                #print(model.model.language_model.layers)
                if not (0 <= layer_idx < len(model.model.language_model.layers)):
                    raise ValueError(f"Layer index {layer_idx} out of bounds")

                target_layer = model.model.language_model.layers[layer_idx]

                if proj:
                    try:
                        target_layer = getattr(target_layer, proj)
                    except AttributeError:
                        print(
                            f"Warning: sub-module '{proj}' not found in layer {layer_idx}. "
                            "Applying to the layer's main output."
                        )

                hook = target_layer.register_forward_hook(
                    make_modify_activation(resolved_vec)
                )
                handles.append(hook)

            print(f"Registered steering hooks on ALL {len(handles)} layers")

        # -------- Case 2: dict + specific layer
        elif isinstance(steering_vec, dict):
            layer_key = f"layer_{layer}"
            if layer_key not in steering_vec:
                raise KeyError(f"{layer_key} not found in steering_vec dict")

            resolved_vec = torch.tensor(
                steering_vec[layer_key], dtype=torch.float32
            )

            target_layer = model.model.language_model.layers[layer]
            if proj:
                target_layer = getattr(target_layer, proj, target_layer)

            hook = target_layer.register_forward_hook(
                make_modify_activation(resolved_vec)
            )
            handles.append(hook)

            print(f"Registered steering hook on layer {layer}")

        # -------- Case 3: single vector + specific layer (old behavior)
        else:
            if layer is None:
                raise ValueError("layer must be specified when steering_vec is a Tensor")

            resolved_vec = steering_vec

            target_layer = model.model.language_model.layers[layer]
            if proj:
                target_layer = getattr(target_layer, proj, target_layer)

            hook = target_layer.register_forward_hook(
                make_modify_activation(resolved_vec)
            )
            handles.append(hook)

            print(f"Registered steering hook on layer {layer}")

    # -----------------------------
    # Generation loop
    # -----------------------------
    for image, text_prompt in zip(test_images, test_prompts):
        #print("here")
        conversation = [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": text_prompt},
                    {"type": "image"},
                ],
            },
        ]
        formatted_prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
        inputs = processor(images=image, text=formatted_prompt, return_tensors='pt').to(0, torch.float16)
    
        # Generate the model's output
        with torch.no_grad():
            output = model.generate(**inputs, max_new_tokens=200, do_sample=False)

        #print()
        # Decode the output to get the human-readable response
        response = processor.decode(output[0], skip_special_tokens=True)
        #print(response)
        #return response.split("ASSISTANT:")[-1].strip()

    # -----------------------------
    # Cleanup
    # -----------------------------
    for h in handles:
        h.remove()

    print("Removed steering hooks.")
    return response.split("ASSISTANT:")[-1].strip()


In [17]:
model.model.language_model.layers

ModuleList(
  (0-31): 32 x LlamaDecoderLayer(
    (self_attn): LlamaAttention(
      (q_proj): Linear(in_features=4096, out_features=4096, bias=False)
      (k_proj): Linear(in_features=4096, out_features=4096, bias=False)
      (v_proj): Linear(in_features=4096, out_features=4096, bias=False)
      (o_proj): Linear(in_features=4096, out_features=4096, bias=False)
    )
    (mlp): LlamaMLP(
      (gate_proj): Linear(in_features=4096, out_features=11008, bias=False)
      (up_proj): Linear(in_features=4096, out_features=11008, bias=False)
      (down_proj): Linear(in_features=11008, out_features=4096, bias=False)
      (act_fn): SiLUActivation()
    )
    (input_layernorm): LlamaRMSNorm((4096,), eps=1e-05)
    (post_attention_layernorm): LlamaRMSNorm((4096,), eps=1e-05)
  )
)

In [18]:
steer_vector

{'layer_0': array([-1.10086445e-02,  7.66448956e-03, -3.96728501e-06, ...,
         1.31739238e-02,  2.22823629e-03,  6.19052909e-03],
       shape=(4096,), dtype=float32),
 'layer_1': array([-0.02357277,  0.00605005,  0.01902706, ...,  0.01249914,
         0.00409505,  0.0221711 ], shape=(4096,), dtype=float32),
 'layer_2': array([-0.04878235, -0.00478968,  0.09132004, ...,  0.07545014,
        -0.00602078,  0.0511116 ], shape=(4096,), dtype=float32),
 'layer_3': array([-0.06224915,  0.01147778,  0.04718355, ...,  0.04471565,
        -0.03207649,  0.1057962 ], shape=(4096,), dtype=float32),
 'layer_4': array([-0.0228981 , -0.01057629,  0.00313034, ...,  0.07850914,
        -0.09159164,  0.06155411], shape=(4096,), dtype=float32),
 'layer_5': array([-0.08127579, -0.02275696,  0.0269854 , ...,  0.07481983,
        -0.14976516, -0.00604019], shape=(4096,), dtype=float32),
 'layer_6': array([-0.17420319, -0.21418945, -0.00550179, ..., -0.02523876,
        -0.17964783, -0.05157318], shape=

In [19]:
import datasets
import csv
import os
from datetime import datetime

test_dataset = datasets.load_dataset("LightChen2333/M3CoT", split="test")

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_path = f"steering_results_{timestamp}.csv"

# directory to store images
image_dir = f"steering_images_{timestamp}"
os.makedirs(image_dir, exist_ok=True)

scales = [0.3, 0.6, 1.0, 1.5, 2.0, 3.0, 4.0, 5.0, 6.0]

fieldnames = [
    "example_idx",
    "image_html",
    "question",
    "rationale",
    "mode",        # baseline | layer | all_layers
    "layer",       # int or None
    "scale",
    "output",
    "error"
]

with open(csv_path, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()

    for ex_idx, entry in enumerate(test_dataset.select(range(5))):
        image = entry["image"]
        question = entry["question"]
        rationale = entry.get("rationale", None)
        prompt = format_choices_for_prompt(question, entry["choices"])

        # ---- save image
        image_path = os.path.join(image_dir, f"example_{ex_idx}.png")
        image.save(image_path)

        image_html = f'<img src="{image_path}" width="256">'

        # -------- Baseline
        try:
            output = do_multimodal_steering(
                model, processor,
                [image], [prompt],
                steering_vec=None,
                scale=1.0,
                normalize=True,
                layer=None
            )
            writer.writerow({
                "example_idx": ex_idx,
                "image_html": image_html,
                "question": question,
                "rationale": rationale,
                "mode": "baseline",
                "layer": None,
                "scale": 1.0,
                "output": output,
                "error": None
            })
        except Exception as e:
            writer.writerow({
                "example_idx": ex_idx,
                "image_html": image_html,
                "question": question,
                "rationale": rationale,
                "mode": "baseline",
                "layer": None,
                "scale": 1.0,
                "output": None,
                "error": repr(e)
            })

        # -------- Per-layer + per-scale
        for scale in scales:
            for layer in range(32):
                try:
                    output = do_multimodal_steering(
                        model, processor,
                        [image], [prompt],
                        steering_vec=steer_vector,
                        scale=scale,
                        normalize=True,
                        layer=layer
                    )
                    writer.writerow({
                        "example_idx": ex_idx,
                        "image_html": image_html,
                        "question": question,
                        "rationale": rationale,
                        "mode": "layer",
                        "layer": layer,
                        "scale": scale,
                        "output": output if output.strip() else None,
                        "error": None
                    })
                except Exception as e:
                    writer.writerow({
                        "example_idx": ex_idx,
                        "image_html": image_html,
                        "question": question,
                        "rationale": rationale,
                        "mode": "layer",
                        "layer": layer,
                        "scale": scale,
                        "output": None,
                        "error": repr(e)
                    })

        # -------- All-layer steering
        for scale in scales:
            try:
                output = do_multimodal_steering(
                    model, processor,
                    [image], [prompt],
                    steering_vec=steer_vector,
                    scale=scale,
                    normalize=True,
                    layer=None
                )
                writer.writerow({
                    "example_idx": ex_idx,
                    "image_html": image_html,
                    "question": question,
                    "rationale": rationale,
                    "mode": "all_layers",
                    "layer": None,
                    "scale": scale,
                    "output": output if output.strip() else None,
                    "error": None
                })
            except Exception as e:
                writer.writerow({
                    "example_idx": ex_idx,
                    "image_html": image_html,
                    "question": question,
                    "rationale": rationale,
                    "mode": "all_layers",
                    "layer": None,
                    "scale": scale,
                    "output": None,
                    "error": repr(e)
                })

print(f"Results written to {csv_path}")
print(f"Images saved to {image_dir}")


Removed steering hooks.
Registered steering hook on layer 0
Removed steering hooks.
Registered steering hook on layer 1
Removed steering hooks.
Registered steering hook on layer 2
Removed steering hooks.
Registered steering hook on layer 3
Removed steering hooks.
Registered steering hook on layer 4
Removed steering hooks.
Registered steering hook on layer 5
Removed steering hooks.
Registered steering hook on layer 6
Removed steering hooks.
Registered steering hook on layer 7
Removed steering hooks.
Registered steering hook on layer 8
Removed steering hooks.
Registered steering hook on layer 9
Removed steering hooks.
Registered steering hook on layer 10
Removed steering hooks.
Registered steering hook on layer 11
Removed steering hooks.
Registered steering hook on layer 12
Removed steering hooks.
Registered steering hook on layer 13
Removed steering hooks.
Registered steering hook on layer 14
Removed steering hooks.
Registered steering hook on layer 15
Removed steering hooks.
Registered

In [20]:
# conversation = [
#     {
#         "role": "user",
#         "content": [
#             {"type": "text", "text": prompt},
#             {"type": "image"},
#         ],
#     },
# ]
# formatted_prompt = processor.apply_chat_template(conversation, add_generation_prompt=False, tokenize=False)
# inputs = processor(images=image, text=formatted_prompt, return_tensors='pt').to(0, torch.float16)
# with torch.no_grad():
#     outputs = model(**inputs, output_hidden_states=True)

#     # last_token_hidden_states_list = []
#     #     for layer_hidden_states in outputs.hidden_states:
#     #         last_token_hidden_states_list.append(layer_hidden_states[0, -1, :])
#     #     return last_token_hidden_states_list
    

In [21]:
# outputs.hidden_states[1:][0][0, -1, :]