In [1]:
import cv2
import numpy as np
import h5py
from tqdm import tqdm


def plot_label_with_video(hdf5_path, video_path, save_path, fs=None):
    # Load respiration signal
    with h5py.File(hdf5_path, 'r') as f:
        signal = f['respiration'][:]

    # Normalize respiration signal for drawing
    signal = (signal - np.min(signal)) / (np.max(signal) - np.min(signal))  # range 0 to 1

    # Load video frames
    cap = cv2.VideoCapture(video_path)
    num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    if not fs:
        fps = int(cap.get(cv2.CAP_PROP_FPS))
    else:
        fps = fs

    # Resample signal length to number of frames
    signal = np.interp(
        np.linspace(1, signal.shape[0], num_frames),
        np.linspace(1, signal.shape[0], signal.shape[0]),
        signal
    )

    out = cv2.VideoWriter(save_path, cv2.VideoWriter.fourcc(*'mp4v'), fps, (width, height))

    for i in tqdm(range(num_frames)):
        ret, frame = cap.read()
        if not ret:
            break

        # Draw respiration waveform segment on bottom of frame
        wave_h = 350  # height of waveform area
        overlay = frame.copy()

        # Define waveform area
        wave_x0 = 10
        wave_x1 = width - 10
        wave_y0 = height - wave_h
        wave_y1 = height

        # Add a dark background for the waveform area
        cv2.rectangle(
            overlay,
            (wave_x0, wave_y0),
            (wave_x1, wave_y1),
            color=(0, 0, 0),   # black
            thickness=-1       # filled rectangle
        )

        # Get signal slice around current time
        plot_width = wave_x1 - wave_x0
        plot_window_secs = 5  # a fixed signal window
        plot_window_len = int(fps * plot_window_secs)

        signal_chunk = signal[:i + 1]

        # Limit to the most recent fixed-length window
        if len(signal_chunk) > plot_window_len:
            signal_chunk = signal_chunk[-plot_window_len:]

        # Resample to match plot width
        if len(signal_chunk) < 2:
            signal_chunk = np.ones(plot_width) * signal_chunk[0] if len(signal_chunk) > 0 else np.zeros(plot_width)   
        else:
            signal_chunk = np.interp(
                np.linspace(0, len(signal_chunk) - 1, plot_width),
                np.arange(len(signal_chunk)),
                signal_chunk
            )

        # Map signal to pixel Y coordinates
        y_vals = wave_y1 - (signal_chunk * wave_h).astype(int)

        for x in range(len(y_vals)-1):
            cv2.line(overlay,
                     (wave_x0 + x, y_vals[x]),
                     (wave_x0 + x + 1, y_vals[x+1]),
                     color=(0, 0, 255), thickness=2)

        # Blend overlay
        alpha = 0.6
        frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)

        # Write to output
        out.write(frame)

    cap.release()
    out.release()
    print(f"Saved video with annotations to {save_path}")


In [4]:
# plot_label_with_video("example/S04_6_demo_pred.hdf5", "example/S04_6_demo.mp4", "example/S04_6_demo_waveform.mp4")

100%|██████████| 608/608 [00:07<00:00, 82.62it/s]


Saved video with annotations to example/S04_6_demo_waveform.mp4
