In [7]:
import pandas as pd
import numpy as np
from scipy.fftpack import fft, dct
from scipy.stats import skew, kurtosis, entropy
import scipy.stats

In [None]:
import scipy.stats
# ----------------------------------------------------------------
# Code source: EMGFlow-Python-Package (https://github.com/WiIIson/EMGFlow-Python-Package/tree/main)
# Adapted to my code with minimal modifications
# ----------------------------------------------------------------


def CalcIEMG(Signal, col, sr):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    if sr <= 0:
        raise Exception("Sampling rate cannot be 0 or negative")
    IEMG = np.sum(np.abs(Signal[col]) * (1 / sr))
    return IEMG


def CalcMAV(Signal, col):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    N = len(Signal[col])
    MAV = np.sum(np.abs(Signal[col])) / N
    return MAV


def CalcMMAV(Signal, col):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    N = len(Signal[col])
    vals = np.abs(Signal[col]).values
    total = 0
    for n in range(N):
        if 0.25 * N <= n <= 0.75 * N:
            total += vals[n]
        else:
            total += 0.5 * vals[n]
    MMAV = total / N
    return MMAV


def CalcSSI(Signal, col, sr):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    if sr <= 0:
        raise Exception("Sampling rate cannot be 0 or negative")
    SSI = np.sum((np.abs(Signal[col]) * (1 / sr)) ** 2)
    return SSI


def CalcVAR(Signal, col):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    N = len(Signal[col])
    VAR = np.var(Signal[col])
    return VAR


def CalcVOrder(Signal, col):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    vOrder = np.sqrt(CalcVAR(Signal, col))
    return vOrder


def CalcRMS(Signal, col):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    RMS = np.sqrt(np.mean(Signal[col] ** 2))
    return RMS


def CalcWL(Signal, col):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    vals = Signal[col].values
    WL = np.sum(np.abs(np.diff(vals)))
    return WL


def CalcLOG(Signal, col):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    N = len(Signal[col])
    LOG = np.exp((1 / N) * np.sum(np.log(np.abs(Signal[col]) + 1e-10)))
    return LOG


def CalcMFL(Signal, col):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    vals = Signal[col].values
    diff = np.diff(vals)
    MFL = np.log(np.sqrt(np.sum(diff ** 2)))
    return MFL


def CalcAP(Signal, col):
    if col not in Signal.columns:
        raise Exception("Column " + col + " not in Signal")
    AP = np.mean(Signal[col] ** 2)
    return AP


def CalcTwitchRatio(psd, freq=60):
    """
    Calculate the Twitch Ratio of a PSD.

    Parameters
    ----------
    psd : DataFrame
        A Pandas DataFrame containing a 'Frequency' and 'Power' column.
    freq : float, optional
        Frequency threshold of the Twitch Ratio separating fast-twitching (high-frequency)
        muscles from slow-twitching (low-frequency) muscles.

    Raises
    ------
    Exception
        An exception is raised if freq is less or equal to 0.
    Exception
        An exception is raised if psd does not only have columns 'Frequency' and 'Power'

    Returns
    -------
    twitch_ratio : float
        Twitch Ratio of the PSD.

    """

    if freq <= 0:
        raise Exception("freq cannot be less or equal to 0")

    if set(psd.columns.values) != {'Frequency', 'Power'}:
        raise Exception("psd must be a Power Spectrum Density dataframe with only a 'Frequency' and 'Power' column")

    fast_twitch = psd[psd['Frequency'] > freq]
    slow_twitch = psd[psd['Frequency'] < freq]

    twitch_ratio = np.sum(fast_twitch['Power']) / np.sum(slow_twitch['Power'])

    return twitch_ratio


#
# =============================================================================
#

def CalcTwitchIndex(psd, freq=60):
    """
    Calculate the Twitch Index of a PSD.

    Parameters
    ----------
    psd : DataFrame
        A Pandas DataFrame containing a 'Frequency' and 'Power' column.
    freq : float, optional
        Frequency threshold of the Twitch Index separating fast-twitching (high-frequency)
        muscles from slow-twitching (low-frequency) muscles.

    Raises
    ------
    Exception
        An exception is raised if freq is less or equal to 0.
    Exception
        An exception is raised if psd does not only have columns 'Frequency' and 'Power'

    Returns
    -------
    twitch_index : float
        Twitch Index of the PSD.

    """

    if freq <= 0:
        raise Exception("freq cannot be less or equal to 0")

    if set(psd.columns.values) != {'Frequency', 'Power'}:
        raise Exception("psd must be a Power Spectrum Density dataframe with only a 'Frequency' and 'Power' column")

    fast_twitch = psd[psd['Frequency'] > freq]
    slow_twitch = psd[psd['Frequency'] < freq]

    twitch_index = np.max(fast_twitch['Power']) / np.max(slow_twitch['Power'])

    return twitch_index


#
# =============================================================================
#

def CalcTwitchSlope(psd, freq=60):
    """
    Calculate the Twitch Slope of a PSD.

    Parameters
    ----------
    psd : DataFrame
        A Pandas DataFrame containing a 'Frequency' and 'Power' column.
    freq : float, optional
        Frequency threshold of the Twitch Slope separating fast-twitching (high-frequency)
        muscles from slow-twitching (low-frequency) muscles.

    Raises
    ------
    Exception
        An exception is raised if freq is less or equal to 0.
    Exception
        An exception is raised if psd does not only have columns 'Frequency' and 'Power'

    Returns
    -------
    fast_slope : float
        Twitch Slope of the fast-twitching muscles.
    slow_slope : float
        Twitch Slope of the slow-twitching muscles.

    """

    if freq <= 0:
        raise Exception("freq cannot be less or equal to 0")

    if set(psd.columns.values) != {'Frequency', 'Power'}:
        raise Exception("psd must be a Power Spectrum Density dataframe with only a 'Frequency' and 'Power' column")

    fast_twitch = psd[psd['Frequency'] > freq]
    slow_twitch = psd[psd['Frequency'] < freq]

    x_fast = fast_twitch['Frequency']
    y_fast = fast_twitch['Power']
    A_fast = np.vstack([x_fast, np.ones(len(x_fast))]).T

    x_slow = slow_twitch['Frequency']
    y_slow = slow_twitch['Power']
    A_slow = np.vstack([x_slow, np.ones(len(x_slow))]).T

    fast_alpha = np.linalg.lstsq(A_fast, y_fast, rcond=None)[0]
    slow_alpha = np.linalg.lstsq(A_slow, y_slow, rcond=None)[0]

    fast_slope = fast_alpha[0]
    slow_slope = slow_alpha[0]

    return fast_slope, slow_slope


#
# =============================================================================
#

def CalcSC(psd):
    """
    Calculate the Spectral Centroid (SC) of a PSD.

    Parameters
    ----------
    psd : DataFrame
        A Pandas DataFrame containing a 'Frequency' and 'Power' column.

    Raises
    ------
    Exception
        An exception is raised if psd does not only have columns 'Frequency' and 'Power'

    Returns
    -------
    SC : float
        SC of the PSD.

    """

    if set(psd.columns.values) != {'Frequency', 'Power'}:
        raise Exception("psd must be a Power Spectrum Density dataframe with only a 'Frequency' and 'Power' column")

    SC = np.sum(psd['Power'] * psd['Frequency']) / np.sum(psd['Power'])
    return SC


#
# =============================================================================
#

def CalcSF(psd):
    """
    Calculate the Spectral Flatness (SF) of a PSD.

    Parameters
    ----------
    psd : DataFrame
        A Pandas DataFrame containing a 'Frequency' and 'Power' column.

    Raises
    ------
    Exception
        An exception is raised if psd does not only have columns 'Frequency' and 'Power'

    Returns
    -------
    SF : float
        SF of the PSD.

    """

    if set(psd.columns.values) != {'Frequency', 'Power'}:
        raise Exception("psd must be a Power Spectrum Density dataframe with only a 'Frequency' and 'Power' column")

    N = psd.shape[0]
    SF = np.prod(psd['Power'] ** (1 / N)) / ((1 / N) * np.sum(psd['Power']))
    return SF


#
# =============================================================================
#

def CalcSS(psd):
    """
    Calculate the Spectral Spread (SS) of a PSD.

    Parameters
    ----------
    psd : DataFrame
        A Pandas DataFrame containing a 'Frequency' and 'Power' column.

    Raises
    ------
    Exception
        An exception is raised if psd does not only have columns 'Frequency' and 'Power'

    Returns
    -------
    SS : float
        SS of the PSD.

    """

    if set(psd.columns.values) != {'Frequency', 'Power'}:
        raise Exception("psd must be a Power Spectrum Density dataframe with only a 'Frequency' and 'Power' column")

    SC = CalcSC(psd)
    SS = np.sum(((psd['Frequency'] - SC) ** 2) * psd['Power']) / np.sum(psd['Power'])
    return SS


#
# =============================================================================
#

def CalcSDec(psd):
    """
    Calculate the Spectral Decrease (SDec) of a PSD.

    Parameters
    ----------
    psd : DataFrame
        A Pandas DataFrame containing a 'Frequency' and 'Power' column.

    Raises
    ------
    Exception
        An exception is raised if psd does not only have columns 'Frequency' and 'Power'

    Returns
    -------
    SDec : float
        SDec of the PSD.

    """

    if set(psd.columns.values) != {'Frequency', 'Power'}:
        raise Exception("psd must be a Power Spectrum Density dataframe with only a 'Frequency' and 'Power' column")

    N = psd.shape[0]
    vals = np.array(psd['Power'])
    SDec = np.sum((vals[1:] - vals[0]) / N) / np.sum(vals[1:])
    return SDec


#
# =============================================================================
#

def CalcSEntropy(psd):
    """
    Calculate the Spectral Entropy of a PSD.

    Parameters
    ----------
    psd : DataFrame
        A Pandas DataFrame containing a 'Frequency' and 'Power' column.

    Raises
    ------
    Exception
        An exception is raised if psd does not only have columns 'Frequency' and 'Power'

    Returns
    -------
    SEntropy : float
        Spectral Entropy of the PSD.

    """

    if set(psd.columns.values) != {'Frequency', 'Power'}:
        raise Exception("psd must be a Power Spectrum Density dataframe with only a 'Frequency' and 'Power' column")

    prob = psd['Power'] / np.sum(psd['Power'])
    SEntropy = -np.sum(prob * np.log(prob))
    return SEntropy


#
# =============================================================================
#

def CalcSRoll(psd, percent=0.85):
    """
    Calculate the Spectral Rolloff of a PSD.

    Parameters
    ----------
    psd : DataFrame
        A Pandas DataFrame containing a 'Frequency' and 'Power' column.
    percent : float, optional
        The percentage of power to look for the Spectral Rolloff after. The default is 0.85.

    Raises
    ------
    Exception
        An exception is raised if psd does not only have columns 'Frequency' and 'Power'
    Exception
        An exception is raised if percent is not between 0 and 1

    Returns
    -------
    float
        Spectral Rolloff of the PSD.

    """

    if set(psd.columns.values) != {'Frequency', 'Power'}:
        raise Exception("psd must be a Power Spectrum Density dataframe with only a 'Frequency' and 'Power' column")

    if percent <= 0 or percent >= 1:
        raise Exception("percent must be between 0 and 1")

    total_prob = 0
    total_power = np.sum(psd['Power'])
    # Make copy and reset rows to iterate over them
    psdCalc = psd.copy()
    psdCalc = psdCalc.reset_index()
    for i in range(len(psdCalc)):
        prob = psdCalc.loc[i, 'Power'] / total_power
        total_prob += prob
        if total_power >= percent:
            return psdCalc.loc[i, 'Frequency']


#
# =============================================================================
#

def CalcSBW(psd, p=2):
    """
    Calculate the Spectral Bandwidth (SBW) of a PSD.

    Parameters
    ----------
    psd : DataFrame
        A Pandas DataFrame containing a 'Frequency' and 'Power' column.
    p : int, optional
        Order of the SBW. The default is 2, which gives the standard deviation around the centroid.

    Raises
    ------
    Exception
        An exception is raised if psd does not only have columns 'Frequency' and 'Power'
    Exception
        An exception is raised if p is not greater than 0

    Returns
    -------
    SBW : float
        The SBW of the PSD.

    """

    if set(psd.columns.values) != {'Frequency', 'Power'}:
        raise Exception("psd must be a Power Spectrum Density dataframe with only a 'Frequency' and 'Power' column")

    if p <= 0:
        raise Exception("p must be greater than 0")

    cent = CalcSC(psd)
    SBW = (np.sum(psd['Power'] * (psd['Frequency'] - cent) ** p)) ** (1 / p)
    return SBW


# Main function to calculate both time-domain and spectral features
def EMG2PSD(Sig_vals, sr=2000, normalize=True):
    """
    Creates a PSD graph of a Signal. Uses the Welch method, meaning it can be
    used as a Long Term Average Spectrum (LTAS).

    Parameters
    ----------
    Sig_vals : float list
        A list of float values. A column of a Signal.
    sr : float
        Sampling rate of the Signal.
    normalize : bool, optional
        If True, will normalize the result. If False, will not. The default is True.

        Raises
    ------
    Exception
        An exception is raised if the sampling rate is less or equal to 0

    Returns
    -------
    psd : DataFrame
        A DataFrame containing a 'Frequency' and 'Power' column. The Power column
        indicates the intensity of each frequency in the Signal provided. Results
        will be normalized if 'normalize' is set to True.
    """

    if sr <= 0:
        raise Exception("Sampling rate must be greater or equal to 0")

    # Initial parameters
    Sig_vals = Sig_vals - np.mean(Sig_vals)
    N = len(Sig_vals)

    # Calculate minimum frequency given sampling rate
    min_frequency = (2 * sr) / (N / 2)

    # Calculate window size givern sampling rate
    nperseg = int((2 / min_frequency) * sr)
    nfft = nperseg * 2

    # Apply welch method with hanning window
    frequency, power = scipy.signal.welch(
        Sig_vals,
        fs=sr,
        scaling='density',
        detrend=False,
        nfft=nfft,
        average='mean',
        nperseg=nperseg,
        window='hann'
    )

    # Normalize if set to true
    if normalize is True:
        power /= np.max(power)

    # Create dataframe of results
    psd = pd.DataFrame({'Frequency': frequency, 'Power': power})
    # Filter given
    psd = psd.loc[np.logical_and(psd['Frequency'] >= min_frequency,
                                 psd['Frequency'] <= np.inf)]

    return psd


# reps

In [3]:
def calculate_emg_metrics(df, sampling_rate=2000, columns=None):
    if columns is None:
        columns = [col for col in df.columns if 'filtered' in col]  #

    results = []

    for col in columns:
        data_col = df[col]

        # Calculate time-domain features from the smoothed data
        envelope_col = df[col.replace('filtered', 'envelope')]
        metrics = {
            'Column': col,
            'Min': np.min(envelope_col),
            'Max': np.max(envelope_col),
            'Mean': np.mean(envelope_col),
            'SD': np.std(envelope_col),
            'Skew': scipy.stats.skew(envelope_col),
            'Kurtosis': scipy.stats.kurtosis(envelope_col),
            'IEMG': CalcIEMG(df, col, sampling_rate),
            'MAV': CalcMAV(df, col),
            'MMAV': CalcMMAV(df, col),
            'SSI': CalcSSI(df, col, sampling_rate),
            'VAR': CalcVAR(df, col),
            'VOrder': CalcVOrder(df, col),
            'RMS': CalcRMS(df, col),
            'WL': CalcWL(df, col),
            'LOG': CalcLOG(df, col),
            'MFL': CalcMFL(df, col),
            'AP': CalcAP(df, col)
        }

        # Calculate spectral features from the bandpass data
        psd = EMG2PSD(data_col, sr=sampling_rate)
        metrics.update({
            'Max_Freq': psd.iloc[psd['Power'].idxmax()]['Frequency'],
            'Twitch_Ratio': CalcTwitchRatio(psd),
            'Twitch_Index': CalcTwitchIndex(psd),
            'Fast_Twitch_Slope': CalcTwitchSlope(psd)[0],
            'Slow_Twitch_Slope': CalcTwitchSlope(psd)[1],
            'Spectral_Centroid': CalcSC(psd),
            'Spectral_Flatness': CalcSF(psd),
            'Spectral_Spread': CalcSS(psd),
            'Spectral_Decrease': CalcSDec(psd),
            'Spectral_Entropy': CalcSEntropy(psd),
            'Spectral_Rolloff': CalcSRoll(psd),
            'Spectral_Bandwidth': CalcSBW(psd, 2)
        })

        results.append(metrics)

    return pd.DataFrame(results)


def extract_emg_features(df):
    all_results = pd.DataFrame()

    # Group by Participant, Set, and Repetition
    grouped = df.groupby(['Participant', 'Set', 'Repetition'])
    for (participant, set_num, repetition), group in grouped:
        metrics_df = calculate_emg_metrics(group)
        metrics_df.set_index('Column', append=True, inplace=True)
        metrics_df = metrics_df.unstack(level='Column')
        metrics_df.columns = ['{}_{}'.format(metric, col) for metric, col in metrics_df.columns]

        # Add Participant, Set, and Repetition information
        metrics_df['Participant'] = participant
        metrics_df['Set'] = set_num
        metrics_df['Repetition'] = repetition

        combined_df = metrics_df.groupby(['Participant', 'Set', 'Repetition']).first().reset_index()
        all_results = pd.concat([all_results, combined_df], ignore_index=True)

    final_df = all_results
    return final_df

In [4]:
def calculate_features(df, columns=None):
    if columns is None:
        columns = df.columns.difference(['Frame', 'Participant', 'Correct', 'Set', 'Repetition'])

    feature_dict = {}

    for col in columns:
        series = df[col].values
        abs_series = np.abs(series)
        fft_coefficients = fft(series)
        abs_fft_coefficients = np.abs(fft_coefficients)
        dct_coefficients = dct(series, type=2, norm='ortho')
        spectral_entropy = entropy(abs_fft_coefficients)

        # Update feature dictionary directly with metrics for this column
        feature_dict.update({
            f'{col}_mean': np.mean(series),
            f'{col}_std': np.std(series, ddof=1),
            f'{col}_min': np.min(series),
            f'{col}_max': np.max(series),
            f'{col}_range': np.max(series) - np.min(series),
            f'{col}_mean_abs': np.mean(abs_series),
            f'{col}_variance': np.var(series, ddof=1),
            f'{col}_mean_absolute_deviation': np.mean(abs_series - np.mean(series)),
            f'{col}_rms': np.sqrt(np.mean(np.square(series))),
            f'{col}_skewness': skew(series),
            f'{col}_kurtosis': kurtosis(series),
            f'{col}_max_fft_coeff': np.max(fft_coefficients),
            f'{col}_min_fft_coeff': np.min(fft_coefficients),
            f'{col}_abs_max_fft_coeff': np.max(abs_fft_coefficients),
            f'{col}_abs_min_fft_coeff': np.min(abs_fft_coefficients),
            f'{col}_mean_fft_coeff': np.mean(fft_coefficients),
            f'{col}_spectral_entropy': spectral_entropy
        })

    return pd.DataFrame([feature_dict])


def extract_features(df):
    all_results = pd.DataFrame()

    grouped = df.groupby(['Participant', 'Set', 'Repetition'])
    for (participant, set_num, repetition), group in grouped:
        metrics_df = calculate_features(group)

        # Add Participant, Set, and Repetition information
        metrics_df['Participant'] = participant
        metrics_df['Set'] = set_num
        metrics_df['Repetition'] = repetition

        all_results = pd.concat([all_results, metrics_df], ignore_index=True)

    return all_results

## prepare imu

In [5]:
df = pd.read_pickle("data/out/correct_position_scaled_df.pkl")
final_df = extract_features(df)
final_df['Correct'] = 1
print(final_df)

df_i = pd.read_pickle("data/out/incorrect_position_scaled_df.pkl")
final_df_i = extract_features(df_i)
final_df_i['Correct'] = 0
print(final_df_i)

# Concatenate the DataFrames under each other
combined_df = pd.concat([final_df, final_df_i], axis=0, ignore_index=True)

# Print or save the combined DataFrame
print(combined_df.shape)
print(combined_df)
combined_df.to_pickle("data/out/imu_for_prediction.pkl")

  final_df['Correct'] = 1


     Left Elbow Flexion/Extension joint_angle_xzy_mean  \
0                                             3.969268   
1                                             3.906561   
2                                             3.796035   
3                                             8.885369   
4                                             8.545811   
..                                                 ...   
145                                          -6.837870   
146                                          -4.836327   
147                                          -5.403425   
148                                          -5.111782   
149                                          -3.990065   

     Left Elbow Flexion/Extension joint_angle_xzy_std  \
0                                            2.638280   
1                                            3.530318   
2                                            2.455373   
3                                            5.106799   
4                 

  final_df_i['Correct'] = 0


## prepare emg

In [10]:
df = pd.read_pickle("data/out/emg_correct_with_set_rep.pkl")
final_df = extract_emg_features(df)
final_df['Correct'] = 1
print(final_df)

df = pd.read_pickle("data/out/emg_incorrect_with_set_rep.pkl")
final_df_i= extract_emg_features(df)
final_df_i['Correct'] = 0
print(final_df_i)

# Concatenate the DataFrames under each other
combined_df = pd.concat([final_df, final_df_i], axis=0, ignore_index=True)

# Print or save the combined DataFrame
print(combined_df.shape)
combined_df.to_pickle("data/out/emg_for_prediction.pkl")

       Participant  Set  Repetition  Min_front delt filtered  \
0    Participant_A  1.0         1.0                 4.556431   
1    Participant_A  1.0         2.0                 3.114427   
2    Participant_A  1.0         3.0                 3.132001   
3    Participant_A  1.0         4.0                 1.969264   
4    Participant_A  1.0         5.0                 1.874528   
..             ...  ...         ...                      ...   
145  Participant_E  3.0         6.0                 1.546288   
146  Participant_E  3.0         7.0                 1.924931   
147  Participant_E  3.0         8.0                 0.832126   
148  Participant_E  3.0         9.0                 5.020372   
149  Participant_E  3.0        10.0                 4.674751   

     Min_middle delt filtered  Min_trapezius filtered  \
0                    4.086119                9.216446   
1                    3.227944               26.964361   
2                    3.197509               22.976608   
3  

## prepare video

In [11]:
video_correct_done = pd.read_pickle("data/out/video_correct_with_set_rep.pkl")
video_incorrect_done = pd.read_pickle("data/out/video_incorrect_with_set_rep.pkl")

In [12]:
df = video_correct_done
final_df = extract_features(df)
final_df['Correct'] = 1
print(final_df.shape)

df_i = video_incorrect_done
final_df_i = extract_features(df_i)
final_df_i['Correct'] = 0
print(final_df_i.shape)

# Concatenate the DataFrames under each other
combined_df = pd.concat([final_df, final_df_i], axis=0, ignore_index=True)

# Print or save the combined DataFrame
print(combined_df.shape)
combined_df.to_pickle("data/out/video_for_prediction.pkl")

  final_df['Correct'] = 1


(142, 497)
(153, 497)
(295, 497)


  final_df_i['Correct'] = 0


## combine all

In [13]:
import pandas as pd

# Load the pickled datasets
imu_path = "data/out/imu_for_prediction.pkl"
emg_path = "data/out/emg_for_prediction.pkl"
video_path = "data/out/video_for_prediction.pkl"

imu_data = pd.read_pickle(imu_path)
emg_data = pd.read_pickle(emg_path)
video_data = pd.read_pickle(video_path)

# Step 1: Combine IMU and EMG datasets
combined_data = pd.merge(
    imu_data,
    emg_data,
    on=['Participant', 'Set', 'Repetition', 'Correct'],
    suffixes=('_imu', '_emg'),
    how='outer'
)

# Step 2: Merge with video data using an inner join to keep only matching rows
combined_data = pd.merge(
    combined_data,
    video_data,
    on=['Participant', 'Set', 'Repetition', 'Correct'],
    suffixes=('', '_video'),
    how='inner'
)

#output_path = "data/out/combined_imu_emg_video_for_prediction.pkl"
#combined_data.to_pickle(output_path)

combined_summary = {
    "Total Rows": combined_data.shape[0],
    "Total Columns": combined_data.shape[1],
}

print("Combined data summary:", combined_summary)

Combined data summary: {'Total Rows': 295, 'Total Columns': 1689, 'Output Path': 'data/out/combined_imu_emg_video_for_prediction.pkl'}


In [14]:
combined_data_cleaned = combined_data.dropna()

cleaned_summary = {
    "Total Rows Removed": combined_data.shape[0] - combined_data_cleaned.shape[0],
    "Remaining Rows": combined_data_cleaned.shape[0],
    "Remaining Columns": combined_data_cleaned.shape[1],
}

cleaned_summary

{'Total Rows Removed': 0, 'Remaining Rows': 295, 'Remaining Columns': 1689}

In [89]:
# Define the function to split complex columns into real and imaginary parts
def split_complex_columns(df):
    new_cols = {}
    for col in df.columns:
        if "_fft_coeff" in col:
            new_cols[col + "_real"] = df[col].apply(lambda x: x.real if isinstance(x, complex) else x)
            new_cols[col + "_imag"] = df[col].apply(lambda x: x.imag if isinstance(x, complex) else 0)
    df = df.drop(columns=[col for col in df.columns if "_fft_coeff" in col])
    new_cols_df = pd.DataFrame(new_cols, index=df.index)
    df = pd.concat([df, new_cols_df], axis=1)
    return df


combined_data_cleaned = split_complex_columns(combined_data_cleaned)
# Remove constant columns
constant_columns = [col for col in combined_data_cleaned.columns if combined_data_cleaned[col].nunique() <= 1]
combined_data_cleaned = combined_data_cleaned.drop(columns=constant_columns)

preprocessing_summary = {
    "Total Constant Columns Removed": len(constant_columns),
    "Remaining Columns": combined_data_cleaned.shape[1],
    "Remaining Rows": combined_data_cleaned.shape[0],
}

preprocessing_summary

{'Total Constant Columns Removed': 136,
 'Remaining Columns': 1385,
 'Remaining Rows': 303}

In [15]:
combined_data_cleaned = split_complex_columns(combined_data_cleaned)
constant_columns = [col for col in combined_data_cleaned.columns if combined_data_cleaned[col].nunique() <= 1]
combined_data_cleaned = combined_data_cleaned.drop(columns=constant_columns)

preprocessing_summary = {
    "Total Constant Columns Removed": len(constant_columns),
    "Remaining Columns": combined_data_cleaned.shape[1],
    "Remaining Rows": combined_data_cleaned.shape[0],
}

preprocessing_summary

{'Total Constant Columns Removed': 207,
 'Remaining Columns': 1952,
 'Remaining Rows': 295}

In [16]:
combined_data_cleaned.to_pickle("video_imu_emg_reps.pkl")

# window

In [18]:
def calculate_imu_features_windowed(df, columns=None):
    if columns is None:
        columns = df.columns.difference(['Frame', 'Participant', 'Correct', 'Set', 'Repetition'])

    feature_dict = {}

    for col in columns:
        series = df[col].values
        abs_series = np.abs(series)
        fft_coefficients = fft(series)
        abs_fft_coefficients = np.abs(fft_coefficients)
        spectral_entropy = entropy(abs_fft_coefficients)

        # Populate metrics for this column
        feature_dict.update({
            f'{col}_mean': np.mean(series),
            f'{col}_std': np.std(series, ddof=1),
            f'{col}_min': np.min(series),
            f'{col}_max': np.max(series),
            f'{col}_range': np.max(series) - np.min(series),
            f'{col}_mean_abs': np.mean(abs_series),
            f'{col}_variance': np.var(series, ddof=1),
            f'{col}_mean_absolute_deviation': np.mean(abs_series - np.mean(series)),
            f'{col}_rms': np.sqrt(np.mean(np.square(series))),
            f'{col}_skewness': skew(series),
            f'{col}_kurtosis': kurtosis(series),
            f'{col}_max_fft_coeff': np.max(fft_coefficients),
            f'{col}_min_fft_coeff': np.min(fft_coefficients),
            f'{col}_abs_max_fft_coeff': np.max(abs_fft_coefficients),
            f'{col}_abs_min_fft_coeff': np.min(abs_fft_coefficients),
            f'{col}_mean_fft_coeff': np.mean(fft_coefficients),
            f'{col}_spectral_entropy': spectral_entropy
        })

    return feature_dict


def extract_imu_features_windowed(df, window_size=30):
    results = []
    grouped = df.groupby(['Participant', 'Set'])
    for (participant, set_num), group in grouped:
        n_windows = len(group) // window_size
        for i in range(n_windows):
            window = group.iloc[i * window_size: (i + 1) * window_size]
            metrics = calculate_imu_features_windowed(window)

            # Add metadata
            metrics['Participant'] = participant
            metrics['Set'] = set_num
            metrics['Window'] = i

            # Append to results
            results.append(metrics)

    return pd.DataFrame(results)

In [19]:
def calculate_emg_features_windowed(df, columns=None):
    if columns is None:
        columns = [col for col in df.columns if 'filtered' in col]

    feature_dict = {}

    for col in columns:
        data_col = df[col]
        envelope_col = df[col.replace('filtered', 'envelope')]

        psd = EMG2PSD(data_col, sr=2000)

        # Add time-domain and spectral metrics
        feature_dict.update({
            f'{col}_Min': np.min(envelope_col),
            f'{col}_Max': np.max(envelope_col),
            f'{col}_Mean': np.mean(envelope_col),
            f'{col}_SD': np.std(envelope_col),
            f'{col}_Skew': scipy.stats.skew(envelope_col),
            f'{col}_Kurtosis': scipy.stats.kurtosis(envelope_col),
            f'{col}_IEMG': CalcIEMG(df, col, 2000),
            f'{col}_MAV': CalcMAV(df, col),
            f'{col}_MMAV': CalcMMAV(df, col),
            f'{col}_SSI': CalcSSI(df, col, 2000),
            f'{col}_VAR': CalcVAR(df, col),
            f'{col}_RMS': CalcRMS(df, col),
            f'{col}_WL': CalcWL(df, col),
            f'{col}_Max_Freq': psd.iloc[psd['Power'].idxmax()]['Frequency'],
            f'{col}_Spectral_Centroid': CalcSC(psd),
            f'{col}_Spectral_Entropy': CalcSEntropy(psd),
        })

    return feature_dict


def extract_emg_features_windowed(df, window_size=1000):
    results = []
    grouped = df.groupby(['Participant', 'Set'])
    for (participant, set_num), group in grouped:
        n_windows = len(group) // window_size
        for i in range(n_windows):
            window = group.iloc[i * window_size: (i + 1) * window_size]
            metrics = calculate_emg_features_windowed(window)

            metrics['Participant'] = participant
            metrics['Set'] = set_num
            metrics['Window'] = i

            results.append(metrics)

    return pd.DataFrame(results)

## prepare imu

In [20]:
df = pd.read_pickle("data/out/correct_position_scaled_df.pkl")
final_df = extract_imu_features_windowed(df, window_size=30)
final_df['Correct'] = 1
print(final_df.shape)

df_i = pd.read_pickle("data/out/incorrect_position_scaled_df.pkl")
final_df_i = extract_imu_features_windowed(df_i, window_size=30)
final_df_i['Correct'] = 0
print(final_df_i.shape)

# Combine and save the results
combined_df = pd.concat([final_df, final_df_i], axis=0, ignore_index=True)
combined_df = combined_df.drop_duplicates()
combined_df.to_pickle("data/out/imu_windowed_for_prediction.pkl")
combined_df.shape

(812, 1109)
(540, 1109)


(1352, 1109)

## prepare emg

In [21]:
df = pd.read_pickle("data/out/emg_correct_with_set_rep.pkl")
final_df_emg = extract_emg_features_windowed(df, window_size=1000)
final_df_emg['Correct'] = 1
print(final_df_emg.shape)

df = pd.read_pickle("data/out/emg_incorrect_with_set_rep.pkl")
final_df_i_emg = extract_emg_features_windowed(df, window_size=1000)
final_df_i_emg['Correct'] = 0
print(final_df_i_emg.shape)

# Combine and save the results
combined_df = pd.concat([final_df_emg, final_df_i_emg], axis=0, ignore_index=True)
combined_df.to_pickle("data/out/emg_windowed_for_prediction.pkl")
combined_df.shape

(815, 52)
(535, 52)


(1350, 52)

## prepare video

In [22]:
df = video_correct_done
final_df = extract_imu_features_windowed(df, window_size=30)
final_df['Correct'] = 1
print(final_df.shape)

df_i = video_incorrect_done
final_df_i = extract_imu_features_windowed(df_i, window_size=30)
final_df_i['Correct'] = 0
print(final_df_i.shape)

# Combine and save the results
combined_df = pd.concat([final_df, final_df_i], axis=0, ignore_index=True)
combined_df = combined_df.drop_duplicates()
combined_df.to_pickle("data/out/video_windowed_for_prediction.pkl")
combined_df.shape

(763, 497)
(540, 497)


(1303, 497)

## combine all

In [25]:
import pandas as pd

# Load the pickled datasets
imu_path = "data/out/imu_windowed_for_prediction.pkl"
emg_path = "data/out/emg_windowed_for_prediction.pkl"
video_path = "data/out/video_windowed_for_prediction.pkl"

imu_data = pd.read_pickle(imu_path)
emg_data = pd.read_pickle(emg_path)
video_data = pd.read_pickle(video_path)

# Step 1: Combine IMU and EMG datasets
combined_data = pd.merge(
    imu_data,
    emg_data,
    on=['Participant', 'Set', 'Window', 'Correct'],
    suffixes=('_imu', '_emg'),
    how='outer'
)

# Step 2: Merge with video data using an inner join to keep only matching rows
combined_data = pd.merge(
    combined_data,
    video_data,
    on=['Participant', 'Set', 'Window', 'Correct'],
    suffixes=('', '_video'),
    how='inner'
)

#output_path = "data/out/combined_imu_emg_video_for_prediction.pkl"
#combined_data.to_pickle(output_path)

# Display summary of the combined dataframe
combined_summary = {
    "Total Rows": combined_data.shape[0],
    "Total Columns": combined_data.shape[1],
}

print("Combined data summary:", combined_summary)

Combined data summary: {'Total Rows': 1303, 'Total Columns': 1650}


In [26]:
combined_data_cleaned = combined_data.dropna()

cleaned_summary = {
    "Total Rows Removed": combined_data.shape[0] - combined_data_cleaned.shape[0],
    "Remaining Rows": combined_data_cleaned.shape[0],
    "Remaining Columns": combined_data_cleaned.shape[1],
}

cleaned_summary

{'Total Rows Removed': 7, 'Remaining Rows': 1296, 'Remaining Columns': 1650}

In [27]:
combined_data_cleaned = split_complex_columns(combined_data_cleaned)
constant_columns = [col for col in combined_data_cleaned.columns if combined_data_cleaned[col].nunique() <= 1]
combined_data_cleaned = combined_data_cleaned.drop(columns=constant_columns)

preprocessing_summary = {
    "Total Constant Columns Removed": len(constant_columns),
    "Remaining Columns": combined_data_cleaned.shape[1],
    "Remaining Rows": combined_data_cleaned.shape[0],
}
preprocessing_summary

{'Total Constant Columns Removed': 202,
 'Remaining Columns': 1918,
 'Remaining Rows': 1296}

In [28]:
combined_data_cleaned.shape

(1296, 1918)

In [31]:
combined_data_cleaned.to_pickle("video_imu_emg_windows.pkl")