In [None]:
!pip install sox wget
!sudo apt-get install sox libsox-fmt-all 
!sudo apt-get install sox libsox-dev
!pip install tqdm

In [None]:
import os
import wget
import tarfile
import zipfile
import sox
import numpy as np
import random
import torch
import soundfile as sf
import librosa
from tqdm import tqdm
from pathlib import Path

In [None]:
workspace = "datasets"
if not os.path.exists(workspace):
  os.makedirs(workspace)

data_dir = '/path/to/vivos.zip'

with zipfile.ZipFile(data_dir, 'r') as zip_ref:
  zip_ref.extractall(workspace)

In [None]:
def volume(n_segments, min_relative_vol, max_relative_vol):
  """ Adjust volume on segments """

  assert (n_segments > 0) and (min_relative_vol > 0) and (max_relative_vol > 0)
  def exec(signal):
    signal_length = len(signal)
    win_length = int(signal_length/n_segments)
    segments = None
    for idx in range(n_segments):
      start = idx * win_length
      if idx < n_segments - 1:
        segment = signal[start:start + win_length]
      else:
        segment = signal[start:signal_length]
      scale = np.random.uniform(min_relative_vol, max_relative_vol)
      segment = scale * segment
      if idx == 0:
        segments = segment
      else:
        segments = np.concatenate((segments, segment))
    return segments
  return exec

In [None]:
def perturb(n_effects, sr=16000):
  """ Random and apply n_effects from effect list """

  def exec(signal):
    effects = ['echo', 'speed', 'pitch', 'volume']
    transforms = np.random.choice(effects, size=n_effects, replace=False)
    for transform in transforms:
      if transform == 'speed':
        value = np.random.choice([0.5, 1.1]) + np.random.uniform(0.0, 0.4)
        tfm = sox.Transformer()
        tfm.speed(value)
        signal = tfm.build_array(input_array=signal, sample_rate_in=sr)
      elif transform == 'pitch':
        value = np.random.choice([-10, 2]) + np.random.randint(0, 8)
        tfm = sox.Transformer()
        tfm.pitch(value)
        signal = tfm.build_array(input_array=signal, sample_rate_in=sr)
      elif transform == 'echo':
        n_echos = np.random.randint(1, 3)
        delays = [60] * n_echos
        decays = [0.4] * n_echos
        tfm = sox.Transformer()
        tfm.echo(gain_in=1.0, gain_out=1.0, n_echos=n_echos, delays=delays, decays=decays)
        signal = tfm.build_array(input_array=signal, sample_rate_in=sr)
      else:
        n_segments = np.random.randint(2, 4)
        min_vol = 0.4
        max_vol = 10
        vol = volume(n_segments=n_segments, min_relative_vol=min_vol, max_relative_vol=max_vol)
        signal = vol(signal)
    return signal
  return exec

In [None]:
def add_noise(noise_list, signal):
  signal_length = len(signal)
  
  # extract noise from noise list
  noise = random.choice(noise_list)
  start = random.randint(0, len(noise) - signal_length - 1)
  noise = noise[start:start + signal_length]
        
  # calculate power of audio and noise
  snr = random.randint(5, 15)
  signal_energy = np.mean(signal**2)
  noise_energy = np.mean(noise**2)
  coef = np.sqrt(10.0 ** (-snr/10) * signal_energy / noise_energy)
  signal_coef = np.sqrt(1 / (1 + coef**2))
  noise_coef = np.sqrt(coef**2 / (1 + coef**2))
        
  return signal_coef * signal + noise_coef * noise

In [None]:
noise_path = '/path/to/noise.npy'
noise_list = np.load(noise_path, allow_pickle=True)

In [None]:
def build_dataset(wav_path, out_path, max_effect=3):
  """ Duplicate dataset four times (1 clean, 1 disturbed and 2 perturbed) """

  wav_list = librosa.util.find_files(wav_path, ext='wav')
  %cd $out_path

  for wav in tqdm(wav_list):
    signal, sr = librosa.load(wav, sr=16000)
    splt = wav.split('/')
    name = splt[-1][:-4]
    
    # clean
    clean_dir = os.path.join(out_path, name + '.wav')
    sf.write(clean_dir, signal, samplerate=16000)

    # perturb 1 
    n_effects = np.random.randint(1, max_effect)
    transform = perturb(n_effects=n_effects)
    perturb_signal = transform(signal)
    perturb_name = name + '_perturb_1.wav'
    perturb_dir = os.path.join(out_path, perturb_name)
    sf.write(perturb_dir, perturb_signal, samplerate=16000)

    # perturb 2
    n_effects = np.random.randint(1, max_effect)
    transform = perturb(n_effects=n_effects)
    perturb_signal = transform(signal)
    perturb_name = name + '_perturb_2.wav'
    perturb_dir = os.path.join(out_path, perturb_name)
    sf.write(perturb_dir, perturb_signal, samplerate=16000)

    # add noise
    noise_signal = add_noise(noise_list=noise_list, signal=signal)
    noise_name = name + '_noise.wav'
    noise_dir = os.path.join(out_path, noise_name)
    sf.write(noise_dir, noise_signal, samplerate=16000)

In [None]:
def build_prompts(in_prompts_path, out_prompts_path):
  """ Create new prompts for new dataset """

  with open(in_prompts_path, 'r') as fin:
    with open(out_prompts_path, 'w') as fout:
      for line in fin:
        splt = line.split()
        name = splt[0]
        label = ' '.join(splt[1:])
        fout.write(name + ' ' + label + '\n')
        fout.write(name + '_noise' + ' ' + label + '\n')
        fout.write(name + '_perturb_1' + ' ' + label + '\n')
        fout.write(name + '_perturb_2' + ' ' + label + '\n')

In [None]:
wav_path = '/path/to/datasets/vivos/train/waves'
out_path = '/path/to/datasets/vivos-augment/train/waves'
build_dataset(wav_path, out_path)

In [None]:
train_prompts = '/path/to/datasets/vivos/train/prompts.txt'
out_prompts = '/path/to/datasets/vivos-augment/train/prompts.txt'
build_prompts(train_prompts, out_prompts)