In [None]:
'''imports'''
# setup detectron2 logger
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

# stblib
import os
from collections import OrderedDict
from pprint import pprint

# thirdparty
import numpy as np
import cv2
from matplotlib import pyplot as plt
from tqdm import tqdm

# detectron2 utils
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog

In [None]:
'''directory setup'''
tmp_dir = os.path.join('tmp', 'video_annotation')
os.makedirs(tmp_dir, exist_ok=True)
print(tmp_dir)

In [None]:
'''load video'''
# USER INPUT
video_name = 'highway-4k.mp4'

# load video
video_input = os.path.join(tmp_dir, video_name)
video = cv2.VideoCapture(video_input)
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
frames_per_second = video.get(cv2.CAP_PROP_FPS)
num_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
print(video_input)
print("%s, #frames = %d, fps = %.01f, w = %d, h = %d" % (video, num_frames, frames_per_second, width, height))

In [None]:
'''frames generator for video processing'''
def _video_frame_generator(cv2_video_capture):
    while video.isOpened():
        success, frame = cv2_video_capture.read()
        if success:
            yield frame
        else:
            break

In [None]:
'''set up trained detectron model'''
# USER INPUT
hw_map = 'cpu'  # map process to 'cpu' or 'gpu'

cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("Cityscapes/mask_rcnn_R_50_FPN.yaml"))
# cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5  # overwrite detection threshold for this model
cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("Cityscapes/mask_rcnn_R_50_FPN.yaml")
cfg.MODEL.DEVICE = hw_map
predictor = DefaultPredictor(cfg)
# inspect metadata
print(MetadataCatalog.get(cfg.DATASETS.TRAIN[0]))

In [None]:
'''loop over video and save detections'''
# USER INPUT
max_frames = 10000

metadata = MetadataCatalog.get(cfg.DATASETS.TRAIN[0])
instance_classes = metadata.get('thing_classes', None)

video_annotation = OrderedDict()

for frame_id, frame in tqdm(enumerate(_video_frame_generator(video))):
    if frame_id + 1 > max_frames:
        break
    # get ouput lists
    outputs = predictor(frame)
    outputs_cpu = outputs["instances"].to("cpu")
    outputs_zipped = zip(outputs_cpu.pred_classes, outputs_cpu.pred_masks, outputs_cpu.pred_boxes, outputs_cpu.scores)
    # register frame annotation
    frame_path = 'frame_%04d' % frame_id
    video_annotation[frame_path] = OrderedDict()
    frame_dict = video_annotation[frame_path]
    for instance_id, (class_id, mask, bbox, score) in enumerate(outputs_zipped):
        instance_path = 'instace_%02d' % instance_id
        frame_dict[instance_path] = OrderedDict()
        frame_dict[instance_path]['class_id'] = class_id.item()
        frame_dict[instance_path]['class_name'] = instance_classes[class_id]
    #     frame_dict[instance_path]['mask'] = mask.numpy()
        frame_dict[instance_path]['bbox'] = bbox.numpy()
        frame_dict[instance_path]['score'] = score.item()

In [None]:
'''
save frame annotation as npz

data structure:
{
    frame_<id> : {
        instance_<id> : {
            class_id : <id>,
            class_name : <name>,
            # mask : <mask>,
            bbox : <coords>,
            score : <score>,
        },
        ...
    },
    ...
}
'''
video_label_file = os.path.join(tmp_dir, '%s.npz' % video_name)
np.savez(video_label_file, **video_annotation)

In [None]:
'''load data again to verify correctness'''
video_labels = OrderedDict(np.load(video_label_file, allow_pickle=True))
pprint(video_labels)