In [1]:
import argparse
import torch

from q_align.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN
from q_align.conversation import conv_templates, SeparatorStyle
from q_align.model.builder import load_pretrained_model
from q_align.mm_utils import process_images, tokenizer_image_token, get_model_name_from_path, KeywordsStoppingCriteria

from PIL import Image

import requests
from PIL import Image
from io import BytesIO
from transformers import TextStreamer

import json
from tqdm import tqdm
from collections import defaultdict

import os




def disable_torch_init():
    """
    Disable the redundant torch default initialization to accelerate model creation.
    """
    import torch
    setattr(torch.nn.Linear, "reset_parameters", lambda self: None)
    setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None)


def load_image(image_file):
    if image_file.startswith('http://') or image_file.startswith('https://'):
        response = requests.get(image_file)
        image = Image.open(BytesIO(response.content)).convert('RGB')
    else:
        image = Image.open(image_file).convert('RGB')
    return image


In [2]:
disable_torch_init()

model_name = get_model_name_from_path("q-future/one-align")
tokenizer, model, image_processor, context_len = load_pretrained_model("q-future/one-align", None, model_name, True, True, device="cuda:0")

Instantiating LlamaAttention without passing `layer_idx` is not recommended and will to errors during the forward call, if caching is used. Please make sure to provide a `layer_idx` when creating this class.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]



In [3]:
import cv2
from PIL import Image
import numpy as np

def expand2square(pil_img, background_color):
    width, height = pil_img.size
    if width == height:
        return pil_img
    elif width > height:
        result = Image.new(pil_img.mode, (width, width), background_color)
        result.paste(pil_img, (0, (width - height) // 2))
        return result
    else:
        result = Image.new(pil_img.mode, (height, height), background_color)
        result.paste(pil_img, ((height - width) // 2, 0))
        return result

def get_image_tensor(image):
    image = expand2square(image, tuple(int(x*255) for x in image_processor.image_mean))
    image_tensor = image_processor.preprocess(image, return_tensors='pt')['pixel_values'].half().to("cuda:0")
    return image_tensor

def extract_frames(video_path):
    # 打开视频文件
    cap = cv2.VideoCapture(video_path)
    
    # 获取视频的FPS（每秒帧数）
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    # 初始化一个计数器用于计算间隔
    count = 0
    up_tensors = []
    low_tensors = []
    # 读取视频直到结束
    while cap.isOpened():
        ret, frame = cap.read()
        
        # 如果正确读取帧，则ret为True
        if not ret:
            break
        
        # 每秒抽取一帧
        if count % fps == 0:
            # 将帧转换为PIL图像格式
            pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            
            # 计算图像的上半部分和下半部分
            width, height = pil_image.size
            upper_half = pil_image.crop((0, 0, width, height // 2))
            lower_half = pil_image.crop((0, height // 2, width, height))
            upper_half = get_image_tensor(upper_half)
            lower_half = get_image_tensor(lower_half)
            up_tensors.append(upper_half)
            low_tensors.append(lower_half)
            # 这里可以根据需要处理或显示图像的上半部分和下半部分
            # 例如，显示上半部分和下半部分
            # upper_half.show()
            # lower_half.show()
        
        count += 1
    
    # 释放和关闭视频文件
    cap.release()
    return up_tensors,low_tensors

# 调用函数



In [13]:
conv_mode = "mplug_owl2"   
inp = "How would you rate the quality of this image?"   
conv = conv_templates[conv_mode].copy()
inp =  inp + "\n" + DEFAULT_IMAGE_TOKEN
conv.append_message(conv.roles[0], inp)
image = None
conv.append_message(conv.roles[1], None)
prompt = conv.get_prompt() + " The quality of the image is"
toks = ["good", "poor", "high", "fair", "low", "excellent", "bad", "fine", "moderate",  "decent", "average", "medium", "acceptable"]
print(toks)
ids_ = [id_[1] for id_ in tokenizer(toks)["input_ids"]]
print(ids_)
input_ids = tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).to("cuda:0")

def get_hidden_states(video_path = '/workspace/val/val/0009.mp4', save_path = '/workspace/test.npy'):
    video_path = video_path  # 请替换为你的视频路径
    up_tensors,low_tensors = extract_frames(video_path)
    
    #请进入Q-Align/q_align/model/modeling_mplug_owl2.py将forward函数里的(347行)return CausalLMOutputWithPast里的logits修改为hidden_states传出
                
    with torch.inference_mode():
        up_hidden_states = model(input_ids.repeat(len(up_tensors), 1),
            images=torch.cat(up_tensors, 0))['logits']
    with torch.inference_mode():
        low_hidden_states = model(input_ids.repeat(len(low_tensors), 1),
            images=torch.cat(low_tensors, 0))['logits']
        
    stacked_features = torch.stack((torch.mean(up_hidden_states, dim=1, keepdim=False), torch.mean(low_hidden_states, dim=1, keepdim=False)), dim=1)
    print(stacked_features.shape)
    # 沿着堆叠的维度（即第0维）计算平均值，得到最终的平均特征张量，形状为[2, 4096]
    average_features = torch.mean(stacked_features, dim=1)
    
    average_features_cpu = average_features.cpu()
    
    # 将PyTorch张量转换为NumPy数组
    average_features_numpy = average_features_cpu.numpy()
    print(average_features_numpy.shape)
    # 存储NumPy数组到文件，这里使用.npy格式
    np.save(save_path, average_features_numpy)

#get_hidden_states()

['good', 'poor', 'high', 'fair', 'low', 'excellent', 'bad', 'fine', 'moderate', 'decent', 'average', 'medium', 'acceptable']
[1781, 6460, 1880, 6534, 4482, 15129, 4319, 2691, 17768, 27189, 6588, 18350, 22691]
torch.Size([2, 2, 4096])
(2, 4096)


In [None]:
import os 
path = '/workspace/val/val/'
videos = os.listdir(path)
for video in videos:
    video_path = os.path.join(path,video)
    save_path = '/workspace/qalign_features/val/' + video.split('.')[0] + '.npy'
    print(video_path)
    print(save_path)
    get_hidden_states(video_path,save_path)

path = '/workspace/train/train/'
videos = os.listdir(path)
for video in videos:
    video_path = os.path.join(path,video)
    save_path = '/workspace/qalign_features/train/' + video.split('.')[0] + '.npy'
    print(video_path)
    print(save_path)
    get_hidden_states(video_path,save_path)

/workspace/val/val/0420.mp4
/workspace/qalign_features/val/0420.npy
torch.Size([9, 2, 4096])
(9, 4096)
/workspace/val/val/0419.mp4
/workspace/qalign_features/val/0419.npy
torch.Size([9, 2, 4096])
(9, 4096)
/workspace/val/val/0418.mp4
/workspace/qalign_features/val/0418.npy
torch.Size([9, 2, 4096])
(9, 4096)
/workspace/val/val/0417.mp4
/workspace/qalign_features/val/0417.npy
torch.Size([9, 2, 4096])
(9, 4096)
/workspace/val/val/0416.mp4
/workspace/qalign_features/val/0416.npy
torch.Size([9, 2, 4096])
(9, 4096)
/workspace/val/val/0415.mp4
/workspace/qalign_features/val/0415.npy
torch.Size([9, 2, 4096])
(9, 4096)
/workspace/val/val/0414.mp4
/workspace/qalign_features/val/0414.npy
torch.Size([1, 2, 4096])
(1, 4096)
/workspace/val/val/0413.mp4
/workspace/qalign_features/val/0413.npy
torch.Size([1, 2, 4096])
(1, 4096)
/workspace/val/val/0412.mp4
/workspace/qalign_features/val/0412.npy
torch.Size([1, 2, 4096])
(1, 4096)
/workspace/val/val/0411.mp4
/workspace/qalign_features/val/0411.npy
torch