In [1]:
import os
import cv2
import json
import random
from collections import defaultdict
import numpy as np

random.seed(0)

In [2]:
def get_video_duration(video_path):
    try:
        cap = cv2.VideoCapture(video_path)
    except TypeError:
        return None, None, None
    if not cap.isOpened():
        print("Error: Could not open video.")
        return None, None, None
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = frame_count / fps
    cap.release()
    return frame_count, duration, fps

In [3]:
dataset = "test"
data_dir = f"/root/project/videoqa/inputs/{dataset}_set"
gt_file = f"/root/project/videoqa/inputs/MedVidQA/{dataset}.json"
max_frame_num = 16384

data = os.listdir(data_dir)
data = list(filter(lambda x: x.endswith(".mp4"), data))
with open(gt_file, "r") as f:
    ann = json.load(f)

In [4]:
#---------------------------------------------------------------------------------------
# metadata
# --------------------------------------------------------------------------------------

miss_files = []

video_dict = {
    elem["video_id"]: {
        "video_name": elem["video_id"],
        "video_url": elem["video_url"],
    } for elem in ann
}
for video_id, video_info in video_dict.items():
    file_path = os.path.join(data_dir, video_id + ".mp4")
    if not os.path.isfile(file_path):
        miss_files.append(video_id)
        continue
    frame_count, duration, fps = get_video_duration(file_path)
    if frame_count is None:
        miss_files.append(video_id)
        continue
    video_info["total_frames"] = min(frame_count, max_frame_num)
    video_info["duration"] = min(duration, max_frame_num // fps)
    video_info["fps"] = fps
for elem in miss_files:
    video_dict.pop(elem)

In [5]:
# ----------------------------------------------------------------------------------
# annotation
# ----------------------------------------------------------------------------------
labels = defaultdict(dict)
for elem in ann:
    video_id = elem["video_id"]
    if video_id not in video_dict:
        continue
    max_duration = video_dict[video_id]["duration"]

    sample_id = elem["sample_id"]
    cap = elem["question"]
    cap = cap.replace("   ", " ")
    cap = cap.replace("  ", " ")

    z0 = list(map(lambda x: int(x), elem["answer_start"].split(":")))
    z0 = 60 * z0[0] + z0[1]
    z1 = list(map(lambda x: int(x), elem["answer_end"].split(":")))
    z1 = 60 * z1[0] + z1[1]
    z = [z0, z1]
    
    if (z[0] >= max_duration) and (z[1] >= max_duration):
        miss_files.append(video_id)
        continue
    z = np.clip(np.array(z), 0, max_duration).tolist()
    if abs(z[1] - z[0]) < 10:
        miss_files.append(video_id)
        continue

    ann_i = {
        sample_id: {
            "z": z,
            "cap": [cap],
        }
    }
    labels[video_id].update(ann_i)

labels = dict(labels)

In [6]:
for elem in miss_files:
    video_dict.pop(elem, None)
    labels.pop(elem, None)

In [7]:
output_json = {
    "metadata": video_dict,
    "annotations": labels,
}

with open(f"{dataset}_annotations.json", "w") as f:
    json.dump(output_json, f, indent=4)

In [8]:
num_event = [len(v) for k, v in output_json["annotations"].items()]
print(np.mean(num_event), np.std(num_event))

2.475 1.6581239398790428
