In [None]:
from pathlib import Path
import numpy as np
import torch
import matplotlib.pyplot as plt
from natsort import natsorted

import cv2
import os
from tqdm import tqdm


# reload notebook automatically after changes to source python files
%load_ext autoreload
%autoreload 2

# change base folder to parent
import os
if os.path.basename(os.getcwd()) == 'notebooks':
    os.chdir('..')
print(os.getcwd())

In [2]:
main_data_folder = Path("data/rrc-lab-data/wheelchair-runs-20241220/")
src_data_folder = main_data_folder / "run-1-wheelchair-mapping"
target_data_folder = main_data_folder / "run-1-wheelchair-mapping-compressed"
target_data_folder.mkdir(exist_ok=True, parents=True)

from pathlib import Path
import cv2
import numpy as np
from tqdm import tqdm

def write_video(src_folder, target_video_path, mode="rgb", codec=None, fps=30):
    """
    Write video from images, supporting both RGB and uint16 depth formats.

    Args:
        src_folder (Path): Folder containing images.
        target_video_path (Path): Path to save the video file.
        mode (str): "rgb" for color images or "depth" for uint16 images.
        codec (str, optional): Video codec. Defaults to "mp4v" for RGB and "FFV1" for depth.
        fps (int): Frames per second. Default is 30.

    Raises:
        ValueError: If invalid mode is provided or no valid images are found.
    """
    src_folder = Path(src_folder)
    target_video_path = Path(target_video_path)
    target_video_path.parent.mkdir(parents=True, exist_ok=True)
    image_paths = natsorted([p for p in src_folder.iterdir() if p.suffix.lower() in {".png", ".jpg", ".jpeg", ".tiff", ".tif"}])
    
    if not image_paths:
        raise ValueError(f"No valid images found in {src_folder}")
    
    # Determine codec and color settings
    if mode == "rgb":
        codec = codec or 'FFV1' # lossless codec
        is_color = True
    elif mode == "depth":
        codec = codec or 'FFV1' # lossless codec
        is_color = False
    else:
        raise ValueError(f"Unsupported mode '{mode}'. Use 'rgb' or 'depth'.")
    
    # Read the first image to determine size
    first_img = cv2.imread(str(image_paths[0]), cv2.IMREAD_UNCHANGED)
    if first_img is None:
        raise ValueError(f"First image is invalid: {image_paths[0]}")
    
    # Check image format based on mode
    if mode == "rgb" and len(first_img.shape) != 3:
        raise ValueError(f"Expected 3-channel image for RGB mode, but got shape {first_img.shape}")
    if mode == "depth" and first_img.dtype != np.uint16:
        raise ValueError(f"Expected uint16 image for depth mode, but got dtype {first_img.dtype}")
    
    height, width = first_img.shape[:2]
    size = (width, height)
    
    # Initialize video writer
    fourcc = cv2.VideoWriter_fourcc(*codec)
    out = cv2.VideoWriter(str(target_video_path), fourcc, fps, size, isColor=is_color)
    
    # Write images to the video
    for img_path in tqdm(image_paths, desc="Writing Video", unit="frame"):
        img = cv2.imread(str(img_path), cv2.IMREAD_UNCHANGED)
        if img is None:
            print(f"Warning: Skipping invalid image {img_path}")
            continue
        
        if mode == "depth" and img.dtype != np.uint16:
            print(f"Warning: Skipping non-uint16 image {img_path}")
            continue
        
        if mode == "rgb" and len(img.shape) != 3:
            print(f"Warning: Skipping non-RGB image {img_path}")
            continue
        
        out.write(img)
    
    out.release()
    print(f"Video written to {target_video_path}")


In [None]:
# write_video((src_data_folder / "rgb"), (target_data_folder / "rgb" / "run-1-wheelchair-mapping.mkv"), mode="rgb")
# write_video((src_data_folder / "aligned_depth"), (target_data_folder / "aligned_depth" /"run-1-wheelchair-mapping.mkv"), mode="depth")

In [4]:
# check if video frames can be read and same as original images

def load_video(video_path, mode="rgb"):
    """
    Load a video and display its total frames. Works for both RGB and depth videos.

    Args:
        video_path (Path): Path to the video file.
        mode (str): "rgb" for standard RGB videos or "depth" for uint16 depth videos.

    Returns:
        cap (cv2.VideoCapture): OpenCV video capture object.
        total_frames (int): Total number of frames in the video.
    """
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        raise ValueError(f"Unable to open video file: {video_path}")
    
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Total frames in video: {total_frames}")
    
    if mode not in {"rgb", "depth"}:
        raise ValueError(f"Unsupported mode '{mode}'. Use 'rgb' or 'depth'.")
    
    return cap, total_frames

def get_frame(cap, frame_idx, mode="rgb"):
    """
    Retrieve a specific frame from the video by its index.

    Args:
        cap (cv2.VideoCapture): OpenCV video capture object.
        frame_idx (int): Index of the frame to retrieve.
        mode (str): "rgb" for standard RGB frames or "depth" for uint16 frames.

    Returns:
        frame (np.ndarray): The requested frame as an image array.
    """
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
    ret, frame = cap.read()
    if not ret:
        raise ValueError(f"Unable to read frame {frame_idx}")
    
    if mode == "depth":
        # Decode uint16 depth frames
        frame = cv2.imdecode(frame, cv2.IMREAD_UNCHANGED)
        if frame is None or frame.dtype != np.uint16:
            raise ValueError(f"Decoded frame {frame_idx} is not uint16 as expected")
    elif mode == "rgb":
        if frame is None or frame.dtype != np.uint8:
            raise ValueError(f"Decoded frame {frame_idx} is not RGB (uint8) as expected")
    else:
        raise ValueError(f"Unsupported mode '{mode}'. Use 'rgb' or 'depth'.")
    
    return frame

def close_video(cap):
    """
    Close the video capture object.

    Args:
        cap (cv2.VideoCapture): OpenCV video capture object.
    """
    cap.release()
    print("Video capture released.")

In [None]:
# # test a certain rgb frame of index 100 and depth frame of index 200
rgb_cap, rgb_total_frames = load_video(target_data_folder / "rgb" / "run-1-wheelchair-mapping.mkv", mode="rgb")
rgb_frame = get_frame(rgb_cap, 100, mode="rgb")
print(np.allclose(rgb_frame, cv2.imread(str(src_data_folder / "rgb" / "100.png"), cv2.IMREAD_UNCHANGED)))

depth_cap, depth_total_frames = load_video(target_data_folder / "aligned_depth" / "run-1-wheelchair-mapping.mkv", mode="depth")
depth_frame = get_frame(depth_cap, 200, mode="depth")
print(np.allclose(depth_frame, cv2.imread(str(src_data_folder / "aligned_depth" / "200.png"), cv2.IMREAD_UNCHANGED)))