In [3]:
# Importing Libraries 

import numpy as np
import statistics as st
import struct
import math as mt
import os
from mat4py import loadmat
import scipy.io
import scipy.interpolate
import matplotlib.pyplot as plt 
import scipy.signal
import sys


from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn import svm
from sklearn.neural_network import MLPClassifier

from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import StratifiedKFold
from sklearn.utils import shuffle

from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import Normalizer
from sklearn.preprocessing import normalize
from sklearn.decomposition import PCA

import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense
from keras.utils import to_categorical

import datetime
import time
import select
import socket
import binascii
from numpy import linspace
import winsound

In [None]:
# Class Definition

class ClassifyFingers():
    def __init__(self):
        self.y_hat     = 0
        self.y_subject = []
        self.X_subject = []
        self.model     = None
        self.pca       = None
        self.scaling   = None         
    def My_Model(self, X,y,type_model):
        try:
            # Selection of the model
            
            if  (type_model == 'MLP'):
                ''' Multi Layer Perceptron '''
                model = MLPClassifier(hidden_layer_sizes=(100,100,100,6),
                                max_iter=300,activation = 'tanh',
                                solver='adam', 
                                warm_start=True,
                                shuffle=True,
                                early_stopping=True,)
            elif(type_model == 'SVM'):
                ''' Support Vector Machine '''
                model = svm.SVC(kernel='rbf', C=1, decision_function_shape='ovo')
            elif(type_model == 'LDA'):
                ''' Linear Discriminant Analysis '''
                model = LinearDiscriminantAnalysis()
            elif(type_model == 'NN'):
                ''' Neural Network '''
                input_shape = (X.shape[1],)
                kfold = StratifiedKFold(n_splits=5, shuffle=True) # 5-fold
                loss_per_fold = []
                accuracy_per_fold = []
                n_classes = len(np.unique(y))
                
                y_categorical = to_categorical(y-1, n_classes)
                
                for train_index, test_index in kfold.split(X, y):
                    X_train, X_test = X[train_index], X[test_index]
                    y_train, y_test = y_categorical[train_index], y_categorical[test_index] 
                    
                    # NN Arquitecture
                    model = Sequential()
                    model.add(keras.layers.InputLayer(input_shape=input_shape))
                    model.add(Dense(100, activation='relu'))
                    model.add(Dense(70, activation='relu'))
                    model.add(Dense(50, activation='relu'))
                    model.add(Dense(6, activation='softmax'))
                    
                    # Compile model and train
                    model.compile(loss='mean_squared_error', optimizer='adam', metrics=['accuracy'])
                    history = model.fit(X_train, y_train, epochs=120, batch_size=30)
                    
                    # Metrics
                    scores = model.evaluate(X_test, y_test)
                    loss_per_fold.append(scores[0])
                    accuracy_per_fold.append(scores[1])
                
                print('NN Average Accuracy: %.3f (%.3f)' % (np.average(accuracy_per_fold), np.average(loss_per_fold)))
                return model
            
            ######################################################################
            if(type_model != 'NN'):
                print('Starting K-Fold Cross Validation...')
                # Create StratifiedKFold object.
                k = StratifiedKFold(n_splits=5, shuffle=True, random_state=1)
                lst_accu_stratified = []
                model_kfold_cv = model
                for train_index, test_index in k.split(X, y):
                    x_train_fold, x_test_fold = X[train_index], X[test_index]
                    y_train_fold, y_test_fold = y[train_index], y[test_index]
                    model_kfold_cv.fit(x_train_fold, y_train_fold)
                    lst_accu_stratified.append(model_kfold_cv.score(x_test_fold, y_test_fold))
            #######################################################################

                print(type_model, 'Mean Accuracy: %.3f (%.3f)' % (np.mean(lst_accu_stratified), np.std(lst_accu_stratified)))
            
            # Train Model
            model.fit(X,y)
            return model
        except:
            print(type_model, 'Not Found')
            exit()
    def ApplyBaseLine(self, raw_data):
        return raw_data - np.mean(raw_data, axis=0)
    def FeatureExtraction(self, data):
        x = np.zeros(40)
        N = len(data)
        
        for i in range(0,len(x)):
            ch1_features = []
            ch2_features = []
            ch3_features = []
            ch4_features = []
            for j in range(0,4):
                # MEMV
                #if(i == 0):
                #    analytical_signal = hilbert(data[:,j])
                #    amplitude_envelope = np.abs(analytical_signal)
                #    
                #    memv = max(amplitude_envelope)
                #    x[i+j*10] = memv
                
                # RMS
                if(i == 0):
                    rms = np.sqrt(np.mean(data[:,j]**2))
                    #print(rms)
                    x[i+j*10]  = rms
                    #print("RMS DONE")
                
                # STD
                if(i == 1):
                    std = np.std(data[:,j])
                    x[i+j*10]  = std
                    #print("STD DONE")
                
                # VAR
                if(i == 2):
                    var = np.var(data[:,j])
                    x[i+j*10]  = var
                    #print("VAR DONE")
                
                # MAV
                if(i == 3):
                    mav = np.max(np.abs(data[:,j]))
                    #print("MAV DONE")
                    x[i+j*10]  = mav
                
                # AUC
                #if(i == 5):
                #    #auc = np.trapz(data[:,j],dx=0.0001)
                #    auc = np.sum( amplitude_envelope[0:N-1][j] + amplitude_envelope[1:N][j])/2
                #    x[i+j*10]  = auc
                #    #print("AUC DONE")
                
                # IEMG
                if(i == 4):
                    iemg = np.sum(np.abs(data[:,j]))
                    x[i+j*10]  = iemg
                    #print("IEMG DONE")
                
                # LogD
                if(i == 5):
                  #  N = len(data[:,j])
                    y = 0
                    
                    for k in range(0,N):
                        y += np.log(np.abs(data[k,j]))
                        
                    logd = np.exp(y/N)
                    x[i+j*10]  = logd
                    #print("LOGD DONE")
                
                # ZCR
                if(i == 6):
                    #N = len(data[:,j])
                    zcr = 0
                    fx = data
                    thres = 0.01
                    for k in range(0,N-1):
                        if ((fx[k,j] > 0  and  fx[k+1,j] < 0))  or  ((fx[k,j]) < 0  and  fx[k+1,j] > 0) and (abs(fx[k,j] - fx[k+1,j]) >= thres):
                            zcr += 1
                    x[i+j*10]  = zcr
                    #print("ZCR DONE")
                
                # WL
                if(i == 7):
                   # N = len(data[:,j])
                    wl = 0
                    for k in range(1,N):
                        wl = wl + np.abs(data[k,j] - data[k-1,j])
                    x[i+j*10]  = wl
                    #print("WL DONE")
                
                # SSI
                if(i == 8):
                    ssi = np.sum(data[:,j]**2)
                    x[i+j*10]  = ssi
                    #print("SSI DONE")
                
                # WEN
                if(i == 9):
                    eps = np.finfo(float).eps
                    fx = data[:,j]**2
                    wen = -np.sum(fx*np.log(fx+eps))
                    x[i+j*10]  = wen
                    #print("WEN DONE")
                
        #return np.array(x,dtype=np.ndarray)
        return x
    def Connect_HOH(self):
        try:
            import Online_Test_HoH as HoH
            self.hoh1 = HoH()
            self.hoh1.connect()    
        except:
            print("No HoH Controller")
    def main(self, x_file, y_file):
        self.X_subject = x_file
        self.y_subject = y_file
        accuracies = []
        
        X = self.X_subject
        Y = self.y_subject

        data_matrix = np.zeros((X.shape[2],40))                 
        for i_2 in range(0,X.shape[2]):                         
            x_data = X[:,:,i_2]
            x_data = x_data - np.mean(x_data, axis=0)     
            data = x_data
            x = np.zeros(40)
            N = len(data)
            
            for i in range(0,len(x)):
                for j in range(0,4):
                    # RMS
                    if(i == 0):
                        rms = np.sqrt(np.mean(data[:,j]**2))
                        #print(rms)
                        x[i+j*10]  = rms
                        #print("RMS DONE")
                    
                    # STD
                    if(i == 1):
                        std = np.std(data[:,j])
                        x[i+j*10]  = std
                        #print("STD DONE")
                    
                    # VAR
                    if(i == 2):
                        var = np.var(data[:,j])
                        x[i+j*10]  = var
                        #print("VAR DONE")
                    
                    # MAV
                    if(i == 3):
                        mav = np.max(np.abs(data[:,j]))
                        #print("MAV DONE")
                        x[i+j*10]  = mav
                    
                    # AUC
                    #if(i == 5):
                    #    #auc = np.trapz(data[:,j],dx=0.0001)
                    #    auc = np.sum( amplitude_envelope[0:N-1][j] + amplitude_envelope[1:N][j])/2
                    #    x[i+j*10]  = auc
                    #    #print("AUC DONE")
                    
                    # IEMG
                    if(i == 4):
                        iemg = np.sum(np.abs(data[:,j]))
                        x[i+j*10]  = iemg
                        #print("IEMG DONE")
                    
                    # LogD
                    if(i == 5):
                    #  N = len(data[:,j])
                        y = 0
                        
                        for k in range(0,N):
                            y += np.log(np.abs(data[k,j]))
                            
                        logd = np.exp(y/N)
                        x[i+j*10]  = logd
                        #print("LOGD DONE")
                    
                    # ZCR
                    if(i == 6):
                        #N = len(data[:,j])
                        zcr = 0
                        fx = data
                        thres = 0.01
                        for k in range(0,N-1):
                            if ((fx[k,j] > 0  and  fx[k+1,j] < 0))  or  ((fx[k,j]) < 0  and  fx[k+1,j] > 0) and (abs(fx[k,j] - fx[k+1,j]) >= thres):
                                zcr += 1
                        x[i+j*10]  = zcr
                        #print("ZCR DONE")
                    
                    # WL
                    if(i == 7):
                    # N = len(data[:,j])
                        wl = 0
                        for k in range(1,N):
                            wl = wl + np.abs(data[k,j] - data[k-1,j])
                        x[i+j*10]  = wl
                    #print("WL DONE")
                
                     # SSI
                    if(i == 8):
                        ssi = np.sum(data[:,j]**2)
                        x[i+j*10]  = ssi
                        #print("SSI DONE")
                
                    # WEN
                    if(i == 9):
                        eps = np.finfo(float).eps
                        fx = data[:,j]**2
                        wen = -np.sum(fx*np.log(fx+eps))
                        x[i+j*10]  = wen
                        #print("WEN DONE")

            for j_2 in range(0,40):
                data_matrix[i_2][j_2] = x[j_2]

        # Feature Selection with PCA
        self.pca = PCA(n_components=36)
        self.pca.fit(data_matrix)
        X = self.pca.transform(data_matrix)
        
        # Normalization
        self.scaling = StandardScaler(with_mean=True, with_std=True)
        self.scaling.fit(X)
        X = self.scaling.transform(X)
        LDA_acc   = 0
        SVM_acc   = 0
        NN_acc    = 0
        MLP_acc   = 0 
        self.model = self.My_Model(X,Y, "SVM")
    def My_model_predict(self,model, X_test,y_test,type_model):
        predictions = []
        # Make Predictions
        print("Starting Predictions...")
        test_acc = 0
        for i in range(0,len(y_test)):
            X = X_test[i,:]
            X= np.reshape(X, (1, -1))
            X = self.pca.transform(X)
            X = self.scaling.transform(X)
            
            if type_model != 'NN':
                y_hat = model.predict(X)
                predictions.append(y_hat)
                if y_hat == y_test[i]:
                    test_acc += 1
            else:
                y_hat = model.predict(X)
                y_hat = np.argmax(y_hat, axis=1) + 1
                predictions.append(y_hat[0])
                if y_hat == y_test[i]:
                    test_acc += 1
                    
        score = test_acc/len(y_test)*100
        print("Test Accuracy: ", score)
        return score, predictions
    def plot_confusion_matrix(self, y_true, y_pred, classes,
                                normalize=False,
                                title=None,
                                cmap=plt.cm.Blues):

            if not title:
                if normalize:
                    title = 'Normalized confusion matrix'

            cm = confusion_matrix(y_true, y_pred)

            if normalize:
                cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
                #print("Normalized confusion matrix")

            #print(cm)

            figure, axis = plt.subplots()
            im = axis.imshow(cm, interpolation='nearest', cmap=cmap)
            axis.figure.colorbar(im, ax=axis)

            axis.set(xticks=np.arange(cm.shape[1]),
                yticks=np.arange(cm.shape[0]),

                xticklabels=classes, yticklabels=classes,
                title=title,
                ylabel='True label',
                xlabel='Predicted label')


            plt.setp(axis.get_xticklabels(), rotation=45, ha="right",
                    rotation_mode="anchor")

            fmt = '.2f' if normalize else 'd'
            thresh = cm.max() / 2.
            for i in range(cm.shape[0]):
                for j in range(cm.shape[1]):
                    axis.text(j, i, format(cm[i, j], fmt),
                            ha="center", va="center",
                            color="cyan" if cm[i, j] > thresh else "red")
            figure.tight_layout()
            return axis

In [None]:
### Offline (50% Training 50% Testing) ####

In [None]:
# Creating the 50% Testing and Testing arrays

X_file = r'C:\Users\victo\Downloads\figa3.mat'
y_file = r'C:\Users\victo\Downloads\figa3_y.mat'

X_file = scipy.io.loadmat(X_file)
y_file = scipy.io.loadmat(y_file)

X = np.array(X_file['info'])
y = np.array(y_file['y'])
# take 20 samples every 40 samples
X_train = []
X_test = []
y_train = []
y_test = []
for i in range(0, len(y), 50):
    X_train.append(X[:,:,i:i+25])
    X_test.append(X[:,:,i+25:i+50])
    y_train.append(y[i:i+25])
    y_test.append(y[i+25:i+50])
    
X_train = np.array(X_train)
X_test = np.array(X_test)
y_test = np.array(y_test)
y_train = np.array(y_train)

# reshape to 2600 x 4 x 150
X_train = np.reshape(X_train, (2624, 4, 150))
X_test  = np.reshape(X_train, (2624, 4, 150))
y_train = np.reshape(y_train, (150, 1))
y_train = y_train.ravel()
y_test  = np.reshape(y_test, (150, 1))

In [None]:
# Training the Model

DA = ClassifyFingers()
DA.main(X_train, y_train)

In [None]:
# Offline Testing
predictions = []
start_time = time.time()
accuracy = 0
accuracy_nomoda = 0
pseudo_data = X_test

y_true = np.zeros(y_test.shape[0])              
y_pred = np.zeros(y_test.shape[0])  


for i in range(0,X_test.shape[2]):              
    samples = pseudo_data[:,:,i]
    count = 0

    samples = DA.ApplyBaseLine(samples)
    samples = DA.FeatureExtraction(samples)
    samples = samples.reshape(1,-1)

    samples = DA.pca.transform(samples)
    samples = DA.scaling.transform(samples)
    y_hat = DA.model.predict(samples)
    y_pred[i] = y_hat
    y_true[i] = DA.y_subject[i]

    if y_pred[i] == y_true[i]:
        accuracy_nomoda += 1
    print("Accuracy No Moda: ", accuracy_nomoda)
    print("Predicted:        ", y_pred[i])
    print("True Value:       ", y_true[i])

    if len(predictions) < 10:                     # Take 40 samples
        predictions.append(int(y_hat[0]))
        print("Predictions list: ", predictions)
        print(len(predictions))                   # Check the length of the buffer

    if len(predictions) == 10:                    # If the buffer is full

        moda = st.mode(predictions)
        count_moda = predictions.count(moda)      # Count the number of times the mode appears in the buffer

        if count_moda >= 5:
            y_hat = moda
            print("Predicted Class: ", y_hat)
            print("Predictions list: ", predictions)
            print("Actual: ", DA.y_subject[i])
            print("Time: ", time.time() - start_time)

            if(y_hat == 1 and DA.y_subject[i] == 1):
                #print("Move Thumb")
                accuracy += 1
            elif(y_hat == 2 and DA.y_subject[i] == 2):
                #print("Move Index")
                accuracy += 1
            elif(y_hat == 3 and DA.y_subject[i] == 3):
                #print("Move Middle")
                accuracy += 1
            elif(y_hat == 4 and DA.y_subject[i] == 4):
                #print("Move Ring")
                accuracy += 1
            elif(y_hat == 5 and DA.y_subject[i] == 5):
                #print("Move Little")
                accuracy += 1
            elif(y_hat == 6 and DA.y_subject[i] == 6):
                #print("Move Hand")
                accuracy += 1
            
            print("Accuracy: ",accuracy)
        
        predictions.pop(0)


In [2]:
# Confusion Matrix

title = 'Offline Testing'
DA.plot_confusion_matrix(y_true,y_pred,classes=np.array(['1','2','3','4','5','6']), normalize = False, title = title )

NameError: name 'DA' is not defined

In [None]:
### Psuedo Online ###

In [None]:
# Pseudo Online Testing
predictions = []
start_time = time.time()
accuracy = 0
idx = 0
idx2 = 2624
counter = 0
trial = 0
thumb = 0
index = 0
middle = 0
ring = 0
pinky = 0
hand = 0


# Read Raw Data
RawData = r'C:\Users\victo\Downloads\figa4trainraw.mat'
RawData = scipy.io.loadmat(RawData)
RawData = np.array(RawData['raw_data'])

# Read Mark Data
marks = r'C:\Users\victo\Downloads\figa4trainmarks.mat'
marks = scipy.io.loadmat(marks)
marks = marks['marks']

# Finding Peaks
peaks = scipy.signal.find_peaks(
    marks[:,0],
    height=(101,106)
    )

# Get Movement and Peak Locations
location = peaks[0]
location = np.array(location)
mov = marks[location,0]
mov = np.array(mov)

# Declaration for Confusion Matrix
y_true = []    # True value     
y_pred = []    # Predicted value 


while (idx < len(RawData)-1230):             # Changed from: 3678 -> 1230      
    if(idx > location[0]):
        for i in range(0,4):
            data = RawData[idx:idx2]
            data = DA.ApplyBaseLine(data)
            data = DA.FeatureExtraction(data)
            data = data.reshape(1,-1)

            data = DA.pca.transform(data)
            data = DA.scaling.transform(data)
            y_hat = DA.model.predict(data)
            y_pred.append(y_hat)
            y_true.append(mov[0]-100)

            if(y_hat == (mov[0]-100)):
                #print("Move Thumb")
                thumb +=1
                accuracy += 1
            elif(y_hat == (mov[0]-100)):
                #print("Move Index")
                index += 1
                accuracy += 1
            elif(y_hat == (mov[0]-100)):
                #print("Move Middle")
                middle += 1
                accuracy += 1
            elif(y_hat == (mov[0]-100)):
                #print("Move Ring")
                ring += 1 
                accuracy += 1
            elif(y_hat == (mov[0]-100)):
                #print("Move Little")
                pinky += 1
                accuracy += 1
            elif(y_hat == (mov[0]-100)):
                #print("Move Hand")
                hand += 1
                accuracy += 1
            
            idx  += 655
            idx2 += 655
        print("########################")
        print("Prediction", y_pred[0])
        print("Actual:   ", mov[0]-100)
        print("Accuracy: ", accuracy)
        mov = np.delete(mov,0)
        location = np.delete(location,0)
        trial +=1
    else:
        idx += 655
        idx2 += 655
    counter += 1


In [None]:
### Online Reading ###

In [None]:
# Online Reading (Using gTech)

if __name__ == "__main__":
    # Read Subjects Data
    X_file = r'C:\Users\victo\Downloads\Sxx_2\Sxx\S01X_1.mat'
    y_file = r'C:\Users\victo\Downloads\Sxx_2\Sxx\S01y.mat'

    
    # Create Object Classification
    DA = ClassifyFingers()
    DA.main(X_file, y_file)     
    
    UDP_IP = "127.0.0.1"
    UDP_PORT = 8003 # Change UDP Port to selected one
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind((UDP_IP, UDP_PORT))
    sock.settimeout(0.5)
        
    print("Connected UDP")
    model = DA.model

    start_time = time.time()
    buff = np.zeros((0,4))
    predictions = []
    get_data = 1
    n = 3
    samples = []    
    x_axis = []
    y_axis = []
    count = 0
    Multiplier = 0
    while True:
        try:
            
            # print("Getting Data...")
            data, addr = sock.recvfrom(65507)
            pack = str(len(data) // 8) + 'd' 
            data = struct.unpack(pack, data)
            for i in range(0, len(data), 9):
                samples.append(data[i:i+9])                
                count += 1            
        except socket.timeout:
            print("Timeout")

        # Check if buffer is full
        if len(samples) >= 2624:
            count = 0
            samples = np.array(samples)
            samples = np.transpose(samples)
            samples = samples[1::2,:]

            #print("Buffer Full")
            samples = DA.ApplyBaseLine(samples)
            samples = DA.FeatureExtraction(samples)
            samples = samples.reshape(1,-1)

            samples = DA.pca.transform(samples)
            samples = DA.scaling.transform(samples)
            winsound.Beep(1500,500)  
            y_hat = model.predict(samples)
            
            
            if len(predictions) < 10: # Take 40 samples
                samples = []
                predictions.append(int(y_hat[0]))
                print("Predictions list: ", predictions)
            print(len(predictions)) # Check the length of the buffer
            
            if len(predictions) == 10: # If the buffer is full
                    
                    moda = st.mode(predictions)
                    count_moda = predictions.count(moda) # Count the number of times the mode appears in the buffer
                    
                    if count_moda >= 5:
                        y_hat = moda
                        print("Predicted Class: ", y_hat)
                        print("Predictions list: ", predictions)
                        print("Time: ", time.time() - start_time)
                        
                        if(y_hat == 1):
                            print("Move Thumb")
                        elif(y_hat == 2):
                            print("Move Index")
                        elif(y_hat == 3):
                            print("Move Middle")
                        elif(y_hat == 4):
                            print("Move Ring")
                        elif(y_hat == 5):
                            print("Move L   ittle")
                        elif(y_hat == 6):
                            print("Move Hand")
                    
                    predictions.pop(0)
                    