# Inertial Measurement Unit (IMU) Data Analysis

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from pathlib import Path
from kielmat.datasets.keepcontrol import load_recording

Let us load a recording from the Keep Control validation dataset ([Warmerdam et al., 2022](https://doi.org/10.3390/data7100136)). The dataset contains simulatenous recording from a 12-camera optical motion capture (OMC) system and at least 15 inertial measurement units (IMUs) attached to the body of the participants.

Participants performed a series of mobility tasks, such as walking, turning, and sit-to-stand, while wearing the IMUs.

Here, we will take a look at the foot IMU data from the slow walking trial, which is walking a 5 meter level-ground distance at half your normal speed.

In [3]:
DATASET_PATH = Path("../../datasets/keepcontrol")
subject_id = "pp002"
task_name = "walkSlow"
recording = load_recording(dataset_path=DATASET_PATH, id=subject_id, task=task_name)



To load the data we use the `KielMAT` package, which returns a `KielMATRecording` object. This object has several attribues, namely:

In [4]:
recording.__dict__.keys()

dict_keys(['data', 'channels', 'info', 'events', 'events_info'])

Both the `data` and `channels` attributes are dictionaries, where the keys are the names of the tracking system, and the values `DataFrame`. The keys of the `data` and `channels` dictionaries should always be the same. A tracking system is defined as a group of motion channels that share hardware properties (the recording device) and software properties (the recording duration and number of samples) ([Motion-BIDS](https://bids-specification.readthedocs.io/en/stable/modality-specific-files/motion.html#motion-recording-data); [Jeung et al., 2024](https://doi.org/10.1038/s41597-024-03559-8)).

In [5]:
print(recording.data.keys(), recording.channels.keys())

dict_keys(['imu', 'omc']) dict_keys(['imu', 'omc'])


Now, let us extract some IMU data, and some relevant channels information to visually explore the data.

In [6]:
imu_data = recording.data["imu"]
imu_channels = recording.channels["imu"]
marker_data = recording.data["omc"]
marker_channels = recording.channels["omc"]

In [7]:
imu_channels.head()

Unnamed: 0,name,component,type,tracked_point,units,sampling_frequency,tracking_system
0,head_ACCEL_x,x,ACCEL,head,g,200,imu
1,head_ACCEL_y,y,ACCEL,head,g,200,imu
2,head_ACCEL_z,z,ACCEL,head,g,200,imu
3,head_GYRO_x,x,GYRO,head,deg/s,200,imu
4,head_GYRO_y,y,GYRO,head,deg/s,200,imu


In [8]:
fs = imu_channels["sampling_frequency"].iloc[0].astype(float)
mapping_type_units = {
    ch_type: imu_channels[imu_channels["type"] == ch_type]["units"].iloc[0]
    for ch_type in imu_channels["type"].unique()
}
print(f"Recording was sampled at {fs} Hz")
print(f"The mapping of channel types to units is: {mapping_type_units}")

Recording was sampled at 200.0 Hz
The mapping of channel types to units is: {'ACCEL': 'g', 'GYRO': 'deg/s', 'MAGN': 'Gauss'}


Let us check to which body segments the IMUs are attached to.

In [9]:
imu_channels["tracked_point"].unique()

array(['head', 'sternum', 'left_upper_arm', 'left_fore_arm',
       'right_upper_arm', 'right_fore_arm', 'pelvis', 'left_thigh',
       'left_shank', 'left_foot', 'right_thigh', 'right_shank',
       'right_foot', 'left_ankle', 'right_ankle'], dtype=object)

We will plot the acceleration norm of different IMUs to see how they behave during the slow walking trial.

In [10]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

body_segments = ["head", "sternum", "pelvis", "left_foot"]
fig = go.Figure()
for tracked_point in body_segments:
    # Calculate the acceleration magnitude
    acc_mag = np.linalg.norm(imu_data[[f"{tracked_point}_ACCEL_{x}" for x in "xyz"]], axis=1)

    fig.add_trace(
        go.Scatter(
            x=np.arange(len(acc_mag)) / fs,
            y=acc_mag,
            mode="lines",
            name=tracked_point,
        )
    )
fig.update_yaxes(title_text=f"acceleration ({mapping_type_units['ACCEL']})")
fig.update_xaxes(title_text="time (s)")
fig.show()


Now, we will switch focus to the foot-worn IMU. We assign the variable `tracked_point` and visualize the acceleration and angular velocity of the foot IMU.

In [11]:
tracked_point = "left_foot"
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02)
for row_idx, ch_type in enumerate(["ACCEL", "GYRO"]):
    for ch_comp in ["x", "y", "z"]:
        fig.add_trace(
            go.Scatter(
                x=np.arange(len(imu_data)) / fs,
                y=imu_data[f"{tracked_point}_{ch_type}_{ch_comp}"],
                mode="lines",
                name=f"{ch_type}_{ch_comp}",
            ), row=row_idx + 1, col=1
        )
    fig.update_yaxes(title_text=f"{ch_type} ({mapping_type_units[ch_type]})", row=row_idx + 1)
fig.update_xaxes(title_text="time (s)", row=2)
fig.show()

To extract any clinically relevant parameters from the IMU data, there are several steps that need to be taken, such as filtering, segmentation, and feature extraction. To begin with, it often makes sense to align the IMU with a reference coordinate system, such that the acceleration can potentially be used to estimate any spatial parameters of interest.



To estimate the orienation of the IMU in a global coordinate system, we can fuse data from the accelerometer and gyroscope using a sensor fusion algorithm. Here, we will use the versatile quaternion filter (VQF, [Laidig and Seel, 2023](https://doi.org/10.1016/j.inffus.2022.10.014)). Let us take a quick look at the documentation: [https://vqf.readthedocs.io/en/latest/getting_started.html](https://vqf.readthedocs.io/en/latest/getting_started.html).

In [12]:
gyr = imu_data[[f"{tracked_point}_GYRO_{x}" for x in "xyz"]].to_numpy()
acc = imu_data[[f"{tracked_point}_ACCEL_{x}" for x in "xyz"]].to_numpy()

gyr = np.ascontiguousarray(gyr, dtype=np.float64)
acc = np.ascontiguousarray(acc, dtype=np.float64)

gyr = gyr * np.pi / 180  if mapping_type_units["GYRO"] == "deg/s" else gyr
acc = acc * 9.81 if mapping_type_units["ACCEL"] == "g" else acc

In [13]:
from vqf import VQF

# run orientation estimation
Ts = 1 / fs
vqf = VQF(Ts)
# alternative: vqf = PyVQF(Ts)
out = vqf.updateBatch(gyr, acc)

Now we can use the orientation estimate to align the acceleration data with the global coordinate system. For that purpose, we need to perform quaternion rotation for each time point.

In [14]:
acc_rot = np.zeros_like(acc)
gyr_rot = np.zeros_like(gyr)
for t in range(len(acc)):
    acc_rot[t] = vqf.quatRotate(out["quat6D"][t], acc[t])
    gyr_rot[t] = vqf.quatRotate(out["quat6D"][t], gyr[t])

In [15]:
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02)
for ch_idx in range(3):
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(acc_rot)) / fs,
            y=acc_rot[:, ch_idx],
            mode="lines",
            name=f"acc_{ch_idx}",
        ), row=1, col=1
    )
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(gyr_rot)) / fs,
            y=gyr_rot[:, ch_idx],
            mode="lines",
            name=f"gyr_{ch_idx}",
        ), row=2, col=1
    )
fig.update_yaxes(title_text=f"acceleration (m/s^2)", row=1)
fig.update_yaxes(title_text=f"angular velocity (rad/s)", row=2)
fig.update_xaxes(title_text="time (s)", row=2)
fig.show()

Let us now implement a simple approach to extract some temporal gait events. Clearly, the strides are characterized by negative peaks in the second angular velocity signal (`gyr_1`). This negavite peak roughly corresponds to the timing of midswing. We can use a simple peak detection algorithm to extract the timing of these peaks.

In [16]:
from scipy.signal import find_peaks

idxs_midswing, _ = find_peaks(-gyr_rot[:, 1], height=(50 * np.pi / 180), distance=int(fs//2))

In [17]:
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02)
for ch_idx in range(3):
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(acc_rot)) / fs,
            y=acc_rot[:, ch_idx],
            mode="lines",
            name=f"acc_{ch_idx}",
        ), row=1, col=1
    )
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(gyr_rot)) / fs,
            y=gyr_rot[:, ch_idx],
            mode="lines",
            name=f"gyr_{ch_idx}",
        ), row=2, col=1
    )
fig.add_trace(
    go.Scatter(
        x=idxs_midswing / fs,
        y=gyr_rot[idxs_midswing, 1],
        mode="markers",
        marker=dict(size=10),
        name="midswing",
    ), row=2, col=1
)
fig.update_yaxes(title_text=f"acceleration (m/s^2)", row=1)
fig.update_yaxes(title_text=f"angular velocity (rad/s)", row=2)
fig.update_xaxes(title_text="time (s)", row=2)
fig.show()

Similarly, the positive peaks before the midswing peak correspond to the approximate timing of toe off, and the peaks after midswing correspond roughly to the timing of heel strike. We can use the same peak detection algorithm to extract the timing of these peaks.

In [18]:
idxs_peaks, _ = find_peaks(gyr_rot[:, 1], height=(50 * np.pi / 180), distance=int(fs//4))

idxs_initial_contact = np.zeros_like(idxs_midswing)
idxs_final_contact = np.zeros_like(idxs_midswing)
for i in range(len(idxs_midswing)):
    f = np.argwhere(idxs_peaks > idxs_midswing[i])[:,0]
    if len(f) > 0:
        idxs_initial_contact[i] = idxs_peaks[f[0]]
    g = np.argwhere(idxs_peaks < idxs_midswing[i])[:,0]
    if len(g) > 0:
        idxs_final_contact[i] = idxs_peaks[g[-1]]

In [19]:
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02)
for ch_idx in range(3):
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(acc_rot)) / fs,
            y=acc_rot[:, ch_idx],
            mode="lines",
            name=f"acc_{ch_idx}",
        ), row=1, col=1
    )
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(gyr_rot)) / fs,
            y=gyr_rot[:, ch_idx],
            mode="lines",
            name=f"gyr_{ch_idx}",
        ), row=2, col=1
    )
fig.add_trace(
    go.Scatter(
        x=idxs_midswing / fs,
        y=gyr_rot[idxs_midswing, 1],
        mode="markers",
        marker=dict(size=10),
        name="midswing",
    ), row=2, col=1
)
fig.add_trace(
    go.Scatter(
        x=idxs_peaks / fs,
        y=gyr_rot[idxs_peaks, 1],
        mode="markers",
        marker=dict(size=10),
        name="peaks",
    ), row=2, col=1
)
fig.add_trace(
    go.Scatter(
        x=idxs_initial_contact / fs,
        y=gyr_rot[idxs_initial_contact.astype(int), 1],
        mode="markers",
        marker=dict(size=10),
        name="initial_contact",
    ), row=2, col=1
)
fig.add_trace(
    go.Scatter(
        x=idxs_final_contact / fs,
        y=gyr_rot[idxs_final_contact.astype(int), 1],
        mode="markers",
        marker=dict(size=10),
        name="final_contact",
    ), row=2, col=1
)
fig.update_yaxes(title_text=f"acceleration (m/s^2)", row=1)
fig.update_yaxes(title_text=f"angular velocity (rad/s)", row=2)
fig.update_xaxes(title_text="time (s)", row=2)
fig.show()

Finally, we can compute the stride time, swing time and stance time for each of the left strides.

In [20]:
stride_times = (idxs_initial_contact[1:] - idxs_initial_contact[:-1]) / fs
swing_times = (idxs_initial_contact - idxs_final_contact) / fs
stance_times = (idxs_final_contact[1:] - idxs_initial_contact[:-1]) / fs
print(f"Stride times: {stride_times}")
print(f"Stance times: {stance_times}")
print(f"Swing times: {swing_times}")

Stride times: [1.295 1.41  1.445 1.445 1.455 1.475]
Stance times: [0.705 0.785 0.89  0.815 0.87  0.875]
Swing times: [0.605 0.59  0.625 0.555 0.63  0.585 0.6  ]


Luckily, for this dataset, the gait events are already annotated in the `events` attribute of the `KielMATRecording` object. Let us compare the results of our simple peak detection algorithm with the annotated gait events.

In [21]:
def load_events(fpath: str | Path) -> pd.DataFrame:
    # Parse file path
    fpath = Path(fpath) if isinstance(fpath, str) else fpath

    # Load marker data and channels info
    events = pd.read_csv(fpath, sep="\t", header=0)
    return events

In [22]:
events_df = load_events(DATASET_PATH / f"sub-{subject_id}" / "motion" / f"sub-{subject_id}_task-{task_name}_events.tsv")

In [26]:
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02)
for ch_idx in range(3):
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(acc_rot)) / fs,
            y=acc_rot[:, ch_idx],
            mode="lines",
            name=f"acc_{ch_idx}",
        ), row=1, col=1
    )
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(gyr_rot)) / fs,
            y=gyr_rot[:, ch_idx],
            mode="lines",
            name=f"gyr_{ch_idx}",
        ), row=2, col=1
    )
fig.add_trace(
    go.Scatter(
        x=idxs_midswing / fs,
        y=gyr_rot[idxs_midswing, 1],
        mode="markers",
        marker=dict(size=10),
        name="midswing",
    ), row=2, col=1
)
fig.add_trace(
    go.Scatter(
        x=idxs_initial_contact / fs,
        y=gyr_rot[idxs_initial_contact.astype(int), 1],
        mode="markers",
        marker=dict(size=10),
        name="initial_contact",
    ), row=2, col=1
)
fig.add_trace(
    go.Scatter(
        x=idxs_final_contact / fs,
        y=gyr_rot[idxs_final_contact.astype(int), 1],
        mode="markers",
        marker=dict(size=10),
        name="final_contact",
    ), row=2, col=1
)
for idx in events_df[events_df["event_type"]=="initial_contact_left"]["onset"].values:
    fig.add_vline(x=idx / fs, line=dict(color="black", dash="dashdot"), opacity=0.25, row=1, col=1)
    fig.add_vline(x=idx / fs, line=dict(color="black", dash="dashdot"), opacity=0.25, row=2, col=1)
for idx in events_df[events_df["event_type"]=="final_contact_left"]["onset"].values:
    fig.add_vline(x=idx / fs, line=dict(color="black", dash="dash"), opacity=0.25, row=1, col=1)
    fig.add_vline(x=idx / fs, line=dict(color="black", dash="dash"), opacity=0.25, row=2, col=1)
fig.update_yaxes(title_text=f"acceleration (m/s^2)", row=1)
fig.update_yaxes(title_text=f"angular velocity (rad/s)", row=2)
fig.update_xaxes(title_text="time (s)", row=2)
fig.show()