<a href="https://colab.research.google.com/github/mateusrt/tcc/blob/master/preprocessamento.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Mount Google Drive
from google.colab import drive # import drive from google colab

ROOT = "/content/drive"     # default location for the drive
drive.mount(ROOT)           # we mount the google drive at /content/drive

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [2]:
from itertools import tee

import time
import numpy as np

from scipy.io import loadmat
from pywt import dwt, wavedec
from sklearn.decomposition import PCA

In [3]:
class Segmento:
    def __init__(self):
        self.segmento = np.empty(0)

    def __repr__(self):
        return (f'\n{self.__class__.__name__}: \n'
                f'{self.segmento}')


class Estimulo:
    def __init__(self, qtd_segmentos):
        self.segmentos = np.array(([Segmento() for _ in range(qtd_segmentos)]))

    def __repr__(self):
        return (f'\n{self.__class__.__name__}: \n'
                f'{self.segmentos}')

In [4]:
def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

In [5]:
def reducao_pca(sinal, qtd_componentes):
    """Função para fazer a redução de conjunto de dados para uma dada quantidade de componentes

    Arguments:
        sinal {array} -- Array com os dados que serão reduzidos.
        qtd_componentes {int} -- Quantidade de elementos que será reduzido o sinal.
    """

    pca = PCA(qtd_componentes)
    pca.fit(sinal)

    componentes = pca.transform(sinal)

    return componentes

In [6]:
def transformada_wavelet(sinal, onda_mae, nivel=1, qtd_componentes=0):
    """Função para fazer a transformada wavelet discreta de um dado sinal

    Arguments:
        sinal {array} -- Array do sinal a ser transformado.
        onda_mae {string} -- Onda base que será utilizada para a transformada.

    Keyword Arguments:
        nivel {int} -- Nível de decomposição do sinal (default: {1})
        qtd_componentes {int} -- Quantidade de elementos para o qual será reduzido o sinal
        (default: {0})
    """

    sinal = sinal.transpose()

    if (nivel > 1):
        coeficientes = wavedec(sinal, onda_mae, level=nivel)
    else:
        coeficientes = dwt(sinal, onda_mae)

    componentes = [np.empty((qtd_canais, qtd_componentes)) for _ in range(nivel + 1)]

    if (qtd_componentes > 0):
        for i in range(nivel + 1):
            componentes[i] = reducao_pca(coeficientes[i], qtd_componentes)

    return np.asarray(componentes)

In [7]:
def segmentar_sinal(sinal, qtd_segmentos, duracao, sobreposicao=0):
    qtd_pontos = len(sinal)

    sobreposicao = int(qtd_pontos * sobreposicao / (duracao * 1000))

    passo, resto = divmod(qtd_pontos, qtd_segmentos)

    segmentos = np.array(([Segmento() for _ in range(qtd_segmentos)]))

    for seg in range(qtd_segmentos):
        inicio = seg * (passo - sobreposicao)
        fim = inicio + passo - 1 + resto

        segmentos[seg].segmento = sinal[inicio:fim, :]

    return segmentos

In [8]:
def carregar_gestos(emg, estimulo, repeticao, pes):
    inicio = pes * qtd_segmentos_gesto
    fim = (pes + 1) * qtd_segmentos_gesto

    for ges in range(qtd_gestos):
        aux = (estimulo == (ges + 1)).reshape(-1)

        aux_sinal = emg[aux, :]
        aux_repeticao = repeticao[aux]

        for rep in range(qtd_repeticoes):
            aux = (aux_repeticao == (rep + 1)).reshape(-1)

            gestos[ges, rep].segmentos[inicio:fim] = segmentar_sinal(
                aux_sinal[aux, :], qtd_segmentos_gesto, DURACAO_GESTO, 20)


def carregar_repouso(emg, estimulo, pes):
    inicio = pes * qtd_segmentos_repouso
    fim = (pes + 1) * qtd_segmentos_repouso

    if (qtd_gestos < 17):
        ultimo_gesto = np.where(estimulo == qtd_gestos + 1)[0][0]

        indices = np.where(estimulo[:ultimo_gesto] == 0)[0]
    else:
        indices = np.where(estimulo == 0)[0]

    lista = np.where(np.r_[True, indices[1:-1] < indices[2:] - 1])[0]

    for i in range(len(lista) - 1):
        repouso[i].segmentos[inicio:fim] = segmentar_sinal(
            emg[lista[i]:lista[i + 1]], qtd_segmentos_repouso, DURACAO_REPOUSO)


def carregar_dados_mat(path):
    for pes in range(qtd_pessoas):
        filename = path + 'S' + str(pes + 1) + '_E1_A1.mat'
        mat = loadmat(filename)

        emg = mat['emg']			 # Array com os sinais emg
        estimulo = mat['restimulus']    # Array de índices dos gestos
        repeticao = mat['rerepetition']  # Array de índices de repetições dos gestos

        del mat

        carregar_gestos(emg, estimulo, repeticao, pes)

        carregar_repouso(emg, estimulo, pes)

In [9]:
def extacao_caracteristicas():
    ti = time.time()

    for ges in range(qtd_gestos):
        for rep in range(qtd_repeticoes):
            for seg in range(pares_gestos):
                coef_gestos[ges, rep, seg] = transformada_wavelet(
                    gestos[ges, rep].segmentos[seg].segmento, onda_mae, NIVEL_FILTRO, 5).reshape(-1)

            for seg in range(pares_repouso):
                coef_repouso[ges, rep, seg] = transformada_wavelet(
                    repouso[ges * qtd_repeticoes + rep].segmentos[seg].segmento, onda_mae, NIVEL_FILTRO, 5).reshape(-1)

    duracao = time.time() - ti
    minutos, segundos = divmod(duracao, 60)
    print('Extração de caracteristicas: %dmin %ds' % (minutos, segundos))

In [10]:
def separar_pares_treino_teste():
    # entrada
    treino_gestos = coef_gestos[:, treino, :].reshape((-1, qtd_entradas))
    teste_gestos = coef_gestos[:, teste, :].reshape((-1, qtd_entradas))

    teste_repouso = coef_repouso[:, teste, :].reshape((-1, qtd_entradas))
    treino_repouso = coef_repouso[:, treino, :].reshape((-1, qtd_entradas))

    # saida desejada
    len_treino = len(treino)
    len_teste = len(teste)

    desejado_gestos = np.multiply(np.arange(qtd_gestos).reshape((-1, 1))
                                  + 1, np.ones((qtd_gestos, len_treino * pares_gestos))).reshape(-1)

    desejado_repouso = np.zeros((qtd_gestos * len_treino * pares_repouso))

    y_treino = np.concatenate((desejado_repouso, desejado_gestos))
    x_treino = np.concatenate((treino_repouso, treino_gestos))


    desejado_gestos = np.multiply(np.arange(qtd_gestos).reshape((-1, 1))
                                  + 1, np.ones((qtd_gestos, len_teste * pares_gestos))).reshape(-1)

    desejado_repouso = np.zeros((qtd_gestos * len_teste * pares_repouso))

    y_teste = np.concatenate((desejado_repouso, desejado_gestos))
    x_teste = np.concatenate((teste_repouso, teste_gestos))

    return x_treino, y_treino, x_teste, y_teste

In [19]:
def embaralhar_dados(x, y):
    size_x = len(x)
    assert size_x == len(y)

    rng = np.random.default_rng()

    embaralhado = rng.permutation(size_x)

    return x[embaralhado], y[embaralhado]

In [None]:
def entrada_saida():
    desejado_gestos = np.multiply(np.arange(qtd_gestos).reshape((-1, 1)) + 1,
                                  np.ones((qtd_gestos,
                                           qtd_repeticoes 
                                           * pares_gestos))).reshape(-1)

    desejado_repouso = np.zeros((qtd_gestos * qtd_repeticoes * pares_repouso))

    saida_desejada = np.concatenate((desejado_repouso, desejado_gestos))
    
    entrada = np.concatenate((coef_repouso.reshape((-1, qtd_entradas)), 
                              coef_gestos.reshape((-1, qtd_entradas))))

    entrada, saida_desejada = embaralhar_dados(entrada, saida_desejada)

    return entrada, saida_desejada


In [17]:
def separar_pares_treino_teste_completo(prop_treino = 0.7):
    entrada, saida_desejada = entrada_saida()

    qtd_pares = len(saida_desejada)
    qtd_pares_treino = round(qtd_pares * prop_treino)

    x_treino = entrada[:qtd_pares_treino]
    y_treino = saida_desejada[:qtd_pares_treino]

    x_teste = entrada[qtd_pares_treino:]
    y_teste = saida_desejada[qtd_pares_treino:]

    return x_treino, y_treino, x_teste, y_teste

In [29]:
if __name__ == "__main__":

    path = 'drive/My Drive/TCC/database/cru/'
    # Separar gestos

    qtd_pessoas = 1
    qtd_gestos = 2
    qtd_repeticoes = 6

    qtd_canais = 12

    NIVEL_FILTRO = 1
    onda_mae = 'coif4'

    DURACAO_GESTO = 5
    DURACAO_REPOUSO = 3
    duracao_segmento = 0.2

    qtd_segmentos_gesto = int(DURACAO_GESTO / duracao_segmento)
    qtd_segmentos_repouso = int(DURACAO_REPOUSO / duracao_segmento)

    qtd_movimentos = qtd_gestos * qtd_repeticoes

    pares_gestos = qtd_pessoas * qtd_segmentos_gesto
    pares_repouso = qtd_pessoas * qtd_segmentos_repouso

    gestos = np.array(
        ([Estimulo(qtd_segmentos_gesto * qtd_pessoas) for _ in range(qtd_movimentos)]))

    repouso = np.array(
        ([Estimulo(qtd_segmentos_repouso * qtd_pessoas) for _ in range(qtd_movimentos)]))

    gestos = gestos.reshape(qtd_gestos, qtd_repeticoes)

    # separar treino e teste
    treino = [0, 1, 4, 2]
    teste = [3, 5]

    carregar_dados_mat(path)

    pares_gestos = qtd_pessoas * qtd_segmentos_gesto
    pares_repouso = qtd_pessoas * qtd_segmentos_repouso

    # extração de caracteristicas
    qtd_entradas = len(transformada_wavelet(
        gestos[0, 0].segmentos[0].segmento, onda_mae, NIVEL_FILTRO, 5).reshape(-1))

    coef_gestos = np.empty((qtd_gestos, qtd_repeticoes, pares_gestos, qtd_entradas))

    coef_repouso = np.empty((qtd_gestos, qtd_repeticoes, pares_repouso, qtd_entradas))

    extacao_caracteristicas()

    x_treino, y_treino, x_teste, y_teste = separar_pares_treino_teste_completo()

    np.savez_compressed('drive/My Drive/TCC/database/G' + str(qtd_gestos) + '/' + onda_mae, x_teste=x_teste, y_teste=y_teste, y_treino=y_treino, x_treino=x_treino)



Extração de caracteristicas: 0min 0s
(480,)
(480, 120)
