# Create set of synthetic audio samples to fine-tune Whisper
- machinelearnear
- https://jonathanbgn.com/2021/08/30/audio-augmentation.html
- https://github.com/pyannote/pyannote-audio/blob/develop/notebook/augmentation.ipynb

## Use Amazon Polly to create an audio file from a text-prompt
- https://docs.aws.amazon.com/polly/latest/dg/voicelist.html

In [2]:
from boto3 import Session
from botocore.exceptions import BotoCoreError, ClientError
from contextlib import closing
import os
import sys
import subprocess
from tempfile import gettempdir

In [3]:
# Create a client using the credentials and region defined in the [adminuser]
# section of the AWS credentials file (~/.aws/credentials).
# session = Session(profile_name="adminuser")
session = Session()
polly = session.client("polly")

In [4]:
def amazon_polly_tts(text_prompt, output_file='speech.mp3'):
    try:
        # Request speech synthesis
        response = polly.synthesize_speech(
            Text=prompt, OutputFormat="mp3", VoiceId="Lucia")
    except (BotoCoreError, ClientError) as error:
        # The service returned an error, exit gracefully
        print(error)
        sys.exit(-1)
    
    # Access the audio stream from the response
    if "AudioStream" in response:
        # Note: Closing the stream is important because the service throttles on the
        # number of parallel connections. Here we are using contextlib.closing to
        # ensure the close method of the stream object will be called automatically
        # at the end of the with statement's scope.
            with closing(response["AudioStream"]) as stream:
                try:
                # Open a file for writing the output as a binary stream
                    with open(output_file, "wb") as file:
                        file.write(stream.read())
                except IOError as error:
                        # Could not write to file, exit gracefully
                        print(error)
                        sys.exit(-1)
    else:
        # The response didn't contain audio data, exit gracefully
        print("Could not stream audio")
        sys.exit(-1)

    return response

In [5]:
# 16 characters per second. 7-10 seconds is between 110 and 160 characters.

In [6]:
prompt = 'La amoxicilina es un antibiótico de la familia de las penicilinas. Es bactericida, es decir, destruye a los microbios. Por tanto, se utiliza para tratar un gran número de infecciones producidas por gérmenes sensibles a este antibiótico'
output_file = 'speech.mp3'

In [7]:
response = amazon_polly_tts(prompt, output_file)

In [8]:
from IPython.display import Audio 
Audio(output_file, rate=16000, autoplay=False)

## Create audio variations through custom augmentations

In [9]:
# !pip install torch torchaudio

In [10]:
import torch
import torchaudio
import torchaudio.functional as F

import math
from IPython.display import Audio
import matplotlib.pyplot as plt

print(torch.__version__)
print(torchaudio.__version__)

1.13.0+cu117
0.13.0+cu117


In [11]:
original_speech, sample_rate = torchaudio.load("speech.mp3", format="mp3")

In [22]:
Audio(data=original_speech,rate=sample_rate)

### `audiomentations`
- https://github.com/iver56/audiomentations
- https://github.com/asteroid-team/torch-audiomentations

In [14]:
# !conda install -c conda-forge libsndfile -y
# !conda install -c conda-forge ffmpeg -y

In [15]:
# !pip install audiomentations[extras]
# !pip install pydub

In [16]:
from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift, Shift
import numpy as np
import wave

In [17]:
augment = Compose([
    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.5),
    TimeStretch(min_rate=0.8, max_rate=1.25, p=0.5),
    PitchShift(min_semitones=-4, max_semitones=4, p=0.5),
    Shift(min_fraction=-0.5, max_fraction=0.5, p=0.5),
])

In [18]:
# from os import path
# from pydub import AudioSegment

# # files                                                                         
# src = output
# dst = "test.wav"

# # convert wav to mp3                                                            
# sound = AudioSegment.from_mp3(src)
# sound.export(dst, format="wav")

In [19]:
# original_speech, sample_rate = torchaudio.load(output_file)

In [20]:
# Augment/transform/perturb the audio data
augmented_samples = augment(samples=original_speech, sample_rate=sample_rate)



In [21]:
Audio(data=augmented_samples,rate=sample_rate)

### `torch-audiomentations`

In [23]:
# !pip install torch-audiomentations

In [35]:
import torch
from torch_audiomentations import Compose, Gain, PolarityInversion, AddBackgroundNoise

In [69]:
# Initialize augmentation callable
apply_augmentation = Compose(
    transforms=[
        AddBackgroundNoise(
            background_paths='sample_noise',
            min_snr_in_db=10,
            max_snr_in_db=30,
            p=0.5,
        ),
    ]
)

In [70]:
wave = original_speech[None].repeat(1,1,1)

In [71]:
augmented_audio = apply_augmentation(wave, sample_rate=sample_rate)

In [72]:
Audio(augmented_audio[0], rate=sample_rate)

### `torchaudio`
- https://pytorch.org/audio/main/tutorials/audio_data_augmentation_tutorial.html
- https://pytorch.org/audio/stable/tutorials/audio_resampling_tutorial.html#comparison-against-librosa
- https://pytorch.org/tutorials/beginner/audio_preprocessing_tutorial.html

In [27]:
def plot_waveform(waveform, sample_rate, title="Waveform", xlim=None):
    waveform = waveform.numpy()

    num_channels, num_frames = waveform.shape
    time_axis = torch.arange(0, num_frames) / sample_rate

    figure, axes = plt.subplots(num_channels, 1)
    if num_channels == 1:
        axes = [axes]
    for c in range(num_channels):
        axes[c].plot(time_axis, waveform[c], linewidth=1)
        axes[c].grid(True)
        if num_channels > 1:
            axes[c].set_ylabel(f"Channel {c+1}")
        if xlim:
            axes[c].set_xlim(xlim)
    figure.suptitle(title)
    plt.show(block=False)
    
def plot_specgram(waveform, sample_rate, title="Spectrogram", xlim=None):
    waveform = waveform.numpy()

    num_channels, _ = waveform.shape

    figure, axes = plt.subplots(num_channels, 1)
    if num_channels == 1:
        axes = [axes]
    for c in range(num_channels):
        axes[c].specgram(waveform[c], Fs=sample_rate)
        if num_channels > 1:
            axes[c].set_ylabel(f"Channel {c+1}")
        if xlim:
            axes[c].set_xlim(xlim)
    figure.suptitle(title)
    plt.show(block=False)

#### Speech perturbation

In [28]:
import random

class RandomSpeedChange:
    def __init__(self, sample_rate):
        self.sample_rate = sample_rate

    def __call__(self, audio_data):
        speed_factor = random.choice([1.0, 1.0, 1.0])
        if speed_factor == 1.0: # no change
            return audio_data

        # change speed and resample to original rate:
        sox_effects = [
            ["speed", str(speed_factor)],
            ["rate", str(self.sample_rate)],
        ]
        transformed_audio, _ = torchaudio.sox_effects.apply_effects_tensor(
            audio_data, self.sample_rate, sox_effects)
        return transformed_audio

speed_transform = RandomSpeedChange(sample_rate)
transformed_audio = speed_transform(original_speech)

In [30]:
Audio(data=transformed_audio,rate=sample_rate)

#### Background noise

In [32]:
import math
import os
import pathlib
import random
import torch

class RandomBackgroundNoise:
    def __init__(self, sample_rate, noise_dir, min_snr_db=0, max_snr_db=15):
        self.sample_rate = sample_rate
        self.min_snr_db = min_snr_db
        self.max_snr_db = max_snr_db

        if not os.path.exists(noise_dir):
            raise IOError(f'Noise directory `{noise_dir}` does not exist')
        # find all WAV files including in sub-folders:
        self.noise_files_list = list(pathlib.Path(noise_dir).glob('**/*.wav'))
        if len(self.noise_files_list) == 0:
            raise IOError(f'No .wav file found in the noise directory `{noise_dir}`')

    def __call__(self, audio_data):
        random_noise_file = random.choice(self.noise_files_list)
        effects = [
            ['remix', '1'], # convert to mono
            ['rate', str(self.sample_rate)], # resample
        ]
        noise, _ = torchaudio.sox_effects.apply_effects_file(random_noise_file, effects, normalize=True)
        audio_length = audio_data.shape[-1]
        noise_length = noise.shape[-1]
        if noise_length > audio_length:
            offset = random.randint(0, noise_length-audio_length)
            noise = noise[..., offset:offset+audio_length]
        elif noise_length < audio_length:
            noise = torch.cat([noise, torch.zeros((noise.shape[0], audio_length-noise_length))], dim=-1)

        snr_db = random.randint(self.min_snr_db, self.max_snr_db)
        snr = math.exp(snr_db / 10)
        audio_power = audio_data.norm(p=2)
        noise_power = noise.norm(p=2)
        scale = snr * noise_power / audio_power

        return (scale * audio_data + noise ) / 2

In [33]:
noise_transform = RandomBackgroundNoise(sample_rate, 'sample_noise')
transformed_audio = noise_transform(original_speech)

In [34]:
Audio(transformed_audio, rate=sample_rate)

## Using Transformers & Datasets

In [None]:
# !pip install transformers datasets

In [None]:
from datasets import load_dataset

In [None]:
dataset = load_dataset("IIC/spanish_biomedical_crawled_corpus")

## Fine-tuning `Whisper`
- https://github.com/huggingface/community-events/tree/main/whisper-fine-tuning-event