Purpose: This notebook produces the visualization for the tracklets and keypoints while aggregating all the predictions into a JSON file that organize the data frame by frame.

In [1]:
import json
from sparrow_datums import Boxes
import numpy as np
import imageio
from tqdm import tqdm
import cv2
import os
from pathlib import Path
import matplotlib.pyplot as plt
from typing import Optional
import io
import torch
import torchvision.transforms as T
from sparrow_datums import AugmentedBoxTracking, BoxTracking, FrameBoxes, PType

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from speed_trapv3.keypoints.model import SegmentationModel
from speed_trapv3.keypoints.config import Config as KeyConfig
from speed_trapv3.keypoints.dataset import crop_and_resize,process_keypoints, keypoints_post_inference_processing
from speed_trapv3.tracking.tracking import get_video_properties, transform_image, write_to_json



In [3]:
video_path = "/code/data/datasets/common_hall/source_video/25_resampled_vid.mp4"
video_save_path = '/code/data/datasets/common_hall/tracking_outputs/hard_coded_video.mp4'
gz_path = '/code/data/datasets/tracking/predictions/hard_coded/hard_coded_vehicle.json.gz'
json_save_path = '/code/data/datasets/common_hall/tracking_outputs/framewise_aggregation.json'

In [4]:
!rm /code/data/datasets/common_hall/tracking_outputs/framewise_aggregation.json
!rm /code/data/datasets/common_hall/tracking_outputs/hard_coded_video.mp4

rm: cannot remove '/code/data/datasets/common_hall/tracking_outputs/framewise_aggregation.json': No such file or directory


In [6]:
keypoint_model = SegmentationModel().eval().cuda()
keypoint_model.load(KeyConfig.trained_model_path)

<All keys matched successfully>

In [7]:
reader = imageio.get_reader(video_path)
fps = reader.get_meta_data()["fps"]
frame_border = True
class_label: bool = False
score_label: bool = False
object_label: bool = True
vehicle_chunk = AugmentedBoxTracking.from_box_tracking(
    BoxTracking.from_file(gz_path)
)
vehicle_tracklet_list = BoxTracking.from_file(gz_path).to_dict()['object_ids']
image_transform = T.Compose([T.ToTensor()])
with imageio.get_writer(
    video_save_path, mode="I", fps=fps, macro_block_size=None
) as writer:
    aggregated_predictions = [] #Len is equal to to the number of frames.
    frame_idx = 0
    for img, vehicle_boxes in tqdm(zip(reader, vehicle_chunk)):
        frame_log = {}
        frame_log['frame_idx'] = frame_idx
        frame_log['annotations'] = []
        boxes = vehicle_boxes  # vehicle_boxes is a len = 16 list where unavailable objects are nan.
        height, width = img.shape[:2]
        fig = plt.figure(frameon=False, figsize=(width / 100, height / 100), dpi=100)
        fig.add_axes((0, 0, 1, 1))
        plt.imshow(img)
        if frame_border:
            plt.plot([450, 1280, 1280, 450, 450], [200, 200, 720, 720, 200], lw=2, c='green')
        for i, box in enumerate(boxes.to_absolute()):
            object_log = {}
            if not np.isfinite(box.x):
                continue
            x1 = np.clip(box.x1, 2, width - 2)
            x2 = np.clip(box.x2, 2, width - 2)
            y1 = np.clip(box.y1, 2, height - 2)
            y2 = np.clip(box.y2, 2, height - 2)
            color: Optional[str] = None
            text_strings: list[str] = []
            if class_label:
                text_strings.append(f"class: {int(box.label)}")
                color = f"C{int(box.label)}"
            if score_label:
                text_strings.append(f"score: {box.score:.2f}")
            if object_label:
                text_strings.append(f"object_id: {i}")
                if color is None:
                    color = f"C{i}"
            if color is None:
                color = "C0"
            plt.text(
                x1 + 3,
                y1 - 8,
                ", ".join(text_strings),
                backgroundcolor=(1, 1, 1, 0.5),
                c="black",
                size=8,
            )
            plt.plot([x1, x2, x2, x1, x1], [y1, y1, y2, y2, y1], lw=2, c=color)
            roi_w = x2 - x1
            roi_h = y2 - y1
            roi_resized = crop_and_resize(box.to_relative().array[:4], img, KeyConfig.image_crop_size[0],KeyConfig.image_crop_size[1])
            roi_resized_w, roi_resized_h = roi_resized.size
            x = image_transform(roi_resized)
            x = torch.unsqueeze(x, 0).cuda()
            keypoints = keypoint_model(x)['keypoints'][0].detach().cpu().numpy()
            heatmaps = keypoint_model(x)['heatmaps'].detach().cpu()
            keypoints_scores = list(np.amax(torch.flatten(heatmaps, 2).numpy(), axis=-1)[0])
            keypoints = keypoints_post_inference_processing(
                    keypoints, roi_resized_w, roi_resized_h, roi_w, roi_h, x1, y1
                )
            object_log['keypoints'] = [list(keypoints[0]), list(keypoints[1])]
            object_log['keypoints_scores'] = [keypoints_scores[0].item(), keypoints_scores[1].item()]
            object_log['object_id'] = i
            object_log['object_tracklet_id'] = vehicle_tracklet_list[i]
            object_log['bounding_box'] = list(box.array[:4])
            frame_log["annotations"].append(object_log)
            plt.plot(keypoints[0][0], keypoints[0][1], marker='o', color="red")
            plt.plot(keypoints[1][0], keypoints[1][1], marker='o', color="blue")
        aggregated_predictions.append(frame_log)
        frame_idx += 1
        buffer = io.BytesIO()
        plt.savefig(buffer, format="png")
        plt.close()
        frame = imageio.v2.imread(buffer.getbuffer(), format="png")
        # Uncomment to write the frames into images.
        # im_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        # save_path = '/code/data/datasets/temp_imgs'
        # filename = str(frame_idx) + ".jpg"
        # cv2.imwrite(os.path.join(save_path, filename), im_rgb)
    
        writer.append_data(frame)
    out_file = open(json_save_path, "w")
    json.dump(aggregated_predictions, out_file)
    out_file.close()

599it [06:05,  1.64it/s]
