Overview: This script contains all functions used by both the kinematic and EMG scripts, and is used as an import. After editing, download this file as a .py
and upload it to the Our Grasp Project > Data > Our Model folder under the name
'utils_babyscript.ipynb'. Run the utils import in your desired file to update
the import and see your changes.

# Mounts and Imports

In [None]:
# Mounting Google Drive
from google.colab import drive, output
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Main Imports
import scipy.io as sio
import scipy.signal
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns           # Used to create confusion heatmaps
import os                       # Used to convert between dataframe and csv
import pywt                     # Used to perform wavelet transform

import random
import copy

# Classifier imports
from sklearn import model_selection
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix

# Functions

Note: The "labels" dictionary maps the index (int) of every hand grasp tested in Ninapro DB1 to its respective name (str).

In [None]:
labels = {1: 'Large Diameter Grasp', 2: 'Small Diameter Grasp (power grip)',
          3: 'Fixed Hook Grasp', 4: 'Index Finger Extension Grasp', 5:
          'Medium Wrap', 6: 'Ring Grasp', 7: 'Prismatic Four Fingers Grasp',
          8: 'Stick Grasp', 9: 'Writing Tripod Grasp', 10: 'Power Sphere Grasp',
          11: 'Three Finger Sphere Grasp', 12: 'Precision Sphere Grasp', 13:
          'Tripod Grasp', 14: 'Prismatic Pinch Grasp', 15: 'Tip Pinch Grasp',
          16: 'Quadpod Grasp', 17: 'Lateral Grasp', 18: 'Parallel Extension Grasp',
          19: 'Extension Type Grasp', 20: 'Power Disk Grasp', 21: 'Open a Bottle with a Tripod Grasp',
          22: 'Turn a Screw', 23: 'Cut Something'}

In [None]:
'''
process_ninapro_subject(file_path)

Purpose: Convert the Ninapro DB1 file for one subject, which is in MATLAB
format, to a Pandas dataframe. Organize the dataframes by adding headers
and a time column.
Parameters: The path to the MATLAB file (str), sampling rate of the data (int).
Output: Pandas dataframe for kinematic data, and a dataframe for EMG data.
'''
def process_ninapro_subject(file_path, sampling_rate):
  # Extract data
  subject_kin_data = sio.loadmat(file_path)['glove']
  subject_emg_data = sio.loadmat(file_path)['emg']

  # Creates Pandas Dataframe
  kin_data_df = pd.DataFrame(subject_kin_data)
  emg_data_df = pd.DataFrame(subject_emg_data)

  # Gets number of rows and columns
  kin_rows, kin_cols = kin_data_df.shape
  emg_rows, emg_cols = emg_data_df.shape

  # Defines names for headers
  kin_data_df.columns = ['Angle_' + str(i) for i in range(1, kin_cols + 1)]
  emg_data_df.columns = ['EMG_' + str(i) for i in range(1, emg_cols - 1)] + ['EMG_flexor', 'EMG_extensor']

  # Create and add a column defining time
  time_unit = 1/sampling_rate
  time_column = [time_unit * i for i in range(kin_rows)]
  kin_data_df.insert(0, 'Time', time_column)
  emg_data_df.insert(0, 'Time', time_column)

  # Saves Pandas dataframe as csv
  suffix_length = len('.mat')
  kin_csv_file_path = file_path[:-suffix_length] + '_kinematic.csv'
  emg_csv_file_path = file_path[:-suffix_length] + '_emg.csv'
  kin_data_df.to_csv(kin_csv_file_path, index = False, header = True)
  emg_data_df.to_csv(emg_csv_file_path, index = False, header = True)

  return kin_data_df, emg_data_df

In [None]:
'''
filter_data(data, sampling_rate, data_type)

Purpose: Perform a 6 Hz low-pass butterworth filter on a data signal. If the
data is EMG, also rectify.
Parameters: Raw signal data (Pandas dataframe), sampling rate (int), type of
data (str)
Output: Filtered siganl data (Pandas dataframe)

Note: The EMG electrodes already provide an amplified (14000x), rectified, and
bandpass-filtered (0-25 Hz) version of the raw sEMG signal.
'''
def filter_data(data, sampling_rate, data_type):
    data = data.copy()

    if 'Time' in data.columns:
        time_data = data['Time']
        data = data.drop(columns='Time')
    else:
        time_data = None

    # Apply data-type-specific preprocessing
    if data_type == 'emg':
        data = data.abs()  # Rectify EMG
        low_cutoff = 6
    elif data_type == 'kinematics':
        low_cutoff = 6
    else:
        raise ValueError("Unsupported data type")

    # Apply low-pass Butterworth filter
    nyq = 0.5 * sampling_rate
    b, a = scipy.signal.butter(2, low_cutoff / nyq, btype='low')
    data = pd.DataFrame(scipy.signal.filtfilt(b, a, data.values, axis=0), columns=data.columns)

    if time_data is not None:
        data.insert(0, 'Time', time_data)

    return data

In [None]:
# Separate kinematic filtering function
def filter_kinematic_data(data, sampling_rate):
  data_copy = data.copy()

  # Keeps time column from being filtered
  if 'Time' in data.columns:
    time_data = data['Time']
    data_copy = data_copy.drop(columns='Time')
  else:
    time_data = None

  # Apply low-pass Butterworth filter
  nyq = 0.5 * sampling_rate
  low_cutoff = 6
  b,a = scipy.signal.butter(2, low_cuttoff / nyq, btype='low')
  data_copy = pd.DataFrame(scipy.signal.filtfilt(b, a, data_copy.values, axis=0), columns=data.columns)

  # Add back time column
  if time_data is not None:
    data_copy.insert(0, 'Time', time_data)

  return data_copy

In [None]:

# Separate EMG filtering function
def filter_emg_data(data, sampling_rate):
  data_copy = data.copy()

  # Keeps time column from being filtered
  if 'Time' in data.columns:
    time_data = data['Time']
    data_copy = data_copy.drop(columns='Time')
  else:
    time_data = None

  # Low pass, cutoff frequency of 2 Hz
  low_cutoff = 2
  nyq = 0.5 * sampling_rate
  b,a = scipy.signal.butter(2, low_cutoff / nyq, btype='low')
  data_copy = pd.DataFrame(scipy.signal.filtfilt(b, a, data_copy.values, axis=0), columns=data.columns)

  # Apply RMS (Root Mean Square) envelope

  # Define windows
  window_size = 25
  num_samples = data_copy.shape[0]
  num_windows = num_samples // window_size

  # Creates a new dataframe with the RMS values
  rms_dataframe = pd.DataFrame(columns=data.columns)
  for column in data_copy.columns:
    rms_values = []
    for i in range(num_windows):
      start_index = i * window_size
      end_index = start_index + window_size
      window_data = data_copy.iloc[start_index:end_index, data_copy.columns.get_loc(column)]
      rms_value = np.sqrt(np.mean(window_data**2))
      rms_values.append(rms_value)
    rms_dataframe[column] = rms_values

  # Add back time column
  if time_data is not None:
    rms_dataframe.insert(0, 'Time', time_data)

  return rms_dataframe


In [None]:
'''
get_restim_data(grasp_number, restim_data)

Purpose: Determines the starting and stopping point of each trial for ONE GRASP
TYPE depending on the restimulus data. Allows us to segment the kinematic/EMG
data later.
Parameters: The label for the grasp type you want to retrieve from the
data (int), restimulus data (numpy array)
Output: A list of tuples containing the starting and stopping times for each of
the ten trials for that grasp type
'''
def get_restim_data(grasp_number, all_restim_data, time_unit):

  trial_segments = []
  in_trial = False
  start_index = None

  for i, val in enumerate(all_restim_data):
    # If a new trial begins, record the starting time
    if not in_trial and grasp_number == val:
      in_trial = True
      start_index = i
    # If a trial ends, record the ending time and append new timestamps
    elif in_trial and grasp_number != val:
      in_trial = False
      stop_index = i
      # Scales index to account for difference in index and sampling rate
      start_index_scaled = int(start_index // time_unit)
      stop_index_scaled = int(stop_index // time_unit)

      trial_segments.append((start_index_scaled, stop_index_scaled))
    # If all 10 trials have been recorded, return the full list of segments
    if len(trial_segments) == 10:
      return trial_segments

  return trial_segments # Fallback in case 10 segments is never reached

In [None]:
'''
gather_restim_timestamps_and_labels()

Purpose: Finds and records the timestaps for every trial done by the subject,
creates a list of corresponding grasp labels.
Parameters: Restimulus data (numpy array), time unit (float)
Output: List of timestamps (tuples) for each trial done by the subject, list
of grasp labels for each trial done by the subject

'''
def gather_restim_timestamps_and_labels(all_restim_data, time_unit):

  restim_timestamps = []
  restim_labels = []
  for grasp_number in labels:
    grasp_restim_timestamps = get_restim_data(grasp_number, all_restim_data, time_unit)
    restim_timestamps.extend(grasp_restim_timestamps)
    restim_labels.extend([labels[grasp_number] for _ in range(10)])
  return restim_timestamps, restim_labels

In [None]:
'''
segment_data(dataframe, timestamps)

Purpose: Divide the kinematic data into separate dataframes for each trial
done by the subject
Parameters: Unsegmented data (Pandas dataframe), list of timestamps (list)
Output: Segmeted list of dataframes based on timestamps (lsit)
'''
def segment_data(dataframe, timestamps):
  trials = []
  for start_index, end_index in timestamps:
    trial_dataframe = dataframe.iloc[start_index:end_index].reset_index(drop=True)
    trials.append(trial_dataframe)
  return trials


In [None]:
'''
normalize_emg_data(emg_data)

Purpose: Normalizes EMG data based on the 95th percentile signal for each
electrode
Parameters: Segmented, un-normalized EMG data (list of Pandas Dataframes)
Output: Normalized EMG data (list of Pandas Dataframes)
'''
def normalize_emg_data(emg_data):
  result = []
  # Gathers EMG values from all trials in the subject together
  big_dict = {}
  for trial in emg_data:
    for column in trial.columns:
      if column == 'Time':
        continue
      else:
        if column not in big_dict:
          big_dict[column] = []
        big_dict[column].extend(trial[column])
  # Calculates the 95th percentile EMG value for each column, stores in new dict
  percentile_dict = {}
  for key, value in big_dict.items():
    percentile_dict[key] = np.percentile(value, 95)
  # Creates a new dataframe for each trial with the normalized data
  for trial in emg_data:
    trial_dict = {}
    for column in trial.columns:
      if column == 'Time':
        trial_dict[column] = trial[column]
      else:
        trial_dict[column] = trial[column] / percentile_dict[column]
    trial_df = pd.DataFrame(trial_dict).copy()
    result.append(trial_df)
  return result

In [None]:
'''
visualize_sample(trials, trial_number, columns)

Purpose: Creates a graph of the kinematic or EMG data for one specific trial
based on specified columns. Entering columns 'Angle_1' and 'Angle_2' will return
a graph of those specific signals for the trial.
Parameters: Segmented trials (list of Dataframes), Trial number (int), columns
to display (list of strings)
Output: Matplotlib graph
'''
def visualize_sample(trials, trial_number, columns):
  num_participants = 1
  grasp_type = labels[trial_number%10 + 1]

  # Create plot (using subplots in case this function eventually compares graphs)
  fig, ax = plt.subplots(num_participants, 1, figsize = (10, 4 * num_participants), sharex=True)
  fig.suptitle(f'Sample Visualization for Trial {trial_number}, {grasp_type}' , fontsize=16)

  # Plot data
  for column in columns:
    ax.plot(trials[trial_number]['Time'], trials[trial_number][column], label=column)
  ax.set_ylabel('Joint Angle (Degrees)')
  ax.legend()

  # Add common x-label
  plt.xlabel('Time (s)')

  # Adjust layout
  plt.tight_layout(rect=[0,0, 1, 0.95])

  # Show the plot
  plt.show()

In [None]:
'''
visualize_full_grasp(trials, grasp_type, columns)

Purpose: Creates a graph of all the kinematic of EMG data for a specific grasp
type for one participant. Entering columns 'Angle_1' and 'Angle_2' will return
a graph of those specific signals for the trial.
Parameters: Segmented trials (list of Dataframes), grasp type (int), columns to
display (list of strings)
Output: Matplotlib graph
'''
def visualize_full_grasp(trials, grasp_type, columns):
  grasp_start_index = (grasp_type-1) * 10
  grasp_end_index = grasp_start_index + 10
  grasp_trials_copy = copy.deepcopy(trials[grasp_start_index:grasp_end_index])

  # Create plot
  fig, ax = plt.subplots(1, figsize=(10, 4), sharex=True)
  fig.suptitle(f'Sample Visualization for {labels[grasp_type]}' , fontsize=16)

  # Create random color for each column
  colors = {}
  for column in columns:
    colors[column] = '#%06X' % random.randint(0, 0xFFFFFF)

  # Plot trial data
  for trial in grasp_trials_copy:
    for column in columns:
        relative_time = trial['Time'] - trial['Time'][0]
        ax.plot(relative_time, trial[column], label=column, alpha=0.2,
                color=colors[column])

  # Plot average of data across trials
  averaged_lines = []
  line_labels = []
  for column in columns:

    # Concatenate and average data
    trial_list = [grasp_trials_copy[i][column] for i in range(len(grasp_trials_copy))]
    concatenated_data = pd.concat(trial_list)
    averaged_column_data = concatenated_data.groupby(level=0).mean()

    # Find relative time (assumes stimulus data)
    time_sampling_rate = 100
    averaged_relative_time = averaged_column_data.index / time_sampling_rate
    # relative_time = grasp_trials_copy[0]['Time'] - grasp_trials_copy[0]['Time'][0]

    # Plot averaged data
    new_line, = ax.plot(averaged_relative_time, averaged_column_data, alpha=1, color = colors[column])
    averaged_lines.append(new_line)
    line_labels.append(column)

  plt.legend(averaged_lines, line_labels)

  # Add common x-label
  plt.xlabel('Time (s)')

  # Adjust layout
  plt.tight_layout(rect=[0,0, 1, 0.95])

  # Show the plot
  plt.show()

In [None]:
'''
find_outliers(subject_data)

Purpose: Determine any outliers in a dataset.
Parameters: List of a certain numerical quantity for each subject, such as
height or weight (list)
Output: List of the index of any outliers in the data (list)
'''
def find_outliers(subject_data):

  value_list = list(subject_data.values())
  # Find mean and IQR
  mean = np.mean(value_list)
  q1, q3 = np.percentile(value_list, [25, 75])
  iqr = q3 - q1
  # find upper and lower bounds
  lower_bound = q1 - 1.5 * iqr
  upper_bound = q3 + 1.5 * iqr
  # Find outliers
  outliers = []
  for key, value in subject_data.items():
    if value < lower_bound or value > upper_bound:
      outliers.append(key)

  return outliers

In [None]:
'''
features_to_csv(features, filename)

Purpose: Extracting features for every subject takes a long time. This function
saves the results of feature extraction as a csv for easy feature retrieval
after processing.
Parameters: list of all features (list), desired name of file (str)
Output: None
'''
def features_to_csv(features, filename):
  df = pd.DataFrame(features)
  file_path = '/content/drive/MyDrive/Grasp Project /Data /Ninapro DB1/' + filename
  df.to_csv(file_path, index=False)


In [None]:
'''
csv_to_features(filename)

Purpose: Converts saved feature results back into their original format.
Parameters: filename (str)
Output: list of all features (list)
'''
def csv_to_features(filename):
  features_df = pd.read_csv('/content/drive/MyDrive/Grasp Project /Data /Ninapro DB1/' + filename)
  return features_df.values

In [None]:
'''
find_subject_number(subject_folder_path)

Purpose: Find the number of the subject based on their folder path.
Parameters: subject folder path (string)
Output: subject number (int)
'''
def find_subject_number(subject_folder_path):
  max_index = len(subject_folder_path) - 1
  start_index = subject_folder_path.find('Subject_') + len('Subject_')
  if start_index == max_index:
    subject_number = subject_folder_path[-1]
  else:
    subject_number = subject_folder_path[-2:]
  return subject_number