# EMG Hand Gesture Classification

This notebook is used to explore the approaches from "A Robust, Real-Time Control Scheme for
Multifunction Myoelectric Control" (Englehart & Hudgins, 2003), while using the data provided by the paper "Latent Factors Limiting the Performance of sEMG-Interfaces" (Lobov et al., 2018), recorded using a MYO Thalmic bracelet.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px
import scipy
import os
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from tqdm import tqdm

### Data Preparation
This section serves to take the raw data, and reshape it/preprocess it before any real analysis being carried out.
- Data structure: dictionary of lists (keys are subject number), with lists each containing two dataframes corresponding to individual series

In [None]:
DIR = '../' # where data directories are found relative to code
exclude = [3, 7, 8, 9, 10, 11, 12, 13, 15, 16, 18, 19, 20, 23, 27, 35] # exclude these subjects due to ADC saturation 
DIR_SJs = []
for idx in range(1, 37):
    if idx not in exclude:
        current_dir = str(idx)
        if idx < 10:
            current_dir = '0' + current_dir
        DIR_SJs.append(current_dir)

In [None]:
def parse_file(DIR, DIR_SJ, series_n):
    ''' Given a file path, extracts numerical information based on text file provided. Returns dataframe'''
    filename = os.listdir(DIR + DIR_SJ)[series_n] # either the first or second series is extracted
    file = open(DIR + DIR_SJ + filename) # open the file to be analysed
    lines = file.readlines()
    columns = lines[0].split() # take the first row as the column labels for dataframe
    lines = lines[1:] # exclude first line which represents column labels
    data = np.zeros((len(lines), 10)) # prepare dataframe to be appropriate size
    for idx, line in enumerate(lines):
        line = line.split() # get entries into different elements        
        try:
            data[idx, :] = np.array(line).astype(np.float64) # convert to numerical and store in array
        except:
            print(DIR_SJ, series_n, idx)
            print(np.array(line).astype(np.float64), line)
    file.close()
    for ch in range(1, 9): # interpolate channel values as to make data regularly sampled
        f = scipy.interpolate.interp1d(data[:, 0], data[:, ch], fill_value='extrapolate') # create interpolator based on regular data
        data[:, ch] = f(list(range(data.shape[0])))
    data[:, 0] = list(range(data.shape[0]))
    df = pd.DataFrame(data=data, columns=columns)
    df[['time', 'class']] = df[['time', 'class']].astype(np.int32) # convert timepoint and class columns to integer type
    return df

def extract_all_data(DIR, DIR_SJs):
    ''' Extracts dataframe from every single subject and series of actions. '''
    series_ns = [0,1] # possible series number
    all_data = {DIR_SJ:[] for DIR_SJ in DIR_SJs} # initialise dictionary to contain data to be analysed
    for DIR_SJ in DIR_SJs:
        for series_n in series_ns:
            df = parse_file(DIR, DIR_SJ, series_n) # get individual data file
            all_data[DIR_SJ].append(df) # add df to the list of that particular subject
    return all_data

def plot_channels(df):
    ''' Plots all channels in a given dataframe '''
    plt.figure()
    leg = []
    for idx in range(1, 9):
        plt.plot(df['time'], df['channel{}'.format(idx)])
        leg.append('channel{}'.format(idx))
    plt.xlabel('Time (ms)')
    plt.ylabel('Voltage (V)')
    plt.ylabel('Single Subject Data')
    plt.legend(leg)
    plt.show()

In [None]:
df = parse_file(DIR, '01/', 0)
df

In [None]:
# # Plot signals
# for DIR_SJ in DIR_SJs:
#     for series_n in range(2):
#         print(DIR_SJ, series_n) # determine which signals are definitely not okay to use
#         df = parse_file(DIR, DIR_SJ + '/', 0)
#         plot_channels(df)

# # # Plot class labels
# # plt.figure()
# # plt.plot(df['time'], df['class'])
# # plt.xlabel('Time (ms)')
# # plt.ylabel('Voltage (V)')
# # plt.ylabel('Single Subject Data')
# # plt.show()
# df.iloc[:,:-1]ime'])

### Extracting windows from data
- Here, I will create a training and testing set from the first series, then evaluate the majority vote scheme on the second series

In [None]:
class GestureClassifier: 

    def __init__(self, chosen_model, w=0.250, tau=0.016, pca=False, n_components=10): # assume window length and processing time from original paper
        ''' Initialize GestureClassifier object.'''
        self.scaler = StandardScaler() # to standardize data
        self.feature_scaler = StandardScaler() # to standardize extracted features
        self.sos = scipy.signal.butter(N=3, Wn=(10, 499), btype='bandpass', output='sos', fs=1000) # to bandpass filter datas
        self.classifier = chosen_model # this way class can be used with multiple different classifiers
        self.w = w # window size in m
        self.tau = tau # step size in ms
        self.fs = 1000 # 1kHz sampling rate
        self.do_pca = pca # whether to apply PCA preprocessing or not
        self.n_components = n_components # if applying PCA, use this number of components
        self.pca = PCA(n_components=n_components)

    def preprocess(self, df):
        ''' Z-score normalizes channels and bandpasses signals between 10-500Hz (As in Englehart & Hudgins, 2003)'''
        self.scaler = self.scaler.fit(df.iloc[:, 1:-1]) # fit scaler
        df.iloc[:, 1:-1] = self.scaler.transform(df.iloc[:, 1:-1]) # standardize channels
        for idx in range(1, 9): # for each channel, apply bandpass filter
            df[['channel{}'.format(idx)]] = scipy.signal.sosfilt(self.sos, df[['channel{}'.format(idx)]])
        return df

    def zero_crossings(self, win):
        ''' Compute zero-crossing count for a given 8-channel window.'''
        zero_crossing = np.sum(np.diff(np.sign(win), axis=0).astype(bool), axis=0) # end up wit a row of zero-crossing values
        return zero_crossing

    def slope_sign_changes(self, win):
        ''' Compute slope-sign change count for a given 8-channel window.'''
        slope = np.diff(win, axis=0) # get slope estimate
        slope_sign_change = np.sum(np.diff(np.sign(slope), axis=0).astype(bool), axis=0) # end up wit a row of zero-crossing values
        return slope_sign_change

    def waveform_length(self, win):
        ''' Computes the waveform length feature for a given 8-channel window.'''
        abs_diffs = np.abs(np.diff(win, axis=0)) # compute absolute differences between each set of points for each channels
        return abs_diffs.sum(axis=0)

    def feature_extraction(self, df):
        ''' Given a specific channel of data, applies preprocessing, extracts features, and returns feature matrix '''
        df = self.preprocess(df) # apply preprocessing
        wsamp = np.around(self.w*self.fs).astype(int) # get window size in number of samples
        tausamp = np.around(self.tau*self.fs).astype(int) # get step size in number of samples
        wstart = 0 # sample where current window starts
        X = np.zeros((0, 32)) # dynamically create feature matrix
        y = np.zeros(0) # dynamically create labels vector
        while (wstart + wsamp) < df.shape[0]: # while there is still enough data to sample from
            x = np.zeros((1, 32)) # where to store features of this specific window (32) features
            win, labels = df.iloc[wstart:wstart+wsamp,1:-1], df.loc[wstart:wstart+wsamp,'class'].squeeze()
            # print(np.abs(win).mean(axis=0).shape)
            # Extract features
            x[0, 0:8] = np.abs(win).mean(axis=0) # MAV feature for each channel
            x[0, 8:16] = self.zero_crossings(win) # Zero-crossings feature for each channel
            x[0, 16:24] = self.slope_sign_changes(win) # Slope-sign changes feature for each channel
            x[0, 24:32] = self.waveform_length(win) # Waveform length feature for each channel
            X = np.vstack((X, x)) # add new row to matrix
            # Choose label for given window
            mode,_ = scipy.stats.mode(labels, axis=0, keepdims=False) # find which labels occured the most in the given window
            y = np.r_[y, mode] # append the label of the given window to the window of labels
            wstart += tausamp # take a step and extract features from next window
        # Save data in a new dataframe for processingfrom sklearn.model_selection import train_test_split
        fnames = ['MAV', 'ZC', 'SSC', 'WL'] # feature names
        columns = [fname+str(idx) for fname in fnames for idx in range(1, 9)] # feature columns
        columns.append('class') # add the class label for appropriate column
        # self.feature_scaler = self.feature_scaler.fit(X)
        # X = self.feature_scaler.transform(X) # standardize extracted features
        data = np.hstack((X, y.reshape(-1, 1))) # add the labels column to the rest of the data, with 33 columns in total
        df_new = pd.DataFrame(data=data, columns=columns)
        return df_new

    def fit(self, X, y):
        ''' Given extracted features, fit the inputted model.'''
        self.feature_scaler = self.feature_scaler.fit(X)
        X = self.feature_scaler.transform(X) # standardize extracted features
        if self.do_pca: # apply PCA processing if asked for
            self.pca = self.pca.fit(X)
            X = self.pca.transform(X)
        self.classifier = self.classifier.fit(X, y) # fit the chosen model

    def predict(self, X):
        ''' Given inputted features, predict.'''
        X = self.feature_scaler.transform(X) # standardize extracted features
        if self.do_pca:
            X = self.pca.transform(X) # if PCA was chosen, apply PCA processing
        y_pred = self.classifier.predict(X) # predictions for given data
        return y_pred

In [None]:
# Test that class works (without classifier for now)
DIR_SJ, series_n = '01/', 0
df = parse_file(DIR, DIR_SJ, series_n)
model = GestureClassifier(None) # no chosen model for now, default parameters chosen for the rest
df_new = model.feature_extraction(df)
df_new

### Data Visualization (PCA, LDA, Scatter Plots, etc)
This section will serve to visualize the extracted features relative to the classification tasks.

In [None]:
# # Visualizing class distribution
# fig = px.histogram(df_new, x='class', title='Gesture Distribution')
# fig.show()

In [None]:
# PCA of exctracted features (Variance doesn't drop too slowly here, not so correlated)
pca = PCA(n_components=30) # So we can get scree plot of first 30 dims 
scaler = StandardScaler().fit(df_new.iloc[:,:-1])
df_new.iloc[:,:-1] = scaler.transform(df_new.iloc[:,:-1]) # standardize data before PCA
pca = pca.fit(df_new.iloc[:,:-1]) # PCA of all columns except class column
fig = plt.figure()
plt.bar(list(range(1,31)), pca.explained_variance_) # scree plot
plt.xlabel('PCs')
plt.ylabel('Explained Variance')
plt.title('Scree Plot of Features')
plt.show()
fig = plt.figure()
plt.bar(list(range(1,31)), np.cumsum(pca.explained_variance_ratio_)) # scree plot
plt.xlabel('PCs')
plt.ylabel('Explained Variance')
plt.title('Cumulative Sum of Variances')
plt.show()
# 3D scatter plot of first 3 PCs
pca2 = PCA(n_components=3) # So we can get scree plot of first 30 dims 
data3d = pca2.fit_transform(df_new.iloc[:,:-1])
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
colors = ['r', 'g', 'b', 'y', 'm', 'c', 'k']
for idx in range(1,7): # for each label except no action
    plot_data = data3d[df_new['class']==idx, :]
    ax.scatter(plot_data[:,0],plot_data[:,1],plot_data[:,2], c=colors[idx])
ax.set_xlabel('PC1')
ax.set_ylabel('PC2')
ax.set_zlabel('PC3')
ax.set_title('PCA Reduced Features in 3D')
plt.show()

In [None]:
# LDA analysis, followed by LDA classification predictions
classifier = LinearDiscriminantAnalysis(n_components=3)
classifier = classifier.fit(df_new.iloc[:,:-1], df_new['class'])
data3d = classifier.transform(df_new.iloc[:,:-1]) # LDA projection into most separable 3 dimensions
fig = plt.figure()
ax = fig.add_subplot(111,projection='3d')
colors = ['r', 'g', 'b', 'y', 'm', 'c', 'k']
actions = ['unmarked', 'rest', 'fist', 'wrist-flexion', 'wrist-extension', 'radial-deviations', 'ulnar-deviations', 'extended-palm']
for idx in range(1,7): # for each label except no action
    plot_data = data3d[df_new['class']==idx, :]
    ax.scatter(plot_data[:,0],plot_data[:,1],plot_data[:,2], c=colors[idx])
ax.set_xlabel('LD1')
ax.set_ylabel('LD2')
ax.set_zlabel('LDA3')
ax.set_title('LDA Reduced Features in 3D')
ax.set_label(actions[1:-1])

In [None]:
# LDA Classification Predictions
df_new = df_new.drop(df_new[np.isclose(df_new['class'], 0)].index)
classifier = LinearDiscriminantAnalysis()
model = GestureClassifier(classifier) # saves LDA model for classification
X_train, X_test, y_train, y_test = train_test_split(df_new.iloc[:,:-1], df_new['class'], test_size=0.33, random_state=42)
model.fit(X_train, y_train) # calling fit method from gesture classifier object
y_train_pred, y_test_pred = model.predict(X_train), model.predict(X_test)
print('Training Accuracy:')
print(classification_report(y_train, y_train_pred))
print('Testing Accuracy:')
print(classification_report(y_test, y_test_pred))

In [None]:
# Misclassifications seem to be primarily for when the subject is either at rest of performing ulnar deviations
pd.DataFrame(data=confusion_matrix(y_test, y_test_pred), columns=actions[1:-1], index=actions[1:-1])

In [None]:
# Visualizing performance of trained classifier on second time-series
DIR_SJ, series_n = '01/', 1
df = parse_file(DIR, DIR_SJ, series_n)
df_new = model.feature_extraction(df) # same model as before
df_new = df_new.drop(df_new[np.isclose(df_new['class'], 0)].index) # remove any unmarked regions of the data
# Dynamic prediction plots
pred = model.predict(df_new.iloc[:,:-1]) # predict labels at every row/window
df_new['class_pred'] = pred
# Plotting results
plt.figure()
plt.scatter(np.arange(df_new.shape[0]), df_new['class'])
plt.scatter(np.arange(df_new.shape[0]), df_new['class_pred'])
plt.xlabel('Time(ms)')
plt.ylabel('Class Label')
plt.title('Classification Dynamics')
plt.legend(['True', 'Predicted'])
plt.show()


In [None]:
# Performance metrics on new dataset --> translates pretty poorly.
print('Testing Accuracy:')
print(classification_report(df_new['class'], df_new['class_pred']))
pd.DataFrame(data=confusion_matrix(df_new['class'], df_new['class_pred']), columns=actions[1:-1], index=actions[1:-1])

In [None]:
# Majority voting strategy: For the sake of argument use m decisions (t<=t(now)) from continuous stream to make decision
# Implemented as per Englehart and Hudgins, 2003
m = 20
df_new[['class', 'class_pred']] = df_new[['class', 'class_pred']].astype(np.int32)
processed_decisions = np.zeros(df_new.shape[0])
processed_decisions[:m] = 1 # assume at rest until enough data in the stream
for idx in range(m, df_new.shape[0]):
    mode,_ = scipy.stats.mode(df_new.iloc[idx-m:idx+m,-1], axis=0, keepdims=False) # find which labels occured the most in the given window
    processed_decisions[idx] = mode # set the decision of that interval to the majority class vote
    if idx + 2*m > df_new.shape[0]: # if there are less than m windows left in the last batch
        mode,_ = scipy.stats.mode(df_new.iloc[idx+m:,-1], axis=0, keepdims=False) # find which labels occured the most in the given window
        processed_decisions[idx:] = 1 # assume at rest at the very end of the stream
        break
df_new['majority_class'] = processed_decisions # majority decision processed classes
plt.figure()
plt.scatter(np.arange(df_new.shape[0]), df_new['class'])
plt.scatter(np.arange(df_new.shape[0]), df_new['majority_class'])
plt.xlabel('Time(ms)')
plt.ylabel('Class Label')
plt.title('Classification Dynamics')
plt.legend(['True', 'Predicted (Processed)'])
plt.show()

In [None]:
# Performance metrics on new dataset --> accuracy barely improves with majority vote
print('Testing Accuracy:')
print(classification_report(df_new['class'], df_new['majority_class']))
pd.DataFrame(data=confusion_matrix(df_new['class'], df_new['majority_class']), columns=actions[1:-1], index=actions[1:-1])

### Cross-subject performance!
This section will train the classifier on a subset of subjects and evaluate performane across the remaining subjects.

In [None]:
model = GestureClassifier(LinearDiscriminantAnalysis())
for k, DIR_SJ in tqdm(enumerate(DIR_SJs)): # for each subject
    for series_n in range(2): # for each series of each subject
        df = parse_file(DIR, DIR_SJ+'/', series_n)
        df_new = model.feature_extraction(df) # same model as before
        df_new = df_new.drop(df_new[np.isclose(df_new['class'], 0)].index) # remove any unmarked regions of the data
        df_new['sub'] = [DIR_SJ for idx in range(df_new.shape[0])] # so we keep track of which windows are marked per subject
        if k == 0 and series_n == 0: # if the first batch of data extracted, save it to a new dataframe
            df_all = df_new
        else:
            df_all = pd.concat((df_all, df_new), ignore_index=True, axis=0) # add new batch of data to main data collection
df_all

In [None]:
# Mixed-subjects and trials (but training on a bit of all subjects), seems to yield okay performance across subjects
classifier = LinearDiscriminantAnalysis()
model = GestureClassifier(classifier) # saves LDA model for classification
X_train, X_test, y_train, y_test = train_test_split(df_all.iloc[:,:-2], df_all['class'], test_size=0.33)
model.fit(X_train, y_train) # calling fit method from gesture classifier object
y_train_pred, y_test_pred = model.predict(X_train), model.predict(X_test)
print('Training Accuracy:')
print(classification_report(y_train, y_train_pred))
print('Testing Accuracy:')
print(classification_report(y_test, y_test_pred))


In [None]:
# Training on subset of subjects and evaluating on the rest of subjects (held-out subjects)
classifier = LinearDiscriminantAnalysis()
model = GestureClassifier(classifier) # saves LDA model for classification
subs = df_all['sub'].unique()
test_subjects = np.random.choice(subs, size=np.around(len(subs)*0.33).astype(np.int32), replace=False) # pick 1/3 of subjects and randomly add them to test set
df_test = df_all[df_all['sub'].isin(test_subjects)] # also shuffles test set
df_train = df_all[~df_all['sub'].isin(test_subjects)].sample(frac=1) # if not in test_subjects, is used for training and shuffled
model.fit(df_train.iloc[:,:-2], df_train['class']) # calling fit method from gesture classifier object
y_train_pred, y_test_pred = model.predict(df_train.iloc[:,:-2],), model.predict(df_test.iloc[:,:-2],)
print('Training Accuracy:')
print(classification_report(df_train['class'], y_train_pred))
print('Testing Accuracy:')
print(classification_report(df_test['class'], y_test_pred))
pd.DataFrame(data=confusion_matrix(df_test['class'], y_test_pred), columns=actions[1:], index=actions[1:])

In [None]:
# Adding new column to test
df_test['class_pred'] = y_test_pred

In [None]:
# Majority voting strategy: For the sake of argument use m decisions (t<=t(now)) from continuous stream to make decision
# Implemented as per Englehart and Hudgins, 2003
m = 10
df_test[['class', 'class_pred']] = df_test[['class', 'class_pred']].astype(np.int32)
processed_decisions = np.zeros(df_test.shape[0])
processed_decisions[:m] = 1 # assume at rest until enough data in the stream
for idx in range(m, df_test.shape[0]):
    mode,_ = scipy.stats.mode(df_test.iloc[idx-m:idx+m,-1], axis=0, keepdims=False) # find which labels occured the most in the given window
    processed_decisions[idx] = mode # set the decision of that interval to the majority class vote
    if idx + 2*m > df_test.shape[0]: # if there are less than m windows left in the last batch
        mode,_ = scipy.stats.mode(df_test.iloc[idx+m:,-1], axis=0, keepdims=False) # find which labels occured the most in the given window
        processed_decisions[idx:] = 1 # assume at rest at the very end of the stream
        break
df_test['majority_class'] = processed_decisions # majority decision processed classes
# df_test.reset_index(inplace=True)
plt.figure()
# plt.plot(np.arange(df_test.shape[0]), df_test['class'])
# plt.plot(np.arange(df_test.shape[0]), df_test['majority_class'])
plt.plot(np.arange(10000), df_test.loc[:9999,'class'])
plt.plot(np.arange(10000), df_test.loc[:9999,'majority_class'])
plt.xlabel('Time(ms)')
plt.ylabel('Class Label')
plt.title('Classification Dynamics')
plt.legend(['True', 'Predicted (Processed)'])
plt.show()

In [None]:
print('Testing Accuracy:')
print(classification_report(df_test['class'], df_test['majority_class']))
pd.DataFrame(data=confusion_matrix(df_test['class'], df_test['majority_class']), columns=actions[1:], index=actions[1:])
df_test.drop(columns='majority_class', inplace=True)