In [None]:
import os
import time
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import scipy.io as sio 
from scipy.fftpack import fft, fftfreq
from scipy import signal
from sklearn.metrics import r2_score
from sklearn import decomposition, svm, neighbors
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split

### Reading the data (after preprocessing) of the specific subject.
def readfiles(s, fd=50,source='hdEMG', ges_num=12):
* s: the index number of subject. e.g. s=1， then the data of S01 is returned by this function
* fd: the data can be down-sampled. e.g. fd = 50, then the data was down-sampled into 50Hz
* source: Input "hdEMG", "glove", "myo" or "imu" to select the interested data
* ges_num: by default is 12, you can also input 4 to extract the data of G01~G04
* return value contains the label according the index number of gesture(1~12)

def filtfiltEnvelope(raw,s,ges,fs=2048):  
* Step 1: Get the absolute value of the input value "raw"
* Step 2: zero-phase 3rd order low-pass Butterworth filter at 0.5Hz

def outlierDector(raw, s, ges, fs, interval = 180, plot = False,channel_num = 65):
* Step 1: Screen out the input value "raw" that outside μ±3σ
* Step 2: Repeately run Step 1 until Outlier Ratio < 1% and Maxium Peak < 2 mv

In [None]:
def readfiles(s, fd=50,source='hdEMG', ges_num=12):  
    new_mainDir = "../"
    addr = os.path.join(new_mainDir, "S" + str(s).zfill(2))
    fs = 2048  # Sampling Frequency of Data
    start = 0
    interval = 180  # Data Length of one gesture (seconds): 18 trials x 10 seconds
    x = np.linspace(start * fs, (start + interval) * fs, interval * fd, endpoint=False, dtype=int) 
    
    for i, ges in enumerate(range(1,ges_num+1)):
        rawdata = sio.loadmat(os.path.join(addr, "G" + str(ges).zfill(2) + '.mat'))
        if source == 'hdEMG':
            data,_ = filtfiltEnvelope(rawdata['Data'][:,0:65],s,ges) 
        elif source == 'glove':
            data = rawdata['Data'][:,65:80]
        elif source == 'myo':
            data,_ = filtfiltEnvelope(rawdata['Data'][:,80:88] * 0.04,s,ges) 
        elif source == 'imu':
            data = rawdata['Data'][:,88:]
        data = data[x,:]  # down-sampled to fd
        if i == 0:
            data_x = data
            data_y = np.ones((data.shape[0], 1)) * ges
        else:
            data_x = np.vstack((data_x, data))
            data_y = np.vstack((data_y, np.ones((data.shape[0], 1)) * ges))
    return data_x,data_y.reshape((-1,))

def filtfiltEnvelope(raw,s,ges,fs=2048):    
    '''Outlier Detector with 50Hz Norch & 3 delta Principle'''
    raw = outlierDector(raw,s,ges,fs)
    '''Lowpass Filter'''
    b, a = signal.butter(N=4, Wn=0.5, btype='lowpass', fs=fs)
    envelopEMG =  signal.filtfilt(b, a, np.abs(raw), axis=0) 
    # envelopEMG[envelopEMG < 0] = 0  # data may have negative value because of the low-pass filter
    return envelopEMG, np.abs(raw)

def outlierDector(raw, s, ges, fs, interval = 180, plot = False,channel_num = 65):
    '''50Hz Norch'''
    T = 1.0 / 2048.0
    stopBand = [50.5, 52]
    b, a = signal.butter(N=3, Wn=[2*stopBand[0] * T, 2*stopBand[1]* T], btype='bandstop', analog=False)
    raw =  signal.filtfilt(b, a, raw, axis=0)
    
    idxMax = [np.zeros((int(interval * fs * 0.02),)),np.zeros((int(interval * fs * 0.02),))]
    idxMin = [np.zeros((int(interval * fs * 0.02),)),np.zeros((int(interval * fs * 0.02),))]
    iter_num = 0
    while True: # Until Outlier Ratio < 1% and Maxium Peak < 2 mv
        if (len(idxMin[1]) + len(idxMax[1])) < interval * fs * 0.01 and np.max(abs(raw)) < 2:
            break
        elif (len(idxMin[1]) + len(idxMax[1])) == 0:
            break

        iter_num += 1
        if iter_num % 10 ==0:
            print("epoch: %s, Outlier Ratio: %.2f%%, Maxium Peak: %.2f mV" % 
                  (iter_num ,(len(idxMin[1]) + len(idxMax[1])) / (interval * fs * 0.01), np.max(abs(raw))))
        # plt.figure(1)
        # plt.plot(raw[:,20],alpha=1)  # np.abs(rawHDsEMG[x,i])
        # plt.show()

        emgMax = np.mean(raw,axis=1) + 3 * np.std(raw,axis=1)
        emgMin = np.mean(raw,axis=1) - 3 * np.std(raw,axis=1)

        # print(emgMean.shape,emgMax.shape,emgMin.shape)
        idxMax = np.where(raw.T > emgMax) # find outlier above the Maximum
        idxMin = np.where(raw.T < emgMin) # find outlier below the Minimum

        raw.T[idxMax]=np.nan
        raw.T[idxMin]=np.nan
        emgMean = np.nanmean(raw.T,axis=0)  # calculate the mean value after filtering the outliers
        raw.T[idxMax] = emgMean[idxMax[1]]
        raw.T[idxMin] = emgMean[idxMin[1]]
        # plt.figure(3)
        # plt.plot(raw[:,20],alpha=1)  # np.abs(rawHDsEMG[x,i])
        # plt.show()
    if plot:
        %matplotlib
        fig = plt.figure(2)
        plt.plot(emgMax,color='grey',linestyle=":")  # plot the boder
        plt.plot(emgMin,color='grey',linestyle=":")

        # draw the raw data
        for i in range(channel_num):
            plt.plot(raw[:,i],alpha=0.1) 
        # draw the outliers
        plt.scatter(idxMax[1],raw.T[idxMax],color='blue')
        plt.scatter(idxMin[1],raw.T[idxMin],color='red')
        plt.show()
    print("Subject No. %s, Gesture No. %s, epoch: %s, Outlier Ratio: %.2f%%, Maxium Peak: %.2f mV" % 
          (s, ges, iter_num ,(len(idxMin[1]) + len(idxMax[1])) / (interval * fs * 0.01), np.max(abs(raw))))
    return raw



### Data Segment
We randomly select 13 out of 18 trials of data (72.2%) from each gestures as the training set, and set the remaining 5 trials (27.8%) as the test set. This dataset split method will prevent the model from overfitting, because the samples from the same trial has a strong correlation. A totally shuffled dataset across the 18 trials may cause the data leaking from the training set to the test set. 
* x: the input data 
* y: the output(label) data

In [None]:
def dataset_split(x,y,train_size=0.7):
    fd = 50
    Intervals = 10  
    TrialNum = 18
    GesNum = 12

    perTrial = fd * Intervals # 50 * 3
    perGesture = perTrial * TrialNum # 50 * 3 * 18
    allDatNum = perGesture * GesNum

    X_train = np.zeros((int(TrialNum * train_size) * perTrial * GesNum, x.shape[1]))
    X_test  = np.zeros(((TrialNum - int(TrialNum * train_size)) * perTrial * GesNum, x.shape[1])) 
    y_train = np.zeros((int(TrialNum * 0.7) * perTrial * GesNum,))
    y_test  = np.zeros(((TrialNum - int(TrialNum * train_size)) * perTrial * GesNum,))

    for i in range(GesNum):
        X_train_idx, X_test_idx = train_test_split(range(TrialNum), test_size=1-train_size)
        for n, j in enumerate(X_train_idx):
            '''the index position of raw data'''
            rawStart =       j * perTrial + i * perGesture
            rawStop  = (j + 1) * perTrial + i * perGesture
            '''the index position of dataset'''
            setStart =       n * perTrial + i * (int(TrialNum * train_size)) * perTrial
            setStop  = (n + 1) * perTrial + i * (int(TrialNum * train_size)) * perTrial
            X_train[setStart:setStop, :] = x[rawStart:rawStop, :]
            y_train[setStart:setStop, ] = y[rawStart:rawStop, ]
        for n, j in enumerate(X_test_idx):
            '''the index position of raw data'''
            rawStart =       j * perTrial + i * perGesture
            rawStop  = (j + 1) * perTrial + i * perGesture
            '''the index position of dataset'''
            setStart =       n * perTrial + i * (TrialNum - int(TrialNum * train_size)) * perTrial
            setStop  = (n + 1) * perTrial + i * (TrialNum - int(TrialNum * train_size)) * perTrial
            X_test[setStart:setStop,:] = x[rawStart:rawStop,:]
            y_test[setStart:setStop,] = y[rawStart:rawStop,]
            
    return X_train, X_test, y_train, y_test
