In [None]:
"""Acceleration threshold + walking overlap analysis.

This notebook loads raw tri-axial accelerometer data (X/Y/Z), constructs a sample-level
timestamp series from a known recording start time and sampling frequency, and computes
the squared acceleration magnitude:

It also loads “Postural Events” exported from PAMWare (Excel) and extracts walking
episodes (“Start of Walking”, “End of Walking”, and optionally “Duration of Walking (s)”),
with light header/column-name normalization to handle formatting differences.

Main output is an interactive Plotly visualization that overlays:

Walking episodes as blue time bars
Triggered “recording segments” as red time bars (built from threshold crossings)
The |a|^2 time series as a line, plus an optional horizontal threshold line
Recording segments are generated by scanning the full signal in fixed windows
(default 2 s). If the maximum |a|^2 within a window exceeds the threshold, the peak
sample becomes a trigger and a segment is created from:

record_prev_seconds before the peak, through
record_after_seconds after the peak
Overlapping/adjacent segments are merged.
For the current zoom/view window, the notebook prints summary metrics:

total walking episodes
total walking duration (s)
total recording segments
total recorded duration (s)
walking episodes overlapped by recording
walking duration overlapped by recording (s)
Key function:
plot_mag2_with_walking_and_recording_bars(...)

Returns:
(fig, rec_df, metrics_df)
- fig: Plotly Figure
- rec_df: DataFrame of recording segment Start/End times
- metrics_df: DataFrame of summary metrics (with percentages vs walking totals)
"""

In [9]:
# importing required libraries
import pandas as pd 
import matplotlib.pyplot as plt
import numpy as np
import re
import plotly.graph_objects as go


In [None]:
# Path to the accelerometer data CSV file
path = "Data/KUBM-002 Acc Data 11-17.csv"

# Path to the postural events data Excel file from PAMWare
xlsx_path = "Data/IBM02_2023_11_17.xlsx"
sheet = "Postural Events"

fs_hz = 50.0  # sampling frequency
recording_start = pd.to_datetime("11/17/23 12:00:00 AM")
recording_end_expected = pd.to_datetime("11/18/23 12:00:00 AM") # You can adjust this if needed differen prefferred end time

In [None]:
# Read the accelerometer data table (skip metadata rows above the real header)
df = pd.read_csv(
    path,
    skiprows=7,# rows 0-6 are metadata; row 7 contains the real header
    sep=",",
    skipinitialspace=True,   # trims spaces after commas in header + values
    engine="python",
 )

# Keep just X/Y/Z if that's all you need (optional):
df_xyz = df[["X", "Y", "Z"]].apply(pd.to_numeric, errors="coerce")

# Build a timestamp for every sample based on start time + fs

n = len(df_xyz)
t = recording_start + pd.to_timedelta(np.arange(n) / fs_hz, unit="s")
df_xyz_time = df_xyz.copy()
df_xyz_time["Time"] = t

df_xyz.head()

In [None]:
# Optional: downsample for faster plotting on large files
plot_df = df_xyz
max_points = 2705951
if len(plot_df) > max_points:
    step = max(1, len(plot_df) // max_points)
    plot_df = plot_df.iloc[::step]

# Squared magnitude (in g^2 if X/Y/Z are in g)
mag2 = (plot_df["X"] ** 2) + (plot_df["Y"] ** 2) + (plot_df["Z"] ** 2)
threshold_g2 = 1.44

plt.figure(figsize=(14, 5))
plt.plot(plot_df.index, plot_df["X"], color="blue", linewidth=1, label="X")
plt.plot(plot_df.index, plot_df["Y"], color="red", linewidth=1, label="Y")
plt.plot(plot_df.index, plot_df["Z"], color="green", linewidth=1, label="Z")

# Plot squared magnitude + threshold line
#plt.plot(plot_df.index, mag2, color="black", linewidth=1, alpha=0.8, label=r"|a|$^2$ (X$^2$+Y$^2$+Z$^2$)")
plt.axhline(threshold_g2, color="purple", linestyle="--", linewidth=1.5, label=r"Threshold = 1.44 g$^2$")

plt.title("Acceleration (X/Y/Z) + Squared Magnitude over Samples")
plt.xlabel("Sample Index")
plt.ylabel("Acceleration (g) / Squared Magnitude (g$^2$)")
plt.grid(True, alpha=0.3)
plt.legend(ncols=2)
plt.tight_layout()
plt.show()

In [None]:

def _norm_col(name: str) -> str:
    s = "" if name is None else str(name)
    s = s.strip().lower()
    return re.sub(r"[^a-z0-9]+", "", s)

targets = {
    "Start of Walking": _norm_col("Start of Walking"),
    "End of Walking": _norm_col("End of Walking"),
    "Duration of Walking (s)": _norm_col("Duration of Walking (s)"),
}

def _read_with(header, skiprows):
    df = pd.read_excel(
        xlsx_path,
        sheet_name=sheet,
        header=header,
        skiprows=skiprows,
        engine="openpyxl",
    )
    df.columns = df.columns.astype(str).str.strip()
    return df

# 1) Try exactly what you requested (pandas uses 0-based row indices).
postural = _read_with(header=2, skiprows=[1, 3])

# 2) If expected columns are not present, auto-detect the header row from the first ~20 rows.
normed_cols = {_norm_col(c) for c in postural.columns}
if not set(targets.values()).issubset(normed_cols):
    preview = pd.read_excel(
        xlsx_path,
        sheet_name=sheet,
        header=None,
        nrows=25,
        engine="openpyxl",
    )
    header_row = None
    for r in range(len(preview)):
        row_vals = [_norm_col(v) for v in preview.iloc[r].tolist()]
        if all(t in row_vals for t in targets.values()):
            header_row = r
            break
    if header_row is None:
        raise KeyError(
            "Could not find a header row containing the expected columns. "
            f"Available columns from first attempt: {list(postural.columns)}"
        )
    postural = pd.read_excel(
        xlsx_path,
        sheet_name=sheet,
        header=header_row,
        engine="openpyxl",
    )
    postural.columns = postural.columns.astype(str).str.strip()

# Build a lookup from normalized column name -> actual column name
col_lookup = {_norm_col(c): c for c in postural.columns}
walk_events = postural.loc[:, [col_lookup[targets[k]] for k in targets]].copy()

walk_events.head()

In [None]:



def plot_mag2_with_walking_and_recording_bars(
    df_xyz: pd.DataFrame,
    walk_events: pd.DataFrame,
    df_xyz_time: pd.DataFrame | None = None,
    fs_hz: float = 50,
    recording_start: str | pd.Timestamp = "01/08/24 08:58:00 AM",
    zoom_start: str | pd.Timestamp | None = None,
    zoom_end: str | pd.Timestamp | None = None,
    max_points: int = 200_000,
    threshold_g2: float | None = 1.44,
    bar_height_g2: float = 3.0,          # walking bar height (g^2 units)
    rec_bar_height_g2: float = 0.6,      # recording bar height (smaller)
    check_window_seconds: float = 2.0,   # peak check window length
    record_prev_seconds: float = 2.0,    # record previous 2s FROM PEAK SAMPLE
    record_after_seconds: float = 15.0,  # record next 15s FROM PEAK SAMPLE
    show_peak_markers: bool = True,      # debug: show detected peak triggers
):
    """
    One Plotly graph:
      - mag2 line (same a2 used for detection)
      - walking bars (blue)
      - recording bars (red)

    Also prints 6 metrics UNDER the graph (for the current zoom window):
      1) total walking episodes
      2) total walking duration (s)
      3) total recording segments (red boxes)
      4) total recorded time (s)
      5) walking episodes overlapped by recording
      6) walking duration overlapped by recording (s)
    """

    # -----------------------------
    # Build/validate timebase
    # -----------------------------
    if df_xyz_time is None:
        if df_xyz is None or len(df_xyz) == 0:
            raise ValueError("df_xyz must be a non-empty DataFrame.")
        if not {"X", "Y", "Z"}.issubset(df_xyz.columns):
            raise ValueError("df_xyz must contain columns: X, Y, Z")

        start_ts = pd.to_datetime(recording_start)
        n = len(df_xyz)
        df_xyz_time = df_xyz.copy()
        df_xyz_time["Time"] = start_ts + pd.to_timedelta(np.arange(n) / fs_hz, unit="s")
        fs_eff = float(fs_hz)
    else:
        if "Time" not in df_xyz_time.columns:
            raise ValueError("df_xyz_time must contain a 'Time' column.")
        if not {"X", "Y", "Z"}.issubset(df_xyz_time.columns):
            raise ValueError("df_xyz_time must contain columns: Time, X, Y, Z")

        df_xyz_time = df_xyz_time.copy()
        df_xyz_time["Time"] = pd.to_datetime(df_xyz_time["Time"], errors="coerce")
        if df_xyz_time["Time"].isna().all():
            raise ValueError("df_xyz_time['Time'] could not be parsed into datetimes.")

        dt = df_xyz_time["Time"].diff().dt.total_seconds().to_numpy()
        dt = dt[np.isfinite(dt) & (dt > 0)]
        fs_eff = float(1.0 / np.median(dt)) if dt.size else float(fs_hz)

    # -----------------------------
    # Numeric X/Y/Z + a2 (same for plot + detection)
    # -----------------------------
    df_mag = df_xyz_time[["Time", "X", "Y", "Z"]].copy()
    for c in ["X", "Y", "Z"]:
        df_mag[c] = pd.to_numeric(df_mag[c], errors="coerce")

    x = np.nan_to_num(df_mag["X"].to_numpy(dtype=np.float32, copy=False), nan=0.0)
    y = np.nan_to_num(df_mag["Y"].to_numpy(dtype=np.float32, copy=False), nan=0.0)
    z = np.nan_to_num(df_mag["Z"].to_numpy(dtype=np.float32, copy=False), nan=0.0)

    a2 = x * x + y * y + z * z
    df_mag["mag2"] = a2
    n = int(a2.shape[0])
    if n == 0:
        raise ValueError("No samples found.")

    # -----------------------------
    # Episodes (walking)
    # -----------------------------
    episodes = walk_events.copy()
    if not {"Start of Walking", "End of Walking"}.issubset(episodes.columns):
        raise ValueError("walk_events must contain columns: 'Start of Walking', 'End of Walking'")

    episodes["Start of Walking"] = pd.to_datetime(episodes["Start of Walking"], errors="coerce")
    episodes["End of Walking"] = pd.to_datetime(episodes["End of Walking"], errors="coerce")
    episodes = (
        episodes.dropna(subset=["Start of Walking", "End of Walking"])
        .sort_values("Start of Walking")
        .reset_index(drop=True)
    )
    if episodes.empty:
        raise ValueError("No valid walking episodes found after parsing Start/End times.")

    # -----------------------------
    # Zoom parsing
    # -----------------------------
    def _parse_zoom(value, default_date: pd.Timestamp):
        if value is None or (isinstance(value, float) and pd.isna(value)):
            return None
        s = str(value).strip()
        if not s:
            return None
        dtv = pd.to_datetime(s, errors="coerce")
        if pd.isna(dtv):
            return None
        if dtv.date() == pd.Timestamp("1970-01-01").date():
            dtv = pd.Timestamp.combine(default_date.date(), dtv.time())
        return dtv

    default_date = episodes["Start of Walking"].iloc[0]
    z0 = _parse_zoom(zoom_start, default_date)
    z1 = _parse_zoom(zoom_end, default_date)

    x0 = z0 if z0 is not None else episodes["Start of Walking"].iloc[0]
    x1 = z1 if z1 is not None else episodes["End of Walking"].iloc[-1]
    if x1 < x0:
        raise ValueError("zoom_end must be after zoom_start")

    # -----------------------------
    # Peak-preserving decimation (min+max per bin)
    # -----------------------------
    mag_view = df_mag[(df_mag["Time"] >= x0) & (df_mag["Time"] <= x1)].copy()
    if len(mag_view) > max_points:
        step = max(1, len(mag_view) // max_points)
        idx = np.arange(len(mag_view))
        grp = idx // step
        g = mag_view.groupby(grp, sort=False)["mag2"]
        keep = np.unique(np.concatenate([g.idxmax().to_numpy(), g.idxmin().to_numpy()]))
        mag_view = mag_view.loc[keep].sort_index()

    # -----------------------------
    # Walking bars (within view)
    # -----------------------------
    ep_view = episodes[(episodes["End of Walking"] >= x0) & (episodes["Start of Walking"] <= x1)].copy()
    ep_dur_s = (ep_view["End of Walking"] - ep_view["Start of Walking"]).dt.total_seconds()
    ep_mid = ep_view["Start of Walking"] + (ep_view["End of Walking"] - ep_view["Start of Walking"]) / 2
    ep_width_ms = ep_dur_s * 1000.0

    # -----------------------------
    # Recording segments (FULL series, then filter to view)
    # -----------------------------
    rec_df = pd.DataFrame(columns=["Start", "End"])
    peak_times_in_view = None

    if threshold_g2 is not None:
        check_n = int(round(fs_eff * check_window_seconds))
        prev_n = int(round(fs_eff * record_prev_seconds))
        after_n = int(round(fs_eff * record_after_seconds))

        total_full_windows = n // check_n

        segments = []      # [start_idx, end_idx) end exclusive
        peak_indices = []
        current_end = None

        for w in range(total_full_windows):
            w_start = w * check_n
            block = a2[w_start : w_start + check_n]
            if block.size == 0:
                continue

            peak_val = float(np.max(block))
            if peak_val < float(threshold_g2):
                continue

            peak_off = int(np.argmax(block))
            peak_idx = w_start + peak_off
            peak_indices.append(peak_idx)

            seg_start = peak_idx - prev_n
            seg_end = peak_idx + after_n  # 2s before peak + 15s after peak = 17s total

            if current_end is None or seg_start >= current_end:
                current_end = seg_end
                segments.append([seg_start, seg_end])
            else:
                if seg_end > current_end:
                    current_end = seg_end
                    segments[-1][1] = current_end

        # cap + merge
        capped = []
        for s0, s1 in segments:
            s0_c = max(0, min(n, int(s0)))
            s1_c = max(0, min(n, int(s1)))
            if s1_c > s0_c:
                capped.append((s0_c, s1_c))

        capped.sort()
        merged = []
        for s0, s1 in capped:
            if not merged or s0 > merged[-1][1]:
                merged.append([s0, s1])
            else:
                merged[-1][1] = max(merged[-1][1], s1)
        merged = [(a, b) for a, b in merged]

        t0 = pd.to_datetime(df_mag["Time"].iloc[0])
        rec_starts = [t0 + pd.to_timedelta(s0 / fs_eff, unit="s") for s0, _ in merged]
        rec_ends = [t0 + pd.to_timedelta(s1 / fs_eff, unit="s") for _, s1 in merged]

        rec_df = pd.DataFrame({"Start": rec_starts, "End": rec_ends})
        rec_df = rec_df[(rec_df["End"] >= x0) & (rec_df["Start"] <= x1)].copy()

        rec_dur_s = (rec_df["End"] - rec_df["Start"]).dt.total_seconds()
        rec_mid = rec_df["Start"] + (rec_df["End"] - rec_df["Start"]) / 2
        rec_width_ms = rec_dur_s * 1000.0

        if show_peak_markers and peak_indices:
            peak_times = [t0 + pd.to_timedelta(i / fs_eff, unit="s") for i in peak_indices]
            peak_vals = [a2[i] for i in peak_indices]
            peak_times_in_view = [
                (pt, pv) for pt, pv in zip(peak_times, peak_vals) if (pt >= x0 and pt <= x1)
            ]
    else:
        rec_mid = np.array([])
        rec_width_ms = np.array([])
        rec_dur_s = np.array([])

    # -----------------------------
    # Plot
    # -----------------------------
    fig = go.Figure()

    # walking bars
    fig.add_trace(
        go.Bar(
            x=ep_mid,
            y=np.full(len(ep_view), float(bar_height_g2)),
            width=ep_width_ms,
            name="Walking episode",
            marker=dict(
                color="rgba(76,120,168,0.20)",
                line=dict(color="rgba(47,75,124,0.7)", width=1),
            ),
            customdata=np.stack(
                [
                    ep_view["Start of Walking"].astype(str),
                    ep_view["End of Walking"].astype(str),
                    ep_dur_s.to_numpy(),
                ],
                axis=1,
            ),
            hovertemplate=(
                "Walking<br>"
                "Start: %{customdata[0]}<br>"
                "End: %{customdata[1]}<br>"
                "Duration: %{customdata[2]:.1f} s"
                "<extra></extra>"
            ),
        )
    )

    # recording bars
    if len(rec_df) > 0:
        fig.add_trace(
            go.Bar(
                x=rec_mid,
                y=np.full(len(rec_df), float(rec_bar_height_g2)),
                width=rec_width_ms,
                name="Recording segment",
                marker=dict(
                    color="rgba(220,53,69,0.35)",
                    line=dict(color="rgba(180,30,45,0.85)", width=1),
                ),
                customdata=np.stack(
                    [
                        rec_df["Start"].astype(str),
                        rec_df["End"].astype(str),
                        rec_dur_s.to_numpy(),
                    ],
                    axis=1,
                ),
                hovertemplate=(
                    "Recording<br>"
                    "Start: %{customdata[0]}<br>"
                    "End: %{customdata[1]}<br>"
                    "Duration: %{customdata[2]:.1f} s"
                    "<extra></extra>"
                ),
            )
        )

    # mag2 line
    fig.add_trace(
        go.Scatter(
            x=mag_view["Time"],
            y=mag_view["mag2"],
            mode="lines",
            name="|a|^2 (g^2)",
            line=dict(color="black", width=1),
            hovertemplate="%{x}<br>|a|^2: %{y:.3f} g^2<extra></extra>",
        )
    )

    # peak markers (debug)
    if peak_times_in_view:
        fig.add_trace(
            go.Scatter(
                x=[t for t, _ in peak_times_in_view],
                y=[v for _, v in peak_times_in_view],
                mode="markers",
                name="Detected peak triggers",
                marker=dict(size=6, color="red"),
                hovertemplate="%{x}<br>peak |a|^2: %{y:.3f} g^2<extra></extra>",
            )
        )

    # threshold line
    if threshold_g2 is not None:
        fig.add_hline(
            y=float(threshold_g2),
            line_dash="dash",
            line_color="purple",
            annotation_text=f"{threshold_g2:.2f} g^2",
            annotation_position="top right",
        )

    fig.update_layout(
        title=f"Walking (Blue) + Recording (Red) + |a|^2 (fs≈{fs_eff:.2f} Hz)",
        barmode="overlay",
        height=540,
        margin=dict(l=50, r=50, t=60, b=45),
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
    )
    fig.update_xaxes(range=[x0, x1], showgrid=True, title_text="Time", rangeslider_visible=True)
    fig.update_yaxes(showgrid=True, title_text="|a|^2 (g^2)")

    fig.show()

       # -----------------------------
    # Metrics (for this zoom window)
    # -----------------------------
    # Total walking episodes & duration (within zoom window)
    total_walk_episodes = int(len(ep_view))
    total_walk_duration_s = float(ep_dur_s.sum())

    # Total recording segments & duration (within zoom window)
    total_rec_segments = int(len(rec_df))
    total_rec_duration_s = float(rec_dur_s.sum()) if len(rec_df) else 0.0

    # Overlap metrics
    rec_intervals = [
        (s.to_pydatetime(), e.to_pydatetime())
        for s, e in zip(rec_df["Start"], rec_df["End"])
    ] if len(rec_df) else []

    def overlap_seconds(ep_start: pd.Timestamp, ep_end: pd.Timestamp, intervals) -> float:
        if ep_end <= ep_start or not intervals:
            return 0.0
        total = 0.0
        for rs, re in intervals:
            a = max(ep_start.to_pydatetime(), rs)
            b = min(ep_end.to_pydatetime(), re)
            if b > a:
                total += (b - a).total_seconds()
        return float(total)

    overlap_dur_s = []
    overlapped_flags = []

    for s, e in zip(ep_view["Start of Walking"], ep_view["End of Walking"]):
        ov = overlap_seconds(s, e, rec_intervals)
        overlap_dur_s.append(ov)
        overlapped_flags.append(ov > 0)

    overlap_walk_duration_s = float(np.sum(overlap_dur_s))
    overlap_walk_episodes = int(np.sum(overlapped_flags))

       # -----------------------------
    # Percentages (walking = 100%)
    # -----------------------------
    ep_pct = lambda x: (x / total_walk_episodes * 100.0) if total_walk_episodes else np.nan
    dur_pct = lambda x: (x / total_walk_duration_s * 100.0) if total_walk_duration_s else np.nan

    metrics_df = pd.DataFrame(
        {
            "Metric": [
                "Total walking episodes (blue)",
                "Total walking duration (s) (blue)",
                "Total recording segments (red)",
                "Total recorded duration (s) (red)",
                "Walking episodes overlapped by recording",
                "Walking duration overlapped by recording (s)",
            ],
            "Value": [
                total_walk_episodes,
                total_walk_duration_s,
                total_rec_segments,
                total_rec_duration_s,
                overlap_walk_episodes,
                overlap_walk_duration_s,
            ],
            "Percent (%)": [
                100.0,                                # baseline
                100.0,                                # baseline
                ep_pct(total_rec_segments),           # <-- FIXED
                dur_pct(total_rec_duration_s),
                ep_pct(overlap_walk_episodes),
                dur_pct(overlap_walk_duration_s),
            ],
        }
    )


    print(metrics_df.to_string(index=False))
    return fig, rec_df, metrics_df




In [None]:
# -----------------------
# READY-TO-USE EXAMPLE
# -----------------------
fig, rec_df, metrics_df = plot_mag2_with_walking_and_recording_bars(
    df_xyz=df_xyz,
    walk_events=walk_events,
    df_xyz_time=df_xyz_time,
    fs_hz=fs_hz,                 
    recording_start=recording_start,
    zoom_start=None,
    zoom_end=None,
    max_points=200_000,
    threshold_g2=1.44, # squared g threshold for detection you can adjust this as needed
    bar_height_g2=5.0,
    rec_bar_height_g2=1.0,
    check_window_seconds=2.0,
    record_prev_seconds=2.0,
    record_after_seconds=15.0,
    show_peak_markers=True,
)