In [1]:
"""Orientation Change Algorithm Evaluation

This standalone notebook cell contains all code to:
1. Generate synthetic IMU data with an abrupt orientation change.
2. Estimate pitch/roll with a complementary filter.
3. Detect abrupt orientation changes using adaptive baseline thresholding.
4. Evaluate detection rate, latency and false-positive rate over multiple trials.

The code mirrors the KISS principle – a single, linear script for quick fail-fast experimentation.
"""

import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass

# --------------------------
# Global Parameters
# --------------------------
FS = 100                 # Hz
DT = 1.0 / FS            # s
ACC_MEAN_MG = np.array([-40, -120, -800])   # mg (static bias)
GYRO_MEAN_DPS = np.array([12, -15, 8])      # deg/s (static bias)
ACC_NOISE_MG_BASE = 50   # 1-sigma noise, mg
GYRO_NOISE_DPS_BASE = 20 # 1-sigma noise, deg/s
np.random.seed(42)       # Reproducible trials

# --------------------------
# Simulation Helpers
# --------------------------
@dataclass
class SimParams:
    duration_static: float = 60.0   # s before event
    duration_event: float = 5.0     # s during motion
    duration_hold: float = 120.0    # s after motion
    acc_event_offset_mg: np.ndarray = np.array([400, 400, 500])
    gyro_event_rate_dps: np.ndarray = np.array([100, 120, -80])


def simulate_imu(params: SimParams,
                 acc_noise_mg: float = ACC_NOISE_MG_BASE,
                 gyro_noise_dps: float = GYRO_NOISE_DPS_BASE):
    """Return (t, acc, gyro, event_start_idx, event_end_idx)."""
    n_static = int(params.duration_static * FS)
    n_event = int(params.duration_event * FS)
    n_hold = int(params.duration_hold * FS)

    # Static period
    acc_static = ACC_MEAN_MG + np.random.randn(n_static, 3) * acc_noise_mg
    gyro_static = GYRO_MEAN_DPS + np.random.randn(n_static, 3) * gyro_noise_dps

    # Event period
    acc_event = (ACC_MEAN_MG + params.acc_event_offset_mg +
                 np.random.randn(n_event, 3) * acc_noise_mg)
    gyro_event = (params.gyro_event_rate_dps +
                  np.random.randn(n_event, 3) * gyro_noise_dps)

    # Hold period (new orientation, gyro back to bias)
    acc_new_mean = ACC_MEAN_MG + params.acc_event_offset_mg
    acc_hold = acc_new_mean + np.random.randn(n_hold, 3) * acc_noise_mg
    gyro_hold = GYRO_MEAN_DPS + np.random.randn(n_hold, 3) * gyro_noise_dps

    acc = np.vstack((acc_static, acc_event, acc_hold))
    gyro = np.vstack((gyro_static, gyro_event, gyro_hold))

    t = np.arange(acc.shape[0]) * DT
    event_start = n_static
    event_end = n_static + n_event
    return t, acc, gyro, event_start, event_end

# --------------------------
# Orientation & Detection
# --------------------------

def tilt_angles(acc_mg):
    ax, ay, az = acc_mg.T
    ax_g, ay_g, az_g = ax / 1000.0, ay / 1000.0, az / 1000.0
    pitch = np.degrees(np.arctan2(-ax_g, np.sqrt(ay_g ** 2 + az_g ** 2)))
    roll = np.degrees(np.arctan2(ay_g, az_g))
    return pitch, roll


def complementary_filter(acc_mg, gyro_dps, alpha=0.98, dt=DT):
    acc_pitch, acc_roll = tilt_angles(acc_mg)
    n = len(acc_pitch)
    pitch = np.zeros(n)
    roll = np.zeros(n)
    pitch[0] = acc_pitch[0]
    roll[0] = acc_roll[0]
    for i in range(1, n):
        gx, gy, _ = gyro_dps[i]
        roll_gyro = roll[i - 1] + gx * dt
        pitch_gyro = pitch[i - 1] + gy * dt
        roll[i] = alpha * roll_gyro + (1 - alpha) * acc_roll[i]
        pitch[i] = alpha * pitch_gyro + (1 - alpha) * acc_pitch[i]
    return pitch, roll


def lowpass(data, alpha):
    y = np.empty_like(data)
    y[0] = data[0]
    for i in range(1, len(data)):
        y[i] = alpha * data[i] + (1 - alpha) * y[i - 1]
    return y


def detect_abrupt_changes(pitch, roll, threshold_deg=5.0, window_s=2.0):
    win = int(window_s * FS)
    from collections import deque
    dq_pitch, dq_roll = deque(maxlen=win), deque(maxlen=win)
    evt = np.zeros_like(pitch, dtype=bool)
    for i in range(len(pitch)):
        dq_pitch.append(pitch[i])
        dq_roll.append(roll[i])
        if len(dq_pitch) < win:
            continue
        bp = np.mean(dq_pitch)
        br = np.mean(dq_roll)
        if abs(pitch[i] - bp) > threshold_deg or abs(roll[i] - br) > threshold_deg:
            evt[i] = True
    return evt

# --------------------------
# Evaluation Loop
# --------------------------

def run_trials(n_trials=20, acc_noise_mg=ACC_NOISE_MG_BASE, gyro_noise_dps=GYRO_NOISE_DPS_BASE):
    latencies = []
    false_pos_rates = []
    for _ in range(n_trials):
        t, acc, gyro, es, ee = simulate_imu(SimParams(), acc_noise_mg, gyro_noise_dps)
        pitch_cf, roll_cf = complementary_filter(acc, gyro)
        # smooth
        pitch_lp = lowpass(pitch_cf, 0.1)
        roll_lp = lowpass(roll_cf, 0.1)
        abrupt = detect_abrupt_changes(pitch_lp, roll_lp)
        # Metrics
        idx_after_event = np.where(abrupt[es:ee])[0]
        if len(idx_after_event):
            latencies.append(idx_after_event[0] * DT)
        fp = np.sum(abrupt[:es]) / (es * DT / 3600)  # FP/h before event
        false_pos_rates.append(fp)
    det_rate = len(latencies) / n_trials
    print(f"Trials: {n_trials}")
    print(f"Detection rate: {det_rate:.2f}")
    if latencies:
        print(f"Mean latency: {np.mean(latencies):.2f} s")
    print(f"Median false positives per hour: {np.median(false_pos_rates):.2f}")
    return det_rate, latencies, false_pos_rates

# Quick demo
if __name__ == "__main__":
    run_trials(n_trials=10)



Trials: 10
Detection rate: 1.00
Mean latency: 0.03 s
Median false positives per hour: 24810.00
