In [1]:
import matplotlib.pyplot as plt
from matplotlib import rcParams
import numpy as np
from scipy import signal
import os
import pandas as pd
from scipy.signal import stft
from math import prod
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from math import e
from sklearn.preprocessing import StandardScaler, MinMaxScaler, MaxAbsScaler, RobustScaler

In [2]:
def butter_bandpass(data, lowcut, highcut, fs=200, order=4):
    nyq = fs * 0.5
    low = lowcut / nyq
    high = highcut / nyq
    b, a = signal.butter(order, [low, high], btype='bandpass')
    return signal.filtfilt(b, a, data)


def butter_lowpass(data, lowcut, fs=200, order=4):
    nyq = fs * 0.5
    low = lowcut / nyq
    b, a = signal.butter(order, low, btype='lowpass')
    return signal.filtfilt(b, a, data)


def butter_highpass(data, highcut, fs=200, order=4):
    nyq = fs * 0.5
    high = highcut / nyq
    b, a = signal.butter(order, high, btype='highpass')
    return signal.filtfilt(b, a, data)


def butter_notch(data, cutoff, var=1, fs=200, order=4):
    nyq = fs * 0.5
    low = (cutoff - var) / nyq
    high = (cutoff + var) / nyq
    b, a = signal.iirfilter(order, [low, high], btype='bandstop', ftype="butter")
    return signal.filtfilt(b, a, data)

In [3]:
def filtros(data):
    data_filtered = butter_notch(data, 60)
    data_filtered = butter_highpass(data_filtered, 5)
    data_filtered = butter_lowpass(data_filtered, 50)
    return data_filtered

## Leitura

In [4]:
dir = './datasets/topicos_cc'
arquivos = os.listdir(dir)
arq_numpy = [f for f in arquivos if f.endswith(".npy") and f.startswith('p2')]
participantes = {}
for i in arq_numpy:
    nome = i.split('_')
    trial = np.load(dir+'/'+i)
    for m in range(0,8):
        if participantes.get(f'participante_{nome[0]}',0) == 0:
            participantes[f'participante_{nome[0]}'] = {}
        if participantes[f'participante_{nome[0]}'].get(f'trial_{nome[1]}',0) == 0:
            participantes[f'participante_{nome[0]}'][f'trial_{nome[1]}'] = {}
        dados = trial[m, :, :].swapaxes(0,1)
        participantes[f'participante_{nome[0]}'][f'trial_{nome[1]}'][f'movimento_{m+1}'] = filtros(dados)

v = list()
for i in arq_numpy:
    trial = np.load(dir+'/'+i)
    v.append(trial[:, :, :].swapaxes(1,2))    

arr = np.vstack((v[0], v[1], v[2]))

print(arr.shape)

(24, 4, 1600)


## Separação de dominios

In [5]:
def janela(overl=64):


    melhores_step = []
    melhor_win = []
    for i in range(1, 128):
        step = i
        segment = 128
        data = arr
        
        _, _, chunks_freq = stft(data, fs=200, nperseg=128, noverlap=overl)
        chunks_freq = np.swapaxes(chunks_freq, 2, 3)
        window = chunks_freq.shape[2] 

        n_win = int((data.shape[-1] - segment) / step) + 1
        ids = np.arange(n_win) * int(step)

        # Janelas do dado no dominio do tempo
        chunks_time = np.array([data[:,:,k:(k + segment)] for k in ids]).transpose(1, 2, 0, 3)
        time_window = chunks_time.shape[2]
            
        if( time_window >= window -3 and time_window <= window +3):
            melhores_step.append(step)
            melhor_win.append(time_window)
        
    return melhores_step, melhor_win, window
# step = janela(overl=64)
# print(step)

In [6]:
all_steps = []
for i in [0.5, 0.7, 0.8, 0.9 ]:
    n_step = int(128*i)
    print(n_step)
    step, win, orig = janela(overl=n_step)
    step = np.array(step)
    # print(step)
    # print(win)
    # print(orig)
    # print(np.abs(orig-np.array(win)))
    # print(step[np.abs(orig-np.array(win)).argmin()])
    all_steps.append(step[np.abs(orig-np.array(win)).argmin()])
    # print()
    
    
all_steps

64
89
102
115


[57, 35, 24, 12]

In [7]:
def janela(overl=64):


    
    for i in np.arange(1, 128, 0.1):
        step = i
        segment = 128
        data = arr
        
        _, _, chunks_freq = stft(data, fs=200, nperseg=128, noverlap=overl)
        chunks_freq = np.swapaxes(chunks_freq, 2, 3)
        window = chunks_freq.shape[2] 

        n_win = int((data.shape[-1] - segment) / step) + 1
        ids = np.arange(n_win) * int(step)

        # Janelas do dado no dominio do tempo
        chunks_time = np.array([data[:,:,k:(k + segment)] for k in ids]).transpose(1, 2, 0, 3)
        time_window = chunks_time.shape[2]
            
        if( time_window == window ):
            return step

In [8]:
all_steps = []
for i in [0.5, 0.7, 0.8, 0.9 ]:
    n_step = int(128*i)
    print(n_step)
    step = janela(overl=n_step)
    all_steps.append(step.round(2))
    
all_steps

64
89
102
115


[56.7, 34.3, 23.4, 11.8]

In [9]:
# [57, 35, 24, 12]

In [10]:
step = 11.8
segment = 128
data = arr
# .get_data()
print('', data.shape)

n_win = int((data.shape[-1] - segment) / step) + 1
ids = np.arange(n_win) * int(step)

# Janelas do dado no dominio do tempo
chunks_time = np.array([data[:,:,k:(k + segment)] for k in ids]).transpose(1, 2, 0, 3)

# Janelas do dado no domínio da frequência
_, _, chunks_freq = stft(data, fs=200, nperseg=128, noverlap=115)
chunks_freq = np.swapaxes(chunks_freq, 2, 3)

print('Formato (shape) dos dados depois da divisão de janelas')
print(f'Dominio do tempo: {chunks_time.shape} - (classes, ensaios, canais, janelas, linhas)')
print(f'Dominio da frequência:  {chunks_freq.shape} - (classes, ensaios, canais, janelas, linhas)')

 (24, 4, 1600)
Formato (shape) dos dados depois da divisão de janelas
Dominio do tempo: (24, 4, 125, 128) - (classes, ensaios, canais, janelas, linhas)
Dominio da frequência:  (24, 4, 125, 65) - (classes, ensaios, canais, janelas, linhas)


## Criação de features

In [11]:
# funções auxiliares
def PSD(w):
    ''' definição da função PSD para o sinal no domínio da frequência '''
    return np.abs(w) ** 2


# funções de extração de características

def var(x):
    return np.sum(x ** 2, axis=-1) / (np.prod(x.shape[:-1]) - 1)

def rms(x):
    return np.sqrt(np.sum(np.abs(x) ** 2, axis=-1) / (np.prod(x.shape[:-1])))

def wamp(x):
    limiar = np.abs(np.diff(x))
    return np.sum(limiar > 0.0001, axis=-1)

def wl(x):
    return np.sum(np.abs(np.diff(x)), axis=-1)

# def zc(x):
#     trs = 0.0001
 
#     f = [1 if i*j <= 0 else 0 for i,j in zip(x[:,:,:,:-1], x[:,:,:,1:])]
    
#     return np.sum(f)


def getzc(data, th):
    t = len(data)
    soma = 0
    for i in range(t-1):
        res = (data[i]*data[i+1])
        res2 = np.abs(data[i]-data[i+1])
        if (res<0 and res2 > th):
            soma +=1
    return soma

def zc(data):
    f=[]
    x,y,z = data.shape[:3]
    for i in range(x):
        l = []
        for j in range(y):
            li = []
            for k in range(z):
                li.append(getzc(data[i][j][k], 0.0001))
            l.append(li.copy())
        f.append(l.copy())

    return np.array(f)

def fmd(w):
    return np.sum(PSD(w), axis=-1) / 2

def mmdf(w):
    return np.sum(np.abs(w), axis=-1) / 2


def fmn(w):
    sample_rate = 200
    f = (w * sample_rate)/(2*len(w))
    return np.sum(np.abs(f*PSD(w)), axis=-1)/np.sum(PSD(w), axis=-1)

def mmnf(w):
    sample_rate = 200
    f = (w * sample_rate)/(2*len(w))
    return np.sum(np.abs(f*np.abs(w)), axis=-1)/np.sum(np.abs(w), axis=-1)


def logDec(data):
    N = np.prod(data.shape)
    return e ** (np.sum(np.log10(np.abs(data)), axis=-1))/N


def iemg(x):
    return np.sum(np.abs(x), axis=-1)

def dasdv(x):
    return np.sqrt(np.sum(np.diff(x)**2, axis=-1)/(np.prod(x.shape[:-1]) - 1))

def tm(x,n):
    return np.abs(np.sum(x**n , axis=-1)/np.prod(x.shape[:-1]))

In [12]:
final_data = list()
final_data.append(var(chunks_time))
final_data.append(rms(chunks_time))
final_data.append(fmd(chunks_freq))
final_data.append(mmdf(chunks_freq))
final_data.append(logDec(chunks_time))
final_data.append(wamp(chunks_time))
final_data.append(wl(chunks_time))
final_data.append(zc(chunks_time))
final_data.append(fmn(chunks_freq))
final_data.append(mmnf(chunks_freq))
final_data.append(iemg(chunks_time))
final_data.append(dasdv(chunks_time))
print(iemg(chunks_time).shape)
for n in range(3,6):
    final_data.append(tm(chunks_time, n))



f, Pxx_den = signal.welch(data, fs=200, nperseg=248, noverlap=223)
# print(f.shape)
# print(Pxx_den.shape)
final_data.append(Pxx_den)

final = np.array(final_data)
final.shape



# (24,                  4,      125,    65) - 
# (classes + ensaios, canais, janelas, linhas)

(24, 4, 125)


(16, 24, 4, 125)

In [13]:
248*0.9

223.20000000000002

In [14]:
X = final.transpose(1,3,2,0)
X = X.reshape(X.shape[0]*X.shape[1], X.shape[2]*X.shape[3])
# X = X.reshape(s[0] * s[1], s[2] * s[3])
# sh = data.shape
# 
# X = data.reshape(sh[0], int(sh[1]/3), 3 * sh[2], sh[3])

In [15]:
# X = X.transpose(1, 2, 3, 0)
# print('classes', 'amostras', 'canais', 'características')
# print(X.shape)
# X = X.reshape(X.shape[0]*X.shape[1], X.shape[2]*X.shape[3])
# X.shape

In [16]:
# criação dos rótulos

# 1,1,1,1,1,1,1,1,1,1,...,2,2,2,2,2,2,2,2,2,2,...,3,...
y = [[str(i)] * int(X.shape[0] / 8) for i in range(8)]
y = np.array(y).flatten()
print('Shape dos rótulos:', y.shape)

Shape dos rótulos: (3000,)


### Normalização

In [17]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30,shuffle=True)

In [18]:
scaler = StandardScaler()
# scaler = MinMaxScaler()
# scaler = MaxAbsScaler()
# scaler = RobustScaler()

In [19]:
scaler.fit(X_train)
# scaler.fit(X)


# print(type(X_train))
# print(type(scaler.transform(X_train)))

X_train = scaler.transform(X_train)
# X = scaler.transform(X)
X_test = scaler.transform(X_test)

  temp **= 2
  new_unnormalized_variance -= correction ** 2 / new_sample_count
  new_unnormalized_variance -= correction ** 2 / new_sample_count
  upper_bound = n_samples * eps * var + (n_samples * mean * eps) ** 2


In [20]:
X_train = np.nan_to_num(X_train)
X_test = np.nan_to_num(X_test)

In [21]:
X_train.shape

(2100, 64)

In [22]:
def executar_svm(X_train, X_test, y_train, y_test):
    # modelo de classificador com os parâmetros padrões
    clf = SVC(gamma='scale')

    # criando o modelo de classificação com os dados de treino
    clf.fit(X_train, y_train)

    # aplicando o classificador nos dados de teste
    res = clf.predict(X_test)

    # obtendo e ajustando os resultados 
    tot_hit = sum([1 for i in range(len(res)) if res[i] == y_test[i]])
    print('Acurácia: {:.2f}%'.format(tot_hit / X_test.shape[0] * 100))

In [23]:
executar_svm(X_train, X_test, y_train, y_test)

Acurácia: 72.22%


In [24]:
# acuracias - standardscaler
# step
# 57: 48.40
# 35: 57.42
# 24: 60.40
# 12: 67.04

In [25]:
# acuracias - MinMaxScaler
# 57 43.09
# 35 47.10
# 24 59.96
# 12 66.59

In [26]:
# acuracias - MaxAbsScaler
# 57 47.34
# 35 57.10
# 24 59.73
# 12 64.22

In [27]:
# acuracias - RobustScaler
# 57 9.57
# 35 10.32
# 24 10.07
# 12 11.06

In [28]:
# logdetector, wl, iemg
# https://biomedical-engineering-online.biomedcentral.com/track/pdf/10.1186/1475-925X-11-55.pdf
# https://www.biopac.com/application/emg-electromyography/advanced-feature/automated-emg-analysis/#tabs

## Combinations

In [29]:
def do_svm(X,y):

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25)
    
    clf = SVC()
    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)
    
    acc = metrics.accuracy_score(y_test, y_pred)
    
    return acc

In [30]:
from itertools import combinations

best_acc = 0
best_comb = 0
best_k = 0
# for comb in range(1,12):
#     for res in combinations(range(9),comb):
#         for ks in range(1,41):
#             X_new = SelectKBest(k=ks).fit_transform(X, y)
#             acc = do_svm(X_new.take(res, axis=1), y)

#             if acc > best_acc:
#                 best_acc = acc
#                 best_comb = res
#                 best_k = ks

# print(f"Melhor Acurácia: {best_acc}, Melhor Combinação: {best_comb}, Melhor K: {best_k}")

In [31]:
# modificar janelamento, para ver qual janelamento eh melhor (50%, 70%, 80%, 90%)
# welch scipy
# minmax, maxabs, robust  -- scaler

### welch

In [32]:
# f, Pxx_den = signal.welch(x, fs, nperseg=1024)