In [None]:
%%capture
# Instalação de pacotes adicionais
!pip install accelerate -U
!pip install jiwer
!pip install librosa==0.9.0 audiomentations datasets torchinfo

import librosa

In [2]:
import pandas as pd
import numpy as np
import os
from tqdm import tqdm
import torchaudio
import torch
from transformers import Wav2Vec2ForSequenceClassification, Wav2Vec2Processor
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from sklearn.model_selection import train_test_split
import warnings
from audiomentations import TimeMask, PitchShift, TanhDistortion, AddBackgroundNoise, SpecFrequencyMask
import librosa
from scipy.io.wavfile import write
import warnings
import torchaudio.transforms as T

warnings.filterwarnings('ignore')
torchaudio.set_audio_backend("sox_io")

In [None]:
%%capture
!gdown 1HhoZcMDpLtXQe2BUzo9rRg8Itl2EuWi5 # dados de áudio
!gdown 10AKwVzDlvvYl_TvMlAD6x0P4efArhJ5y # Caminhos e Transcrições
!gdown 1FERZhKvNKcdpFGdEhSL7Uq3EcIU8k7Qd # Background noises

# Descompactar dados de áudio e ruídos de fundo
!unzip /content/Dados_wav.zip
!unzip /content/bg_noises.zip


In [4]:
df = pd.read_csv("/content/audio_data.csv")
df['path'] = df['path'].apply(lambda x: '/content/'+ x[55:])
cleaned_df = df[df['transcription'] != 'Transcription failed']


In [5]:
def find_longest_audio_length(df):
    max_length = 0
    for _, row in tqdm(df.iterrows(), total=df.shape[0]):
        audio_path = row['path']
        waveform, sample_rate = torchaudio.load(audio_path)
        length = waveform.shape[1]
        if length > max_length:
            max_length = length
    return max_length

LONGEST_LENGTH = find_longest_audio_length(cleaned_df)
print(f"The longest audio length is: {LONGEST_LENGTH}")


100%|██████████| 1190/1190 [00:27<00:00, 43.50it/s]

The longest audio length is: 144648





In [6]:
def augment_audio_with_background_noise(audio, sample_rate, noise_folder='/content/bg_noises'):
    name = 'background_noise'
    background_noise_files = os.listdir(noise_folder)
    for file in background_noise_files:
        if file.endswith('.wav'):
            noise_file = os.path.join(noise_folder, file)
            noise_audio, _ = librosa.load(noise_file, sr=sample_rate)
            background_noise_aug = AddBackgroundNoise(sounds_path=noise_folder, p=1.0)
            augmented_audio = background_noise_aug(audio, sample_rate=sample_rate)
    return augmented_audio, name

def augment_audio_with_tanh_distortion(audio, sample_rate):
    name = 'tanh_distortion'
    tanh_dist_aug = TanhDistortion(p=1.0)
    augmented_audio = tanh_dist_aug(audio, sample_rate=sample_rate)
    return augmented_audio, name

def augment_audio_with_pitch_shift(audio, sample_rate):
    name = 'pitch_shift'
    pitch_shift_aug = PitchShift(p=1.0)
    augmented_audio = pitch_shift_aug(audio, sample_rate=sample_rate)
    return augmented_audio, name

def augment_audio_with_time_mask(audio, sample_rate):
    name = 'time_mask'
    time_mask_aug = TimeMask(min_band_part=0.1, max_band_part=0.15, fade=True, p=1.0)
    augmented_audio = time_mask_aug(audio, sample_rate=sample_rate)
    return augmented_audio, name


In [7]:
def data_aug(dataset, aug_function):
    new_file_names = []
    new_labels = []
    transcriptions = []

    for index, row in dataset.iterrows():
        file_path = os.path.join(row['path'])
        audio, sr = librosa.load(file_path, sr=None)
        augmented_audio, name = aug_function(audio, sample_rate=sr)

        new_file_name = f"{os.path.dirname(file_path)}/{name}_{index}.wav"
        new_file_names.append(new_file_name)
        new_labels.append(row['class'])
        transcriptions.append(row['transcription'])

        write(os.path.join(new_file_name), sr, (augmented_audio * 32767).astype(np.int16))

    aug_data = pd.DataFrame()
    aug_data['path'] = new_file_names
    aug_data['class'] = new_labels
    return aug_data

# Aplicar data augmentation
aug1_data = data_aug(cleaned_df, augment_audio_with_background_noise)
aug2_data = data_aug(cleaned_df, augment_audio_with_tanh_distortion)
aug4_data = data_aug(cleaned_df, augment_audio_with_pitch_shift)
aug5_data = data_aug(cleaned_df, augment_audio_with_time_mask)

# Concatenar dados aumentados
cleaned_aug_df = pd.concat([cleaned_df, aug1_data, aug2_data, aug4_data, aug5_data])
print("Número total de amostras após o aumento de dados:", len(cleaned_aug_df))


Número total de amostras após o aumento de dados: 5950


In [8]:
train_df, test_df = train_test_split(cleaned_aug_df, test_size=0.2, random_state=42)

# Salvar os dados processados e aumentados
train_df.to_csv("/content/train_data.csv", index=False)
test_df.to_csv("/content/test_data.csv", index=False)


In [9]:
def pad_waveform(waveform, target_length):
    current_length = waveform.shape[1]
    if current_length < target_length:
        padding = target_length - current_length
        waveform = torch.nn.functional.pad(waveform, (0, padding), "constant", 0)
    return waveform

def extract_features(audio_file, target_length=144648):  # Utilize o valor de LONGEST_LENGTH calculado anteriormente
    waveform, sample_rate = torchaudio.load(audio_file)
    waveform = pad_waveform(waveform, target_length)
    input_values = processor(waveform.squeeze().numpy(), sampling_rate=sample_rate, return_tensors="pt", padding=True).input_values
    return input_values

def collate_fn(batch):
    input_values = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    input_values = torch.nn.utils.rnn.pad_sequence(input_values, batch_first=True, padding_value=processor.tokenizer.pad_token_id)
    labels = torch.tensor(labels)
    return input_values, labels


In [10]:
class CommandDataset(Dataset):
    def __init__(self, dataset):
        self.dataset = dataset
        self.resampler = T.Resample(orig_freq=48000, new_freq=16000)
        self.label_map = {cmd: idx for idx, cmd in enumerate(train_df['class'].unique())}

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        audio_path = self.dataset.iloc[idx]['path']
        command_class = self.dataset.iloc[idx]['class']
        label = self.label_map.get(command_class, -1)
        if label == -1:
            raise ValueError(f"Unknown class: {command_class}")

        waveform, sample_rate = torchaudio.load(audio_path)

        if sample_rate != 16000:
            waveform = self.resampler(waveform)

        input_values = processor(waveform.squeeze().numpy(), sampling_rate=16000, return_tensors='pt').input_values
        return input_values.squeeze(), label


In [None]:
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-large-xlsr-53-portuguese")
num_labels = len(train_df['class'].unique().tolist())
model = Wav2Vec2ForSequenceClassification.from_pretrained("facebook/wav2vec2-large-xlsr-53-portuguese", num_labels=num_labels)

train_dataset = CommandDataset(train_df)
test_dataset = CommandDataset(test_df)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)


In [12]:
num_epochs = 2  # Ajuste conforme necessário
learning_rate = 1e-4  # Ajuste conforme necessário
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

model.train()
for epoch in range(num_epochs):
    epoch_loss = 0.0
    for batch in tqdm(train_loader):
        input_values, labels = batch
        input_values = input_values.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(input_values, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss/len(train_loader)}")

# Salvar o modelo
model.save_pretrained("/content/command_model")
processor.save_pretrained("/content/command_processor")


100%|██████████| 298/298 [07:17<00:00,  1.47s/it]


Epoch 1/2, Loss: 1.3998359531764215


100%|██████████| 298/298 [07:13<00:00,  1.46s/it]


Epoch 2/2, Loss: 0.27375514368720405


[]

In [None]:
import shutil

# Nome do diretório que será criado e zipado
dir_name = "/content/model_and_data"

# Cria o diretório
os.makedirs(dir_name, exist_ok=True)

# Move os arquivos necessários para o diretório
shutil.move("/content/test_data.csv", os.path.join(dir_name, "test_data.csv"))
shutil.move("/content/command_processor", os.path.join(dir_name, "command_processor"))
shutil.move("/content/command_model", os.path.join(dir_name, "command_model"))

# Nome do arquivo zip final
zip_filename = "/content/model_and_data.zip"

# Zipa o diretório
shutil.make_archive(zip_filename.replace(".zip", ""), 'zip', dir_name)

# Verifica se o zip foi criado corretamente
if os.path.exists(zip_filename):
    print(f"Arquivos foram zipados com sucesso em {zip_filename}")
else:
    print("Falha ao criar o arquivo zip.")
