# Preprocessing IMU Data

In [2]:
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
%matplotlib inline
sns.set_style("whitegrid")

In [None]:
"""Preprocessing IMU data before it goes into maneuver detection algorithms"""
from typing import Dict, Union
import numpy as np
import simdkalman  # lightweight package, not currently a part of the docker
from scipy.ndimage import uniform_filter1d
import yaml


# In KF, it is the ratio between the process noise and the observation noise
# that matters
KF_CONFIG = {
    "moving_window_size": None,
    "kalman_process_noise": 1.,
    "kalman_observation_noise": 1.,
    "kalman_initial_covariance": 10,
}
# sampling rate ~30Hz, and we want to see features of ~0.1 s long
MOVING_AVERAGE_CONFIG = {
    "moving_window_size": 4,
    "kalman_process_noise": None,
    "kalman_observation_noise": None,
    "kalman_initial_covariance": None,
}


class IMUPreprocessor():
    """Preprocess IMU data before it goes into maneuver detection algorithms.

    Roughly follows methods from M. Tabatabaei et al. "Driver Maneuver
    Identification Using Inertial Sensors and a Machine Learning Algorithm"
    """
    def __init__(
        self,
        moving_window_size: int = None,
        kalman_process_noise: float = None,
        kalman_observation_noise: float = None,
        kalman_initial_covariance: float = None,
    ):
        # moving average filter
        self.moving_window_size = moving_window_size
        # kalman filter - see https://simdkalman.readthedocs.io/en/latest/
        self.kalman_process_noise = kalman_process_noise
        self.kalman_observation_noise = kalman_observation_noise
        self.kalman_initial_covariance = kalman_initial_covariance

        self._init_tools()

    def _init_tools(self):
        """Initialize tools for preprocessing based on configuration"""

        # Kalman parameters should both be either set or unset
        if (
            (self.kalman_observation_noise is None)
            ^ (self.kalman_process_noise is None)
        ):
            raise ValueError("Kalman filter parameters incorrectly set")

        self.kalman_filter = (
            simdkalman.KalmanFilter(
                state_transition=[[1]],  # A
                process_noise=self.kalman_process_noise,  # Q
                observation_model=np.array([[1]]),  # H
                observation_noise=self.kalman_observation_noise  # R
            )
            if self.kalman_process_noise is not None else None
        )

    def update_cfg(self, config: Union[str, dict]):
        """Update configuration from a file or dictionary"""
        if isinstance(config, str):
            with open(config, 'r') as f:
                cfg = yaml.safe_load(f)
        else:
            cfg = config

        for attr_name in [
            "moving_window_size",
            "kalman_process_noise",
            "kalman_observation_noise",
            "kalman_initial_covariance",
        ]:
            setattr(
                self,
                attr_name,
                cfg[attr_name] if attr_name in cfg else None
            )
        self._init_tools()

    def _is_imu_values_name(self, key: str) -> bool:
        """name of a column corresponds to either acceleration or gyro"""
        return "time" not in key

    def _moving_average(self, data: np.ndarray) -> np.ndarray:
        """Moving average filter for every key in data except timestamps

        If the moving window size is not set, returns data unchanged.

        Parameters
        ----------
            data : array of the dimension (n_sensors, n_time_points)
            Returns
        -------
            array smooth values of the dimension (n_sensors, n_time_points)
        """
        # This solution relies on scipy.ndimage. If that library becomes
        # unavailable on k3y, look at
        # https://stackoverflow.com/questions/13728392/moving-average-or-running-mean,
        # mind edge efects
        return (
            uniform_filter1d(data, size=self.moving_window_size, axis=1)
            if self.moving_window_size is not None else data
        )

    def _kalman_smoother(self, data):
        """Kalman filter with noise-only term

        relies on https://simdkalman.readthedocs.io/en/latest/
        """
        if self.kalman_filter is None:
            return data

        smoothed = self.kalman_filter.smooth(
            data,
            initial_value=[0],
            initial_covariance=[[self.kalman_initial_covariance]]
        ).states.mean

        return np.squeeze(smoothed)

    def _normalize(self, data: np.ndarray) -> np.ndarray:
        # TODO: normalize to -1..1 min-max based on the training data
        print("WARNING: normalization is not implemented yet")
        return data

    def preprocess(self, data: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        """Run all preprocessing steps on the data

        Parameters:
        -----------
            data : dictionary of arrays, obtained by merging the acceleration
                and gyro dataframes. Keys are the names of the columns
        Returns:
        --------
            dictionary of arrays, with the keys same as the input except an
            extra suffix
        """
        # transform the input dictionary into an array for numerical efficiency
        key_lookup = {}
        data_stack = []
        non_imu_keys = []

        i = 0
        for key, vals in data.items():
            if self._is_imu_values_name(key):
                key_lookup[i] = key
                data_stack.append(vals)
                i += 1
            else:
                non_imu_keys.append(key)
        data_np = np.vstack(data_stack)

        # apply preprocessing steps one by one
        data_np = self._moving_average(data_np)
        data_np = self._kalman_smoother(data_np)
        data_np = self._normalize(data_np)

        # split the result back into a dictionary
        result = {key: data[key] for key in non_imu_keys}
        for i, key in key_lookup.items():
            result[key] = data_np[i, :]

        return result

Fetch some example data

In [8]:
df_a = pd.read_parquet("data/accel_2023-07-10_22-00-00.parquet")
df_g = pd.read_parquet("data/gyro_2023-07-10_22-00-00.parquet")