In [7]:
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import subprocess
import mne
import mne_qt_browser
from scipy.stats import skew, kurtosis

# execute ls, sort files by date created
output = subprocess.run(["ls -t ../data_named"], stdout=subprocess.PIPE, shell=True, text=True)
# show only the last 5 files, line by line
files = output.stdout.split("\n")[:-1][:5]
filename = None
i = 0
print("\n>> Select file to analyze:\n")
for file in files:
    print("  ", i, ":", file)
    i += 1
index = input("\n>> ")
if index == "":
    index = 0 # if no input, default to the most recent file
    filename = files[index]
    print("\nOK, most recent file: ", filename)
else:
    index = int(index)
    filename = files[index]
    print("\nOK, opening: ", filename)


>> Select file to analyze:

   0 : 0210_Jerry.npz
   1 : 0210_first.npz

OK, opening:  0210_first.npz


In [8]:
import scipy.signal as signal
import scipy.stats as stats

class DataBuilder:
    def __init__(self, sfreq):
        self.sfreq = sfreq

    def load_data(self, data_path):
        # Load the .npz file
        data = np.load(data_path, allow_pickle=True)
        labels = []
        windows = []
        for key in data.files:
            label = data[key].item()['label']
            window = data[key].item()['data']
            labels.append(label)
            windows.append(window)
        # dropping the first window because it's "rest"
        # drop the last window too
        return windows[1:-1], labels[1:-1]
    
    def load_window(self, data_path, index):
        # Load the .npz file
        windows, labels = self.load_data(data_path)
        windows = self.preprocess_data(windows)
        return windows[index], labels[index]

    def preprocess_data(self, windows):
        # remove the first two columns of each window (millis and hall sensor)
        processed_windows = []
        for window in windows:
            window = np.delete(window, [0,1], axis=1)
            processed_windows.append(window)
        
        print("Printing some stats!\n====================")
        print("Number of samples:", len(processed_windows))
        print("Number of channels:", len(processed_windows[0][0]))
        print("====================\n")

        return processed_windows
    
    def create_feature_vector(self, window):
        # window has shape samples x channels
        features = []
        print("Window shape:", window.shape)
        for channel in window.T:
            # demean and normalize
            channel = channel - np.mean(channel)
            channel = channel / np.std(channel)

            # calculate features
            kurtosis = stats.kurtosis(channel)
            skew = stats.skew(channel)
            # find peaks
            peaks, _ = signal.find_peaks(channel)
            num_peaks = len(peaks)
            # find zero crossings
            zero_crossings = np.where(np.diff(np.sign(channel)))[0]
            num_zero_crossings = len(zero_crossings)
            features.append(kurtosis)
            features.append(skew)
            features.append(num_peaks)
            features.append(num_zero_crossings)

        features = np.array(features)
        return features

    def create_feature_matrix(self, windows):
        # windows has shape windows x samples x channels
        feature_matrix = []
        for window in windows:
            feature_vector = self.create_feature_vector(window)
            feature_matrix.append(feature_vector)

        feature_matrix = np.array(feature_matrix)
        print("Feature matrix shape:", feature_matrix.shape)
        return feature_matrix
    
    def get_labels(self):
        return self.labels

    def build_data(self, data_path):
        # Load data
        windows, labels = self.load_data(data_path)
        print("Number of labeled samples:", len(windows))

        # Preprocess data
        windows = self.preprocess_data(windows)

        # Create feature matrix
        X = self.create_feature_matrix(windows)

        # Get labels
        y = labels

        return X, y

In [31]:
npz_file_path = f"../data_named/{filename}"

data_builder = DataBuilder(sfreq=1000)
X, y = data_builder.build_data(data_path=npz_file_path)

window_4, label_4 = data_builder.load_window(npz_file_path, 4)

print("X shape:", X.shape)
print("y shape:", len(y))

Number of labeled samples: 61
Printing some stats!
Number of samples: 61
Number of channels: 5

Window shape: (667, 5)
Window shape: (798, 5)
Window shape: (908, 5)
Window shape: (759, 5)
Window shape: (575, 5)
Window shape: (826, 5)
Window shape: (748, 5)
Window shape: (634, 5)
Window shape: (540, 5)
Window shape: (781, 5)
Window shape: (712, 5)
Window shape: (954, 5)
Window shape: (717, 5)
Window shape: (887, 5)
Window shape: (723, 5)
Window shape: (788, 5)
Window shape: (518, 5)
Window shape: (707, 5)
Window shape: (1011, 5)
Window shape: (719, 5)
Window shape: (659, 5)
Window shape: (875, 5)
Window shape: (555, 5)
Window shape: (744, 5)
Window shape: (995, 5)
Window shape: (650, 5)
Window shape: (912, 5)
Window shape: (990, 5)
Window shape: (603, 5)
Window shape: (751, 5)
Window shape: (569, 5)
Window shape: (536, 5)
Window shape: (1021, 5)
Window shape: (947, 5)
Window shape: (873, 5)
Window shape: (710, 5)
Window shape: (564, 5)
Window shape: (730, 5)
Window shape: (862, 5)
Windo

  channel = channel / np.std(channel)
  channel = channel / np.std(channel)


In [10]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report

# linear classifier
from sklearn.linear_model import LogisticRegression

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=42)
# drop nans in X_test
y_test = np.array(y_test)
y_test = y_test[~np.isnan(X_test).any(axis=1)]
X_test = X_test[~np.isnan(X_test).any(axis=1)]
y_train = np.array(y_train)
y_train = y_train[~np.isnan(X_train).any(axis=1)]
X_train = X_train[~np.isnan(X_train).any(axis=1)]

model = LogisticRegression()
model.fit(X_train, y_train)

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


In [44]:
# # save model
# import pickle
# pickle.dump(model, open("model.pkl", "wb"))

# # load model
# model = pickle.load(open("model.pkl", "rb"))

In [11]:
y_pred = model.predict(X_test)

# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
print(f'Accuracy: {accuracy:.2f}')

# Additional metrics
print(classification_report(y_test, y_pred))

Accuracy: 0.38
                 precision    recall  f1-score   support

     double-tap       1.00      0.50      0.67         4
           rest       0.17      0.20      0.18         5
            tap       0.50      0.17      0.25         6
 thumb-to-index       0.33      1.00      0.50         2
thumb-to-middle       0.33      1.00      0.50         1
 thumb-to-pinky       1.00      0.50      0.67         2
  thumb-to-ring       0.00      0.00      0.00         1

       accuracy                           0.38        21
      macro avg       0.48      0.48      0.40        21
   weighted avg       0.52      0.38      0.38        21



In [12]:
def run_inference(window, model):
    feature_vector = data_builder.create_feature_vector(window)
    feature_vector = np.array([feature_vector])
    print("Feature vector shape:", feature_vector.shape)
    prediction = model.predict(feature_vector)
    return prediction

prediction = run_inference(window_4, model)
print("Prediction:", prediction)
print("Actual:", label_4)

Window shape: (575, 5)
Feature vector shape: (1, 20)
Prediction: ['thumb-to-middle']
Actual: double-tap


### Remap to classify only thumb-related movements and tap-related movements

In [26]:
y_remap = []
for label in y:
    if label == "thumb-to-index" or label == "thumb-to-middle" or label == "thumb-to-ring" or label == "thumb-to-pinky":
        y_remap.append("thumb")
    if label == "tap" or label == "double-tap":
        y_remap.append("tapping")
    if label == "rest":
        y_remap.append("rest")
y_remap = np.array(y_remap)
y_remap

array(['thumb', 'thumb', 'thumb', 'thumb', 'tapping', 'tapping', 'rest',
       'thumb', 'thumb', 'thumb', 'thumb', 'tapping', 'tapping', 'rest',
       'thumb', 'thumb', 'thumb', 'thumb', 'tapping', 'tapping', 'rest',
       'thumb', 'thumb', 'thumb', 'thumb', 'tapping', 'tapping', 'rest',
       'thumb', 'thumb', 'thumb', 'thumb', 'tapping', 'tapping', 'rest',
       'thumb', 'thumb', 'thumb', 'thumb', 'tapping', 'tapping', 'rest',
       'thumb', 'thumb', 'thumb', 'thumb', 'tapping', 'tapping', 'rest',
       'thumb', 'thumb', 'thumb', 'thumb', 'tapping', 'tapping', 'rest',
       'thumb', 'thumb', 'thumb', 'thumb', 'tapping'], dtype='<U7')

In [35]:
idx = y_remap != "rest"
X_remap = X[idx]
y_remap = y_remap[idx]

In [37]:
X_remap.shape

(53, 20)

In [38]:
y_remap.shape

(53,)

In [39]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report

# linear classifier
from sklearn.linear_model import LogisticRegression

X_train, X_test, y_train, y_test = train_test_split(X_remap, y_remap, test_size=0.2, random_state=42)
# drop nans in X_test
y_test = np.array(y_test)
y_test = y_test[~np.isnan(X_test).any(axis=1)]
X_test = X_test[~np.isnan(X_test).any(axis=1)]
y_train = np.array(y_train)
y_train = y_train[~np.isnan(X_train).any(axis=1)]
X_train = X_train[~np.isnan(X_train).any(axis=1)]

model = LogisticRegression()
model.fit(X_train, y_train)

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


In [40]:
y_pred = model.predict(X_test)

# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
print(f'Accuracy: {accuracy:.2f}')

# Additional metrics
print(classification_report(y_test, y_pred))

Accuracy: 0.90
              precision    recall  f1-score   support

     tapping       0.80      1.00      0.89         4
       thumb       1.00      0.83      0.91         6

    accuracy                           0.90        10
   macro avg       0.90      0.92      0.90        10
weighted avg       0.92      0.90      0.90        10



In [41]:
y_pred

array(['thumb', 'tapping', 'tapping', 'thumb', 'thumb', 'tapping',
       'tapping', 'thumb', 'tapping', 'thumb'], dtype='<U7')

In [42]:
y_test

array(['thumb', 'tapping', 'tapping', 'thumb', 'thumb', 'tapping',
       'tapping', 'thumb', 'thumb', 'thumb'], dtype='<U7')

In [44]:
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler

In [45]:
sc = StandardScaler()
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)

In [46]:
svm = SVC(kernel='linear', random_state=42)
svm.fit(X_train, y_train)
y_pred = svm.predict(X_test)

In [47]:
print(classification_report(y_test, y_pred))
print("Accuracy:", accuracy_score(y_test, y_pred))

              precision    recall  f1-score   support

     tapping       0.80      1.00      0.89         4
       thumb       1.00      0.83      0.91         6

    accuracy                           0.90        10
   macro avg       0.90      0.92      0.90        10
weighted avg       0.92      0.90      0.90        10

Accuracy: 0.9
