# Feature Extractor

- Notebook que faz a extracao das features da mesma forma que esta implementado no microncontrolador

In [None]:
import numpy as np
from scipy.signal.windows import hann
from scipy.io import wavfile
from soundfile import SoundFile
import tg_utils as utils
import pandas as pd
from pathlib import Path
from scipy.signal import sosfilt

In [None]:
## Definicao de constantes
BUFFER_LEN = 2048       # Tamanho do buffer utilizado para o calculo da FFT
INPUT_AUDIO_CHANNEL = 0 # Seleciona o canal dos arquivos de audio do microfone (0, 1) -- faz sentido apenas para o audio .flac
OVERLAP_FACTOR = 0.75
ADVANCE_BLOCK_SIZE = int(BUFFER_LEN * (1 - OVERLAP_FACTOR))
# filter = utils.GeradorFiltroPassaFaixa().projetar_filtro()
FAULT_ID_LIST = [
    1, # FAULT_ID_OFF
    2, # FAULT_ID_HEALTH
    3, # FAULT_ID_BEARING
    4, # FAULT_ID_FAN
    5  # FAULT_ID_GEAR
]

SOUND_FILES = [
    '../data/audio_files/off.wav',      # Off Condition with noise ((Fault_ID1))
    '../data/audio_files/health.wav',   # Healthy condition (Fault_ID2)
    '../data/audio_files/f1.wav',       # Bearing fault (Fault_ID3)
    '../data/audio_files/f2.wav',       # Fan fault (Fault_ID4)
    '../data/audio_files/f3.wav'        # Gear fault (Fault_ID5)
]

# Coeficientes do Filtro Passa-Faixa Butter de 4 estagios (Ordem 8)
# Fs=48000Hz, Fc=[20.0Hz, 20000.0Hz]
filterCoefs = [ [0.49803642, 0.99607284, 0.49803642, 1., 1.18440148, 0.36786567], 
                [ 1., 2., 1., 1., 1.45394167, 0.67891696],
                [ 1., -2., 1., 1., -1.99516502, 0.99517187],
                [ 1., -2., 1., 1., -1.99799243, 0.99799927] ]

# CSV_COLUMNS=['RMS','Mean','Median','Variance','Skewness','Kurtosis', 'CrestFactor','ShapeFactor','ImpulseFactor','MarginFactor', 'Peak1','Peak2','Peak3','PeakLocs1','PeakLocs2','PeakLocs3','FaultID']
# CSV_COLUMNS = ['RMS','Variance','Skewness','Kurtosis','CrestFactor','ShapeFactor','ImpulseFactor','MarginFactor','Peak1','Peak2','Peak3','PeakLocs1','PeakLocs2','PeakLocs3']
CSV_COLUMNS = ['RMS','Variance','Skewness','Kurtosis','CrestFactor','ShapeFactor','ImpulseFactor','MarginFactor','Peak1','Peak2','Peak3','PeakLocs1','PeakLocs2','PeakLocs3','FaultID']

In [None]:

# Processa cada um dos arquivos de audio
data_list = []

for sfile, fault_id in zip(SOUND_FILES, FAULT_ID_LIST):
    # Pre processamento
    # Cria um filtro com Janela Hanning e buffer
    window = hann(BUFFER_LEN)
    buffer = np.zeros(BUFFER_LEN, dtype='int16')
    input_signal = np.zeros(2 * BUFFER_LEN, dtype='float32')
    output_data = []
    
    # Abre o arquivo de audio como um objeto SoundFile
    with SoundFile(file=sfile, mode='r') as sf:
        # Imprime informacoes a respeito do audio
        print(sf.format_info)
        print(sf.subtype_info)
        print(sf.extra_info)
        print(f"sf.frames= {sf.frames}")
        # Percorre todos os frames do arquivo de audio
        while sf.tell() < sf.frames:
            # Define o tamanho do frame (pega o restante dos frames caso menor que BUFFER_LEN)
            sf_pos = sf.tell()
            frame_size = BUFFER_LEN if ((sf.frames - sf.tell()) > BUFFER_LEN) else (sf.frames - sf.tell())

            # Encerra o programa caso a quantidade de amostras restantes seja menor que BUFFER_LEN
            if frame_size < BUFFER_LEN:
                print(f"Frame size menor que BUFFER_LEN! frame size: {frame_size}")
                break

            # Copia os frames para o buffer
            sf.read(frames=frame_size, dtype='int16', out=buffer, always_2d=False)

            # Normaliza os dados de entrada
            nor_buffer = buffer / 2**16

            # Aplica o filtro no sinal de entrada
            # filtered_signal = filter.filtrar_sinal(nor_buffer)
            filtered_signal = sosfilt(filterCoefs, nor_buffer)

            # multiplica os valores lido pela janela de han
            input_signal[BUFFER_LEN:] = filtered_signal * window

            # o +1 no range final e porque a funcao de range eh intervalo fechado para o valor final/stop
            for index in range(ADVANCE_BLOCK_SIZE, BUFFER_LEN+1, ADVANCE_BLOCK_SIZE):
                
                # Cria bloco de dados contendo 75% de dados antigos (overlapping 75%)
                data = input_signal[index:(BUFFER_LEN+index)]

                # Faz a extracao das features no dominio do tempo e frequencia
                time_features = utils.ExtractTimeDomainFeatures(DataBuff=data, BufferLen=frame_size, Fs=sf.samplerate)
                frequency_features = utils.ExtractFrequencyDomainFeatures(DataBuff=data, BufferLen=frame_size, Fs=sf.samplerate)

                # Junta as features do tempo, frequencia e o id
                output_data.append(dict(zip(CSV_COLUMNS, [*time_features,*frequency_features, fault_id])))
            
            # atualiza primera metade do buffer com os dados da segunda metade
            # para o overlapping de 75%
            input_signal[:BUFFER_LEN] = input_signal[BUFFER_LEN:]


    print(output_data[0])
    print(f'Output data shape: {len(output_data)}')

    # Cria um dataframe
    df = pd.DataFrame(output_data, columns=CSV_COLUMNS)
    data_list.append(df)

    # sava o dataframe como arquivo csv
    sfile_name = Path(sfile).stem
    df.to_csv(f'../data/extracted_features/desktop/features_{sfile_name}_desktop.csv', index=False)
    print(f'Arquivo CSV salvo para {sfile_name}')

In [None]:
# Concatenate all processed data Dataframe
concat_data = pd.concat(data_list, ignore_index=True)
concat_data.to_csv('../data/extracted_features/features_like_uc.csv', index=False)

In [None]:
def calcula_primeiro_frame():
    # Processa cada um dos arquivos de audio
    # DEBUG: Processa apenas o primeiro batch de dados do arquivo de audio
    for sfile, fault_id in zip(SOUND_FILES, FAULT_ID_LIST):
        # Pre processamento
        # Cria um filtro com Janela Hanning e buffer
        window = hann(BUFFER_LEN)
        buffer = np.zeros(BUFFER_LEN, dtype='int16')
        input_signal = np.zeros(2 * BUFFER_LEN, dtype='float32')
        output_data = []
        
        # Abre o arquivo de audio como um objeto SoundFile
        with SoundFile(file=sfile, mode='r') as sf:
            # Imprime informacoes a respeito do audio
            print(sf.format_info)
            print(sf.subtype_info)
            print(sf.extra_info)
            print(f"sf.frames= {sf.frames}")
            
            # Define o tamanho do frame (pega o restante dos frames caso menor que BUFFER_LEN)
            sf_pos = sf.tell()
            frame_size = BUFFER_LEN if ((sf.frames - sf.tell()) > BUFFER_LEN) else (sf.frames - sf.tell())


            # Encerra o programa caso a quantidade de amostras restantes seja menor que BUFFER_LEN
            if frame_size < BUFFER_LEN:
                print(f"Frame size menor que BUFFER_LEN! frame size: {frame_size}")
                break

            # Copia os frames para o buffer
            sf.read(frames=frame_size, dtype='int16', out=buffer, always_2d=False)

            # multiplica os valores lido pela janela de han
            np.savetxt(f'../data/extracted_features/desktop/teste_{Path(sfile).stem}_1st_frame_before_han.txt', X=buffer)
            teste = buffer * window
            np.savetxt(f'../data/extracted_features/desktop/teste_{Path(sfile).stem}_1st_frame_after_han.txt', X=teste)

            # input_signal[BUFFER_LEN:] = buffer * window
            input_signal[BUFFER_LEN:] = buffer

            # o +1 no range final e porque a funcao de range eh intervalo fechado para o valor final/stop
            for index in range(ADVANCE_BLOCK_SIZE, BUFFER_LEN+1, ADVANCE_BLOCK_SIZE):
                
                # Cria bloco de dados contendo 75% de dados antigos (overlapping 75%)
                data = input_signal[index:(BUFFER_LEN+index)]

                # Faz a extracao das features no dominio do tempo e frequencia
                time_features = utils.ExtractTimeDomainFeatures(DataBuff=data, BufferLen=frame_size, Fs=sf.samplerate)
                frequency_features = utils.ExtractFrequencyDomainFeatures(DataBuff=data, BufferLen=frame_size, Fs=sf.samplerate)

                # Junta as features do tempo, frequencia e o id
                output_data.append(dict(zip(CSV_COLUMNS, [*time_features,*frequency_features, fault_id])))
            
            # atualiza primera metade do buffer com os dados da segunda metade
            # para o overlapping de 75%
            input_signal[:BUFFER_LEN] = input_signal[BUFFER_LEN:]


        print(output_data[0])
        print(f'Output data shape: {len(output_data)}')
        
        # Cria um dataframe
        df = pd.DataFrame(output_data, columns=CSV_COLUMNS)
        
        # sava o dataframe como arquivo csv
        sfile_name = Path(sfile).stem
        df.to_csv(f'../data/extracted_features/desktop/teste_1frame_features_{sfile_name}_desktop_.csv', index=False)
        print(f'Arquivo CSV salvo para {sfile_name}')    