In [1]:
import torch
from qwen_vl_utils import process_vision_info
from transformers import AutoProcessor, AutoTokenizer, Qwen2VLForConditionalGeneration

# default: Load the model on the available device(s)
model = Qwen2VLForConditionalGeneration.from_pretrained(
    "Qwen/Qwen2-VL-7B-Instruct", torch_dtype="auto", device_map="auto"
)

# We recommend enabling flash_attention_2 for better acceleration and memory saving, especially in multi-image and video scenarios.
# model = Qwen2VLForConditionalGeneration.from_pretrained(
#     "Qwen/Qwen2-VL-7B-Instruct",
#     torch_dtype=torch.bfloat16,
#     attn_implementation="flash_attention_2",
#     device_map="auto",
# )

# default processer
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-7B-Instruct")


  from .autonotebook import tqdm as notebook_tqdm
`Qwen2VLRotaryEmbedding` can now be fully parameterized by passing the model config through the `config` argument. All other arguments will be removed in v4.46
Loading checkpoint shards: 100%|██████████| 5/5 [00:03<00:00,  1.42it/s]


In [2]:
prompt = """
### Prompt
"Analyze three consecutive 128x64 resolution in-vehicle camera images (ordered by time) and provide essential driving conditions in JSON format. Based solely on the actual content of these images, describe the driving situation focusing on features that directly influence vehicle trajectory prediction.

Your response should:
1. Reflect the actual scene in the images, not repeat the example
2. Consider the temporal changes across the three frames
3. Strictly use only the specified options for each field
4. Output in JSON format only

### JSON Output Format
{
  "road_type": string,  // Options: [
                        //   "highway",
                        //   "urban_arterial",
                        //   "intersection",
                        //   "merging_zone",
                        //   "curve_section"
                        // ]

  "road_geometry": {
    "curvature": string,  // Options: ["straight", "curve"]
    "slope": string,      // Options: ["uphill", "downhill", "flat"]
    "curve_direction": string,  // Options: ["left", "right", "none"]
    "curve_sharpness": string  // Options: ["gentle", "sharp", "none"]
  },

  "lane_count": number,  // Options: [1, 2, 3, "multiple"]

  "front_vehicle": {
    "present": boolean,  // Options: [true, false]
    "distance": string,  // Options: ["close", "medium", "far"]
    "type": string,      // Options: ["large", "normal", "none"]
    "relative_motion": {
      "speed_difference": string,  // Options: ["approaching", "maintaining", "separating"]
      "lateral_position": string,  // Options: ["center", "left_side", "right_side"]
      "trajectory": string         // Options: ["stable", "changing_left", "changing_right"]
    }
  },

  "visibility": string,  // Options: ["clear", "dark", "poor"]
  "traffic_flow": string  // Options: ["stopped", "moving", "fast"]
}

Return only the JSON output based on the actual scene in the provided sequence of images."
"""


In [3]:
# Messages containing multiple images and a text query

id_name = "ff8ac16fafde4aebe8e9632a0d382ef1_320"

messages = [
    {
        "role": "user",
        "content": [
            {
                "type": "image",
                "image": f"../../data/input/atma18/images/{id_name}/image_t-1.0.png",
            },
            {
                "type": "image",
                "image": f"../../data/input/atma18/images/{id_name}/image_t-0.5.png",
            },
            {
                "type": "image",
                "image": f"../../data/input/atma18/images/{id_name}/image_t.png",
            },
            {"type": "text", "text": prompt},
        ],
    }
]


In [4]:
# Preparation for inference
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
    text=[text],
    images=image_inputs,
    videos=video_inputs,
    padding=True,
    return_tensors="pt",
)
inputs = inputs.to("cuda")


In [5]:
# Inference
generated_ids = model.generate(**inputs, max_new_tokens=256)
generated_ids_trimmed = [
    out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids, strict=False)
]
output_text = processor.batch_decode(
    generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
print(output_text)




['```json\n{\n  "road_type": "highway",\n  "road_geometry": {\n    "curvature": "straight",\n    "slope": "flat",\n    "curve_direction": "none",\n    "curve_sharpness": "none"\n  },\n  "lane_count": 2,\n  "front_vehicle": {\n    "present": true,\n    "distance": "close",\n    "type": "normal",\n    "relative_motion": {\n      "speed_difference": "approaching",\n      "lateral_position": "center",\n      "trajectory": "stable"\n    }\n  },\n  "visibility": "clear",\n  "traffic_flow": "moving"\n}\n```']


In [6]:
from pprint import pprint

pprint(output_text[0])


('```json\n'
 '{\n'
 '  "road_type": "highway",\n'
 '  "road_geometry": {\n'
 '    "curvature": "straight",\n'
 '    "slope": "flat",\n'
 '    "curve_direction": "none",\n'
 '    "curve_sharpness": "none"\n'
 '  },\n'
 '  "lane_count": 2,\n'
 '  "front_vehicle": {\n'
 '    "present": true,\n'
 '    "distance": "close",\n'
 '    "type": "normal",\n'
 '    "relative_motion": {\n'
 '      "speed_difference": "approaching",\n'
 '      "lateral_position": "center",\n'
 '      "trajectory": "stable"\n'
 '    }\n'
 '  },\n'
 '  "visibility": "clear",\n'
 '  "traffic_flow": "moving"\n'
 '}\n'
 '```')
