In [1]:
json_path = "data/annotations/caption/train/20230707_8_SN46_T1/overhead_view/20230707_8_SN46_T1_caption.json"
video_root = "data/videos"

In [2]:
import cv2
from PIL import Image
import torch
from transformers import AutoProcessor, LlavaForConditionalGeneration,AutoModelForCausalLM 
import os
import json


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
#Videollama3
device = "cuda:0"
model_path = "DAMO-NLP-SG/VideoLLaMA3-2B"
model = AutoModelForCausalLM.from_pretrained(
    model_path,
    trust_remote_code=True,
    device_map={"": device},
    torch_dtype=torch.bfloat16,
    attn_implementation="flash_attention_2",
)
processor = AutoProcessor.from_pretrained(model_path, trust_remote_code=True)
tokenizer = AutoProcessor.from_pretrained(model_path, trust_remote_code=True)

In [5]:
# print(torch.cuda.is_available())
# print(torch.version.cuda)
# print(torch.cuda.device_count())
# if torch.cuda.is_available():
#     print(torch.cuda.get_device_name(0))

In [6]:
# Llava
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model_id = "llava-hf/llava-1.5-13b-hf"
# processor = AutoProcessor.from_pretrained(model_id, use_fast=True)
# tokenizer = AutoProcessor.from_pretrained(model_id, use_fast=True)
# model = LlavaForConditionalGeneration.from_pretrained(
#     model_id, torch_dtype=torch.float16, low_cpu_mem_usage=True
# ).to("cuda" if torch.cuda.is_available() else "cpu")

In [44]:
# def generate_caption(frames, prompt):
#     captions = []
#     for frame in frames:
#         inputs = processor(images=frame, text=prompt, return_tensors="pt").to(model.device)
#         inputs = {k: v.to(torch.bfloat16) if v.dtype == torch.float32 else v for k, v in inputs.items()}
#         with torch.no_grad():
#             outputs = model.generate(**inputs, max_new_tokens=150)
#         caption = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
#         captions.append(caption)
        
#      return " ".join(captions)

def generate_caption_batch(frames, prompt):
    # Process all frames together with the prompt
    inputs = processor(images=frames, text=prompt, return_tensors="pt").to(model.device)
    
    # Convert float tensors to bfloat16 if needed to match model dtype
    inputs = {
        k: (v.to(torch.bfloat16) if isinstance(v, torch.Tensor) and v.dtype == torch.float32 else v.to(model.device) if isinstance(v, torch.Tensor) else v)
        for k, v in inputs.items()
    }
    
    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=150)
        
    caption = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    return caption
   

In [7]:
def extract_middle_frame(video_path, start_time, end_time):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"[ERROR] Cannot open video: {video_path}")
        return None
    
    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps == 0:
        cap.release()
        return None
    
    middle_time = (start_time + end_time) / 2.0
    frame_number = int(middle_time * fps)
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
    ret, frame = cap.read()
    cap.release()

    if not ret:
        print(f"[ERROR] Failed to read frame at {frame_number} in video: {video_path}")
        return None

    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(frame_rgb)
    return pil_image

In [4]:
# def extract_frames(video_paths, start_time = 0.0, end_time=None, interval=1.0):
#     all_frames = []
#     for video_path in video_paths:
#         cap = cv2.VideoCapture(str(video_path))
#         if not cap.isOpened():
#             print(f"Cannot open video: {video_path}")
#             continue
        
#         fps = cap.get(cv2.CAP_PROP_FPS)
#         if fps == 0:
#             cap.release()
#             continue
        
#         total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
#         video_duration = total_frames/fps
        
#         if end_time is None or end_time > video_duration:
#             end_time = video_duration
        
#         frame_interval = int(fps*interval)
#         start_frame = int(start_time * fps)
#         end_frame = int(end_time * fps)

#         for frame_num in range(start_frame, end_frame, frame_interval):
#             cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
#             success, frame = cap.read()
#             if not success:
#                 continue
#             frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#             pil_image = Image.fromarray(frame_rgb)
#             all_frames.append(pil_image)
#         cap.release()
#     return all_frames

def extract_frames(video_paths, start_time=0.0, end_time=None, interval=1.0):
    all_frames = []
    for video_path in video_paths:
        cap = cv2.VideoCapture(str(video_path))
        if not cap.isOpened():
            print(f"Cannot open video: {video_path}")
            continue
        
        fps = cap.get(cv2.CAP_PROP_FPS)
        if fps == 0:
            cap.release()
            continue
        
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        video_duration = total_frames / fps
        
        if end_time is None or end_time > video_duration:
            end_time = video_duration
        
        frame_interval = int(fps * interval)
        start_frame = int(start_time * fps)
        end_frame = int(end_time * fps)

        for frame_num in range(start_frame, end_frame, frame_interval):
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
            success, frame = cap.read()
            if not success:
                continue
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(frame_rgb)

            # Append a dict with metadata + image
            all_frames.append({
                "video_path": video_path,
                "fps": fps,
                "frame_num": frame_num,
                "image": pil_image
            })
        cap.release()
    return all_frames

In [5]:
def find_video_path(video_root, video_file):
    for subfolder in ["train", "val"]:
        folder_path = os.path.join(video_root, subfolder)
        for root, _, files in os.walk(folder_path):
            if video_file in files:
                return os.path.join(root, video_file)
    return None

In [6]:

def find_video(video_root: str, video_file: str):
    for dirpath, _, files in os.walk(video_root):
        if video_file in files:
            return os.path.join(dirpath, video_file)
    return None

In [49]:
# def run_on_json(json_path: str, video_root: str):
#     with open(json_path, 'r', encoding='utf-8') as f:
#         data = json.load(f)

#     video_files = data.get("overhead_videos", [])
#     events = data.get("event_phase", [])

#     if not video_files or not events:
#         print(f"[SKIP] Invalid video/event alignment in {json_path}")
#         return

#     print(f"[INFO] Processing: {os.path.basename(json_path)}")

#     for event in events:
#         start = float(event["start_time"])
#         end = float(event["end_time"])
#         labels = event.get("labels", [])

#         all_frames = []
#         for video_file in video_files:
#             video_paths = [find_video(video_root, vf) for vf in video_files]
#             frames = extract_frames(video_paths, start, end)
#             if frames:
#                 all_frames.extend(frames)

#         if not all_frames:
#             continue

#         caption_ped = generate_caption(
#             all_frames,
#             "<image>  Describe the crash victim in detail: age, gender, clothing, posture, and behavior. Were they distracted or alert? Were they in a legal crossing area? Mention their awareness of the vehicle."

#         )
#         caption_veh = generate_caption(
#             all_frames,
#             "<image>  Describe the vehicle involved in the accident with the crash victim. Was it accelerating, braking, or turning? Describe its position relative to the pedestrian and crosswalk. Did it yield? Was it following traffic rules?"
#         )
#         print(f"\nEvent Labels: {labels}")
#         print(f"Pedestrian: {caption_ped}")
#         print(f"Vehicle: {caption_veh}")

def run_on_json(json_path: str, video_root: str):
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    video_files = data.get("overhead_videos", [])
    events = data.get("event_phase", [])

    if not video_files or not events:
        print(f"[SKIP] Invalid video/event alignment in {json_path}")
        return

    print(f"[INFO] Processing: {os.path.basename(json_path)}")

    video_paths = [find_video(video_root, vf) for vf in video_files]


    for event in events:
        start = float(event["start_time"])
        end = float(event["end_time"])
        labels = event.get("labels", [])

        all_frames = []
        for video_path in video_paths:
            frames = extract_frames([video_path], start, end)
            if frames:
                all_frames.extend(frames)

        if not all_frames:
            continue
        
        pedestrian_prompt = ( "<image>"
        " Describe the crash victim: age, gender, clothing, posture, "
        "behavior, alertness, and crossing legality."
        )
        vehicle_prompt = ( "<image>"
        " Describe the vehicle involved: movement, position relative to pedestrian, "
        "and compliance with traffic rules."
        )

        caption_ped = generate_caption_batch(all_frames, pedestrian_prompt)
        caption_veh = generate_caption_batch(all_frames, vehicle_prompt)

        print(f"\nEvent Labels: {labels}")
        print(f"Pedestrian: {caption_ped}")
        print(f"Vehicle: {caption_veh}")

In [7]:
def generate_captions(json_path, video_root):
    with open(json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    video_files = data.get("overhead_videos", [])
    events = data.get("event_phase", [])

    if not video_files or not events:
        print(f"[SKIP] Invalid video/event alignment in {json_path}")
        return

    print(f"[INFO] Processing: {os.path.basename(json_path)}")

    video_paths = [find_video(video_root, vf) for vf in video_files]

    for event in events:
        start = float(event["start_time"])
        end = float(event["end_time"])
        labels = event.get("labels", [])

        all_frames = []
        for video_path in video_paths:
            frames = extract_frames([video_path], start, end)
            if frames:
                all_frames.extend(frames)

        if not all_frames:
            continue

        all_responses = []

        for i, segment in enumerate(all_frames):
            video_path = segment["video_path"]
            fps = segment.get("fps", 30)  # default fps
            max_frames = segment.get("max_frames", int(fps * (end - start)))  # better default max_frames

            conversation = [
                {
                    "role": "system",
                    "content": """
                    You are an AI assistant analyzing traffic video segments. 
                    For each video segment, generate two captions:
                    1. Describe pedestrians' behavior, positions, and interactions.
                    2. Describe vehicles' movements, positions, and interactions.
                    Provide clear, concise captions focusing on relevant traffic events, including accidents or normal traffic flow.
                    Use relative positions and timing if applicable.
                    """
                },
                {
                    "role": "user",
                    "content": [
                        {"type": "video", "video": {"video_path": video_path, "fps": fps, "max_frames": max_frames}},
                        {"type": "text", "text": """
                        For this video segment, provide two captions:
                        Pedestrian Caption: [Describe all pedestrian activity]
                        Vehicle Caption: [Describe all vehicle activity]
                        Include mentions of any accidents or noteworthy behavior.
                        """}
                    ]
                },
            ]

            inputs = processor(
                conversation=conversation,
                add_system_prompt=True,
                add_generation_prompt=True,
                return_tensors="pt"
            )
            inputs = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in inputs.items()}
            if "pixel_values" in inputs:
                inputs["pixel_values"] = inputs["pixel_values"].to(torch.bfloat16)

            output_ids = model.generate(**inputs, max_new_tokens=512)
            response = processor.batch_decode(output_ids, skip_special_tokens=True)[0].strip()

            pedestrian_caption = ""
            vehicle_caption = ""

            # Parse the response
            for line in response.splitlines():
                if line.lower().startswith("pedestrian caption:"):
                    pedestrian_caption = line[len("pedestrian caption:"):].strip()
                elif line.lower().startswith("vehicle caption:"):
                    vehicle_caption = line[len("vehicle caption:"):].strip()

            if not pedestrian_caption or not vehicle_caption:
                parts = response.split('\n\n')
                if len(parts) >= 2:
                    pedestrian_caption = parts[0].strip()
                    vehicle_caption = parts[1].strip()

            all_responses.append({
                "segment_index": i,
                "video_path": video_path,
                "pedestrian_caption": pedestrian_caption,
                "vehicle_caption": vehicle_caption,
                "raw_response": response,
            })

        # You can choose to return all_responses here or accumulate them elsewhere
        return all_responses



In [8]:
generate_captions(json_path, video_root)

[INFO] Processing: 20230707_8_SN46_T1_caption.json


[{'segment_index': 0,
  'video_path': 'data/videos/train/20230707_8_SN46_T1/overhead_view/20230707_8_SN46_T1_Camera1_0.mp4',
  'pedestrian_caption': 'A pedestrian is crossing the street in front of a car. The pedestrian and driver are both moving towards each other.',
  'vehicle_caption': 'A black vehicle approaches from behind the traffic light, followed by a silver convertible driving on the road.',
  'raw_response': 'Pedestrian Caption: A pedestrian is crossing the street in front of a car. The pedestrian and driver are both moving towards each other.\nVehicle Caption: A black vehicle approaches from behind the traffic light, followed by a silver convertible driving on the road.'},
 {'segment_index': 1,
  'video_path': 'data/videos/train/20230707_8_SN46_T1/overhead_view/20230707_8_SN46_T1_Camera1_0.mp4',
  'pedestrian_caption': 'A man in a yellow shirt and black pants walks across the crosswalk. Another person, wearing a dark outfit, is seen walking towards the traffic light pole.',

In [115]:
#Llava
run_on_json(json_path, video_root)

[INFO] Processing: 20230707_8_SN46_T1_caption.json

Event Labels: ['4']
Pedestrian: Describe the crash victim in detail: age, gender, clothing, posture, and behavior. Were they distracted or alert? Were they in a legal crossing area? Mention their awareness of the vehicle.

The crash victim is a person wearing a white shirt. They are standing in the middle of the street, possibly waiting to cross the road. They are not in a legal crossing area, and their posture suggests that they are alert and aware of their surroundings. It is not clear if they were distracted or not, but their presence in the middle of the street indicates that they were not following traffic rules and could be at risk of being hit by a vehicle.
Vehicle: Describe the vehicle involved in the accident with the crash victim. Was it accelerating, braking, or turning? Describe its position relative to the pedestrian and crosswalk. Did it yield? Was it following traffic rules?

The vehicle involved in the accident was a b

In [50]:
#VideoLlama3-2b
run_on_json(json_path, video_root)

[INFO] Processing: 20230707_8_SN46_T1_caption.json


AssertionError: Number of images does not match the number of image tokens in the text.

In [39]:
#Beginning the vqa
json_path = "data/annotations/vqa/train/20230707_8_SN46_T1"
video_root = "data/videos"

In [11]:
def find_vqa_type(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    if isinstance(data, list) and all('environment' in item for item in data if isinstance(item, dict)):
        return "environment"
    
    if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
        item = data[0]
        if "event_phase" in item:
            if "overhead_videos" in item:
                return "overhead_view"
            elif "vehicle_view" in item:
                return "vehicle_view"

In [124]:
json_path2 = 'data/annotations/vqa/train/20230707_8_SN46_T1/overhead_view/20230707_8_SN46_T1.json'
test = find_vqa_type(json_path2)
print("Detected the question type: ", test)

Detected the question type:  overhead_view


In [21]:
def generate_answer(model, image_pil, question, choices, tokenizer, processor):
    prompt = "<image>\n" + question + "\nChoices:n"
    for key, val in choices.items():
        prompt += f"{key}: {val}\n"
    prompt += "Answer with the letter of the correct choice."
    
    inputs = processor(text=prompt, images=image_pil, return_tensors="pt")
    inputs = {k: v.to(model.device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=10)
        answer_text = tokenizer.decode(outputs[0], skip_special_tokens=True).lower().strip()
    
    for letter in ['a', 'b', 'c', 'd']:
        if letter in answer_text:
            return letter
    return None    

In [131]:
def final_answer(model, frames, question, choices, processor, tokenizer):
    votes = {}
    
    for frame in frames:
        answer = generate_answer(model,frame, question, choices, tokenizer, processor)
        if answer:
            votes[answer] = votes.get(answer, 0) + 1
            
    if not votes:
        return None

    return max(votes.items(), key=lambda x: x[1])[0]

In [142]:
def run_vqa_on_scenario(base_dir, video_root, model):
    all_results = []
    
    for root, dirs, files in os.walk(base_dir):
        if any(p in root for p in ['environment', 'overhead_view', 'vehicle_view']):
            for file in files:
                if file.endswith(".json"):
                    perspective = os.path.basename(root)
                    scenario_id = file.replace(".json", "")
                    full_path = os.path.join(root, file)
                    
                    print(f"Processing {perspective}")
                    
                    with open(full_path, 'r') as f:
                        json_data = json.load(f)
                    
                    if perspective == "environment":
                        result = evaluate_environment(json_data, video_root, model)
                    elif perspective == "overhead_view":
                        result = evaluate_overhead(json_data, video_root, model)
                    elif perspective == "vehicle_view":
                        result = evaluate_vehicle(json_data, video_root, model)
                    
                    all_results.append({
                        "scenario": scenario_id,
                        "perspective": perspective,
                        "results": result
                        })   
    return all_results 

In [130]:
def evaluate_vehicle(json_data, video_root, model):
    results = []
    video_filename = json_data[0].get("vehicle_view")
    video_path = find_video(video_root, video_filename)
    
    for event in json_data[0].get("event_phase", []):
        start = float(event["start_time"])
        end = float(event["end_time"])
        for q in event.get("conversations", []):
            question_text = q.get("question")
            choices = {k: q[k] for k in ['a', 'b', 'c', 'd'] if k in q}
            correct = q.get("correct")
            
            frames = extract_frames([video_path], start, end)
            answer = final_answer(model, frames, question_text, choices, processor, tokenizer)
            is_correct = (answer == correct)
            
            results.append({
                "perspective": "vehicle_view",
                "question": question_text,
                "choices": choices,
                "correct": correct,
                "model_answer": answer,
                "is_correct": is_correct,
            })
    return results
    

In [129]:
def evaluate_overhead(json_data, video_root, model):
    results = []
    overhead_videos = json_data[0].get("overhead_videos", [])
    
    video_dirname = os.path.basename(json_path).replace(".json", "")
    video_prefix = video_dirname.split("_")[0:4]
    video_folder = "_".join(video_prefix)
    video_dir = os.path.join(video_root, "train", video_folder, "overhead_view")
    videos_path = [os.path.join(video_dir, vid) for vid in overhead_videos]
    
    event_phases = json_data[0].get("event_phase", [])
    for phase in event_phases:
        start = float(phase.get("start_time", 0))
        end = float(phase.get("end_time", 0))
        for conv in phase.get("conversations", []):
            question_text = conv.get("question")
            correct = conv.get("correct")
            choices = {k: conv[k] for k in ['a', 'b', 'c', 'd'] if k in conv}
            
            frames = extract_frames(videos_path, start, end)
            answer = final_answer(model, frames, question_text, choices, processor, tokenizer)
            is_correct = (answer == correct)
            
            results.append({
                "perspective": "overhead",
                "question": question_text,
                "choices": choices,
                "correct": correct,
                "model_answer": answer,
                "is_correct": is_correct,
            })
    return results

In [128]:
def evaluate_environment(json_data, video_root, model):
    results = []
    overhead_videos = json_data[0].get("overhead_videos", [])
    
    video_dirname = os.path.basename(json_path).replace(".json", "")
    video_prefix = video_dirname.split("_")[0:4]
    video_folder = "_".join(video_prefix)
    video_dir = os.path.join(video_root, "train", video_folder, "overhead_view")
    videos_path = [os.path.join(video_dir, vid) for vid in overhead_videos]
    
    questions = json_data[0].get("environment", [])
    for q in questions:
        question_text = q.get("question")
        correct = q.get("correct")
        choices = {k: q[k] for k in ['a', 'b', 'c', 'd'] if k in q}
    
        frames = extract_frames(videos_path)
        answer = final_answer(model, frames, question_text, choices, processor, tokenizer)
        is_correct = (answer == correct)
        
        results.append({
            "perspective": "environment",
            "question": question_text,
            "choices": choices,
            "correct": correct,
            "model_answer": answer,
            "is_correct": is_correct,
        })
    return results
    

In [143]:
base_dir = "data/annotations/vqa/train/20230707_8_SN46_T1"
video_root = "data/videos"

results = run_vqa_on_scenario(base_dir, video_root, model)

Processing overhead_view
Processing vehicle_view
Processing environment


In [144]:
def save_results_to_json(results, output_path):
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    with open(output_path, 'w') as f:
        json.dump(results, f, indent=4)
    print(f"[INFO] Results saved to: {output_path}")

In [145]:
output_json = "outputs/full_vqa_results_test.json"
save_results_to_json(results, output_json)

[INFO] Results saved to: outputs/full_vqa_results_test.json
