# importação de bibliotecas

In [1]:
import numpy as np
from scipy.io import wavfile
import matplotlib.pyplot as plt
from scipy.signal import find_peaks, butter, filtfilt
from scipy.fft import fft, ifft
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Declaração de funções

In [2]:
def lowpass_filter(data, cutoff_freq, sample_rate, filter_order=5):
    nyquist_rate = 0.5 * sample_rate
    normalized_cutoff = cutoff_freq/ nyquist_rate
    b_coeficiente,a_coeficiente = butter(filter_order, normalized_cutoff, btype='low',analog=False)
    filtered_signal = filtfilt(b_coeficiente,a_coeficiente,data)
    return filtered_signal

def normalize_audio_int32(data):
    max_val = np.max(np.abs(data))
    normalized_data = data / max_val * 2147483647
    return normalized_data.astype(np.int32)

# Input do audio

In [3]:
sample_rate, audio_signal = wavfile.read('teste.wav')
time_axis = np.linspace(0, len(audio_signal) / sample_rate, len(audio_signal))

# Se o sinal tiver mais de um canal, selecionar apenas o primeiro canal
if len(audio_signal.shape) > 1:
    audio_signal = audio_signal[:, 0]

print(f"Sample Rate: {sample_rate}")
print(f"Audio Signal Shape: {audio_signal.shape}")
print(f"Max Value (Original Signal): {np.max(audio_signal)}")
print(f"Min Value (Original Signal): {np.min(audio_signal)}")

# Aplicar FFT ao sinal de áudio
fft_signal = fft(audio_signal)
frequencies = np.fft.fftfreq(len(audio_signal), 1 / sample_rate)

# Filtrar frequências acima de 1000 Hz
cutoff_frequency = 1000
fft_signal[np.abs(frequencies) > cutoff_frequency] = 0

# Transformar de volta ao domínio do tempo
filtered_signal = np.real(ifft(fft_signal))

# variáveis para treinamento da IA
threshold = np.mean(audio_signal) + 2 * np.std(audio_signal)
labels = (audio_signal > threshold).astype(int)
max_val = np.max(np.abs(audio_signal))
normalized_signal = audio_signal / max_val

windows_size = 100
overlap = 50
x =[]
y = []

for i in range(0,len(normalized_signal) - windows_size +1, windows_size - overlap):
    x.append(normalized_signal[i:i + windows_size])
    y.append(labels[i + windows_size//2])

X = np.array(x)
Y = np.array(y)

X_tensor = torch.tensor(X, dtype=torch.float32)
Y_tensor = torch.tensor(Y, dtype=torch.float32)

dataset = TensorDataset(X_tensor,Y_tensor)
dataLoader = DataLoader(dataset,batch_size=32, shuffle=True)

class PeakDetector(nn.Module):
    def __init__(self):
        super(PeakDetector, self).__init__()
        self.fc1 = nn.Linear(windows_size, 100) 
        self.fc2 = nn.Linear(100, 50)
        self.fc3 = nn.Linear(50,10)
        self.fc4 = nn.Linear(10,1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = self.sigmoid(self.fc4(x))
        return x
    
model = PeakDetector()
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) # teste com 0,0001 deu errado

num_epochs = 400
for epoch in range(num_epochs):
    for inputs, targets in dataLoader:
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f'Eposh {epoch+1}/{num_epochs}, loss: {loss.item()}')

with torch.no_grad():
    test_signal = normalized_signal[:windows_size]
    teste_tensor = torch.tensor(test_signal, dtype=torch.float32).unsqueeze(0)
    predicted_peak = model(teste_tensor).item()

    print(f'Probabilidade de pico na janela de teste: {predicted_peak}')

  sample_rate, audio_signal = wavfile.read('teste.wav')


Sample Rate: 96000
Audio Signal Shape: (1681979,)
Max Value (Original Signal): 0.9989989995956421
Min Value (Original Signal): -0.9989989995956421
Eposh 1/400, loss: 0.0038107887376099825
Eposh 2/400, loss: 1.3410296560323332e-05
Eposh 3/400, loss: 0.18477876484394073
Eposh 4/400, loss: 1.4968496770961792e-06
Eposh 5/400, loss: 8.033609628910199e-05
Eposh 6/400, loss: 2.1298410501913168e-07
Eposh 7/400, loss: 1.5227548146867775e-06
Eposh 8/400, loss: 1.3039530131209176e-05
Eposh 9/400, loss: 0.001694322912953794
Eposh 10/400, loss: 3.036392968169821e-07
Eposh 11/400, loss: 2.5366412614857836e-07
Eposh 12/400, loss: 7.167331617097261e-09
Eposh 13/400, loss: 4.249192317451467e-11
Eposh 14/400, loss: 0.00020335096633061767
Eposh 15/400, loss: 1.5470317521248944e-05
Eposh 16/400, loss: 6.263629614977617e-10
Eposh 17/400, loss: 0.0012819808907806873
Eposh 18/400, loss: 0.009733414277434349
Eposh 19/400, loss: 0.05510278418660164
Eposh 20/400, loss: 8.30844559818189e-11
Eposh 21/400, loss: 2

In [4]:
# passar os picos detectados para um array
detected_peaks = []
with torch.no_grad():
    for i in range(len(normalized_signal) - windows_size):
        window = normalized_signal[i:i + windows_size]
        window_tensor = torch.tensor(window, dtype=torch.float32).unsqueeze(0)
        peak_prob = model(window_tensor).item()
        detected_peaks.append(peak_prob)

In [49]:
len_to_one_second = 0
for i in range(len(time_axis)):
    if time_axis[i] == 1:
        len_to_one_second = i
        break

controle_tempo = 0
tempos = []
tempos_baixo = []
for i in range(len(detected_peaks)):
    if detected_peaks[i] >= 0.8 and detected_peaks[i+1] >= 0.8:
        controle_tempo =+ 1
        if controle_tempo >= len_to_one_second:
            tempos.append(int(time_axis[i]))
    elif detected_peaks[i] >= 0.8 and detected_peaks[i+1] < 0.8:
        controle_tempo = 0
for i in range(len(detected_peaks)):
    if detected_peaks[i] == 0 and detected_peaks[i+1] == 0:
        controle_tempo =+ 1
        if controle_tempo >= len_to_one_second:
            tempos_baixo.append(int(time_axis[i]))
    elif detected_peaks[i] > 0.1 :
        controle_tempo = 0


In [28]:
def remove_duplicatas(lista):
    lista_sem_duplicatas = []
    for i in range(len(lista)):
        if lista[i] not in lista_sem_duplicatas:
            lista_sem_duplicatas.append(lista[i])
    return lista_sem_duplicatas

def min_max_tuplas(lista):
    sequencias = [lista[0]]
    juncao_tempos = []
    segments_to_attenuate = []
    for i in range(1,len(lista)):
        if lista[i] -1 == lista[i-1]:
            sequencias.append(lista[i])
        else:
            juncao_tempos.append(sequencias)
            sequencias = [lista[i]]
    juncao_tempos.append(sequencias)
    for i in range(len(juncao_tempos)):
        segments_to_attenuate.append((min(juncao_tempos[i]),max(juncao_tempos[i])))
    return segments_to_attenuate

In [45]:
tempos_unicos = []
tempos_unicos_baixo = []

for i in range(len(tempos)):
    if tempos[i] not in tempos_unicos:
        tempos_unicos.append(tempos[i])
for i in range(len(tempos_baixo)):
    if tempos_baixo[i] not in tempos_unicos_baixo:
        tempos_unicos_baixo.append(tempos_baixo[i])

sequencias = [tempos_unicos[0]]
juncao_tempos = []
segments_to_attenuate = []
for i in range(1,len(tempos_unicos)):
    if tempos_unicos[i] -1 == tempos_unicos[i-1]:
        sequencias.append(tempos_unicos[i])
    else:
        juncao_tempos.append(sequencias)
        sequencias = [tempos_unicos[i]]
juncao_tempos.append(sequencias)
for i in range(len(juncao_tempos)):
    segments_to_attenuate.append((min(juncao_tempos[i]),max(juncao_tempos[i])))


In [50]:
#print(segments_to_attenuate_hight)
print(len(tempos_baixo))

682929


In [23]:

attenuation_factor = 0.5

attenuated_signal = filtered_signal.copy()
for start_time, end_time in segments_to_attenuate:
    start_sample = int(start_time * sample_rate)
    end_sample = int(end_time * sample_rate)
    attenuated_signal[start_sample:end_sample] *= attenuation_factor

# Normalizar o sinal atenuado
normalized_filtered_signal = normalize_audio_int32(attenuated_signal)
output_filename = 'saida3.wav'
wavfile.write(output_filename, sample_rate, normalized_filtered_signal)

print(f"Sinal filtrado, atenuado e normalizado salvo como {output_filename}")

Sinal filtrado, atenuado e normalizado salvo como saida3.wav
