# Data Augmentation for Processing in ASR

## Team: AI-NLP 

Author: Aline Rodrigues

In [None]:
import librosa
import numpy as np
import soundfile as sf
import random
import torch
import torchaudio
import pandas as pd

In [None]:
class DataAugmentation():
    """
        Author:  Aline Rodrigues
        Created: 01/11/2024
    """

    def __init__(self, dataset: str =''):        
        self.dataset = dataset
    
    
    def load_audio(self, file_path: str, sr=16000):
        audio, sample_rate = librosa.load(file_path, sr=sr)
        return audio, sample_rate


    def save_audio(self, file_path, audio, sample_rate):
        sf.write(file_path, audio, sample_rate)


    def time_stretch(self, audio, sr, rate=1.2):
        return librosa.effects.time_stretch(audio, rate=rate)

    def pitch_shift(self, audio, sr, n_steps=4):
        return librosa.effects.pitch_shift(y=audio, sr=sr, n_steps=n_steps)

    def add_noise(self, audio, sr, noise_factor=0.005):
        noise = np.random.randn(len(audio))
        augmented_audio = audio + noise_factor * noise
        return augmented_audio

    def apply_rir(self, audio, sr, rir):
        return np.convolve(audio, rir, mode='full')[:len(audio)]

    def generate_synthetic_rir(self, length=200):
        rir = np.random.normal(0, 1, length)
        rir = rir / np.linalg.norm(rir)  # Normalizar para manter a energia original
        return rir
    
    def spec_augment(self, spec, time_warping=80, freq_masking=27, time_masking=100):
        # Time Warping
        if random.random() > 0.5:  # 50% de chance de aplicar
            time_shift = random.randint(-time_warping, time_warping)
            spec = torch.roll(spec, shifts=time_shift, dims=-1)
        
        # Frequency Masking
        num_freqs = spec.size(0)  # Número de bins de frequência
        if freq_masking < num_freqs:
            freq_size = random.randint(0, min(freq_masking, num_freqs))  # Limite para freq_masking
            freq_start = random.randint(0, num_freqs - freq_size)
            spec[freq_start:freq_start + freq_size, :] = 0
        
        # Time Masking
        num_times = spec.size(1)  # Número de frames de tempo
        if time_masking < num_times:
            time_size = random.randint(0, min(time_masking, num_times))  # Limite para time_masking
            time_start = random.randint(0, num_times - time_size)
            spec[:, time_start:time_start + time_size] = 0
        
        return spec
    
    def spectrogram_to_waveform(self, spectrogram, n_iter=32):
        # Inversão do espectrograma usando Griffin-Lim
        griffin_lim = torchaudio.transforms.GriffinLim(n_fft=1024)
        waveform = griffin_lim(spectrogram)
        return waveform
    

    def augment_audio(self, audio, sr: int, output_path: str, alias: str, row: dict) -> dict:
        rows = []
        # 1. Time-Stretch
        stretched_audio = self.time_stretch(audio, rate=0.95)
        self.save_audio(f'{output_path}/{alias}_time_stretched.wav', stretched_audio, sr)
        path = row['path'].replace(alias, f'{alias}_time_stretched')
        row1 = row.copy()
        row1['path'] = path
        rows.append(row1)
        # 2. Pitch Shifting
        pitch_shifted_audio = self.pitch_shift(audio, sr, n_steps=1)
        self.save_audio(f'{output_path}/{alias}_pitch_shifted.wav', pitch_shifted_audio, sr)
        path = row['path'].replace(alias, f'{alias}_pitch_shifted')
        row1 = row.copy()
        row1['path'] = path
        rows.append(row1)
        # 3. Additive Noise
        noisy_audio = self.add_noise(audio, noise_factor=0.005)
        self.save_audio(f'{output_path}/{alias}_noisy.wav', noisy_audio, sr)
        path = row['path'].replace(alias, f'{alias}_noisy')
        row1 = row.copy()
        row1['path'] = path
        rows.append(row1)
        # 4. Room Impulse Response (RIR)
        rir = self.generate_synthetic_rir(length=300)
        rir_audio = self.apply_rir(audio, rir)
        self.save_audio(f'{output_path}/{alias}_rir_applied.wav', rir_audio, sr)
        path = row['path'].replace(alias, f'{alias}_rir_applied')
        row1 = row.copy()
        row1['path'] = path
        rows.append(row1)
        return rows
    
    
    def apply_augmentations(self, row, audio, sr, output_path, alias):
        """
        Aplica até 2 augmentations aleatórias por áudio.
        """
        techniques = [
            ("time_stretch", self.time_stretch, {"rate": random.uniform(0.95, 1.05)}),
            ("pitch_shift", self.pitch_shift, {"n_steps": random.choice([-1, 1])}),
            ("add_noise", self.add_noise, {"noise_factor": 0.005}),
            ("rir", self.apply_rir, {"rir": self.generate_synthetic_rir(length=300)}),
        ]

        selected = random.sample(techniques, k=2)  # aplica 2 augmentations diferentes

        rows = []
        for name, func, kwargs in selected:
            try:
                augmented_audio = func(audio, sr=sr, **kwargs)
                filename = f'{alias}_{name}.wav'
                self.save_audio(f'{output_path}/{filename}', augmented_audio, sr)

                row1 = row.copy()
                row1['path'] = row['path'].replace(alias, filename[:-4])  # remove ".wav"
                rows.append(row1)
            except Exception as e:
                print(f"[!] Falha ao aplicar {name}: {e}")
                continue

        return rows

    def run(self):        
        # load data
        df = pd.read_csv(self.dataset + 'train.csv',  encoding='utf-8')
        
        df_aug = []
        path_audios = self.dataset + 'train/'
        output_dir = self.dataset + 'aug/'
        
        for _, row in df.iterrows():
            
            path_audio = path_audios + row['path']
            print(path_audio)
            alias = path_audio.split('/')[-1].replace('.wav', '')
        
            audio, sr = self.load_audio(path_audio)
            rows = self.apply_augmentations(row.copy(), audio, sr, output_dir, alias)
            df_aug.extend(rows)
            
        df_aug = pd.DataFrame(df_aug)
        concatenated_df = pd.concat([df, df_aug], axis=0)
        concatenated_df.to_csv('data_augmentation.csv', index=False)

In [None]:
dataset = ''

In [None]:
DataAugmentation(dataset).run()

[]
