In this notebook, I first calculate the jerk the end effector and each joints on the WheelArm. If the value is high, I will go deeper into the movement period and do analysis.

In [None]:
#!/usr/bin/env python3
import numpy as np
import pandas as pd
import h5py
from scipy.signal import butter, filtfilt

JOINT_H5 = "kinova_gen3_joint_states.h5"
EE_H5    = "kinova_gen3_cartesian_states.h5"

OUT_EE   = "filtered_ee.csv"
OUT_JOINTS = "filtered_joints.csv"

# ---------- Filter settings ----------
# Cutoff frequency in Hz (tune this!)
CUTOFF_HZ = 5.0
FILTER_ORDER = 4

# Optional: cap resample rate if your timestamps are extremely dense/noisy
MAX_RESAMPLE_HZ = 200.0  # set None to disable


def dedup_by_time(t: np.ndarray, X: np.ndarray):
    """Average samples that share identical timestamps (avoids dt=0 issues)."""
    t = np.asarray(t, dtype=np.float64)
    X = np.asarray(X, dtype=np.float64)

    order = np.argsort(t)
    t = t[order]
    X = X[order]

    uniq, idx_start, counts = np.unique(t, return_index=True, return_counts=True)
    out = np.zeros((len(uniq),) + X.shape[1:], dtype=np.float64)
    for i, (s, c) in enumerate(zip(idx_start, counts)):
        out[i] = X[s:s + c].mean(axis=0)
    return uniq, out


def make_uniform_grid(t: np.ndarray):
    """Create a uniform time grid using median dt."""
    dt = np.diff(t)
    dt = dt[dt > 0]
    if len(dt) < 5:
        raise ValueError("Not enough valid dt samples to build a uniform grid.")

    dt_med = float(np.median(dt))
    fs = 1.0 / dt_med

    if MAX_RESAMPLE_HZ is not None and fs > MAX_RESAMPLE_HZ:
        fs = MAX_RESAMPLE_HZ
        dt_med = 1.0 / fs

    t0, t1 = float(t[0]), float(t[-1])
    N = int(np.floor((t1 - t0) / dt_med)) + 1
    t_u = t0 + dt_med * np.arange(N, dtype=np.float64)
    return t_u, fs


def interp_to_grid(t_src: np.ndarray, X_src: np.ndarray, t_u: np.ndarray):
    """Linear interpolation of each column onto uniform grid."""
    X_src = np.asarray(X_src, dtype=np.float64)
    if X_src.ndim == 1:
        return np.interp(t_u, t_src, X_src)

    X_u = np.zeros((len(t_u), X_src.shape[1]), dtype=np.float64)
    for d in range(X_src.shape[1]):
        X_u[:, d] = np.interp(t_u, t_src, X_src[:, d])
    return X_u


def lowpass_zero_phase(X: np.ndarray, fs: float, cutoff_hz: float, order: int):
    """
    Zero-phase Butterworth low-pass via filtfilt.
    Assumes uniform sampling at rate fs.
    """
    if cutoff_hz <= 0 or cutoff_hz >= fs / 2:
        raise ValueError(f"cutoff_hz must be in (0, fs/2). Got cutoff={cutoff_hz}, fs={fs}")

    b, a = butter(order, cutoff_hz / (fs / 2), btype="low")
    X = np.asarray(X, dtype=np.float64)

    if X.ndim == 1:
        return filtfilt(b, a, X, axis=0)

    return filtfilt(b, a, X, axis=0)


def deriv_from_pos(t_u: np.ndarray, X_u: np.ndarray):
    """Compute velocity via numerical derivative on uniform grid."""
    return np.gradient(X_u, t_u, axis=0, edge_order=2)


def main():
    # --------- Load EE ---------
    with h5py.File(EE_H5, "r") as f:
        t_ee = f["timestamps"][...]
        p_ee = f["positions"][...]   # (N,3)

    t_ee, p_ee = dedup_by_time(t_ee, p_ee)
    t_u_ee, fs_ee = make_uniform_grid(t_ee)
    p_u = interp_to_grid(t_ee, p_ee, t_u_ee)

    p_f = lowpass_zero_phase(p_u, fs=fs_ee, cutoff_hz=CUTOFF_HZ, order=FILTER_ORDER)
    v_f = deriv_from_pos(t_u_ee, p_f)  # filtered velocity derived from filtered pos

    df_ee = pd.DataFrame({
        "t": t_u_ee,
        "x": p_f[:, 0], "y": p_f[:, 1], "z": p_f[:, 2],
        "vx": v_f[:, 0], "vy": v_f[:, 1], "vz": v_f[:, 2],
    })
    df_ee.to_csv(OUT_EE, index=False)
    print(f"Saved {OUT_EE}  (fs≈{fs_ee:.2f} Hz, cutoff={CUTOFF_HZ} Hz, order={FILTER_ORDER})")

    # --------- Load joints ---------
    with h5py.File(JOINT_H5, "r") as f:
        t_q = f["timestamps"][...]
        q   = f["positions"][...]    # (N,7)
        dq  = f["velocitys"][...]    # (N,7)  (note: dataset name is 'velocitys' in your file)

    t_q, q = dedup_by_time(t_q, q)
    _, dq = dedup_by_time(t_q, dq)   # same t_q keys in your file, but keep consistent

    t_u_q, fs_q = make_uniform_grid(t_q)
    q_u  = interp_to_grid(t_q, q,  t_u_q)
    dq_u = interp_to_grid(t_q, dq, t_u_q)

    # Filter joint positions + (optionally) joint velocities
    q_f  = lowpass_zero_phase(q_u,  fs=fs_q, cutoff_hz=CUTOFF_HZ, order=FILTER_ORDER)

    # Recommended: compute dq from filtered q (more consistent)
    dq_from_q_f = deriv_from_pos(t_u_q, q_f)

    # If you want to also filter the logged dq, uncomment:
    # dq_f = lowpass_zero_phase(dq_u, fs=fs_q, cutoff_hz=CUTOFF_HZ, order=FILTER_ORDER)
    # and then choose dq_f vs dq_from_q_f below.

    data = {"t": t_u_q}
    for j in range(q_f.shape[1]):
        data[f"q{j+1}"]  = q_f[:, j]
    for j in range(dq_from_q_f.shape[1]):
        data[f"dq{j+1}"] = dq_from_q_f[:, j]

    df_j = pd.DataFrame(data)
    df_j.to_csv(OUT_JOINTS, index=False)
    print(f"Saved {OUT_JOINTS} (fs≈{fs_q:.2f} Hz, cutoff={CUTOFF_HZ} Hz, order={FILTER_ORDER})")


if __name__ == "__main__":
    main()


Saved filtered_ee.csv  (fs≈10.47 Hz, cutoff=5.0 Hz, order=4)
Saved filtered_joints.csv (fs≈20.00 Hz, cutoff=5.0 Hz, order=4)


This step is to calculate jerk based on filtered data and draw the jerk plot in time series.

In [None]:
#!/usr/bin/env python3
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

EE_CSV = "filtered_ee.csv"

OUT_EE_TS = "ee_jerk_timeseries.csv"
OUT_EE_PLOT = "ee_jerk_plot.png"
OUT_EE_STATS = "ee_jerk_stats.csv"


def _ensure_strictly_increasing(t, *arrays):
    """Drop non-increasing timestamps (rare after filtering, but safe)."""
    t = np.asarray(t, dtype=np.float64)
    keep = np.ones_like(t, dtype=bool)
    keep[1:] = np.diff(t) > 0
    t2 = t[keep]
    out = [np.asarray(a)[keep] for a in arrays]
    return (t2, *out)


def jerk_from_velocity(t, v):
    """
    Given velocity v(t), compute:
      a = dv/dt
      j = da/dt
    Works for vector v shape (N,3).
    """
    a = np.gradient(v, t, axis=0, edge_order=2)
    j = np.gradient(a, t, axis=0, edge_order=2)
    return a, j


def normalized_jerk_from_jerk(t, pos, jerk):
    """
    Jnorm = (T^5 / L^2) * ∫ ||jerk||^2 dt
    L is path length from pos.
    """
    t = np.asarray(t, dtype=np.float64)
    T = float(t[-1] - t[0])

    # pos is (N,3)
    L = float(np.sum(np.linalg.norm(np.diff(pos, axis=0), axis=1)))
    jerk_sq = np.sum(jerk * jerk, axis=1)

    E = float(np.trapz(jerk_sq, t))
    Jnorm = (T**5 / (L**2)) * E if (T > 0 and L > 0) else np.nan
    return Jnorm, T, L, E


def main():
    # =======================
    # End-effector (translation)
    # =======================
    ee = pd.read_csv(EE_CSV)
    t = ee["t"].to_numpy()
    p = ee[["x", "y", "z"]].to_numpy()
    v = ee[["vx", "vy", "vz"]].to_numpy()

    t, p, v = _ensure_strictly_increasing(t, p, v)

    _, j = jerk_from_velocity(t, v)
    jerk_mag = np.linalg.norm(j, axis=1)
    jerk_sq = np.sum(j * j, axis=1)

    # Normalized jerk
    Jnorm_ee, T_ee, L_ee, E_ee = normalized_jerk_from_jerk(t, p, j)

    # Mean / max jerk magnitude
    mean_jerk = float(np.mean(jerk_mag))
    max_jerk = float(np.max(jerk_mag))

    # Save time series
    ee_out = pd.DataFrame({
        "t": t,
        "jx": j[:, 0], "jy": j[:, 1], "jz": j[:, 2],
        "jerk_mag": jerk_mag,
        "jerk_sq": jerk_sq,
    })
    ee_out.to_csv(OUT_EE_TS, index=False)

    # Plot jerk magnitude vs time
    plt.figure()
    plt.plot(t, jerk_mag)
    plt.xlabel("t (s)")
    plt.ylabel("EE jerk magnitude (m/s^3)")
    plt.title("End-effector jerk vs time")
    plt.tight_layout()
    plt.savefig(OUT_EE_PLOT, dpi=200)
    plt.close()

    # Save summary stats
    stats = pd.DataFrame([{
        "Jnorm_end_effector_translation": Jnorm_ee,
        "T_s": T_ee,
        "L_m": L_ee,
        "jerk_energy_int": E_ee,
        "mean_jerk_mag_mps3": mean_jerk,
        "max_jerk_mag_mps3": max_jerk,
        "mean_jerk_sq": float(np.mean(jerk_sq)),
        "max_jerk_sq": float(np.max(jerk_sq)),
    }])
    stats.to_csv(OUT_EE_STATS, index=False)

    print("Done.")
    print(f"Jnorm={Jnorm_ee:.6e}, T={T_ee:.3f}s, L={L_ee:.6f}m")
    print(f"mean jerk magnitude={mean_jerk:.6e} m/s^3")
    print(f"max  jerk magnitude={max_jerk:.6e} m/s^3")
    print(f"Saved: {OUT_EE_TS}, {OUT_EE_PLOT}, {OUT_EE_STATS}")


if __name__ == "__main__":
    main()


  E = float(np.trapz(jerk_sq, t))


Done.
Jnorm=4.311933e+10, T=155.904s, L=2.186122m
mean jerk magnitude=9.079415e-02 m/s^3
max  jerk magnitude=4.194219e-01 m/s^3
Saved: ee_jerk_timeseries.csv, ee_jerk_plot.png, ee_jerk_stats.csv
