In [1]:
# Allows import from parent directory.
import sys; sys.path.append('..')

import os
from typing import Dict, List, Tuple

import cv2
import numpy as np
import plotly.express as px
import src.visualization_utils as viz
from src.approaches.fixed_angle import create_predictions
from src.loaders import DetectionsLoader, FrameLoader
from src.models import Detection, DirectionLine, Vehicle
from src.perspective import Perspective
from src.providentia_utils import parse_perspective
from tqdm import tqdm

In [2]:
class SimplePredictor:
    """Simple interface for creating predictions with default parameters"""

    def __init__(self, thresholds_override: float = {}):
        self.thresholds = {'score': 0.5, 'mask_width': 25, 'edge_margin': 10}
        self.thresholds.update(thresholds_override)
        self.vertical_contour_shift = 3
        self.label_mapping = {2: 'CAR', 5: 'BUS', 7: 'TRUCK'}
        self.dimension_values_mapping = {
            'VAN': [[4.09, 5.80, 7.45], [1.66, 2.06, 2.51], [1.81, 2.18, 2.82]],
            'CAR': [[3.32, 4.59, 4.87], [1.56, 1.94, 2.16], [1.22, 1.45, 1.81]],
            'TRUCK': [[2.57, 9.30, 16.71], [2.00, 2.51, 2.92], [2.82, 3.60, 4.21]],
            'BUS': [[10.29, 13.26, 17.59], [2.49, 2.59, 2.80], [2.82, 3.33, 3.93]],
        }
        self.direction_line = DirectionLine(np.array([[0, 1]]).T, 0)

    def create_predictions(
        self, detections: List[Detection], perspective: Perspective
    ) -> List[Vehicle]:

        predictions = create_predictions(
            detections,
            perspective,
            self.direction_line,
            self.thresholds,
            self.vertical_contour_shift,
            self.label_mapping,
            self.dimension_values_mapping,
        )

        return predictions


def create_legend_overlay(
    color_mapping: Dict, image_shape: Tuple[int, int], item_width: int = 150
) -> np.ndarray:
    """Create overaly for color mapping legend in top left corner."""

    overlay = np.zeros((*image_shape, 3))

    for i, (category, color) in enumerate(color_mapping.items()):
        color = np.array(px.colors.validate_colors(color)[0]) * 255

        upper_left = i * item_width, 0
        lower_right = (i + 1) * item_width, 50

        cv2.rectangle(overlay, upper_left, lower_right, color, -1)
        cv2.putText(
            overlay,
            category,
            (upper_left[0] + 10, 37),
            cv2.FONT_HERSHEY_SIMPLEX,
            1.25,
            (255, 255, 255),
            3,
        )

    return overlay


def create_bbox_frames(
    frames_dir: str,
    detections_dir: str,
    perspective_path: str,
    new_frames_dir: str,
    thresholds_override: float = {},
    box_thickness: float = 4,
):
    """Create frames with overalayed predicted bounding boxes."""

    predictor = SimplePredictor(thresholds_override)

    color_mapping = dict(
        zip(['CAR', 'VAN', 'TRUCK', 'BUS'], px.colors.qualitative.Plotly)
    )

    frame_loader = FrameLoader(frames_dir)
    detections_loader = DetectionsLoader(detections_dir)
    perspective = parse_perspective(perspective_path)

    os.makedirs(new_frames_dir, exist_ok=True)

    legend_overlay = create_legend_overlay(color_mapping, perspective.image_shape)

    for i, (frame, detections) in enumerate(
        tqdm(
            zip(frame_loader.load_items(), detections_loader.load_items()),
            total=len(frame_loader.paths),
        )
    ):
        predictions = predictor.create_predictions(detections, perspective)

        colors = [color_mapping[v.category] for v in predictions]

        img = viz.overlay_3d_vehicles(
            frame,
            predictions,
            perspective,
            box_thickness=box_thickness,
            box_colors=colors,
        )

        # Overlay color mapping legend.
        img[legend_overlay != 0] = legend_overlay[legend_overlay != 0]

        path = os.path.join(new_frames_dir, os.path.basename(frame_loader.paths[i]))
        cv2.imwrite(path, img[..., ::-1])


In [3]:
def create_mask_frames(
    frames_dir: str,
    detections_dir: str,
    new_frames_dir: str,
    score_threshold: float = 0.5,
    color: Tuple = (0, 0, 1),
):
    """Create frames with overlayed masks from detections."""
    frame_loader = FrameLoader(frames_dir)
    detections_loader = DetectionsLoader(detections_dir)

    os.makedirs(new_frames_dir, exist_ok=True)

    for i, (frame, detections) in enumerate(
        tqdm(
            zip(frame_loader.load_items(), detections_loader.load_items()),
            total=len(frame_loader.paths),
        )
    ):
        detections = [d for d in detections if d.score >= score_threshold]

        img = viz.overlay_colored_masks(frame, detections, colors=color)

        path = os.path.join(new_frames_dir, os.path.basename(frame_loader.paths[i]))
        cv2.imwrite(path, img[..., ::-1])


In [4]:
def combine_frames_to_video(
    frames_dir: str, video_path: str = None, fps: int = 30, frame_type: str = 'jpg'
):
    """Use ffmpeg commands to combines frames into video. The video path should omit the ending/type."""
    
    if video_path is None:
        video_path = frames_dir

    os.makedirs(os.path.dirname(video_path), exist_ok=True)

    os.system(
        f'ffmpeg -loglevel error -y -r {fps} -pattern_type glob '
        f'-i "{frames_dir}/*.{frame_type}" '
        f'-c:v libx264 {video_path}.mp4'
    )

In [5]:
def create_videos(
    frames_dir: str,
    detections_dir: str,
    perspective_path: str,
    mask_frames_dir: str,
    bbox_frames_dir: str,
    fps: int = 30,
):
    """Create mask and bbox frames and the two respective videos."""

    print('Creating mask frames...', flush=True)
    create_mask_frames(frames_dir, detections_dir, mask_frames_dir)

    print('Creating bbox frames...', flush=True)
    create_bbox_frames(frames_dir, detections_dir, perspective_path, bbox_frames_dir)

    print('Creating mask video...', flush=True)
    combine_frames_to_video(mask_frames_dir, mask_frames_dir, fps=fps)

    print('Creating bbox video...', flush=True)
    combine_frames_to_video(bbox_frames_dir, bbox_frames_dir, fps=fps)

In [6]:
create_videos(
    '../data/videos/s40_far_video_frames',
    '../data/detections/s40_far_video/coco_yolact_resnet101_edge',
    '../data/profusion_r0_dataset/r0_s1/05_calibration/s40_camera_basler_north_50mm.json',
    'output/video_creation/s40_far_masks',
    'output/video_creation/s40_far_video_bboxs',
)

Creating mask frames...


100%|██████████| 1980/1980 [03:26<00:00,  9.58it/s]

Creating bbox frames...



100%|██████████| 1980/1980 [02:41<00:00, 12.28it/s]

Creating mask video...





Creating bbox video...


In [7]:
create_videos(
    '../data/videos/rainy_frames',
    '../data/detections/rainy/coco_yolact_resnet101_edge',
    '../data/profusion_r0_dataset/r0_s1/05_calibration/s50_camera_basler_south_16mm.json',
    'output/video_creation/rainy_masks',
    'output/video_creation/rainy_bboxes',
    fps=10
)

Creating mask frames...


100%|██████████| 607/607 [00:55<00:00, 10.94it/s]

Creating bbox frames...



100%|██████████| 607/607 [00:52<00:00, 11.63it/s]

Creating mask video...





Creating bbox video...
