In [1]:
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "3"
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"
os.environ["HF_HUB_OFFLINE"] = "1"
# os.environ["MAX_PIXELS"]=


In [2]:
import torch
import torch.nn.functional as F
from transformers import LlavaForConditionalGeneration, AutoTokenizer, AutoProcessor

model_id= "llava-hf/llava-1.5-7b-hf"
model = LlavaForConditionalGeneration.from_pretrained(
 model_id,
 torch_dtype=torch.bfloat16,
 attn_implementation="eager",
#  device_map="auto",
).to("cuda")
processor = AutoProcessor.from_pretrained(model_id, use_fast=True)


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

In [3]:
from PIL import Image
messages = [
    {
        "role": "user",
        "content": [
            {
                "type": "image",
                "image": "examples/image.png",
            },
            {
                "type": "text",
                "text": "Describe this image."
            },
        ],
    }
]

text = processor.apply_chat_template(
    messages, tokenize=False, add_generation_prompt=True
)
#print(image_inputs)
image = Image.open("examples/image.png")


In [4]:
import numpy as np
from PIL import Image
from IPython.display import display

def generate_images(image_list, mode="noise", color=(255, 255, 255)):
    """
    根据 `mode` 生成与 `image_list` 中图像等大的随机噪声或纯色图像。

    参数：
    - image_list: list[PIL.Image]，输入的图像列表。
    - mode: str，"noise" 生成随机噪声图像，"blank" 生成纯色图像。
    - color: tuple，生成纯色图像时的颜色，默认为白色 (255, 255, 255)。

    返回：
    - list[PIL.Image]，生成的图像列表。
    """
    generated_images = []
    
    for img in image_list:
        width, height = img.size
        
        if mode == "noise":
            noise_array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
            generated_image = Image.fromarray(noise_array)
        elif mode == "blank":
            generated_image = Image.new("RGB", (width, height), color)
        else:
            raise ValueError("mode could only be 'noise' or 'blank'")
        
        generated_images.append(generated_image)
    
    return generated_images

# neg_image_inputs = generate_images(image_inputs, mode="noise")


In [5]:
inputs = processor(
    images=[image],
    text=[text],
    padding=True,
    return_tensors="pt",
)
inputs = inputs.to("cuda")
output_ids = model.generate(
            **inputs,
            return_dict_in_generate=True,
            output_attentions=True,
            max_new_tokens=128,
        )


In [6]:
neg_inputs = processor(
    text=[text],
    images=neg_image_inputs,
    videos=video_inputs,
    padding=True,
    return_tensors="pt",
)
neg_inputs = neg_inputs.to("cuda")
print(neg_inputs.pixel_values)
negative_output_ids = model.generate(
            **neg_inputs,
            return_dict_in_generate=True,
            output_attentions=True,
            max_new_tokens=128,
        )


NameError: name 'neg_image_inputs' is not defined

In [7]:
# If you wanna check what's the output of negative generation, run me.

# generated_ids = negative_output_ids.sequences
generated_ids = output_ids.sequences

generated_ids = [
    output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, generated_ids)
]
out = processor.tokenizer.batch_decode(
    generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
response = out[0]
print(response)


The image displays two different amounts of money, with one being $1,145,000 and the other being $4,999,999. The two amounts are placed side by side, with the larger amount on the left and the smaller amount on the right. The contrast between the two figures emphasizes the significant difference in their values.


In [8]:
from torch import Tensor


def get_mean_attn_score(output_ids) -> Tensor:
    r"""
    get the mean attention weights of the prefilling and full attention
    Args:
        output_ids: the output ids of the model
    Returns:
        mean_attn: the mean attention weights of the prefilling and full attention, shape: (L, L)
    """
    output_attn = output_ids.attentions
    pref_len = output_attn[0][0].shape[3]
    full_len = output_attn[-1][0].shape[3]
    prefill_attn = output_attn[0]
    assert prefill_attn[0].shape[0] == 1, "batch size should be 1"
    full_attn = []

    for l, layer in enumerate(prefill_attn):
        layer = layer.cpu().squeeze(0).float()
        layer = torch.nn.functional.pad(layer, (0, full_len - pref_len, 0, full_len - pref_len))
        for i in range(full_len - pref_len):
            cur_attn = output_attn[i + 1][l].cpu().squeeze(0)[:, 0, :].float()
            layer[:, pref_len + i, :pref_len + i + 1] = cur_attn
        full_attn.append(layer)
    mean_attn = torch.stack(full_attn).mean(dim=(0, 1))
    return mean_attn

aw = get_mean_attn_score(output_ids)

# neg_aw = get_mean_attn_score(negative_output_ids)


In [9]:
from torch._tensor import Tensor
from typing import Tuple


def get_visual_token_mean_attn_score(
    mean_attn, inputs, vision_token_id
) -> Tuple[Tensor, ...]:
    r"""
    Get the attention weights of the visual tokens
    Args:
        mean_attn: the mean attention weights of the prefilling and full attention, shape: (L, L)
        inputs: the inputs of the model
    Returns:
        visual_token_attn_weights: the tuple of the attention weights of the visual tokens, each element shape: (V, V)
    """
    NUM_IMG_TOKENS = 576
    assert inputs["input_ids"].shape[0] == 1, "batch size should be 1"
    pref_len = len(inputs["input_ids"][0])
    vision_start_token_indices = inputs["input_ids"][0].tolist().index(vision_token_id)
    vision_end_token_indices = vision_start_token_indices + NUM_IMG_TOKENS
    vision_start_token_indices = [vision_start_token_indices]
    vision_end_token_indices = [vision_end_token_indices]
    # assert len(vision_start_token_indices) == len(vision_end_token_indices), "vision start and end token idx should be the same"
    print(vision_start_token_indices)
    print(vision_end_token_indices)
    # iterate over multiple images
    visual_token_attn_weights = tuple(
        torch.mean(mean_attn[pref_len:, s : e], dim=0)
        for s, e in zip(
            vision_start_token_indices, vision_end_token_indices, strict=True
        )
    )
    return visual_token_attn_weights


vw = get_visual_token_mean_attn_score(
    aw, inputs, processor.tokenizer.added_tokens_encoder["<image>"]
)
# neg_vw = get_visual_token_mean_attn_score(neg_aw, neg_inputs)


[5]
[581]


In [10]:
vw[0].shape


torch.Size([576])

In [11]:
from typing import Literal


# Apply weighted attention to vision tokens, fix multiple images
def get_visual_token_weight(
    vision_attn_weight,
    keep_percentage,
    weighting_type: Literal["linear", "uniform"] = "linear",
    lowest_weight=0.0,
):
    sorted_indices = torch.argsort(vision_attn_weight, descending=True)
    num_tokens_to_keep = int(len(vision_attn_weight) * keep_percentage)
    weight_vision_token = torch.zeros_like(vision_attn_weight, dtype=torch.float)
    weight_vision_token[sorted_indices[:num_tokens_to_keep]] = 1.0
    if weighting_type == "linear":
        weight_vision_token[sorted_indices[num_tokens_to_keep:]] = torch.linspace(
            lowest_weight, 1.0, len(vision_attn_weight) - num_tokens_to_keep
        )
    else:
        weight_vision_token[sorted_indices[num_tokens_to_keep:]] = lowest_weight
    return weight_vision_token

vm_linear = [get_visual_token_weight(v, 0.6, "linear", 0.0) for v in vw]
vm_uniform = [get_visual_token_weight(v, 0.6, "uniform", 0.0) for v in vw]

# model.embed_weight = torch.concat(vm_linear, dim=0).to(model.device)


In [None]:
# Manually process input_ids and pixel_values into input_embeds
def manual_embed_inputs(model, input_ids, pixel_values, vision_token_weights=None):
    print(input_ids.shape)
    print(pixel_values.shape)
    print(vision_token_weights.shape)
    
    # Get input embeddings from the model
    inputs_embeds = model.get_input_embeddings()(input_ids)
    
    # Process image features
    image_features = model.get_image_features(
        pixel_values=pixel_values,
        vision_feature_layer=model.config.vision_feature_layer,
        vision_feature_select_strategy=model.config.vision_feature_select_strategy
    )
    
    # Apply vision token weights if provided
    if vision_token_weights is not None:
        # Ensure weights are on the same device as image features
        vision_token_weights = vision_token_weights.to(image_features.device)
        # Apply weights to image features
        image_features = image_features * vision_token_weights.unsqueeze(0).unsqueeze(-1)
    
    # Find image token positions
    special_image_mask = (input_ids == model.config.image_token_index).unsqueeze(-1)
    special_image_mask = special_image_mask.expand_as(inputs_embeds).to(inputs_embeds.device)
    print(special_image_mask.shape)
    
    # Replace image tokens with image features
    image_features = image_features.to(inputs_embeds.device, inputs_embeds.dtype)
    inputs_embeds = inputs_embeds.masked_scatter(special_image_mask, image_features)
    
    return inputs_embeds

# Create input embeddings with weighted vision tokens
input_embeds = manual_embed_inputs(
    model, 
    inputs.input_ids, 
    inputs.pixel_values, 
    vision_token_weights=vm_linear[0] if len(vm_linear) > 0 else None
)

# Create modified inputs dictionary
modified_inputs = {
    "inputs_embeds": input_embeds,
    "attention_mask": inputs.attention_mask
}


torch.Size([1, 593])
torch.Size([1, 3, 336, 336])
torch.Size([576])
torch.Size([1, 593, 4096])


: 

In [41]:
modified_inputs['inputs_embeds'].shape


torch.Size([1, 593, 4096])

In [42]:
generated_ids = model.generate(**modified_inputs, max_new_tokens=128)
# generated_ids = [
#     output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, generated_ids)
# ]
out = processor.tokenizer.batch_decode(
    generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
response = out[0]


In [43]:
print(response)


The image displays two columns of numbers, likely representing financial data. The first column contains the numbers 1 through 10, while the second column has the numbers 11 through 15. The numbers in the second column are larger than those in the first column.

In addition to the numbers, there is a question mark located in the middle of the second column, possibly indicating a point of interest or a question related to the financial data.


## OLD CODE

In [None]:
generated_ids = [
    output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, generated_ids)
]
out = processor.tokenizer.batch_decode(
    generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
response = out[0]


In [None]:
print(response)


In [None]:
with torch.no_grad():
    output_ids = model.generate(
        **inputs,
        max_new_tokens=512,
        return_dict_in_generate=True,
        output_hidden_states=True,
        output_attentions=True,
        use_cache=True,
    )

print(output_ids)


In [None]:
with torch.no_grad():
    output_ids = model.generate(
        **inputs,
        max_new_tokens=512,
    )
print(output_ids)


In [None]:
# Decode and echo output
generated_ids = [
    output_ids[len(input_ids):]
    for input_ids, output_ids in zip(inputs.input_ids, output_ids.sequences)
]
output_text = processor.batch_decode(
    generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
)
_ = [print(output) for output in output_text]


In [None]:
from typing import Tuple, List
from torch import Tensor
import copy

vision_start_token_idx = inputs['input_ids'][0].tolist().index(model.config.vision_start_token_id)
vision_end_token_idx = inputs['input_ids'][0].tolist().index(model.config.vision_end_token_id)

output_attn: Tuple[Tuple[Tensor, ...], ...] = copy.deepcopy(output_ids.attentions)
# get the length of the prefilling and full attention
pref_len: int = output_attn[0][0].shape[3]
full_len: int = output_attn[-1][0].shape[3]
prefill_attn: Tuple[Tensor, ...] = output_attn[0]

# batchsize should be 1
assert prefill_attn[0].shape[0] == 1
full_attn = []
for l, layer in enumerate(prefill_attn):
    layer = layer.cpu().squeeze(0).float()
    layer = F.pad(layer, (0, full_len - pref_len, 0, full_len - pref_len))
    for i in range(full_len - pref_len):
        # print(i, )
        # cur_attn = output_attn[i][l].cpu().squeeze(0).float()
        cur_attn = output_attn[i + 1][l].cpu().squeeze(0)[:, 0, :].float()
        # print(cur_attn.shape)
        layer[:, pref_len + i, :pref_len + i + 1] = cur_attn
    full_attn.append(layer)
mean_attn = torch.stack(full_attn).mean(dim=(0, 1))

image_output_attn = torch.mean(mean_attn[pref_len:, vision_start_token_idx + 1:vision_end_token_idx], dim=0)

def calculate_dynamic_threshold(attn, percentile=98):
    hist = torch.histc(attn, bins=100)
    cdf = torch.cumsum(hist, dim=0)/torch.sum(hist)
    threshold = torch.argmax((cdf > percentile/100).float()).item()/100
    return threshold

threshold = calculate_dynamic_threshold(image_output_attn)
print(threshold)


In [None]:
def reweighted_vision_tokens(attn_map, keep_percentage=threshold):
    # Get the attention values sorted in descending order
    sorted_attention, sorted_indices = torch.sort(attn_map, descending=True)
    
    # Determine the number of tokens to keep
    num_tokens_to_keep = int(len(sorted_attention) * keep_percentage)
    
    # Create a weight mask where the top tokens have higher weight
    weight_vision_token = torch.zeros_like(attn_map, dtype=torch.float)
    
    # Assign weights for tokens (top tokens get higher weights, others get smaller weights)
    weight_vision_token[sorted_indices[:num_tokens_to_keep]] = 1.0
    weight_vision_token[sorted_indices[num_tokens_to_keep:]] = torch.linspace(0.6, 1.0, len(sorted_attention) - num_tokens_to_keep)

    return weight_vision_token
    
weight_vision_token = reweighted_vision_tokens(image_output_attn).to('cuda')


In [None]:
weight_vision_token


In [None]:
# weight_vision_token.size()
input_ids = inputs["input_ids"]
n_image_tokens = (input_ids == model.config.image_token_id).sum().item()
print(n_image_tokens)


In [None]:
print(inputs["input_ids"])


In [None]:
input_ids = inputs["input_ids"]
attention_mask = inputs["attention_mask"]
pixel_values = inputs["pixel_values"]
image_grid_thw = inputs["image_grid_thw"]

inputs_embeds = model.model.embed_tokens(input_ids)
if pixel_values is not None:
    pixel_values = pixel_values.type(model.visual.get_dtype())
    image_embeds = model.visual(pixel_values, grid_thw=image_grid_thw)
    n_image_tokens = (input_ids == model.config.image_token_id).sum().item()
    n_image_features = image_embeds.shape[0]
    if n_image_tokens != n_image_features:
        raise ValueError(
            f"Image features and image tokens do not match: tokens: {n_image_tokens}, features {n_image_features}"
        )
    image_mask = (
        (input_ids == model.config.image_token_id)
        .unsqueeze(-1)
        .expand_as(inputs_embeds)
        .to(inputs_embeds.device)
    )
    image_embeds = image_embeds.to(inputs_embeds.device, inputs_embeds.dtype)
    print(image_embeds)
    image_embeds *= weight_vision_token[:, None]
    print(image_embeds)
    inputs_embeds = inputs_embeds.masked_scatter(image_mask, image_embeds)

if attention_mask is not None:
    attention_mask = attention_mask.to(inputs_embeds.device)

print(image_embeds.shape)



In [None]:
generated_ids = model.generate(inputs_embeds=inputs_embeds, max_new_tokens=2048)


In [None]:
output_text = processor.batch_decode(
    generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
)
_ = [print(output) for output in output_text]
