# DMD

Various preprocessing functions for the DMD (dataset).

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from concurrent.futures import ProcessPoolExecutor  # noqa F401
from pathlib import Path
from typing import Literal

import cv2  # noqa F401
import matplotlib.pyplot as plt
from tqdm import tqdm

from model.common import (
    create_video_from_images,  # noqa F401
    crop_aspect_ratio,  # noqa F401
)
from model.depth import convert_video_to_depth  # noqa F401
from model.dmd import (
    CATEGORIES,
    DRIVER_SESSION_MAPPING,  # noqa F401
    ROOT,
    convert_depth_images_to_video,  # noqa F401
    convert_frames_to_video,  # noqa F401
    extract_frames,
    get_all_sessions,
    get_clips,  # noqa F401
)

plt.rcParams['font.size'] = 16

## Extract Frames from Video

Distribute video frames between distractions and "normal" activities.

In [None]:
sessions = get_all_sessions()
for session in tqdm(sessions):
    for source_type in ['rgb', 'depth', 'ir']:
        source_video_extension: Literal['mp4', 'avi'] = (
            'avi' if source_type == 'depth' else 'mp4'
        )
        input_video_path = (
            ROOT / session / f'{session}_{source_type}_body.{source_video_extension}'
        )
        annotations_file_path = ROOT / session / f'{session}.json'

        assert input_video_path.exists(), f'File not found: {input_video_path}'
        assert annotations_file_path.exists(), (
            f'File not found: {annotations_file_path}'
        )
        print(f'Input video path: {input_video_path}')
        print(f'Annotations file path: {annotations_file_path}')

        extract_frames(
            input_video_path=input_video_path,
            annotations_file_path=annotations_file_path,
            force_overwrite=True,
            source_type=source_type,
            skip_output_directory_setup=True,
        )

Video Depth Anything:

In [None]:
# convert_frames_to_video(session, source_type=source_type, preset='slow', crf=10)
# convert_video_to_depth(ROOT / session, source_type='crop_rgb')

Convert depth images to video:

In [None]:
# convert_depth_images_to_video(
#     session, 'normal', 1, extension='png', fps=30, directory='source_depth'
# )

## Convert Clip Masks to Video

In [None]:
# session = ROOT / 'gA_4_s1_2019-03-13T10;36;15+01;00'
# assert session.exists(), f'Path not found: {session}'

# sequences = get_clips(session, 'anomal') + get_clips(session, 'normal')
# frame_paths = sorted(
#     [img for x in sequences for img in (x / 'masks').glob('*.png')],
#     key=lambda x: int(x.stem),
# )

# create_video_from_images(
#     frame_paths,
#     session / f'{session.name}_masks.mp4',
#     fps=30,
#     size=(256, 256),
# )

## Edit Existing Clips

In [None]:
session_dirs: list[Path] = [x for x in ROOT.glob('*') if x.is_dir()]
session_subdirs: list[Path] = [
    ses_dir / class_dir for ses_dir in session_dirs for class_dir in CATEGORIES
]
all_subdirs: list[list[Path]] = [
    list(subdir.glob('*')) for subdir in session_subdirs if subdir.is_dir()
]
all_dirs = sorted(
    [
        dir
        for subdirs in all_subdirs
        for dir in subdirs
        if (dir / 'rgb').is_dir() and len(list((dir / 'rgb').glob('*.jpg'))) > 0
    ]
)

Rename:

In [None]:
# for dir in tqdm(all_dirs):
#     old = dir / 'source_depth'
#     new = dir / 'source_depth_original'

#     if old.exists() and old.is_dir():
#         old.rename(new)

Remove

In [None]:
# # Remove first N images (e.g., depth camera warmup)
# for dir in tqdm(all_dirs):
#     images = sorted((dir / 'source_depth').glob('*.png'), key=lambda x: int(x.stem))
#     for img_path in images:
#         if int(img_path.stem) < 30:
#             img_path.unlink()

Edit:

In [None]:
# def process_image(img_path: Path, output_dir: Path) -> None:
#     # Read and process the image
#     img = cv2.imread(str(img_path))
#     img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
#     img = crop_aspect_ratio(img, 3 / 4)  # from square to 3:4
#     img = crop_aspect_ratio(img, 4 / 3)  # from 3:4 to 4:3
#     img = cv2.resize(img, (480, 360), interpolation=cv2.INTER_AREA)
#     cv2.imwrite(str(output_dir / img_path.name), img)


# def process_directory(dir: Path) -> None:
#     sensor_images = sorted(
#         (dir / 'source_depth_original').glob('*.png'),
#         key=lambda x: int(x.stem),
#     )
#     output_dir = dir / 'source_depth'
#     output_dir.mkdir(exist_ok=True, parents=True)

#     with ProcessPoolExecutor() as executor:
#         for img_path in sensor_images:
#             executor.submit(process_image, img_path, output_dir)


# for dir in tqdm(all_dirs):
#     process_directory(dir)