# Class Prediction using LDA

This script will pre-process, extract features from the recorded dataset. The dataset consists of EMG recordings while the user practices silent and subvocalized speech. For each sample, the user is instructed to "say" a word from a vocabulary list, and the sample is labelled with the word. This notebook is used on 3-word datasets, and to evaluate cross-session performance for both the electrode brace, the star array and the control array.

## Data processing script

This step returns a full feature matrix and a class array for training the LDA.

In [None]:
# Data loading
import scipy.signal
import numpy as np
import pandas as pd

from feature_extractors import window_generator, mav, wl, window_rms
from librosa.feature import mfcc, delta, rms

# Define configuration
INITIALIZATION_WINDOW = [0.5, 4.5] # seconds
EMG_SAMPLE_RATE = 250

# Window Configuration
WINDOW_SIZE = 400 # ms
WINDOW_STRIDE = 100 # ms
WINDOW_SAMPLE_SIZE = int(EMG_SAMPLE_RATE * (WINDOW_SIZE / 1000)) # should be an integer result
WINDOW_SAMPLE_STRIDE = int(EMG_SAMPLE_RATE * (WINDOW_STRIDE / 1000)) # should be an integer result

# Define filters
sos_notch_50hz = scipy.signal.butter(4, [48,52], 'bandstop', fs=EMG_SAMPLE_RATE, output='sos')
sos_highpass = scipy.signal.butter(4, 0.5, 'highpass', fs=EMG_SAMPLE_RATE, output='sos')
sos_interest = scipy.signal.butter(4, [2,48], 'bandpass', fs=EMG_SAMPLE_RATE, output='sos')

base_path = "../../datasets/star-50x3-3"
df = pd.read_csv(f"{base_path}/metadata.csv")
X = None
y = None

for index, row in df.iterrows():

    # Load in npy from dataset
    emg = np.load(f"{base_path}/{row['id']}.npy")

    # Do basic analog filtering
    emg = emg - np.mean(emg, axis=0) # Remove DC
    emg = scipy.signal.sosfilt(sos_highpass, emg, axis=0)
    emg = scipy.signal.sosfilt(sos_notch_50hz, emg, axis=0)

    print(emg.shape)
    
    # Remove intialization to avoid initialization noise
    start = int(INITIALIZATION_WINDOW[0]*EMG_SAMPLE_RATE)
    end = int(INITIALIZATION_WINDOW[1]*EMG_SAMPLE_RATE)
    emg = emg[start:end,:]

    print(emg.shape)

    emg_reshaped = np.reshape(emg, (1, emg.shape[0], emg.shape[1])) # is now (1 x TIME x CHANNELS)


    mav_features = mav(emg_reshaped)
    wl_features = wl(emg_reshaped)
    mfcc_features = np.squeeze(mfcc(y=emg.T, sr=EMG_SAMPLE_RATE, n_mfcc=2, n_fft=emg.shape[0], center=False))

    # 64 features in total
    features = np.vstack((mav_features, wl_features, mfcc_features.T))
    # features = np.vstack((mav_features, wl_features, zc_features, ssc_features))
    # features = mfcc_features.T
    # features = np.vstack((mav_features, zc_features, mfcc_features.T[0:2]))
    features = features.ravel()
    
    if X is None:
        X = np.expand_dims(features, axis=0)
    else:
        X = np.concatenate((X, np.expand_dims(features, axis=0)), axis=0)

    if y is None:
        y = np.array([row['cls']])
    else:
        y = np.append(y, row['cls'])

print("Generated Feature Matrix:")
print(X)
print(X.shape)

print("Generated Label Matrix:")
print(y)
print(y.shape)

# Viewing PCA and LDA properties of dataset

Setting up PCA and LDA transformations on the extracted features

In [None]:
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
import matplotlib.pyplot as plt

target_names = np.unique(y) # Get list of label names

pca = PCA(n_components=2)
X_pca = pca.fit(X).transform(X)

lda = LinearDiscriminantAnalysis(solver="svd", n_components=2)
X_lda = lda.fit(X, y).transform(X)

plt.figure(figsize=(15, 6))
splot = plt.subplot(1, 2, 1)
for color, target_name in zip(['grey', 'mediumpurple', 'blue', 'green', 'yellow', 'orange', 'red', 'sienna'], target_names):
    plt.scatter(
        X_pca[y == target_name, 0], X_pca[y == target_name, 1], color=color, alpha=0.8, lw=2, label=target_name
    )
plt.legend(loc="best", shadow=False, scatterpoints=1)
plt.title("PCA of dataset")
plt.tick_params(left = False, right = False , labelleft = False ,
                labelbottom = False, bottom = False)

splot = plt.subplot(1, 2, 2)
for color, target_name in zip(['grey', 'mediumpurple', 'blue', 'green', 'yellow', 'orange', 'red', 'sienna'], target_names):
    plt.scatter(
        X_lda[y == target_name, 0], X_lda[y == target_name, 1], color=color, alpha=0.8, lw=2, label=target_name
    )
plt.legend(loc="best", shadow=False, scatterpoints=1)
plt.title("LDA of dataset")
plt.tick_params(left = False, right = False , labelleft = False ,
                labelbottom = False, bottom = False)

# Linear Discriminator Analysis

Setting up K-fold cross validator, across 5 folds for hyperparameter tuning.

In [None]:
from sklearn.model_selection import train_test_split

seed = 39
# Include all in test train
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.8, random_state=seed)
#X_train, X_test, y_train, y_test = train_test_split(X[:150,:], y[:150], train_size=0.8) # Session 0 only
# X_train, X_test, y_train, y_test = train_test_split(X[150:,:], y[150:], train_size=0.8, random_state=seed) # Session 1 only

# X_train = X
# y_train = y

# Split by session, session 0 for train, session 1 for test
#X_train = X[:150,:]
#y_train = y[:150]

#X_test = X[150:,:]
#y_test = y[150:]

# Test 0 train 1
#X_test = X[:150,:]
#y_test = y[:150]

#X_train = X[150:,:]
#y_train = y[150:]

In [None]:
from sklearn import preprocessing
from sklearn.pipeline import make_pipeline
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import KFold, cross_val_score, permutation_test_score, LearningCurveDisplay

ss = KFold(n_splits=5)
clf = make_pipeline(preprocessing.StandardScaler(), LinearDiscriminantAnalysis(solver="svd", n_components=2))
scores = cross_val_score(clf, X_train, y_train, cv=ss)
score, _, pvalue = permutation_test_score(clf, X_train, y_train, cv=ss)

print("%0.2f accuracy with a standard deviation of %0.2f." % (scores.mean(), scores.std()))
print("True score of %0.2f with a pvalue of %0.2f" % (score, pvalue))

LearningCurveDisplay.from_estimator(clf, X_train, y_train, train_sizes=[0.25, 0.50, 0.75, 1.0], cv=ss)

# Sequential feature selection

In [None]:
# Sequential feature selection
from sklearn.feature_selection import SequentialFeatureSelector

feature_names = ['mav', 'wl', 'zc', 'ssc', 'mfcc0', 'mfcc1', 'mfcc2', 'mfcc3']

feature_names_full = []
for i in range(8):
    for name in feature_names:
        feature_names_full.append(f"{name}_{i}")

ss = KFold(n_splits=5)
clf = make_pipeline(preprocessing.StandardScaler(), LinearDiscriminantAnalysis(solver="lsqr"))


sfs_forward = SequentialFeatureSelector(
    clf, n_features_to_select=32, direction="forward"
).fit(X_train, y_train)

sfs_backward = SequentialFeatureSelector(
    clf, n_features_to_select=32, direction="backward"
).fit(X_train, y_train)

features = []
print("Features selected by forward SFS:")
features.append(sfs_forward.get_feature_names_out(feature_names_full))
print(features[0])

print("Features selected by backward SFS:")
features.append(sfs_backward.get_feature_names_out(feature_names_full))
print(features[1])

In [None]:
import matplotlib.pyplot as plt

# Let's make a nice pretty plot
channel_count = {} # 0 is forward, 1 is backward
type_count = {}

channels = []
types = []
channel_groups = {
    'forward': [],
    'backward': []
}
type_groups = {
    'forward': [],
    'backward': []
}

for i, direction in enumerate(features):
    for feature in direction:
        type, channel = feature.split("_")

        direction = 'forward' if (i == 0) else 'backward'

        try:
            idx = channels.index(channel)
            channel_groups[direction][idx] += 1
        except ValueError: # Doesn't have
            channels.append(channel)
            channel_groups['forward'].append(0)
            channel_groups['backward'].append(0)

            idx = len(channels) - 1
            channel_groups[direction][idx] += 1
        
        try:
            idx = types.index(type)
            type_groups[direction][idx] += 1
        except ValueError: # Doesn't have
            types.append(type)
            type_groups['forward'].append(0)
            type_groups['backward'].append(0)

            idx = len(types) - 1
            type_groups[direction][idx] += 1

plt.figure(figsize=(15,6))

ax = plt.subplot(1, 2, 1)
x = np.arange(len(channels))  # the label locations
width = 0.25  # the width of the bars
multiplier = 0

for direction, channel_count in channel_groups.items():
    offset = width * multiplier
    rects = ax.bar(x + offset, channel_count, width, label=direction)
    ax.bar_label(rects, padding=3)
    multiplier += 1

ax.set_ylabel('Number of times included')
ax.set_xlabel('Channel name')

ax.set_title('Channel importance from SFS')
ax.set_xticks(x + width, channels)
ax.legend(loc='upper right', ncols=3)

ax = plt.subplot(1, 2, 2)
x = np.arange(len(types))  # the label locations
width = 0.25  # the width of the bars
multiplier = 0

for direction, type_count in type_groups.items():
    offset = width * multiplier
    rects = ax.bar(x + offset, type_count, width, label=direction)
    ax.bar_label(rects, padding=3)
    multiplier += 1

ax.set_ylabel('Number of times included')
ax.set_xlabel('Type name')

ax.set_title('Type importance from SFS')
ax.set_xticks(x + width, types)
ax.legend(loc='upper right', ncols=3)

plt.show()

# Test performance

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

clf.fit(X_train, y_train)

score = clf.score(X_test, y_test)
print(f"The test score is: {score}")

# Let's make a confusion matrix
y_pred = clf.predict(X_test)
cm = confusion_matrix(y_test, y_pred, labels=clf.classes_)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=clf.classes_)
disp.plot()
plt.show()