# Extract Kinematic Parameters From Movement Data

## 1) Imports

In [None]:
# libraries
import os
import toml
import numpy as np
import pandas as pd
from scipy import signal
from scipy.signal import butter, sosfiltfilt

# modules
import motion_processing
from src import utils
import visualizations as visu

## 2) Config

In [None]:
# read config data
with open('config.toml', 'r') as f:
    config = toml.load(f)

# load relevant paths
preprocessed_data_path: str = config['batch_preprocessing']['prepro_out_path']
param_extract_out_path: str = config['parameter_extraction']['param_extract_out_path']

# global variables
FRAMERATE = config['camera_params']['framerate']


## 3) Load Movement Data to Dictionary

In [None]:
# read preprocessed files form path
hands_data_file_lst: list = [os.path.join(preprocessed_data_path, x) for x in sorted(os.listdir(preprocessed_data_path))
                             if x.endswith('hands_processed.csv') and not x.startswith('.')]
# sort
hands_data_file_lst = sorted(hands_data_file_lst)

# organize by exercise
hand_exercise_dict: dict = utils.group_motion_files_by_exercise(hands_data_file_lst)

print('Number of participants represented per exercise: ')
for key in hand_exercise_dict.keys():
    print(f'Exercise {key}: {len(hand_exercise_dict[key])} participants')

## 4) Exercise-Specific Parameter Extraction

In [None]:


def extract_landmarks_of_focus(movement_fpath: str, landmark_lst: list) -> list:
    """
    Renames basic landmark names with appropriate index (1: left, 2: right)

    Args:
        movement_fpath (str): movement file path.
        landmark_lst (list): list of landmark names.

    Returns:
        new_landmark_lst (list): modified list of landmark names. Extends landmark names with index of focused side.
    """
    single_digit_landmark_lst: list = ['wrist', 'elbow', 'shoulder']

    task_df = pd.read_csv(movement_fpath)

    # infer the side focused in the given trial
    focus_side: str = utils.infer_focus_side(task_df, model_type='Hand')

    landmark_model_id: str = ''
    if focus_side == 'Left':
        landmark_model_id = '1'
    elif focus_side == 'Right':
        landmark_model_id = '2'
    else:
        print(f'Error: The focused hand side {focus_side} is invalid.')

    # create the list of landmarks for the focused side
    new_landmark_lst: list = [(x[:-1] + landmark_model_id + x[-1]) if x not in single_digit_landmark_lst
                              else (x[:] + landmark_model_id) for x in landmark_lst]

    return new_landmark_lst


def extract_irregular_movements_parameters(raw_signal: np.ndarray,
                                           framerate: float = FRAMERATE,
                                           plot_results: bool = False,
                                           detection_config: dict = None) -> dict:
    """
    Extracts movement parameters using a 2-Pass adaptive strategy:
    1) Scout: gets Median amplitude and median Period of the time series as estimators.
    2) Harvester: uses adaptive peak detection initialized with scout estimators to find valid peaks/valleys

    Args:
        raw_signal (np.ndarray): The raw signal (time series).
        framerate (float): Sampling rate in Hz.
        plot_results (bool, optional): Whether to plot the results. Defaults to False.
        detection_config (dict, optional): A dictionary containing detection parameters. Defaults to None.

    Returns:
        features (dict): Dictionary with extracted movement features
    """

    # configuration setup
    config_params = {'min_segment_length': 0.05,
                     'min_peak_amp_diff': 0.15,
                     'min_valley_amp_diff': 0.15,
                     'max_peak_amp_diff': None,
                     'max_valley_amp_diff': None,
                     'prominence_factor': 0.35,
                     'distance_factor': 0.35}

    # update dictionary with passed config (if available)
    if detection_config:
        config_params.update(detection_config)

    features = {
        'repetition_freq': 0.0, 'num_repetitions': 0.0,
        'period_mean': 0.0, 'period_std': 0.0, 'period_min': 0.0, 'period_max': 0.0,
        'amplitude_mean': 0.0, 'amplitude_std': 0.0, 'amplitude_min': 0.0, 'amplitude_max': 0.0,
        'velocity_pos_mean': 0.0, 'velocity_pos_std': 0.0,
        'velocity_neg_mean': 0.0, 'velocity_neg_std': 0.0,
        'extraction_status': 'failed',
        'valid_peaks_idx': [],
        'valid_valleys_idx': []
    }

    n_samples = len(raw_signal)
    if n_samples < framerate:
        print(f'Error: Trial skipped. Samples ({n_samples}) < Framerate ({framerate})')
        features['extraction_status'] = 'failed'
        return features

    # 1) preprocessing
    def robust_detrend(signal_data: np.ndarray, framerate: float = framerate, cutoff: float = 0.3) -> np.ndarray:
        """
        Removes complex, non-linear trends using a Zero-Phase High-Pass Filter.
        Better than linear detrending for signals with wandering baselines.

        Args:
            signal_data: The raw signal.
            framerate: Sampling rate in Hz.
            cutoff: Frequency below which to remove data (Hz). Defaults to 0.3 Hz (removes trends slower than 3.3s).

        Returns:
            np.ndarray: Detrended signal of same shape as the input signal.
        """
        # safety fallback: signal too short for the filter (< 3x filter length) -> fall back to linear detrend
        if len(signal_data) < 3 * (framerate / cutoff):
            return signal.detrend(signal_data, type='linear')

        # 2nd order Butterworth High-Pass Filter with numerically more stable 'sos' output
        sos = butter(2, cutoff, btype='highpass', fs=framerate, output='sos')

        # apply filter forward and backward (zero phase distortion)
        # centers the signal around 0
        return sosfiltfilt(sos, signal_data)

    # detrend the signal
    try:
        detrended_signal = robust_detrend(raw_signal, framerate, cutoff=0.3)
    except NameError:
        detrended_signal = signal.detrend(raw_signal, type='linear')

    # =========================================================================
    # PASS 1: THE SCOUT (Zero-Crossing)
    # Goal: Get robust Median Amplitude and Median Period from high-confidence anchors.
    # =========================================================================

    signs = np.sign(detrended_signal)
    signs[signs == 0] = 1
    crossings = np.where(np.diff(signs))[0]

    scout_peaks = []
    scout_valleys = []

    if len(crossings) >= 2:
        for i in range(len(crossings) - 1):
            seg_start, seg_end = crossings[i], crossings[i+1]
            if (seg_end - seg_start) < (framerate * config_params['min_segment_length']):
                continue

            segment = detrended_signal[seg_start:seg_end]
            if np.mean(segment) > 0:
                scout_peaks.append({'idx': seg_start + np.argmax(segment), 'amp': np.max(segment)})
            else:
                scout_valleys.append({'idx': seg_start + np.argmin(segment), 'amp': abs(np.min(segment))})

    # failsafe: if scout misses everything, tuning fails.
    if len(scout_peaks) < 2 or len(scout_valleys) < 1:
        features['extraction_status'] = 'failed_scout_pass'
        return features

    # extract statistics from scout
    median_peak_amp = np.median([p['amp'] for p in scout_peaks])
    median_valley_amp = np.median([v['amp'] for v in scout_valleys])

    scout_peak_indices = sorted([p['idx'] for p in scout_peaks])
    median_period_samples = np.median(np.diff(scout_peak_indices))

    # =========================================================================
    # PASS 2: THE HARVESTER (Prominence-Based)
    # Goal: Find ALL valid peaks/valleys, including those below zero.
    # =========================================================================

    # dynamic tuning
    prominence_threshold_peak = config_params['prominence_factor'] * median_peak_amp
    prominence_threshold_valley = config_params['prominence_factor'] * median_valley_amp
    distance_threshold = int(config_params['distance_factor'] * median_period_samples)
    distance_threshold = max(distance_threshold, 1)

    # 1) Find peaks using prominence and distance threshold
    peak_idxs, _ = signal.find_peaks(detrended_signal,
                                     prominence=prominence_threshold_peak,
                                     distance=distance_threshold)

    # 2) Find valleys
    valley_idxs, _ = signal.find_peaks(-detrended_signal,
                                       prominence=prominence_threshold_valley,
                                       distance=distance_threshold)

    potential_peaks = [{'idx': i, 'amp': detrended_signal[i], 'type': 'peak'} for i in peak_idxs]
    potential_valleys = [{'idx': i, 'amp': abs(detrended_signal[i]), 'type': 'valley'} for i in valley_idxs]

    # Filter pass 2 candidates
    filtered_peaks = []
    for p in potential_peaks:

        # apply ceiling if the peak is positive and large (reject large artifacts)
        if config_params['max_peak_amp_diff'] and p['amp'] > 0:
             if p['amp'] > (config_params['max_peak_amp_diff'] * median_peak_amp):
                 continue

        filtered_peaks.append(p)

    filtered_valleys = []
    for v in potential_valleys:

        # ceiling check
        if config_params['max_valley_amp_diff'] and v['amp'] > (config_params['max_valley_amp_diff'] * median_valley_amp):
            continue

        filtered_valleys.append(v)

    # Enforce proper alternation of peaks and valleys
    def _clean_alternation(peaks, valleys):
        all_events = sorted(peaks + valleys, key=lambda x: x['idx'])
        if not all_events: return [], []

        clean_events = [all_events[0]]
        for current in all_events[1:]:
            last = clean_events[-1]

            if current['type'] == last['type']:
                # Keep the maximum: peak -> highest; valleys -> lowest
                if current['amp'] > last['amp']:
                    clean_events.pop()
                    clean_events.append(current)
            else:
                clean_events.append(current)

        return [e for e in clean_events if e['type'] == 'peak'], [e for e in clean_events if e['type'] == 'valley']

    valid_peaks, valid_valleys = _clean_alternation(filtered_peaks, filtered_valleys)

    features['valid_peaks_idx'] = [p['idx'] for p in valid_peaks]
    features['valid_valleys_idx'] = [v['idx'] for v in valid_valleys]

    if len(valid_peaks) < 2:
         features['extraction_status'] = 'failed_insufficient_reps'
         return features

    # =========================================================================
    # METRICS CALCULATION
    # =========================================================================

    # Period
    peak_indices = [p['idx'] for p in valid_peaks]
    period_diffs = np.diff(peak_indices) / framerate
    features.update(utils.get_descriptive_stats(np.array(period_diffs), 'period'))

    # Reps & Freq
    avg_period = features['period_mean']
    if avg_period > 0:
        features['repetition_freq'] = round(1.0 / avg_period, 2)
        valid_duration = (peak_indices[-1] - peak_indices[0]) / framerate
        features['num_repetitions'] = round(valid_duration * features['repetition_freq'] + 1, 1)

    # Amplitude (P-P)
    amplitudes = []
    for p in valid_peaks:
        p_idx = p['idx']
        next_valleys = [v for v in valid_valleys if v['idx'] > p_idx]
        if next_valleys:
            v = next_valleys[0]
            if (v['idx'] - p_idx) / framerate < (1.5 * avg_period):

                # raw signal values
                raw_p = detrended_signal[p['idx']]
                raw_v = detrended_signal[v['idx']]
                amplitudes.append(raw_p - raw_v)

    if amplitudes:
        features.update(utils.get_descriptive_stats(np.array(amplitudes), 'amplitude'))

    # Velocity
    velocity_curve = np.gradient(detrended_signal, 1/framerate)
    rise_vels, fall_vels = [], []
    all_events = sorted(valid_peaks + valid_valleys, key=lambda x: x['idx'])

    for i in range(len(all_events) - 1):
        curr, next_evt = all_events[i], all_events[i+1]
        start, end = curr['idx'], next_evt['idx']

        start = max(0, start)
        end = min(len(velocity_curve), end)

        if start < end:
            vel_seg = velocity_curve[start:end]
            if len(vel_seg) > 0:
                if next_evt['type'] == 'peak':
                    rise_vels.append(np.max(vel_seg))
                else:
                    fall_vels.append(np.min(vel_seg))

    features.update(utils.get_descriptive_stats_short(np.array(rise_vels), 'velocity_pos'))
    features.update(utils.get_descriptive_stats_short(np.array(fall_vels), 'velocity_neg'))

    features['extraction_status'] = 'success'

    if plot_results:
        time_axis = np.linspace(0, len(detrended_signal) / FRAMERATE, len(detrended_signal))
        visu.viz_irregular_events(time_axis=time_axis,
                                  signal=detrended_signal,
                                  features=features,
                                  title='Adaptive 2-Pass Parameter Extraction')

    return features


def run_irregular_param_extraction(movement_paths_lst: list, landmark_lst: list, detection_dict: dict,
                                   movement_func, framerate: float = FRAMERATE, plot_results: bool = False) -> list:
    """
    Updates a movement feature file (.csv) with movement parameters extracted from a list of movement data files.

    Args:
        movement_paths_lst (list): list of movement file paths.
        landmark_lst (list): list of landmark names.
        detection_dict (dict): dictionary with parameters to tune the feature detection algorithm.
        movement_func (function): function that extracts the basic movement from an exercise.
        framerate (float): The framerate of the original video file.
        plot_results (bool, optional): Whether to plot the extraction results. Defaults to False.

    Returns:
        feature_metrics_lst (list): list with dictionaries holding the extracted parameters of each exercise.
    """
    # store a list with all feature for each movement file
    feature_metrics_lst: list = []

    # iterate over each participant
    for movement_file in movement_paths_lst:

        # load current movement file as DataFrame
        movement_df: pd.DataFrame = pd.read_csv(movement_file)

        # extract the landmarks of focus (varies by affected side and focused side)
        new_landmark_lst = extract_landmarks_of_focus(movement_fpath=movement_file, landmark_lst=landmark_lst)

        # get movement array (finger tapping amplitude, rotational angle, or similar)
        movement_arr: np.ndarray = movement_func(movement_df, new_landmark_lst)

        final_metrics = extract_irregular_movements_parameters(raw_signal=movement_arr, detection_config=detection_dict,
                                                               framerate=framerate, plot_results=plot_results)

        # add the current file path
        final_metrics.update({'f_path': movement_file})

        if final_metrics['extraction_status'] == 'success':
            print(f'{os.path.basename(movement_file)}:')
            for key in final_metrics.keys():
                print(f'{key}: {final_metrics[key]}')
        else:
            print(f'Extraction of the current trial was unsuccessful: {os.path.basename(movement_file)}.')

        feature_metrics_lst.append(final_metrics)

    return feature_metrics_lst


### 4.1) Pronation-Supination - Exercise 07/08

In [None]:
# 07: affected side; 08: healthy side
exercise_num: str = '08'
# wrist, index knuckle, and pinky knuckle
landmark_lst = ['wrist', 'mcp2', 'mcp5']

# feature detection dict
sup_pro_dict: dict = {'min_segment_length': 0.05,
                      'min_peak_amp_diff': 0.02,
                      'min_peak_dur_diff': 0.10,
                      'min_valley_amp_diff': 0.02,
                      'min_valley_dur_diff': 0.06,
                      'max_peak_amp_diff': 20.0,
                      'max_peak_dur_diff': 5.0,
                      'max_valley_amp_diff': 20.0}

if exercise_num == '07':

    current_trial_path_lst: list = hand_exercise_dict.get(exercise_num)

    affected_suppro_feature_lst: list = run_irregular_param_extraction(movement_paths_lst=current_trial_path_lst,
                                                                       landmark_lst=landmark_lst,
                                                                       detection_dict=sup_pro_dict,
                                                                       movement_func=motion_processing.calculate_3d_hand_rotation,
                                                                       framerate=FRAMERATE,
                                                                       plot_results=False)

    utils.save_extracted_data_to_csv(affected_suppro_feature_lst, conf_dict=config, out_dir=param_extract_out_path, overwrite=False)

elif exercise_num == '08':

    current_trial_path_lst: list = hand_exercise_dict.get(exercise_num)

    healthy_suppro_feature_lst: list = run_irregular_param_extraction(movement_paths_lst=current_trial_path_lst,
                                                                      landmark_lst=landmark_lst,
                                                                       detection_dict=sup_pro_dict,
                                                                      movement_func=motion_processing.calculate_3d_hand_rotation,
                                                                      framerate=FRAMERATE,
                                                                      plot_results=False)

    utils.save_extracted_data_to_csv(healthy_suppro_feature_lst, conf_dict=config, out_dir=param_extract_out_path, overwrite=False)
