In [1]:
import os
import glob

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

import librosa

from sklearn.preprocessing import StandardScaler

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

In [2]:
AUDIO_DIR = r'C:\Users\joaov_zm1q2wh\python\icassp_challenge\joao\data\task1\training\phonationA'
LABELS_PATH = r'C:\Users\joaov_zm1q2wh\python\icassp_challenge\joao\data\task1\labels.csv'
SAMPLE_RATE = 8000
WAVEFORM_INPUT_DIM = 1 
LATENT_DIM = 20
BATCH_SIZE = 64
N_EPOCHS = 10

### Sinal de áudio puro

Quando você decide processar o sinal de áudio puro, o sinal é apenas uma sequência de amostras de amplitude no tempo.

Em cada passo de tempo, o dado de entrada é um único valor numérico (a amplitude).

$$\text{Sinal de áudio: } [a_1, a_2, a_3, a_4, \dots, a_N]$$

Como o LSTM espera a dimensão: [Batch Size, Length, Feature Dimension]

Precisamos moldar o sinal de áudio, que é 1D, para ter uma terceira dimensão (a dimensão da feature).

O array de amostras $N$ é transformado em um array de vetores de dimensão 1, ou seja, $N \times 1$.$$\text{Novo Input: } [\text{Batch Size}, \text{N (amostras)}, \mathbf{1}]$$Portanto, essa variável WAVEFORM_INPUT_DIM = 1 é como dizer para o modelo: "Em cada passo de tempo da sequência, a entrada é um vetor de dimensão 1."

In [4]:
class AudioWaveformDataset(Dataset):
    def __init__(self, audio_files, csv_path, sample_rate=8000, scaler=None):
        self.audio_files = audio_files
        self.sample_rate = sample_rate
        self.scaler = scaler

        self.df_labels = pd.read_csv(csv_path)
        self.id_to_class = {row['ID']: int(row['Class']) for _, row in self.df_labels.iterrows()}

        self.waveforms = []
        self.classes = []

        all_samples = []

        print("Iniciando carregamento e processamento dos áudios...")
        for file in self.audio_files:
            waveform = self.load_waveform(file)
            if waveform is not None:
                waveform = waveform.reshape(-1, WAVEFORM_INPUT_DIM) 
                
                self.waveforms.append(waveform)
                all_samples.append(waveform)

                file_id = os.path.basename(file).split('_')[0]
                if file_id not in self.id_to_class:
                    raise ValueError(f"ID {file_id} não encontrado no CSV")
                self.classes.append(self.id_to_class[file_id])

        if self.scaler is None:
            all_frames = np.concatenate(all_samples, axis=0) 
            self.scaler = StandardScaler()
            self.scaler.fit(all_frames)
            print("Scaler ajustado com sucesso no sinal puro (waveform).")

        self.normalized_waveforms = [
            torch.tensor(self.scaler.transform(w), dtype=torch.float32)
            for w in self.waveforms
        ]

        print(f"Total de {len(self.normalized_waveforms)} áudios carregados e normalizados.")

    def load_waveform(self, file_path):
        try:
            audio, sr = librosa.load(file_path, sr=self.sample_rate)
            return audio
        except Exception as e:
            print(f"Erro ao processar {file_path}: {e}")
            return None

    def __len__(self):
        return len(self.normalized_waveforms)

    def __getitem__(self, idx):
        waveform = self.normalized_waveforms[idx]
        length = waveform.shape[0]
        classe = self.classes[idx]
        return waveform, length, classe

def collate_fn(batch):
    sequences, lengths, classes = zip(*batch)
    lengths = torch.tensor(lengths, dtype=torch.long)
    padded_batch = pad_sequence(sequences, batch_first=True)
    
    lengths, sorted_idx = lengths.sort(descending=True)
    padded_batch = padded_batch[sorted_idx]
    classes = torch.tensor([classes[i] for i in sorted_idx], dtype=torch.long)

    return padded_batch, lengths, classes

In [5]:
class LSTMAutoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim, num_layers=1):
        super(LSTMAutoencoder, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim
        self.num_layers = num_layers

        self.encoder_lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True
        )
        self.encoder_lstm.flatten_parameters()
        self.encoder_linear = nn.Linear(hidden_dim, latent_dim)
        
        self.decoder_linear = nn.Linear(latent_dim, hidden_dim)
        self.decoder_lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True
        )
        self.decoder_lstm.flatten_parameters()

        self.output_linear = nn.Linear(hidden_dim, input_dim)

    def forward(self, x, lengths):
        batch_size, max_len, _ = x.size()

        packed_input = pack_padded_sequence(x, lengths.cpu(), batch_first=True)
        _, (h_n, c_n) = self.encoder_lstm(packed_input)
        z = self.encoder_linear(h_n[-1])

        h_0_decoder = self.decoder_linear(z).unsqueeze(0).repeat(self.num_layers, 1, 1)
        c_0_decoder = torch.zeros_like(h_0_decoder)

        packed_input_decoder = pack_padded_sequence(x, lengths.cpu(), batch_first=True)
        packed_output, _ = self.decoder_lstm(packed_input_decoder, (h_0_decoder, c_0_decoder))

        output, _ = pad_packed_sequence(packed_output, batch_first=True, total_length=max_len)
        x_recon = self.output_linear(output)

        return x_recon, z

In [None]:
all_audio_files = glob.glob(os.path.join(AUDIO_DIR, "*.wav"))

if not all_audio_files:
    print(f"Nenhum arquivo de áudio encontrado em {AUDIO_DIR}. Verifique o caminho e a extensão.")
else:
    print(f"Encontrados {len(all_audio_files)} arquivos de áudio.")
    
    dataset = AudioWaveformDataset(all_audio_files, csv_path=LABELS_PATH, sample_rate=SAMPLE_RATE) 
    dataloader = DataLoader(
        dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        collate_fn=collate_fn,
        num_workers=0
    )
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # device = torch.device("cpu")
    
    if device.type == 'cuda':
        print("Desabilitando cuDNN para evitar RuntimeError...")
        torch.backends.cudnn.enabled = False
    
    model = LSTMAutoencoder(input_dim=WAVEFORM_INPUT_DIM, hidden_dim=64, latent_dim=LATENT_DIM).to(device)
    criterion = nn.MSELoss(reduction='none')
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    loss_history = []
    print("\nIniciando treinamento...")
    for epoch in range(N_EPOCHS):
        total_loss = 0
        print('For 1')
        for batch, lengths, classes in dataloader:
            print('For 2')
            batch = batch.to(device)
            
            optimizer.zero_grad()
            recon, z = model(batch, lengths)
            
            mask = torch.zeros_like(batch, dtype=torch.bool).to(device)
            for i, l in enumerate(lengths):
                print('For 3')
                mask[i, :l, :] = True
            
            loss_all = criterion(recon, batch)
            loss_masked = loss_all * mask.float() 
            loss = loss_masked.sum() / mask.sum() 
            
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item() * lengths.size(0)
        
        avg_loss = total_loss / len(dataset)
        loss_history.append(avg_loss)
        print(f"Epoch {epoch+1}/{N_EPOCHS} | Loss: {avg_loss:.6f}")
            
    print("Treinamento concluído.")

Encontrados 272 arquivos de áudio.
Iniciando carregamento e processamento dos áudios...
Scaler ajustado com sucesso no sinal puro (waveform).
Total de 272 áudios carregados e normalizados.
Desabilitando cuDNN para evitar RuntimeError...

Iniciando treinamento...
For 1
For 2
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 2
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 2
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 2
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 2
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 2
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 2
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3
For 3


In [None]:
plt.figure(figsize=(8,5))
plt.plot(range(1, N_EPOCHS+1), loss_history, marker=',')
plt.title("Loss vs Épocas (Escala Logarítmica)")
plt.xlabel("Época")
plt.ylabel("Loss")
plt.yscale('log')
plt.grid(True, which='both', linestyle='--', linewidth=0.5)
plt.show()

In [None]:
if 'model' in locals() and LATENT_DIM in [2, 3]:
    print("\nExtraindo vetores latentes...")
    
    model.eval()
    latent_vectors = []
    all_classes = []
    
    with torch.no_grad():
        for batch, lengths, classes in dataloader:
            batch = batch.to(device)
            _, z = model(batch, lengths)
            latent_vectors.append(z.cpu().numpy())
            all_classes.append(classes.cpu().numpy())
            
    latent_vectors = np.concatenate(latent_vectors, axis=0)
    all_classes = np.concatenate(all_classes, axis=0)
    
    
    unique_classes = np.unique(all_classes)
    cmap = plt.cm.get_cmap('tab10', len(unique_classes))
    class_to_idx = {cls: i for i, cls in enumerate(unique_classes)}
    colors = [cmap(class_to_idx[cls]) for cls in all_classes]
        
    fig = plt.figure(figsize=(10, 8))
    
    if LATENT_DIM == 3:
        ax = fig.add_subplot(111, projection='3d')
        scatter = ax.scatter(latent_vectors[:, 0], latent_vectors[:, 1], latent_vectors[:, 2], c=colors, alpha=0.6)
        
        # Ajuste de limites para visualização
        ax.set_xlim(latent_vectors[:, 0].min(), latent_vectors[:, 0].max())
        ax.set_ylim(latent_vectors[:, 1].min(), latent_vectors[:, 1].max())
        ax.set_zlim(latent_vectors[:, 2].min(), latent_vectors[:, 2].max())

        ax.set_title('Espaço Latente 3D (Waveform)')
        ax.set_xlabel('z1')
        ax.set_ylabel('z2')
        ax.set_zlabel('z3')
    elif LATENT_DIM == 2:
        ax = fig.add_subplot(111)
        scatter = ax.scatter(latent_vectors[:, 0], latent_vectors[:, 1], c=colors, alpha=0.6)
        ax.set_title('Espaço Latente 2D (Waveform)')
        ax.set_xlabel('z1')
        ax.set_ylabel('z2')
    
    handles = [plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=cmap(class_to_idx[cls]), markersize=10, label=f'Classe {cls}') for cls in unique_classes]
    ax.legend(handles=handles, title="Classes")
    plt.show()
    
        
    print(f'Plotagem concluída. Total de {len(latent_vectors)} vetores latentes extraídos.')
else:
    print('Plotagem do espaço latente não realizada (LATENT_DIM deve ser 2 ou 3).')