In [1]:
from pylsl import StreamInlet, resolve_stream
import numpy as np
import pandas as pd

from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import train_test_split, cross_val_score, cross_validate, KFold
import matplotlib.pyplot as plt

from scipy import stats

import time

from volume_control import volume_up,  volume_down, volume_on, volume_off

#### functions for feature calculations

In [2]:
def rms(values):
    return np.sqrt(np.sum(np.square(values))/len(values))

def mav(values):
    return np.sum(np.absolute(values))/len(values)    

def var(values):
    return np.sum(np.square(values))/(len(values)-1)

def wl_exp(values):
    wavelen = np.sum(np.abs(values[:-1]-values[1:]))
    return wavelen

def wl(values):
    val_array = values.to_numpy()
    wavelen = np.sum(np.abs(val_array[:-1]-val_array[1:]))
    return wavelen

def zc(values):
    val_array = values.to_numpy()
    zero_crossings = np.where(np.diff(np.signbit(val_array)))[0]
    return len(zero_crossings)

def zc_exp(values):
    val_array = values.to_numpy()
    zero_crossings = np.where(np.diff(np.signbit(val_array)))[0]
    return len(zero_crossings)

### TRAINING

- set current device and subject_ID

In [6]:
def read_from_file(file: str):
    data = pd.read_csv(file)
    return data.loc[data["direction"] > 0]

path_lg = '/Users/lukasgehrke/Documents/publications/proxEMG/data/4_chan/'
path_nw = 'C:/Users/Nils/Documents/02_Uni/Master Human Factors/\
00_Masterarbeit/55_github/proxEMG/data/4_chan/'
path_mac_old = '/Users/work/Desktop/Nils/study/4_chan/'
path_mac = '/Users/work/Desktop/Nils/proxEMG/data/user_study/'

device = path_mac
subject_id = '06'

path_rot = device + subject_id + '/' + subject_id + '_training_rotation.csv'
path_ges = device + subject_id + '/' + subject_id + '_training_gesture.csv'

In [7]:
def feature_calculation(data:pd.DataFrame, win_size=None, baseline=False, baseline_size=1):
    df = pd.DataFrame()
    n_epoch = data['epoch_ixs'].max()
    
    for i in range(1,n_epoch+1):
        
        data_curr = data[data['epoch_ixs']==i]

        if win_size is not None: # get middle window
            middle_t = (len(data_curr)-1) / 2
            start_t = int(np.floor(middle_t - win_size/2))
            end_t = int(np.floor(middle_t + win_size/2))
            data_curr = data_curr[start_t:end_t]

        if baseline:
            base = data_curr.iloc[0:baseline_size,2:].mean()
            data_curr.iloc[:,2:] = data_curr.iloc[:,2:] - base
        
        # df_curr = data_curr[:win_size].filter(muscles_list[:-1]).agg(features_selected).melt().transpose()[-1:]
        df_curr = data_curr.filter(muscles_list).agg(features_selected).melt().transpose()[-1:]
        df_curr['direction'] = data_curr.direction.values[0]
        
        df = pd.concat([df,df_curr], axis=0, ignore_index=True)
        #print(i)
    
    return df

#### feature calculation and data preparation

In [8]:
%%time

windowsize = 250
baselinesize = windowsize

#features_selected = [rms,mav,var, wl,zc]
#features_selected = [mav, var, wl]
features_selected = [wl, mav, var]
muscles_list = ["muscle1","muscle2", "muscle3","muscle4"]
# muscles_list = ["muscle1","muscle2", "muscle3"] 

# overlap = 0.2
# stepsize = windowsize - int(overlap*windowsize)
# muscles_list = ["muscle2", "muscle3"]

import warnings
warnings.filterwarnings('ignore')

all_data_rot = feature_calculation(read_from_file(file=path_rot), win_size=windowsize, baseline=True, baseline_size=baselinesize)
all_data_ges = feature_calculation(read_from_file(file=path_ges), win_size=windowsize, baseline=True, baseline_size=baselinesize)

CPU times: user 3.78 s, sys: 106 ms, total: 3.88 s
Wall time: 3.93 s


In [9]:
# zusammenfassen von no_rot_left & no_rot_right
all_data_rot['direction'] = all_data_rot['direction'].replace([4],[3])

# check class sizes
freq_rot = all_data_rot['direction'].value_counts() 
freq_ges = all_data_ges['direction'].value_counts()

# sample all classes to smallest class size
sample_data_ges = all_data_ges.groupby('direction').sample(n=min(freq_ges), random_state=42)
sample_data_rot = all_data_rot.groupby('direction').sample(n=min(freq_rot), random_state=42)

### TESTING

In [10]:
def LDA_with_CV(data: pd.DataFrame, folds = 10):
    #clf=LDA('lsqr',shrinkage='auto')
    clf=LDA()
    kfolds = KFold(n_splits=folds, random_state=1, shuffle=True)
    
    X_train = data.iloc[:,1:len(data.columns)].values
    y_train = data.direction
    
    cv_results = cross_val_score(clf, X_train, y_train, cv=kfolds)

    print("%0.3f accuracy with a standard deviation of %0.3f using %0.f-folds-CV" % (cv_results.mean(), cv_results.std(),folds))

In [11]:
def testing(data: pd.DataFrame, testsize = 0.25):
    clf=LDA()
    X = data.iloc[:,1:len(data.columns)].values
    y = data.direction
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=testsize, random_state=42)
    
    clf.fit(X_train, y_train)
    test_result = clf.score(X_test,y_test)
    
    print("%0.3f accuracy with a test_size of %0.2f" % (test_result, testsize))  

testing:
<br>
- LDA_with_CV: LDA with cross validation; optional parameter folds (default 10)
- testing: LDA with train-test-split; optional parameter testsize (default 0.25)

In [12]:
LDA_with_CV(sample_data_ges)
LDA_with_CV(sample_data_rot,folds=5)

testing(sample_data_ges)
testing(sample_data_rot)

0.989 accuracy with a standard deviation of 0.033 using 10-folds-CV
0.867 accuracy with a standard deviation of 0.087 using 5-folds-CV
1.000 accuracy with a test_size of 0.25
0.868 accuracy with a test_size of 0.25


#### fit model

In [13]:
#clf_rot = LDA(solver="lsqr", shrinkage='auto')
clf_rot = LDA()

X_train_rot = sample_data_rot.iloc[:,0:len(sample_data_rot.columns)-1].values
y_train_rot = sample_data_rot.direction.values

clf_rot.fit(X_train_rot, y_train_rot)

# clf_ges = LDA(solver="lsqr", shrinkage='auto')
clf_ges = LDA()

X_train_ges = sample_data_ges.iloc[:,0:len(sample_data_ges.columns)-1].values
y_train_ges = sample_data_ges.direction.values

clf_ges.fit(X_train_ges, y_train_ges)

LinearDiscriminantAnalysis()

## MAIN

###### gestures
direction 1 = no gesture 
<br>
direction 2 = gesture

###### rotation
direction 1 = clockwise 
<br>
direction 2 = anti-clockwise 
<br>
direction 3 = no rotation

In [14]:
overlap = 0.2
dropped_samples = windowsize - int(overlap*windowsize)

baseline = True
baseline_size = windowsize

threshold_ges = 0.9
duration_rot = 5
threshold_rot = 0.9

duration_rot = 20
n_predictions = 3 # number of same predictions before output
n_loops = 20 # number of rotation windows classified before jumping back to gesture detection

## live experiment

##### testing part I

In [15]:
### gesture testing

def main():
    # first resolve an EMG stream on the lab network
    print("looking for an EMG stream...")

    streams = resolve_stream('name', 'BrainVision RDA') # BrainVision

    inlet = StreamInlet(streams[0])
    
    data = np.array([[],[],[],[]]) # 4 emg channels
    
    switch = False
    
    while True:
        # get a new sample
        sample, timestamp = inlet.pull_sample()
        
        data = np.append(data, [[sample[0]],[sample[1]],[sample[2]],[sample[3]]], axis = 1)

        if data.shape[1] == windowsize:

            data_feat = data
            if baseline:
                data_feat -= np.mean(data_feat[:,0:baseline_size][:,None],axis=2)
            
            features = pd.DataFrame(data_feat).agg(features_selected, axis = 1).to_numpy().reshape(1,-1)

            direction_ges = clf_ges.predict(features)[0] #predicted class
            prob_ges = clf_ges.predict_proba(features) #probability for class prediction
            
            direction_rot = clf_rot.predict(features)[0] #predicted class
            prob_rot = clf_rot.predict_proba(features) #probability for class prediction
            
            if direction_ges == 2 and float(prob_ges[0][1]) > threshold_ges:
                if switch==True:
                    volume_on()
                    switch = False
                    time.sleep(3)
                if switch==False:
                    volume_off()
                    switch = True
                    time.sleep(3)

            # drop samples
            data = np.delete(data, np.s_[:dropped_samples], axis=1)

if __name__ == '__main__':
    main()

looking for an EMG stream...


2022-07-29 11:57:42.299 ( 253.854s) [            217B]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 32768, multicast: 1, broadcast: 0)
2022-07-29 11:57:42.299 ( 253.854s) [            217B]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 32768, multicast: 1, broadcast: 0)
2022-07-29 11:57:42.299 ( 253.854s) [            217B]      netinterfaces.cpp:102   INFO| 	IPv4 addr: 7f000001
2022-07-29 11:57:42.300 ( 253.854s) [            217B]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 32768, multicast: 1, broadcast: 0)
2022-07-29 11:57:42.300 ( 253.854s) [            217B]      netinterfaces.cpp:105   INFO| 	IPv6 addr: ::1
2022-07-29 11:57:42.300 ( 253.854s) [            217B]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 32768, multicast: 1, broadcast: 0)
2022-07-29 11:57:42.300 ( 253.854s) [            217B]      netinterfaces.cpp:105   INFO| 	IPv6 addr: fe80::1%lo0
2022-07-29 11:57:42.300 ( 253.854s) [            217B]      netinterfaces.cpp:91    I

KeyboardInterrupt: 

In [16]:
def main():
    # first resolve an EMG stream on the lab network
    print("looking for an EMG stream...")

    streams = resolve_stream('name', 'BrainVision RDA') # BrainVision

    inlet = StreamInlet(streams[0])
    
    data = np.array([[],[],[],[]]) # 4 emg channels  
 
    while True:
        # get a new sample
        sample, timestamp = inlet.pull_sample()
        
        data = np.append(data, [[sample[0]],[sample[1]],[sample[2]],[sample[3]]], axis = 1)

        if data.shape[1] == windowsize:

            data_feat = data
            if baseline:
                data_feat -= np.mean(data_feat[:,0:baseline_size][:,None],axis=2)
            
            features = pd.DataFrame(data_feat).agg(features_selected, axis = 1).to_numpy().reshape(1,-1)

            direction_ges = clf_ges.predict(features)[0] #predicted class
            prob_ges = clf_ges.predict_proba(features) #probability for class prediction
            
            direction_rot = clf_rot.predict(features)[0] #predicted class
            prob_rot = clf_rot.predict_proba(features) #probability for class prediction
            
            if prob_rot[0][int(direction_rot-1)] > threshold_rot:
                if direction_rot == 1:
                    volume_up(5)
                if direction_rot == 2:
                    volume_down(5)

            # drop samples
            data = np.delete(data, np.s_[:dropped_samples], axis=1)

if __name__ == '__main__':
    main()

looking for an EMG stream...
55
60
65
70
75
80
86
91
96
100
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
36
41
46
51
56
61
66
71
76
81
86
91
96
100
105
105
105
105
105
105
105
95
90
94
99
103
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
105
95
100
105
105
105
105
105
105
105
105
105
105
105
105


KeyboardInterrupt: 

##### testing part II

In [17]:
def main():
    # first resolve an EMG stream on the lab network
    print("looking for an EMG stream...")
    
    # streams = resolve_stream('type', 'EEG') # LSL_DATA_SENDER
    # streams = resolve_stream('name', 'EMG') # BITALINO
    # streams = resolve_stream('name', 'OpenSignals') # BITALINO
    streams = resolve_stream('name', 'BrainVision RDA') # BrainVision

    inlet = StreamInlet(streams[0])
    
    data = np.array([[],[],[],[]]) # length depends on emg channels
    
    overlap = 0.4
    dropped_samples = windowsize - int(overlap*windowsize)
    
    switch = False # True/False for rotation recognition
    counter = 0
    
    while True:
        # get a new sample
        sample, timestamp = inlet.pull_sample()
        
        data = np.append(data, [[sample[0]],[sample[1]],[sample[2]],[sample[3]]], axis = 1)

        if data.shape[1] == windowsize:

            data_feat = data
            if baseline:
                data_feat -= np.mean(data_feat[:,0:baseline_size][:,None],axis=2)
            
            features = pd.DataFrame(data_feat).agg(features_selected, axis = 1).to_numpy().reshape(1,-1)

            direction_ges = clf_ges.predict(features)[0] #predicted class
            prob_ges = clf_ges.predict_proba(features) #probability for class prediction
            
            direction_rot = clf_rot.predict(features)[0] #predicted class
            prob_rot = clf_rot.predict_proba(features) #probability for class prediction
            
            # trial logic
            if switch == True:
                             
                dir_array = np.append(dir_array,direction_rot) # append predicted direction
                dir_array = np.delete(dir_array,0) # delete first entry
                
                if len(np.unique(dir_array)) == 1: # check if all entries are the same
                    if np.unique(dir_array) == 1: 
                        volume_up(10)
                        switch = False
                        
                    if np.unique(dir_array) == 2: 
                        volume_down(10)
                        switch = False           
            

            # switch == False: outer circle
            if switch == False and counter == 0 and direction_ges == 2 and float(prob_ges[0][1]) > threshold_ges:
                switch = True
                counter = n_loops
                data = np.delete(data, np.s_[:windowsize], axis=1) # empty data array
                dir_array = np.zeros(n_predictions) # (re-)create empty array to avoid taking output from last round
                
                time.sleep(1) # pause before starting inner circle
                               
            
            # drop samples
            data = np.delete(data, np.s_[:dropped_samples], axis=1)
            
            if counter == 0: switch == False
            else: counter = counter-1

if __name__ == '__main__':
    main()

looking for an EMG stream...
60
70
60
70
80
91
101
110
110
110
110
110
110
90
99
108
90
99
108
90
79
69
58
48
38
28
18
28
18
8
18
8
-2
-10
10
0
-10
-10
-10
-10
10
20
10
0
10
20
10
0
10
0
-10
-10
-10
-10
-10
-10
10
20
30
20
10
0
-10
-10
10
20
30
40
50
60
50
60
50
40
50
60
70
80
91
101
110
110
110
90
79
69
58
48
38
28
18
28
38
48
58
48
38
28
18
28
18
28
38
48
58
48
58


KeyboardInterrupt: 

2022-07-29 12:19:35.827 (1567.521s) [R_BrainVision   ]      data_receiver.cpp:342    ERR| Stream transmission broke off (Input stream error.); re-connecting...
2022-07-29 12:19:35.827 (1567.521s) [R_BrainVision   ]      data_receiver.cpp:342    ERR| Stream transmission broke off (Input stream error.); re-connecting...
2022-07-29 12:19:35.827 (1567.521s) [R_BrainVision   ]      data_receiver.cpp:342    ERR| Stream transmission broke off (Input stream error.); re-connecting...
