In [None]:
!pip install praat-textgrids
!pip install jiwer

In [None]:
import sys
from absl import flags
FLAGS = flags.FLAGS
flags.DEFINE_string('normalizers_file', '/kaggle/input/normalizers/normalizers.pkl', 'file with pickled feature normalizers')

flags.DEFINE_list('remove_channels', [], 'channels to remove')
flags.DEFINE_list('silent_data_directories', ['/kaggle/input/emgdata/emg_data/silent_parallel_data'], 'silent data locations')
flags.DEFINE_list('voiced_data_directories', ['/kaggle/input/emgdata/emg_data/voiced_parallel_data','/kaggle/input/emgdata/emg_data/nonparallel_data'], 'voiced data locations')
flags.DEFINE_string('testset_file', '/kaggle/input/testesetlarge/testset_largedev.json', 'file with testset indices')
flags.DEFINE_string('text_align_directory', '/kaggle/input/textaligns/text_alignments', 'directory with alignment files')

flags.DEFINE_boolean('debug', False, 'debug')
flags.DEFINE_string('output_directory', '/kaggle/working/outputs', 'where to save models and outputs')
flags.DEFINE_integer('batch_size', 32, 'training batch size')
flags.DEFINE_float('learning_rate', 3e-4, 'learning rate')
flags.DEFINE_integer('learning_rate_warmup', 1000, 'steps of linear warmup')
flags.DEFINE_integer('learning_rate_patience', 5, 'learning rate decay patience')
flags.DEFINE_string('start_training_from', None, 'start training from this model')
flags.DEFINE_float('l2', 0, 'weight decay')
flags.DEFINE_string('evaluate_saved', None, 'run evaluation on given model file')

FLAGS(sys.argv[1:])



In [None]:
%%writefile data_utils.py

import string

import numpy as np
import librosa
import soundfile as sf
from textgrids import TextGrid
import jiwer
from unidecode import unidecode

import torch
import matplotlib.pyplot as plt

from absl import flags
FLAGS = flags.FLAGS

phoneme_inventory = ['aa','ae','ah','ao','aw','ax','axr','ay','b','ch','d','dh','dx','eh','el','em','en','er','ey','f','g','hh','hv','ih','iy','jh','k','l','m','n','nx','ng','ow','oy','p','r','s','sh','t','th','uh','uw','v','w','y','z','zh','sil']

def normalize_volume(audio): #recebe um sinal de áudio e realiza uma normalização de volume nele.
    rms = librosa.feature.rms(audio)  #calcula o valor RMS (root mean square) do sinal de áudio 
    max_rms = rms.max() + 0.01
    target_rms = 0.2
    audio = audio * (target_rms/max_rms) #normaliza o sinal de áudio multiplicando-o por um fator que ajusta o RMS para um valor de destino (0.2)
    max_val = np.abs(audio).max()
    if max_val > 1.0: # this shouldn't happen too often with the target_rms of 0.2
        audio = audio / max_val  #Se o valor máximo absoluto do sinal de áudio for maior que 1.0, o sinal é dividido pelo valor máximo para evitar a saturação.
    return audio

def dynamic_range_compression_torch(x, C=1, clip_val=1e-5):  #realiza uma compressão de faixa dinâmica em um tensor
    return torch.log(torch.clamp(x, min=clip_val) * C)

def spectral_normalize_torch(magnitudes):  #realiza a normalização espectral
    output = dynamic_range_compression_torch(magnitudes)
    return output

mel_basis = {}
hann_window = {}

def mel_spectrogram(y, n_fft, num_mels, sampling_rate, hop_size, win_size, fmin, fmax, center=False):  # calcula o espectrograma mel de um sinal de áudio
    if torch.min(y) < -1.:
        print('min value is ', torch.min(y))
    if torch.max(y) > 1.:
        print('max value is ', torch.max(y))

    global mel_basis, hann_window
    if fmax not in mel_basis: #verificam se já foi calculada a base de mel correspondente à frequência máxima fmax. Se não tiver sido calculada, a função librosa.filters.mel 
                                #é usada para calcular a base de mel com os parâmetros fornecidos
        mel = librosa.filters.mel(sr=sampling_rate, n_fft=n_fft, n_mels=num_mels, fmin=fmin, fmax=fmax)

        #print('Mel ', mel)
        mel_basis[str(fmax)+'_'+str(y.device)] = torch.from_numpy(mel).float().to(y.device)
        hann_window[str(y.device)] = torch.hann_window(win_size).to(y.device)
        #print('Mel_basis ', mel_basis)

    y = torch.nn.functional.pad(y.unsqueeze(1), (int((n_fft-hop_size)/2), int((n_fft-hop_size)/2)), mode='reflect') #preenche o sinal de áudio y com reflexão antes do cálculo do espectrograma. 
                                                                                #O sinal é expandido com uma dimensão extra e é aplicado um preenchimento refletivo em ambas as extremidades.
    y = y.squeeze(1) #a dimensão extra é removida.
    #print('y ', y)
    #spec = torch.stft(y, n_fft, hop_length=hop_size, win_length=win_size, window=hann_window[str(y.device)], #calcula a transformada de Fourier de curto tempo (STFT) do sinal de áudio y
                      #center=center, pad_mode='reflect', normalized=False, onesided=True)  #O espectrograma é calculado apenas para a metade positiva das frequências (onesided=True).
    #print ('Resutado do stft ', spec)
    spec = torch.stft(y, n_fft, hop_length=hop_size, win_length=win_size, window=hann_window[str(y.device)],
                  center=center, pad_mode='reflect', normalized=False, onesided=True, return_complex=False)
    #print ('Resutado do stft ', spec)
    spec = torch.sqrt(spec.pow(2).sum(-1)+(1e-9)) #Essa linha calcula o módulo (magnitude) do espectrograma. O espectrograma é elevado ao quadrado, somado ao valor de 1e-9 para evitar divisão por zero
    # Imprimir as dimensões de mel_basis
    #print ('Resutado do sqrt ', spec)
    #print(len(mel_basis))
    #for key, value in mel_basis.items():
        #print(f"Dimensões de {key}: {value.shape}")
    #print('Dimensões de spec ', spec.shape)
    spec = torch.matmul(mel_basis[str(fmax)+'_'+str(y.device)], spec)

    
    #spec = mel_basis[str(fmax)+'_'+str(y.device)] @ spec

    #print ('Resutado da multiplicação ', spec)
    #print('Dimensões de spec ', spec.shape)
    spec = spectral_normalize_torch(spec) #O resultado é normalizado espectralmente

    return spec

def load_audio(filename, start=None, end=None, max_frames=None, renormalize_volume=False): #carrega um arquivo de áudio, realiza pré-processamento nele e retorna o espectrograma mel correspondente
    audio, r = sf.read(filename)

    if len(audio.shape) > 1:
        audio = audio[:,0] # verifica se o sinal de áudio possui mais de uma dimensão, o que indicaria que é um áudio estéreo. Nesse caso, seleciona apenas o primeiro canal de áudio.
    if start is not None or end is not None:
        audio = audio[start:end] #permitem selecionar uma parte específica do sinal de áudio

    if renormalize_volume:
        audio = normalize_volume(audio)
    if r == 16000:
        audio = librosa.resample(audio, orig_sr=16000, target_sr=22050)  #verificam a taxa de amostragem r do áudio. Se for igual a 16000, o áudio é ressampleado para a taxa de amostragem de 22050 Hz usando a função librosa.resample. 
    else:
        assert r == 22050
    audio = np.clip(audio, -1, 1) # realiza um ajuste no intervalo de valores do sinal de áudio. Por vezes, o ressampleamento pode fazer com que alguns valores ultrapassem o intervalo [-1, 1], 
                                    #então essa linha garante que todos os valores estejam dentro desse intervalo.
    pytorch_mspec = mel_spectrogram(torch.tensor(audio, dtype=torch.float32).unsqueeze(0), 1024, 80, 22050, 256, 1024, 0, 8000, center=False)  #calculam o espectrograma mel do sinal de áudio
    mspec = pytorch_mspec.squeeze(0).T.numpy() #O resultado é convertido em uma matriz numpy e atribuído a mspec.
    if max_frames is not None and mspec.shape[0] > max_frames:  #verifica se o parâmetro max_frames foi fornecido e se o número de quadros (linhas) do espectrograma excede max_frames
        mspec = mspec[:max_frames,:] #o espectrograma é recortado para ter no máximo max_frames quadros.
    return mspec

def double_average(x):  #suaviza um sinal x aplicando uma média móvel dupla, ou seja, realizando duas convoluções consecutivas com um filtro médio. Isso ajuda a reduzir o ruído e suavizar o sinal.
    assert len(x.shape) == 1  # verificam se o sinal x tem apenas uma dimensão. Caso contrário, é lançado um erro.
    f = np.ones(9)/9.0  #cria uma matriz de tamanho 9 preenchida com o valor 1.0 e, em seguida, divide todos os elementos por 9.0. Essa matriz f será usada como um filtro médio.
    v = np.convolve(x, f, mode='same')  #aplica a convolução entre o sinal x e o filtro f. A opção mode='same' garante que o tamanho da saída seja o mesmo que o tamanho de x
    w = np.convolve(v, f, mode='same')  #aplica a convolução entre o sinal v e o filtro f. A opção mode='same' garante que o tamanho da saída seja o mesmo que o tamanho de x
    return w

def get_emg_features(emg_data, debug=False): #calcula recursos relacionados ao sinal de eletromiografia (EMG) a partir dos dados de EMG fornecido. Esses recursos incluem médias de janelas, valores RMS, 
                                    #taxa de cruzamento por zero e espectrograma de curto prazo. Esses recursos podem ser usados posteriormente para análise e processamento adicional dos sinais de EMG.
    xs = emg_data - emg_data.mean(axis=0, keepdims=True)  #calculam xs, que é o sinal de EMG centrado em torno da média. 
                                                            #É subtraída a média de cada coluna dos dados de EMG (emg_data) usando a função mean ao longo do eixo 0.
    frame_features = []
    for i in range(emg_data.shape[1]):
        x = xs[:,i]  # x: coluna atual de xs
        w = double_average(x) # w: sinal resultante da aplicação da função double_average em x, ou seja, o sinal suavizado
        p = x - w # p: sinal resultante da subtração de w de x, representando as partes pulsativas do sinal
        r = np.abs(p) #r: sinal resultante do valor absoluto de p, representando a magnitude das partes pulsativas do sinal

        w_h = librosa.util.frame(w, frame_length=16, hop_length=6).mean(axis=0) # w_h: média das janelas de 16 amostras de w com um deslocamento de 6 amostras
        p_w = librosa.feature.rms(y=w, frame_length=16, hop_length=6, center=False)  #p_w: valor RMS (Root Mean Square) das janelas de 16 amostras de w com um deslocamento de 6 amostras
        p_w = np.squeeze(p_w, 0)
        p_r = librosa.feature.rms(y=r, frame_length=16, hop_length=6, center=False)  #p_r: valor RMS das janelas de 16 amostras de r com um deslocamento de 6 amostras
        p_r = np.squeeze(p_r, 0)
        z_p = librosa.feature.zero_crossing_rate(p, frame_length=16, hop_length=6, center=False) #z_p: taxa de cruzamento por zero das janelas de 16 amostras de p com um deslocamento de 6 amostras
        z_p = np.squeeze(z_p, 0)
        r_h = librosa.util.frame(r, frame_length=16, hop_length=6).mean(axis=0) #r_h: média das janelas de 16 amostras de r com um deslocamento de 6 amostras

        s = abs(librosa.stft(np.ascontiguousarray(x), n_fft=16, hop_length=6, center=False))  #calcula o espectrograma de curto prazo do sinal x usando a Transformada de Fourier de Curto Prazo (STFT)
        # s has feature dimension first and time second

        if debug:
            plt.subplot(7,1,1)
            plt.plot(x)
            plt.subplot(7,1,2)
            plt.plot(w_h)
            plt.subplot(7,1,3)
            plt.plot(p_w)
            plt.subplot(7,1,4)
            plt.plot(p_r)
            plt.subplot(7,1,5)
            plt.plot(z_p)
            plt.subplot(7,1,6)
            plt.plot(r_h)

            plt.subplot(7,1,7)
            plt.imshow(s, origin='lower', aspect='auto', interpolation='nearest')

            plt.show()

        frame_features.append(np.stack([w_h, p_w, p_r, z_p, r_h], axis=1))  #empilham os recursos calculados para cada coluna em uma lista. Os recursossão empilhados verticalmente, resultando em uma matriz 2D.
        frame_features.append(s.T) #o espectrograma s é transposto e adicionado à lista frame_features.

    frame_features = np.concatenate(frame_features, axis=1) #concatena todos os elementos da lista frame_features ao longo do eixo 1, resultando em uma matriz unidimensional final
    return frame_features.astype(np.float32)

class FeatureNormalizer(object):  #implementa um normalizador de recursos.Ela fornece métodos para normalizar uma amostra de recurso e desfazer a normalização, 
                #aplicando as médias e os desvios padrão calculados. Esse normalizador pode ser útil para preparar os dados antes de usá-los em um modelo de aprendizado de máquina.
    def __init__(self, feature_samples, share_scale=False):#é o construtor da classe. Ele recebe uma lista de amostras de recursos (feature_samples), que são matrizes 2D com dimensões (tempo, recurso). 
        """ features_samples should be list of 2d matrices with dimension (time, feature) """
        feature_samples = np.concatenate(feature_samples, axis=0)  #as amostras de recursos são concatenadas ao longo do eixo 0 para formar uma única matriz
        self.feature_means = feature_samples.mean(axis=0, keepdims=True)  #as médias dos recursos são calculadas ao longo do eixo 0 e armazenadas em self.feature_means
        if share_scale:
            self.feature_stddevs = feature_samples.std()  #Se share_scale for True, o desvio padrão de todos os recursos é calculado e armazenado em self.feature_stddevs
        else:
            self.feature_stddevs = feature_samples.std(axis=0, keepdims=True) #Caso contrário, os desvios padrão de cada recurso são calculados separadamente e armazenados em self.feature_stddevs.

    def normalize(self, sample): #recebe uma amostra de recurso (sample) e normaliza essa amostra subtraindo as médias dos recursos (self.feature_means) e dividindo pelo desvio padrão dos recursos
        sample -= self.feature_means
        sample /= self.feature_stddevs
        return sample

    def inverse(self, sample): #ecebe uma amostra de recurso normalizada e realiza a operação inversa da normalização. Primeiro, a amostra é multiplicada pelo desvio padrão dos recursos. 
                                #Em seguida, a média dos recursos (self.feature_means) é adicionada de volta à amostra. 
        sample = sample * self.feature_stddevs
        sample = sample + self.feature_means
        return sample

def combine_fixed_length(tensor_list, length): #combina uma lista de tensores em um único tensor de comprimento fixo. 
                                #Ele garante que os dados sejam combinados em um tensor de tamanho fixo, preenchendo com zeros, se necessário.
    total_length = sum(t.size(0) for t in tensor_list) #calcula o comprimento total somando o tamanho (dimensão 0) de cada tensor na lista.
    if total_length % length != 0:#verifica se o comprimento total não é divisível pelo comprimento desejado
        pad_length = length - (total_length % length)  #Se não for, calcula o comprimento de preenchimento necessário para tornar o total divisível pelo comprimento desejado
        tensor_list = list(tensor_list) # copy
        tensor_list.append(torch.zeros(pad_length,*tensor_list[0].size()[1:], dtype=tensor_list[0].dtype, device=tensor_list[0].device)) #um tensor preenchido com zeros é criado com o comprimento de 
                                                                #preenchimento necessário e as mesmas dimensões do primeiro tensor na lista. Esse tensor de preenchimento é anexado à lista de tensores.
        total_length += pad_length 
    tensor = torch.cat(tensor_list, 0)  #os tensores na lista (incluindo o tensor de preenchimento, se adicionado) são concatenados ao longo da dimensão 0 para formar um único tensor.
    n = total_length // length #o número de segmentos de comprimento fixo que podem ser extraídos do tensor é calculado dividindo o comprimento total pelo comprimento desejado.
    return tensor.view(n, length, *tensor.size()[1:])  #o tensor é redimensionado para ter as dimensões (n, length, ...), onde n é o número de segmentos e ... representa as dimensões restantes dos tensores originais.

def decollate_tensor(tensor, lengths):  #desagrega um tensor em uma lista de tensores com comprimentos diferentes. Essa função é útil quando você deseja separar um tensor em segmentos de comprimentos diferentes, 
                                    #conforme especificado por uma lista de comprimentos. Pode ser usado, por exemplo, para processar lotes de dados em tamanhos diferentes após a etapa de inferência em um modelo.
    b, s, d = tensor.size()  # obtem as dimensões do tensor original. b representa o tamanho do lote, s representa o comprimento dos segmentos e d representa a dimensão dos recursos.
    tensor = tensor.view(b*s, d) # o tensor é redimensionado usando .view() para ter uma forma de (b * s, d). Isso combina o tamanho do lote com o comprimento dos segmentos.
    results = []
    idx = 0
    for length in lengths: #Para cada comprimento, é verificado se a posição atual mais o comprimento está dentro dos limites do tensor. Se não estiver, um erro de assert é acionado.
        assert idx + length <= b * s
        results.append(tensor[idx:idx+length]) #o segmento correspondente é extraído do tensor, começando na posição idx e com o comprimento especificado. O segmento é adicionado à lista results.
        idx += length
    return results

def splice_audio(chunks, overlap): #combina várias partes de áudio sobrepostas em um único áudio.Essa função é útil para combinar partes de áudio sobrepostas, como segmentos de áudio em uma gravação 
                                        #contínua, onde a sobreposição ajuda a suavizar a transição entre as partes.
    chunks = [c.copy() for c in chunks] # copy so we can modify in place

    assert np.all([c.shape[0]>=overlap for c in chunks]) #é verificado se todas as partes de áudio têm um tamanho maior ou igual à sobreposição especificada. Isso é importante para garantir 
                                                            #que haja dados suficientes para aplicar a sobreposição corretamente.

    result_len = sum(c.shape[0] for c in chunks) - overlap*(len(chunks)-1) #soma os tamanhos de todas as partes de áudio e subtraindo o tamanho da sobreposição entre as partes para o comprimento total
    result = np.zeros(result_len, dtype=chunks[0].dtype)  #Um array de zeros chamado result é inicializado com o comprimento total calculado e o tipo de dados da primeira parte de áudio.

    ramp_up = np.linspace(0,1,overlap) #Duas rampas, ramp_up e ramp_down, são criadas usando np.linspace() para representar as funções de aumento e diminuição gradual da amplitude durante a sobreposição.
    ramp_down = np.linspace(1,0,overlap)

    i = 0
    for chunk in chunks:
        l = chunk.shape[0]  #Em um loop for, cada parte de áudio é processada individualmente. Para cada parte de áudio, seu comprimento l é obtido.

        # note: this will also fade the beginning and end of the result
        chunk[:overlap] *= ramp_up  #As partes de áudio são multiplicadas pelos valores correspondentes nas rampas ramp_up e ramp_down. Isso aplica a sobreposição gradual no início e no final de cada parte de áudio.
        chunk[-overlap:] *= ramp_down

        result[i:i+l] += chunk #A parte de áudio processada é adicionada ao resultado final a partir da posição i. A variável i é atualizada para a próxima posição correta no resultado, levando em 
                                    #consideração o tamanho da parte de áudio e a sobreposição.
        i += l-overlap

    return result

def print_confusion(confusion_mat, n=10): #imprime informações sobre as confusões mais comuns em uma matriz de confusão. Essa função é útil para analisar e visualizar as confusões mais comuns em uma matriz de 
                            #confusão, fornecendo insights sobre o desempenho do modelo de classificação em relação a classes específicas.
    # axes are (pred, target)
    target_counts = confusion_mat.sum(0) + 1e-4  #calcula o número de ocorrências de cada classe alvo na matriz de confusão. Isso é feito somando os valores de cada coluna da matriz e adicionando um 
                                    #pequeno valor (1e-4) para evitar divisão por zero.
    aslist = []
    for p1 in range(len(phoneme_inventory)): #a função itera sobre todas as combinações únicas de classes alvo (p1 e p2). As confusões são calculadas somando as ocorrências nas células 
                                    #correspondentes na matriz de confusão e dividindo pelo número total de ocorrências das duas classes alvo.
        for p2 in range(p1):
            if p1 != p2:
                aslist.append(((confusion_mat[p1,p2]+confusion_mat[p2,p1])/(target_counts[p1]+target_counts[p2]), p1, p2)) #As confusões são armazenadas em uma lista aslist como uma tupla contendo a taxa de confusão,
                                                                                                                #o índice p1 da classe alvo, e o índice p2 da classe alvo.
    aslist.sort() #classifica em ordem crescente com base na taxa de confusão
    aslist = aslist[-n:]  #é selecionado o top n das confusões mais comuns.
    max_val = aslist[-1][0]  #O valor máximo e o valor mínimo de confusão são obtidos a partir da lista aslist.
    min_val = aslist[0][0]
    val_range = max_val - min_val
    print('Common confusions (confusion, accuracy)') 
    for v, p1, p2 in aslist:
        p1s = phoneme_inventory[p1]
        p2s = phoneme_inventory[p2]
        print(f'{p1s} {p2s} {v*100:.1f} {(confusion_mat[p1,p1]+confusion_mat[p2,p2])/(target_counts[p1]+target_counts[p2])*100:.1f}')
        #^ imprime as informações sobre as confusões mais comuns. Ela exibe a classe alvo p1, a classe alvo p2, a taxa de confusão (multiplicada por 100 para exibição em porcentagem) e a precisão (também 
        # multiplicada por 100).

def read_phonemes(textgrid_fname, max_len=None): #lê os fonemas de um arquivo TextGrid e os converte em uma sequência de índices de fonemas, usada para treinar e avaliar modelos de processamento de linguagem.
    tg = TextGrid(textgrid_fname)
    phone_ids = np.zeros(int(tg['phones'][-1].xmax*86.133)+1, dtype=np.int64)  #cria um array phone_ids preenchido com valores -1. O tamanho do array é calculado com base no tempo máximo encontrado 
            #no arquivo TextGrid multiplicado por 86.133. O valor 86.133 é usado para converter o tempo do TextGrid para a taxa de amostragem de 22050 Hz, que é a taxa de amostragem mencionada anteriormente.
    phone_ids[:] = -1  
    phone_ids[-1] = phoneme_inventory.index('sil') #o último valor do array é definido como o índice do fonema 'sil' no inventário de fonemas. Isso garante que a lista seja longa o suficiente para cobrir 
                                                    #todo o comprimento da sequência original.
    for interval in tg['phones']: #percorre cada intervalo de fonema no arquivo TextGrid e mapeia o fonema correspondente para o seu índice no inventário de fonemas.
        phone = interval.text.lower() 
        if phone in ['', 'sp', 'spn']: # Se o fonema for uma string vazia, 'sp' ou 'spn', ele é substituído por 'sil'
            phone = 'sil'
        if phone[-1] in string.digits: #Se o fonema tiver um dígito no final, o dígito é removido.
            phone = phone[:-1]
        ph_id = phoneme_inventory.index(phone) #Os índices de fonemas são atribuídos aos intervalos de tempo correspondentes no array phone_ids.
        phone_ids[int(interval.xmin*86.133):int(interval.xmax*86.133)] = ph_id
    assert (phone_ids >= 0).all(), 'missing aligned phones' #verifica se todos os valores de phone_ids são não negativos (ou seja, todos os fonemas foram encontrados e mapeados corretamente). 
                                #Caso contrário, uma exceção é lançada indicando que há fonemas ausentes na sequência alinhada.

    if max_len is not None:  #Se max_len for especificado, a sequência de fonemas é truncada para o comprimento máximo especificado
        phone_ids = phone_ids[:max_len]
        assert phone_ids.shape[0] == max_len #A função verifica se o comprimento da sequência truncada é igual a max_len.
    return phone_ids

class TextTransform(object): #implementa transformações de texto úteis, como limpeza de texto, mapeamento de texto para sequências de números inteiros e mapeamento inverso de sequências de números inteiros para texto.
    def __init__(self):
        self.transformation = jiwer.Compose([jiwer.RemovePunctuation(), jiwer.ToLowerCase()])  #transformation é um objeto jiwer.Compose que encapsula uma série de transformações de texto, como remoção de pontuação 
                                #e conversão para minúsculas. Essas transformações são realizadas pelo pacote jiwer.
        self.chars = string.ascii_lowercase+string.digits+' ' #chars é uma string que contém todos os caracteres permitidos no texto, incluindo letras minúsculas, dígitos e espaço em branco.

    def clean_text(self, text): #Limpa o texto aplicando as transformações definidas.
        text = unidecode(text)  #remove quaisquer caracteres acentuados ou diacríticos
        text = self.transformation(text) #as transformações definidas em self.transformation são aplicadas
        return text

    def text_to_int(self, text): #Converte o texto em uma sequência de números inteiros
        text = self.clean_text(text) #O texto é limpo usando o método clean_text
        return [self.chars.index(c) for c in text] #cada caractere do texto limpo é mapeado para o seu índice correspondente em self.chars

    def int_to_text(self, ints):  #Converte uma sequência de números inteiros em texto
        return ''.join(self.chars[i] for i in ints)  #Cada número inteiro em ints é mapeado para o caractere correspondente em self.chars. 

In [None]:
%%writefile read_emg.py

import re
import os
import numpy as np
import matplotlib.pyplot as plt
import random
from collections import defaultdict
import scipy
import json
import copy
import sys
import pickle
import string
import logging
from functools import lru_cache
from copy import copy



import librosa
import soundfile as sf

import torch

from data_utils import load_audio, get_emg_features, FeatureNormalizer, phoneme_inventory, read_phonemes, TextTransform

from scipy import signal

from absl import flags
FLAGS = flags.FLAGS

def remove_drift(signal, fs):
    b, a = scipy.signal.butter(3, 2, 'highpass', fs=fs)
    return scipy.signal.filtfilt(b, a, signal)

def notch(signal, freq, sample_frequency):
    b, a = scipy.signal.iirnotch(freq, 30, sample_frequency)
    return scipy.signal.filtfilt(b, a, signal)

def notch_harmonics(signal, freq, sample_frequency):
    for harmonic in range(1,8):
        signal = notch(signal, freq*harmonic, sample_frequency)
    return signal

def subsample(signal, new_freq, old_freq):
    times = np.arange(len(signal))/old_freq
    sample_times = np.arange(0, times[-1], 1/new_freq)
    result = np.interp(sample_times, times, signal)
    return result

def apply_to_all(function, signal_array, *args, **kwargs):
    results = []
    for i in range(signal_array.shape[1]):
        results.append(function(signal_array[:,i], *args, **kwargs))
    return np.stack(results, 1)

def load_utterance(base_dir, index, limit_length=False, debug=False, text_align_directory=None):
    index = int(index)
    raw_emg = np.load(os.path.join(base_dir, f'{index}_emg.npy'))
    before = os.path.join(base_dir, f'{index-1}_emg.npy')
    after = os.path.join(base_dir, f'{index+1}_emg.npy')
    if os.path.exists(before):
        raw_emg_before = np.load(before)
    else:
        raw_emg_before = np.zeros([0,raw_emg.shape[1]])
    if os.path.exists(after):
        raw_emg_after = np.load(after)
    else:
        raw_emg_after = np.zeros([0,raw_emg.shape[1]])

    x = np.concatenate([raw_emg_before, raw_emg, raw_emg_after], 0)
    x = apply_to_all(notch_harmonics, x, 60, 1000)
    x = apply_to_all(remove_drift, x, 1000)
    x = x[raw_emg_before.shape[0]:x.shape[0]-raw_emg_after.shape[0],:]
    emg_orig = apply_to_all(subsample, x, 689.06, 1000)
    x = apply_to_all(subsample, x, 516.79, 1000)
    emg = x

    for c in FLAGS.remove_channels:
        emg[:,int(c)] = 0
        emg_orig[:,int(c)] = 0

    emg_features = get_emg_features(emg)

    mfccs = load_audio(os.path.join(base_dir, f'{index}_audio_clean.flac'),
            max_frames=min(emg_features.shape[0], 800 if limit_length else float('inf')))

    if emg_features.shape[0] > mfccs.shape[0]:
        emg_features = emg_features[:mfccs.shape[0],:]
    assert emg_features.shape[0] == mfccs.shape[0]
    emg = emg[6:6+6*emg_features.shape[0],:]
    emg_orig = emg_orig[8:8+8*emg_features.shape[0],:]
    assert emg.shape[0] == emg_features.shape[0]*6

    with open(os.path.join(base_dir, f'{index}_info.json')) as f:
        info = json.load(f)

    sess = os.path.basename(base_dir)
    tg_fname = f'{text_align_directory}/{sess}/{sess}_{index}_audio.TextGrid'
    if os.path.exists(tg_fname):
        phonemes = read_phonemes(tg_fname, mfccs.shape[0])
    else:
        phonemes = np.zeros(mfccs.shape[0], dtype=np.int64)+phoneme_inventory.index('sil')

    return mfccs, emg_features, info['text'], (info['book'],info['sentence_index']), phonemes, emg_orig.astype(np.float32)

class EMGDirectory(object):
    def __init__(self, session_index, directory, silent, exclude_from_testset=False):
        self.session_index = session_index
        self.directory = directory
        self.silent = silent
        self.exclude_from_testset = exclude_from_testset

    def __lt__(self, other):
        return self.session_index < other.session_index

    def __repr__(self):
        return self.directory

class SizeAwareSampler(torch.utils.data.Sampler):
    def __init__(self, emg_dataset, max_len):
        self.dataset = emg_dataset
        self.max_len = max_len

    def __iter__(self):
        indices = list(range(len(self.dataset)))
        random.shuffle(indices)
        batch = []
        batch_length = 0
        for idx in indices:
            directory_info, file_idx = self.dataset.example_indices[idx]
            with open(os.path.join(directory_info.directory, f'{file_idx}_info.json')) as f:
                info = json.load(f)
            if not np.any([l in string.ascii_letters for l in info['text']]):
                continue
            length = sum([emg_len for emg_len, _, _ in info['chunks']])
            if length > self.max_len:
                logging.warning(f'Warning: example {idx} cannot fit within desired batch length')
            if length + batch_length > self.max_len:
                yield batch
                batch = []
                batch_length = 0
            batch.append(idx)
            batch_length += length
        # dropping last incomplete batch

class EMGDataset(torch.utils.data.Dataset):
    def __init__(self, base_dir=None, limit_length=False, dev=False, test=False, no_testset=False, no_normalizers=False):

        self.text_align_directory = FLAGS.text_align_directory

        if no_testset:
            devset = []
            testset = []
        else:
            with open(FLAGS.testset_file) as f:
                testset_json = json.load(f)
                devset = testset_json['dev']
                testset = testset_json['test']

        directories = []
        if base_dir is not None:
            directories.append(EMGDirectory(0, base_dir, False))
        else:
            for sd in FLAGS.silent_data_directories:
                for session_dir in sorted(os.listdir(sd)):
                    directories.append(EMGDirectory(len(directories), os.path.join(sd, session_dir), True))

            has_silent = len(FLAGS.silent_data_directories) > 0
            for vd in FLAGS.voiced_data_directories:
                for session_dir in sorted(os.listdir(vd)):
                    directories.append(EMGDirectory(len(directories), os.path.join(vd, session_dir), False, exclude_from_testset=has_silent))

        self.example_indices = []
        self.voiced_data_locations = {} # map from book/sentence_index to directory_info/index
        for directory_info in directories:
            for fname in os.listdir(directory_info.directory):
                m = re.match(r'(\d+)_info.json', fname)
                if m is not None:
                    idx_str = m.group(1)
                    with open(os.path.join(directory_info.directory, fname)) as f:
                        info = json.load(f)
                        if info['sentence_index'] >= 0: # boundary clips of silence are marked -1
                            location_in_testset = [info['book'], info['sentence_index']] in testset
                            location_in_devset = [info['book'], info['sentence_index']] in devset
                            if (test and location_in_testset and not directory_info.exclude_from_testset) \
                                    or (dev and location_in_devset and not directory_info.exclude_from_testset) \
                                    or (not test and not dev and not location_in_testset and not location_in_devset):
                                self.example_indices.append((directory_info,int(idx_str)))

                            if not directory_info.silent:
                                location = (info['book'], info['sentence_index'])
                                self.voiced_data_locations[location] = (directory_info,int(idx_str))

        self.example_indices.sort()
        random.seed(0)
        random.shuffle(self.example_indices)

        self.no_normalizers = no_normalizers
        if not self.no_normalizers:
            self.mfcc_norm, self.emg_norm = pickle.load(open(FLAGS.normalizers_file,'rb'))

        sample_mfccs, sample_emg, _, _, _, _ = load_utterance(self.example_indices[0][0].directory, self.example_indices[0][1])
        self.num_speech_features = sample_mfccs.shape[1]
        self.num_features = sample_emg.shape[1]
        self.limit_length = limit_length
        self.num_sessions = len(directories)

        self.text_transform = TextTransform()

    def silent_subset(self):
        result = copy(self)
        silent_indices = []
        for example in self.example_indices:
            if example[0].silent:
                silent_indices.append(example)
        result.example_indices = silent_indices
        return result

    def subset(self, fraction):
        result = copy(self)
        result.example_indices = self.example_indices[:int(fraction*len(self.example_indices))]
        return result

    def __len__(self):
        return len(self.example_indices)

    @lru_cache(maxsize=None)
    def __getitem__(self, i):
        directory_info, idx = self.example_indices[i]
        mfccs, emg, text, book_location, phonemes, raw_emg = load_utterance(directory_info.directory, idx, self.limit_length, text_align_directory=self.text_align_directory)
        raw_emg = raw_emg / 20
        raw_emg = 50*np.tanh(raw_emg/50.)

        if not self.no_normalizers:
            mfccs = self.mfcc_norm.normalize(mfccs)
            emg = self.emg_norm.normalize(emg)
            emg = 8*np.tanh(emg/8.)

        session_ids = np.full(emg.shape[0], directory_info.session_index, dtype=np.int64)
        audio_file = f'{directory_info.directory}/{idx}_audio_clean.flac'

        text_int = np.array(self.text_transform.text_to_int(text), dtype=np.int64)

        result = {'audio_features':torch.from_numpy(mfccs).pin_memory(), 'emg':torch.from_numpy(emg).pin_memory(), 'text':text, 'text_int': torch.from_numpy(text_int).pin_memory(), 'file_label':idx, 'session_ids':torch.from_numpy(session_ids).pin_memory(), 'book_location':book_location, 'silent':directory_info.silent, 'raw_emg':torch.from_numpy(raw_emg).pin_memory()}

        if directory_info.silent:
            voiced_directory, voiced_idx = self.voiced_data_locations[book_location]
            voiced_mfccs, voiced_emg, _, _, phonemes, _ = load_utterance(voiced_directory.directory, voiced_idx, False, text_align_directory=self.text_align_directory)

            if not self.no_normalizers:
                voiced_mfccs = self.mfcc_norm.normalize(voiced_mfccs)
                voiced_emg = self.emg_norm.normalize(voiced_emg)
                voiced_emg = 8*np.tanh(voiced_emg/8.)

            result['parallel_voiced_audio_features'] = torch.from_numpy(voiced_mfccs).pin_memory()
            result['parallel_voiced_emg'] = torch.from_numpy(voiced_emg).pin_memory()

            audio_file = f'{voiced_directory.directory}/{voiced_idx}_audio_clean.flac'

        result['phonemes'] = torch.from_numpy(phonemes).pin_memory() # either from this example if vocalized or aligned example if silent
        result['audio_file'] = audio_file

        return result

    @staticmethod
    def collate_raw(batch):
        batch_size = len(batch)
        audio_features = []
        audio_feature_lengths = []
        parallel_emg = []
        for ex in batch:
            if ex['silent']:
                audio_features.append(ex['parallel_voiced_audio_features'])
                audio_feature_lengths.append(ex['parallel_voiced_audio_features'].shape[0])
                parallel_emg.append(ex['parallel_voiced_emg'])
            else:
                audio_features.append(ex['audio_features'])
                audio_feature_lengths.append(ex['audio_features'].shape[0])
                parallel_emg.append(np.zeros(1))
        phonemes = [ex['phonemes'] for ex in batch]
        emg = [ex['emg'] for ex in batch]
        raw_emg = [ex['raw_emg'] for ex in batch]
        session_ids = [ex['session_ids'] for ex in batch]
        lengths = [ex['emg'].shape[0] for ex in batch]
        silent = [ex['silent'] for ex in batch]
        text_ints = [ex['text_int'] for ex in batch]
        text_lengths = [ex['text_int'].shape[0] for ex in batch]

        result = {'audio_features':audio_features,
                  'audio_feature_lengths':audio_feature_lengths,
                  'emg':emg,
                  'raw_emg':raw_emg,
                  'parallel_voiced_emg':parallel_emg,
                  'phonemes':phonemes,
                  'session_ids':session_ids,
                  'lengths':lengths,
                  'silent':silent,
                  'text_int':text_ints,
                  'text_int_lengths':text_lengths}
        return result

def make_normalizers():
    dataset = EMGDataset(no_normalizers=True)
    mfcc_samples = []
    emg_samples = []
    for d in dataset:
        mfcc_samples.append(d['audio_features'])
        emg_samples.append(d['emg'])
        if len(emg_samples) > 50:
            break
    mfcc_norm = FeatureNormalizer(mfcc_samples, share_scale=True)
    emg_norm = FeatureNormalizer(emg_samples, share_scale=False)
    pickle.dump((mfcc_norm, emg_norm), open(FLAGS.normalizers_file, 'wb'))

if __name__ == '__main__':
    d = EMGDataset()
    for i in range(1000):
        d[i]


In [None]:
!git clone https://github.com/sooftware/conformer.git
!cd conformer && pip install .

!git clone --recursive https://github.com/parlance/ctcdecode.git
!cd ctcdecode && pip install .

In [None]:
import torch
from conformer import Conformer
from torch import nn
from typing import Tuple

class ConformerCTC(nn.Module):
    def __init__(self,
#                  freq_mask: int = 27,
#                  time_mask_ratio: float = 0.05,
                 **kwargs):
        super(ConformerCTC, self).__init__()
#         self.spec_aug = SpecAug(freq_mask, time_mask_ratio)
        self.encoder = Conformer(**kwargs)
    
    def forward(self,
                inputs: torch.Tensor,
                input_length: torch.Tensor
               ) -> Tuple[torch.Tensor, torch.Tensor]:
#         inputs = self.spec_aug(inputs)
        outputs, output_lengths = self.encoder(inputs, input_length)
        return outputs, output_lengths


In [None]:
from pathlib import Path

Path('/kaggle/working/outputs').mkdir(parents=True, exist_ok=True)

In [None]:
def make_input(specs, device):
    '''
        specs: (batch, time step, feature)
    '''
    batch, time_step, _ = specs.size()
    input_length = torch.full(size=(batch,),
                             fill_value=time_step, dtype=torch.long)
    return specs.to(device), input_length.to(device)

In [None]:
def make_target(transcript, device):
    target_length = torch.LongTensor([i.size(0) for i in transcript])
    #print(target_length)
    target = torch.nn.utils.rnn.pad_sequence(transcript, batch_first=True)
    return target.to(device), target_length.to(device)


In [None]:
import os
import sys
import numpy as np
import logging
import subprocess
from ctcdecode import CTCBeamDecoder
import jiwer
import random
import librosa
import gc

import torch
from torch import nn
import torch.nn.functional as F

from read_emg import EMGDataset, SizeAwareSampler
from data_utils import combine_fixed_length, decollate_tensor

from absl import flags
FLAGS = flags.FLAGS

import scipy.signal as signal

gc.get_threshold()
gc.get_count()
gc.collect()
gc.get_count()

def test(model, testset, device):
    model.eval()

    blank_id = len(testset.text_transform.chars)
    decoder = CTCBeamDecoder(testset.text_transform.chars+'_', blank_id=blank_id, log_probs_input=True,
            model_path='/kaggle/input/librispeech-4gram-language-model/4-gram-librispeech.bin', alpha=1.5, beta=1.85)

    dataloader = torch.utils.data.DataLoader(testset, batch_size=1)
    references = []
    predictions = []
    
    
    n_chars = len(testset.text_transform.chars)
    spec = testset.num_features
    
    hyp = dict(
    num_classes=n_chars+1,
    input_dim=spec,
    encoder_dim=144,
    num_encoder_layers=16,
    num_attention_heads=4,
    conv_kernel_size=31)
    
    model = ConformerCTC(**hyp).to(device)
    
    with torch.no_grad():
        for example in dataloader:
            X = example['emg'].to(device)
            X_raw = example['raw_emg'].to(device)
            sess = example['session_ids'].to(device)

            inputs, input_lengths = make_input(X, device)
            pred, preds_length  = model(inputs, input_lengths)
            #pred = F.log_softmax(pred, -1)
            

            beam_results, beam_scores, timesteps, out_lens = decoder.decode(pred)
            pred_int = beam_results[0,0,:out_lens[0,0]].tolist()

            pred_text = testset.text_transform.int_to_text(pred_int)
            #print('pred_text: ', pred_text)
            target_text = testset.text_transform.clean_text(example['text'][0])
            #print('target text: ',target_text)
            print(len(target_text))
            
            if len(target_text) != 0:
                references.append(target_text)
                #print('references: ', references)
                predictions.append(pred_text)
                #print('predictions: ', predictions)
                #print(len(references))
            else:
                continue
    model.train()
    print('references: ', references)
    print('predictions: ', predictions)
    return jiwer.wer(references, predictions)

def pad_target(target, expected_length, blank_symbol):
    # Verifica o tamanho atual do alvo (target)
    current_length = target.size(0)

    # Se o tamanho atual já for igual ao esperado, não é necessário fazer o preenchimento
    if current_length == expected_length:
        return target

    # Calcula quantos símbolos em branco precisam ser adicionados ao alvo
    num_blanks = expected_length - current_length

    # Transpõe o tensor para que o preenchimento seja aplicado na dimensão correta
    transposed_target = target.transpose(0, 1)

    # Faz o preenchimento do tensor de destino
    padded_target = F.pad(transposed_target, (0, num_blanks), value=blank_symbol)

    # Transpõe o tensor de volta para sua forma original
    padded_target = padded_target.transpose(0, 1)

    return padded_target


def train_model(trainset, devset, device, n_epochs=200):
    dataloader = torch.utils.data.DataLoader(trainset, pin_memory=(device=='cuda'), num_workers=0, collate_fn=EMGDataset.collate_raw, batch_sampler=SizeAwareSampler(trainset, 128000))

    n_chars = len(devset.text_transform.chars)
    spec = devset.num_features
    
    hyp = dict(
    num_classes=n_chars+1,
    input_dim=spec,
    encoder_dim=144,
    num_encoder_layers=16,
    num_attention_heads=4,
    conv_kernel_size=31)
    
    model = ConformerCTC(**hyp).to(device)

    if FLAGS.start_training_from is not None:
        state_dict = torch.load(FLAGS.start_training_from)
        model.load_state_dict(state_dict, strict=False)

    optim = torch.optim.AdamW(model.parameters(), lr=FLAGS.learning_rate, weight_decay=FLAGS.l2)
    lr_sched = torch.optim.lr_scheduler.MultiStepLR(optim, milestones=[125,150,175], gamma=.5)

    def set_lr(new_lr):
        for param_group in optim.param_groups:
            param_group['lr'] = new_lr

    target_lr = FLAGS.learning_rate
    def schedule_lr(iteration):
        iteration = iteration + 1
        if iteration <= FLAGS.learning_rate_warmup:
            set_lr(iteration*target_lr/FLAGS.learning_rate_warmup)

    batch_idx = 0
    optim.zero_grad()
    for epoch_idx in range(n_epochs):
        gc.collect()
        gc.get_count()
        losses = []
        max_target_length = 0
        all_y = []
        for example in dataloader:
            schedule_lr(batch_idx)
            
            X = combine_fixed_length(example['emg'], 200).to(device)
            sess = combine_fixed_length(example['session_ids'], 200).to(device)
            X_raw = combine_fixed_length(example['raw_emg'], 200*8).to(device)
            
            inputs, input_lengths = make_input(X, device)
            pred, pred_length = model(inputs, input_lengths)

            pred = nn.utils.rnn.pad_sequence(decollate_tensor(pred, pred_length), batch_first=False)
            y = example['text_int']
            target, target_lengths = make_target(y, device)
            
            gc.get_count()
            gc.collect()
            gc.get_count()

            expected_batch_size = pred.size(1)   # Obtém o tamanho do lote esperado a partir do tensor pred

            #print("Tamanho de target_lengths:", len(target_lengths))
            #print("Tamanho esperado do lote:", expected_batch_size)

            # Verifica se o tamanho do lote (batch size) do tensor target_lengths é igual ao esperado
            if len(target_lengths) != expected_batch_size:
                # Ajusta o tamanho do tensor target_lengths para corresponder ao tamanho do lote esperado
                target_lengths_tensor = torch.tensor(target_lengths, dtype=torch.long, device=device)
                #print("Tamanho de target_lengths_tensor antes:", target_lengths_tensor.size())

                # Calcula o número de elementos a serem preenchidos com zeros
                num_elements_to_pad = expected_batch_size - len(target_lengths)

                # Preenche o tensor com zeros para igualar ao tamanho do lote esperado
                target_lengths_tensor = F.pad(target_lengths_tensor, (0, num_elements_to_pad), value=0)
                #print("Tamanho de target_lengths_tensor depois:", target_lengths_tensor.size())
              
    
            device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
            target = target.to(device)
            
            blank_symbol = 0  # Replace 0 with the appropriate blank symbol index
            padded_target = pad_target(target, expected_batch_size, blank_symbol)
            
            #print("Tamanho de pred:", pred.size())
            #print("Tamanho de target:", target.size())
            #print("Tamanho de padded_target:", padded_target.size())
            #print("Tamanho de pred_length:", pred_length.size())
            #print("Tamanho de target_lengths_tensor:", target_lengths_tensor.size())

    
            cal_loss = nn.CTCLoss(zero_infinity=True).to(device)
            loss = cal_loss(pred, padded_target, pred_length, target_lengths_tensor)
            gc.collect()
            print(loss)


            losses.append(loss.item())

            loss.backward()
            if (batch_idx+1) % 2 == 0:
                optim.step()
                optim.zero_grad()

            batch_idx += 1
        train_loss = np.mean(losses)
        val = test(model, devset, device)
        lr_sched.step()
        logging.info(f'finished epoch {epoch_idx+1} - training loss: {train_loss:.4f} validation WER: {val*100:.2f}')
        print(f'finished epoch {epoch_idx+1} - training loss: {train_loss:.4f} validation WER: {val*100:.2f}')
        torch.save(model.state_dict(), os.path.join(FLAGS.output_directory,'model.pt'))

    model.load_state_dict(torch.load(os.path.join(FLAGS.output_directory,'model.pt'))) # re-load best parameters
    return model

def evaluate_saved():
    device = 'cuda' if torch.cuda.is_available() and not FLAGS.debug else 'cpu'
    testset = EMGDataset(test=True)
    
    n_chars = len(testset.text_transform.chars)
    spec = testset.num_features
    
    hyp = dict(
    num_classes=n_chars+1,
    input_dim=spec,
    encoder_dim=144,
    num_encoder_layers=16,
    num_attention_heads=4,
    conv_kernel_size=31)
    
    model = ConformerCTC(**hyp).to(device)
    model.load_state_dict(torch.load(FLAGS.evaluate_saved))
    print('WER:', test(model, testset, device))

def main():
    os.makedirs(FLAGS.output_directory, exist_ok=True)
    logging.basicConfig(handlers=[
            logging.FileHandler(os.path.join(FLAGS.output_directory, 'log.txt'), 'w'),
            logging.StreamHandler()
            ], level=logging.INFO, format="%(message)s")

    logging.info(subprocess.run(['git','rev-parse','HEAD'], stdout=subprocess.PIPE, universal_newlines=True).stdout)
    logging.info(subprocess.run(['git','diff'], stdout=subprocess.PIPE, universal_newlines=True).stdout)

    logging.info(sys.argv)

    trainset = EMGDataset(dev=False,test=False).subset(0.8)
    devset = EMGDataset(dev=True)
    logging.info('output example: %s', devset.example_indices[0])
    logging.info('train / dev split: %d %d',len(trainset),len(devset))

    device = 'cuda' if torch.cuda.is_available() and not FLAGS.debug else 'cpu'
    
    
    model = train_model(trainset, devset, device)

if __name__ == '__main__':
    if FLAGS.evaluate_saved is not None:
        evaluate_saved()
    else:
        main()