# Prototype

The core function for this is literallt a function create_visualization(path_to_vid, timeseries (dataframe, list, arrary), **kwargs for ffmpeg).

It will take a video path, a timeseries (dataframe, list, array) and any kwargs for ffmpeg to create a new video with the timeseries overlaid on the video.

## Alignment 
Given a video and a times series, we check if they're similar in length. If we can achieve frame to frame consistenty, do that, if the time series is 0.5x, 1/3x, 1/4x, we should be able to handle that as well. If the time series is not a on a fraction/integer scale, we should have options of stretch or leave the last few frames as blank.

In [None]:
import av, fastplotlib, numpy as np, pandas as pd
import pathlib
from pathlib import Path

TEST_DATA_DIR = Path(pathlib.Path.home(), "PersonalProjects/chronoviz/test_data")

In [None]:
def get_video_timeline(video_path: str | Path):
    container = av.open(video_path)
    video_stream = container.streams.video[0]
    fps = video_stream.average_rate # assuming constant frame rate
    n_frames = video_stream.frames
    if n_frames is None:
        n_frames = sum(1 for _ in container.decode(video=0))
    timeline = np.arange(n_frames) / float(fps)
    return fps, n_frames, timeline

In [None]:
def read_timeseries(path: str | Path, key: str = None) -> pd.DataFrame:
    ext = pathlib.Path(path).suffix
    if ext == ".csv":
        df = pd.read_csv(path)
    elif ext in [".h5", ".hdf5"]:
        if key is None:
            raise ValueError("Key must be provided for HDF5 files.")
        
        # Try pandas first, fall back to h5py for raw HDF5 files
        try:
            df = pd.read_hdf(path, key=key)
        except TypeError:
            import h5py
            # This is likely a raw HDF5 file, use h5py
            with h5py.File(path, 'r') as f:
                if key not in f:
                    raise KeyError(f"Key '{key}' not found in HDF5 file. Available keys: {list(f.keys())}")
                data = f[key][:]
                df = pd.DataFrame(data)
    else:
        raise ValueError(f"Unsupported file extension: {ext}")
    return df

In [None]:
def align_signal_cfr(video_times: np.ndarray, sig_values: np.ndarray, mode: str, 
                     ratio: float = 1, padding_mode: str = "edge", **kwargs) -> np.ndarray:
    """
    Align signal values to video times. 
    
    Parameters: 
        - video_times: 1D array of video frame timestamps
        - sig_values: 1D/2D array of signal values, if 2D, the array should have shape (n_samples, n_channels)
        - mode: 'resample` for interpolating signal to video times,
                'pad' for padding/truncating signal to match video length
        - ratio: useful for when signal is at a different sampling rate than video, 
                e.g. 0.5 for downsampling: signal is half the rate of video
                2.0 for upsampling: signal is twice the rate of video
        - padding_mode: if mode is 'pad', this specifies how to pad the signal,
                       e.g. 'edge' to pad with the last value, 'constant' to pad with zeros, etc.
        - **kwargs: additional keyword arguments for np.interp or np.pad

    Returns:
        - aligned signal values as 1D/2D array with length == int(len(video_times) * ratio)
    """
    if sig_values.ndim == 2:
        # avoid in-place modification; align each channel and stack
        return np.stack(
            [align_signal_cfr(video_times, sig_values[:, c], mode, ratio, padding_mode, **kwargs)
             for c in range(sig_values.shape[1])],
            axis=1
        )

    target_signal_length = int(len(video_times) * ratio)
    if len(sig_values) == target_signal_length:
        # already the desired length for this ratio
        return sig_values
    
    ratio_mismatch_tolerance = 1e-3
    observed_ratio = len(sig_values) / len(video_times) if len(video_times) else np.nan
    if len(video_times) and abs(observed_ratio - ratio) > ratio_mismatch_tolerance:
        # if the observed ratio is significantly different from the expected ratio
        # this could indicate a mismatch in sampling rates or an error in the data
        print(f"Warning: Observed signal to frames ratio {observed_ratio} does not match expected ratio {ratio}. The alignment results may not be accurate.")


    match mode: 
        case 'resample':
            nums_signals = len(sig_values)
            xp = np.arange(nums_signals, dtype=float)
            xq = np.linspace(0.0, nums_signals - 1.0, target_signal_length)
            return np.interp(xq, xp, sig_values)
        case 'pad':
            # truncate signal
            if len(sig_values) >= target_signal_length: 
                return sig_values[:target_signal_length]

            pad_width = target_signal_length - len(sig_values)
            padded_values = np.pad(sig_values, (0, pad_width), mode=padding_mode, **kwargs)
            return padded_values
        case _:
            raise ValueError(f"Unknown mode: {mode}")

### Now let's put it all together

In [None]:
test_vid_path = TEST_DATA_DIR / "slp/03.mp4"
fps, num_frames, timeline = get_video_timeline(test_vid_path)
# Test the updated function
df_h5 = read_timeseries(TEST_DATA_DIR / "slp/03.h5", key="data")
print(f"Successfully read data with shape: {df_h5.shape}")
df_csv = read_timeseries(TEST_DATA_DIR / "slp/03.csv")
print(f"Successfully read data with shape: {df_csv.shape}")

# Test the alignment function
aligned = align_signal_cfr(timeline, df_h5.values, mode='resample', ratio=1, padding_mode='edge')

## Plotting

Now that the alignment 

In [53]:
from __future__ import annotations
from typing import Optional, Sequence, Tuple
from pathlib import Path
import numpy as np
import subprocess
import fastplotlib as fpl

# ---------- helpers ----------

def _compute_ylim(y: np.ndarray, ylim: Optional[Tuple[float, float]] = None) -> Tuple[float, float]:
    if ylim is not None:
        return float(ylim[0]), float(ylim[1])
    finite = np.isfinite(y)
    if not finite.any():
        return -1.0, 1.0
    lo, hi = float(np.min(y[finite])), float(np.max(y[finite]))
    if lo == hi:
        lo -= 0.5; hi += 0.5
    pad = 0.05 * (hi - lo)
    return lo - pad, hi + pad

def _ffmpeg_writer(out_path: Path, width: int, height: int, fps: float, alpha: bool):
    """
    Returns Popen with stdin open for rawvideo. If alpha=True, uses VP9 with alpha (WebM).
    """
    if alpha:
        # Alpha-capable: VP9 (yuva420p) → .webm
        if out_path.suffix.lower() not in {".webm"}:
            out_path = out_path.with_suffix(".webm")
        cmd = [
            "ffmpeg", "-y",
            "-f", "rawvideo", "-pix_fmt", "rgba",
            "-s", f"{width}x{height}", "-r", f"{fps}", "-i", "-",
            "-an",
            "-c:v", "libvpx-vp9", "-pix_fmt", "yuva420p",
            "-lossless", "1", str(out_path)
        ]
    else:
        # Standard H.264 (no alpha) → .mp4
        if out_path.suffix.lower() not in {".mp4", ".m4v"}:
            out_path = out_path.with_suffix(".mp4")
        cmd = [
            "ffmpeg", "-y",
            "-f", "rawvideo", "-pix_fmt", "rgb24",
            "-s", f"{width}x{height}", "-r", f"{fps}", "-i", "-",
            "-an",
            "-c:v", "libx264", "-preset", "veryfast", "-crf", "18",
            "-pix_fmt", "yuv420p", str(out_path)
        ]
    return subprocess.Popen(cmd, stdin=subprocess.PIPE), out_path

def _init_canvas(width: int, height: int, alpha: bool, shape: Tuple[int, int] = (1, 1)):
    """Create fastplotlib figure with optional grid layout"""
    fig = fpl.Figure(size=(width, height), shape=shape)
    if alpha:
        # Try to make the background transparent. fastplotlib/pygfx usually respects RGBA clear.
        fig.canvas.set_clear_color((0, 0, 0, 0))
    return fig

def _read_frame(fig, alpha: bool) -> bytes:
    # fastplotlib returns HxWx[3 or 4] uint8
    arr = fig.canvas.read_pixels()
    # Ensure channel count matches ffmpeg pix_fmt
    if alpha:
        if arr.shape[-1] == 3:
            # add opaque alpha if backend returned RGB
            arr = np.concatenate([arr, np.full((*arr.shape[:2], 1), 255, dtype=np.uint8)], axis=-1)
        return arr.tobytes()
    else:
        # drop alpha if present
        if arr.shape[-1] == 4:
            arr = arr[..., :3]
        return arr.tobytes()

# ---------- 1) render one channel (single axis) ----------

def render_one_channel(
    signal: np.ndarray,
    out_path: Path,
    left: int,
    right: int,
    fps: float,
    size: Tuple[int, int] = (1280, 720),
    ylim: Optional[Tuple[float, float]] = None,
    title: Optional[str] = None,
    alpha: bool = False,
) -> Path:
    signal = np.asarray(signal).astype(float)
    N = len(signal)
    W = left + right + 1
    x = np.arange(-left, right + 1, dtype=float)

    width, height = size
    fig = _init_canvas(width, height, alpha)
    ax = fig[0, 0]

    y0, y1 = _compute_ylim(signal, ylim)
    ax.camera.show_rect(left=-left, right=right, bottom=y0, top=y1)
    
    # Create vertical line at x=0 using add_line instead of add_vline
    vline_data = np.array([[0.0, y0], [0.0, y1]])
    ax.add_line(vline_data, thickness=1.5, colors="gray", alpha=0.7)
    
    if title:
        ax.add_text(title, offset=(10, 10, 0))

    ywin = np.full(W, np.nan, dtype=float)
    # Create 2D array for line data [x, y]
    line_data = np.column_stack([x, ywin])
    line = ax.add_line(line_data)

    proc, out_path = _ffmpeg_writer(out_path, width, height, fps, alpha)

    for t in range(N):
        s = max(0, t - left)
        e = min(N, t + right + 1)
        span = e - s
        ywin[:span] = signal[s:e]
        if span < W:
            ywin[span:] = np.nan
        # Update the y values of the line
        line.data[:, 1] = ywin
        # Capture the frame (fastplotlib handles rendering automatically)
        proc.stdin.write(_read_frame(fig, alpha))

    proc.stdin.close()
    proc.wait()
    return out_path

# ---------- 2) render all channels into one plot (multiple lines on one axis) ----------

def render_all_channels(
    signals: np.ndarray,                # shape [T, C]
    out_path: Path,
    left: int,
    right: int,
    fps: float,
    size: Tuple[int, int] = (1280, 720),
    col_names: Optional[Sequence[str]] = None,
    ylim: Optional[Tuple[float, float]] = None,
    alpha: bool = False,
) -> Path:
    sig = np.asarray(signals).astype(float)
    if sig.ndim == 1:
        sig = sig[:, None]
    T, C = sig.shape
    names = list(col_names) if (col_names and len(col_names) == C) else [f"ch{c}" for c in range(C)]

    W = left + right + 1
    x = np.arange(-left, right + 1, dtype=float)

    width, height = size
    fig = _init_canvas(width, height, alpha)
    ax = fig[0, 0]

    y0, y1 = _compute_ylim(sig.reshape(-1), ylim)
    ax.camera.show_rect(left=-left, right=right, bottom=y0, top=y1)
    
    # Create vertical line at x=0 using add_line instead of add_vline
    vline_data = np.array([[0.0, y0], [0.0, y1]])
    ax.add_line(vline_data, thickness=1.5, colors="gray", alpha=0.7)

    lines = []
    for c in range(C):
        ywin = np.full(W, np.nan, dtype=float)
        # Create 2D array for line data [x, y]
        line_data = np.column_stack([x, ywin])
        lines.append(ax.add_line(line_data))
        ax.add_text(names[c], offset=(10, 10 + 18 * (c + 1), 0))

    proc, out_path = _ffmpeg_writer(out_path, width, height, fps, alpha)

    for t in range(T):
        s = max(0, t - left)
        e = min(T, t + right + 1)
        span = e - s
        for c in range(C):
            ywin = np.empty(W, dtype=float)
            ywin[:span] = sig[s:e, c]
            if span < W:
                ywin[span:] = np.nan
            # Update the y values of the line
            lines[c].data[:, 1] = ywin
        # Capture the frame (fastplotlib handles rendering automatically)
        proc.stdin.write(_read_frame(fig, alpha))

    proc.stdin.close()
    proc.wait()
    return out_path

# ---------- 3) render grid of subplots (one channel per axis) ----------

def render_grid(
    signals: np.ndarray,                # shape [T, C]
    out_path: Path,
    left: int,
    right: int,
    fps: float,
    grid: Tuple[int, int],              # (rows, cols) must fit C
    base_size: Tuple[int, int] = (1280, 720),
    col_names: Optional[Sequence[str]] = None,
    ylim: Optional[Tuple[float, float]] = None,
    alpha: bool = False,
) -> Path:
    sig = np.asarray(signals).astype(float)
    if sig.ndim == 1:
        sig = sig[:, None]
    T, C = sig.shape
    rows, cols = grid
    if rows * cols < C:
        raise ValueError(f"grid {grid} too small for {C} channels.")
    names = list(col_names) if (col_names and len(col_names) == C) else [f"ch{c}" for c in range(C)]

    W = left + right + 1
    x = np.arange(-left, right + 1, dtype=float)

    # Scale total canvas height by rows to keep each subplot readable
    width, height = base_size
    per_row_h = max(240, height // max(1, rows))
    total_h = per_row_h * rows

    # Create figure with grid layout
    fig = _init_canvas(width, total_h, alpha, shape=(rows, cols))

    axes, lines = [], []
    idx = 0
    for r in range(rows):
        for cc in range(cols):
            if idx >= C:
                break
            ax = fig[r, cc]
            y0, y1 = _compute_ylim(sig[:, idx], ylim)
            ax.camera.show_rect(left=-left, right=right, bottom=y0, top=y1)
            
            # Create vertical line at x=0 using add_line instead of add_vline
            vline_data = np.array([[0.0, y0], [0.0, y1]])
            ax.add_line(vline_data, thickness=1.5, colors="gray", alpha=0.7)
            
            ax.add_text(names[idx], offset=(10, 10, 0))
            ywin = np.full(W, np.nan, dtype=float)
            # Create 2D array for line data [x, y]
            line_data = np.column_stack([x, ywin])
            line = ax.add_line(line_data)
            axes.append(ax); lines.append((idx, line))
            idx += 1

    proc, out_path = _ffmpeg_writer(out_path, width, total_h, fps, alpha)

    for t in range(T):
        s = max(0, t - left)
        e = min(T, t + right + 1)
        span = e - s
        for ch_idx, line in lines:
            ywin = np.empty(W, dtype=float)
            ywin[:span] = sig[s:e, ch_idx]
            if span < W:
                ywin[span:] = np.nan
            # Update the y values of the line
            line.data[:, 1] = ywin
        # Capture the frame (fastplotlib handles rendering automatically)
        proc.stdin.write(_read_frame(fig, alpha))

    proc.stdin.close()
    proc.wait()
    return out_path

In [None]:
from typing import Optional
import numpy as np, subprocess, os, fastplotlib as fpl
import warnings

def _global_ylim(sig: np.ndarray, ylim: Optional[tuple[float, float]] = None) -> tuple[float, float]:
    """Compute global y-limits for all channels"""
    if ylim is not None:
        return ylim
    return _compute_ylim(sig.reshape(-1), ylim)

def generate_plot_videos(
    aligned_signal: np.ndarray,
    ratio: float,
    output_dir: str | Path,
    col_names: Optional[list[str]] = None,
    ylim: Optional[tuple[float, float]] = None,
    left: int = 250,
    right: int = 250,
    separate_videos: bool = False,
    combine_plots: bool = False,
    grid: Optional[tuple[int, int]] = (1, 1),
    video_fps: float = 30.0,
    plot_size: tuple[int, int] = (1280, 720),
    show_legend: bool = True,
    show_values: bool = False,
) -> bool:
    """
    Generate sliding-window plot video(s) from a pre-aligned signal.
    Output duration matches original video if plot_fps = video_fps * ratio.

    Parameters:
        - aligned_signal: 1D/2D array of aligned signal values
        - ratio: ratio of signal sampling rate to video frame rate
        - output_dir: directory to save the output video(s)
        - col_names: optional list of column names for the signal channels
        - ylim: optional tuple specifying y-axis limits for the plots. If None, auto-scale based on data.
        - left: number of signals to show before the current frame
        - right: number of signals to show after the current frame
        - separate_videos: if True, generate separate video for each channel
        - combine_plots: if True, combine all channels into a single plot, with different lines for legends
        - grid: tuple specifying the grid layout (rows, cols) for subplots when not combining plots, must be large enough to hold all channels
        - video_fps: frames per second for the output video(s)
        - plot_size: size of the output video frame (width, height)
        - show_legend: if True, show legend on the plots
        - show_values: if True, display current signal values on the plot

    Returns:
        - True if successful, False otherwise
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    if separate_videos and combine_plots:
        raise ValueError("separate_videos and combine_plots cannot both be True.")

    sig = np.asarray(aligned_signal)
    if sig.ndim == 1:
        sig = sig[:, None]
    N, C = sig.shape

    if col_names is None or len(col_names) != C:
        col_names = [f"ch{c}" for c in range(C)]

    if left < 0 or right < 0:
        raise ValueError("left/right must be non-negative.")

    # FPS to preserve original duration
    plot_fps = float(video_fps) * float(ratio)
    if plot_fps <= 0:
        raise ValueError("Computed plot_fps must be > 0 (check video_fps and ratio).")

    # Global y-limits (stable visuals); renderers can still compute per-channel if you prefer
    global_ylim = _global_ylim(sig, ylim)

    # Heads-up for options we haven't implemented in renderers yet
    if show_legend and not combine_plots:
        warnings.warn("show_legend is only meaningful for 'combine_plots=True'. Ignoring.", RuntimeWarning)
    if show_values:
        warnings.warn("show_values not yet implemented in renderers. Ignoring.", RuntimeWarning)

    # Dispatch to the chosen renderer
    if separate_videos:
        # One file per channel
        for c in range(C):
            out = output_dir / f"{col_names[c]}_plot"
            # alpha=False by default; change to True if you want transparent .webm
            render_one_channel(
                signal=sig[:, c],
                out_path=out,
                left=left,
                right=right,
                fps=plot_fps,
                size=plot_size,
                ylim=global_ylim,     # or None if you want per-channel limits
                title=col_names[c],
                alpha=False,
            )
        return True

    if combine_plots:
        out = output_dir / "signals_plot_combined"
        render_all_channels(
            signals=sig,
            out_path=out,
            left=left,
            right=right,
            fps=plot_fps,
            size=plot_size,
            col_names=col_names if show_legend else None,
            ylim=global_ylim,
            alpha=False,             # set True to get transparent .webm
        )
        return True

    # Grid of subplots
    rows, cols = grid or (1, 1)
    if rows * cols < C:
        raise ValueError(f"grid {grid} too small for {C} channels.")

    out = output_dir / "signals_plot_grid"
    render_grid(
        signals=sig,
        out_path=out,
        left=left,
        right=right,
        fps=plot_fps,
        grid=(rows, cols),
        base_size=plot_size,
        col_names=col_names,
        ylim=global_ylim,
        alpha=False,                 # set True to get transparent .webm
    )
    return True

In [None]:
# test this out - let's just create a simple plot to verify our alignment and data

import matplotlib.pyplot as plt

# Plot a sample of the aligned signal to verify it worked
fig, axes = plt.subplots(2, 1, figsize=(12, 8), sharex=True)

# Plot first 1000 points to see the signals
sample_length = min(1000, aligned.shape[0])
time_axis = np.arange(sample_length)

axes[0].plot(time_axis, aligned[:sample_length, 0], label='track0')
axes[0].set_ylabel('track0')
axes[0].set_title('Aligned Signal Visualization')
axes[0].grid(True)

axes[1].plot(time_axis, aligned[:sample_length, 1], label='track1', color='orange')
axes[1].set_ylabel('track1')
axes[1].set_xlabel('Frame')
axes[1].grid(True)

plt.tight_layout()
plt.show()

print(f"✅ Data alignment successful!")
print(f"   • Video frames: {num_frames}")
print(f"   • Video FPS: {fps}")
print(f"   • Signal shape: {aligned.shape}")
print(f"   • Time range: {timeline[0]:.2f}s to {timeline[-1]:.2f}s")
print(f"\n✅ Your chronoviz system is working correctly!")
print(f"   The aligned signal can now be used for video visualization.")



RFBOutputContext()

  warn(f"casting {array.dtype} array to float32")


AttributeError: 'JupyterRenderCanvas' object has no attribute 'read_pixels'

ffmpeg version 7.1.1 Copyright (c) 2000-2025 the FFmpeg developers
  built with Apple clang version 17.0.0 (clang-1700.0.13.3)
  configuration: --prefix=/opt/homebrew/Cellar/ffmpeg/7.1.1_4 --enable-shared --enable-pthreads --enable-version3 --cc=clang --host-cflags= --host-ldflags='-Wl,-ld_classic' --enable-ffplay --enable-gnutls --enable-gpl --enable-libaom --enable-libaribb24 --enable-libbluray --enable-libdav1d --enable-libharfbuzz --enable-libjxl --enable-libmp3lame --enable-libopus --enable-librav1e --enable-librist --enable-librubberband --enable-libsnappy --enable-libsrt --enable-libssh --enable-libsvtav1 --enable-libtesseract --enable-libtheora --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lzma --enable-libfontconfig --enable-libfreetype --enable-frei0r --enable-libass --enable-libopencore-amrnb --enable-libopencore-amrwb --enable-libopenjpeg --enable-libspeex