<a href="https://colab.research.google.com/github/phrasenmaeher/custom-audio-classification-tf/blob/main/custaudio_data_generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Generate samples


Code for section 2 of the post at
[TDS/Medium](https://towardsdatascience.com/custom-audio-classification-with-tensorflow-af8c16c38689)



First, install necessary packages

In [None]:
!pip install pydub
from pydub import AudioSegment
from pydub.utils import which
from pathlib import Path
import numpy as np
import csv
import random, string, os, tqdm, pickle, argparse
AudioSegment.converter = which("ffmpeg")


Collecting pydub
  Downloading https://files.pythonhosted.org/packages/a6/53/d78dc063216e62fc55f6b2eebb447f6a4b0a59f55c8406376f76bf959b08/pydub-0.25.1-py2.py3-none-any.whl
Installing collected packages: pydub
Successfully installed pydub-0.25.1


Define some helper functions

In [None]:
def create_output_paths(output_path, mixed=False):
  Path(output_path+"train/male/").mkdir(parents=True, exist_ok=True)
  Path(output_path+"train/female/").mkdir(parents=True, exist_ok=True)

  Path(output_path+"test/male/").mkdir(parents=True, exist_ok=True)
  Path(output_path+"test/female/").mkdir(parents=True, exist_ok=True)

  Path(output_path+"valid/male/").mkdir(parents=True, exist_ok=True)
  Path(output_path+"valid/female/").mkdir(parents=True, exist_ok=True)

  if mixed:
    Path(output_path+"train/mixed/").mkdir(parents=True, exist_ok=True)
    Path(output_path+"test/mixed/").mkdir(parents=True, exist_ok=True)
    Path(output_path+"valid/mixed/").mkdir(parents=True, exist_ok=True)

In [None]:
def build_speech_lists(dirs):
  '''
  To prevent an overlap between test and train speech files, each directory (that is, each speaker), is only assigned
  to one category
  '''
  speech_data = []

  for subdir in dirs:
    temp = [file for file in Path(subdir).glob('**/*.flac') if (os.path.getsize(file)/1000)>200]
    speech_data.extend(temp)

  return speech_data


In [None]:
def parse_speakers(speaker_list, seed=1337):
  '''
  Generates the train, test, and validation speech data
  '''

  #this file contains a list of male (index 0) and female (index 1) voice directories available to overlay.
  #use it to create train/test sets with no overlap
  with open(speaker_list, "rb") as rf:
    speakers = pickle.load(rf)

  #sorting
  male_speakers = speakers[0]
  male_speakers.sort()

  male_train = male_speakers[:int(0.8*len(male_speakers))] #80% for train set
  male_valid = male_speakers[int(0.8*len(male_speakers)):int(0.9*len(male_speakers))] #10% for validation set
  male_test = male_speakers[int(0.9*len(male_speakers)):] #10% for test set

  #sorting for equal choices
  female_speakers = speakers[1]
  female_speakers.sort()

  female_train = female_speakers[:int(0.8*len(female_speakers))] #80% for train set
  female_valid = female_speakers[int(0.8*len(female_speakers)):int(0.9*len(female_speakers))] #10% for validation set
  female_test = female_speakers[int(0.9*len(female_speakers)):] #10% for test set

  print("Number of female train, validation, and test speakers: {}, {}, {}".format(len(female_train), len(female_valid), len(female_test)))

  print("Number of male train, validation, and test speakers: {}, {}, {}".format(len(male_train),len(male_valid), len(male_test)))

  return build_speech_lists(female_train), build_speech_lists(female_valid), build_speech_lists(female_test), build_speech_lists(male_train), build_speech_lists(male_valid), build_speech_lists(male_test)

In [None]:
def generate_simultaneous_speech_overlay(base_sound, speech_list):
  '''Simultaneously overlays the speeches in speech_list onto base_sound'''
  
  speeches = []
  for path in speech_list:
    speech = AudioSegment.from_file(path)
    speech = permutate_speech(speech)
    speeches.append(speech)

  base_sound_len = len(base_sound) # millisecond duration of base_sound
  entry_point = random.uniform(0, 0.2*base_sound_len) #random starting point for the entry point of the overlaid sound

  output = base_sound
  for k in range(len(speeches)):
    temp_len =len(speeches[k])
    output = output.overlay(speeches[k], position=entry_point)

  return output

In [None]:
def generate_successive_speech_overlay(base_sound, speech_list):
  '''Successively with random pauses overlays the speeches in speech_list onto base_sound'''

  speeches = []
  for path in speech_list:
    speech = AudioSegment.from_file(path)
    speech = permutate_speech(speech)
    speeches.append(speech)
  
  entry_point = random.uniform(0, 0.2*len(base_sound)) #random starting point for the entry point of the overlaid sound

  output = base_sound
  for k in range(len(speeches)):
    temp_len =len(speeches[k])
    output = output.overlay(speeches[k], position=entry_point)
    entry_point += temp_len +random.uniform(0, 5000) #random pause of up to 5 seconds

  return output

In [None]:
def permutate_speech(speech):
  '''Randomly makes the speech louder, quieter, shorter'''
  loudness_coin = random.getrandbits(1)

  if loudness_coin: #if we chose a true speech to overlay above, now randomly make this speech louder or quieter [simulates distance to the listener]
    rand_loudness = random.uniform(-1, 1)
    speech= speech.apply_gain(rand_loudness)

  #randomly pertubate speech duration [change starting or end point of overlayed file]
  speech_len = len(speech) #duration in milliseconds of the speech
  start = random.uniform(0, 0.5*speech_len) #random starting point [for greater variance]
  start_at_coin = random.getrandbits(1)
  if start_at_coin: #if 1, start the sound at 'start'
    speech = speech[start:]
  else: #play until start
    speech = speech[:start]
  
  return speech

In [None]:
def generate_speech_overlay(base_sound, speech_list:list, simultaneous:bool):
  '''Overlays the speeches in speech_list onto beesound1'''
  if simultaneous:
    output = generate_simultaneous_speech_overlay(base_sound, speech_list)
  else:
    output = generate_successive_speech_overlay(base_sound, speech_list)
  
  return output

In [None]:
def get_speech_samples(gender, subset, number_of_samples):
  
  speeches = []
  
  if gender: #draw from female speakers
    if subset == "train":
      for k in range(number_of_samples):
        speeches.append(random.choice(female_train))
    elif subset == "test":
      for k in range(number_of_samples):
        speeches.append(random.choice(female_test))
    else:
      for k in range(number_of_samples):
        speeches.append(random.choice(female_valid))
  
  else: #draw from male speakers
    if subset == "train":
      for k in range(number_of_samples):
        speeches.append(random.choice(male_train))
    elif subset == "test":
      for k in range(number_of_samples):
        speeches.append(random.choice(male_test))
    else:
      for k in range(number_of_samples):
        speeches.append(random.choice(male_valid))
    
  return speeches

In [None]:
def generate_sample(subset, filename, args):
  '''
  Generate a single audio sample
  '''

  #generate the 60 seconds of silence as the basis to lay other speeches onto
  base_sound = AudioSegment.silent(duration=60000)

  #either choose a male (0) or female (1) speech
  gender_coin = random.getrandbits(1)
  
  #get a list of speeches to overlay
  speeches = get_speech_samples(gender=gender_coin, subset=subset, number_of_samples=random.randint(6, 10))
  if args['simultan']:
      simultaneous = random.getrandbits(1) #if 1 (True), then overlay the sounds simultaneously
  else:
      simultaneous = 0

  output = generate_speech_overlay(base_sound=base_sound, speech_list=speeches, simultaneous=0)
  if output.duration_seconds == 60.0:
    
    subdir = "female" if gender_coin else "male"
    out_name = args['output_path']+subset+"/"+subdir+"/"+filename+".flac"
    output.export(out_name, format='flac', parameters=["-ar", "22050"]) 
    return out_name, subdir
  
  print("Running an additional generation, sample not long enough")
  return generate_sample(subset, filename, args)

In [None]:
def gen_csv(data_dict, outpath, outname):
  with open(outpath+outname, 'w') as csvfile:
    filewriter = csv.writer(csvfile, delimiter=',')
    filewriter.writerow(['path', 'label'])
    filewriter.writerows(data_dict.items())

In [None]:
parser = argparse.ArgumentParser(description='')
parser.add_argument('--output_path', dest='output_path', default='/content/drive/MyDrive/custaudio/dataset/', help='Base path for the dataset')
parser.add_argument('--speaker_file', dest='speaker_file', default="/content/drive/MyDrive/custaudio/speaker.pkl", help='Pickle file that stores the speakers')
parser.add_argument('--seed', dest='seed', type=int, default=1337, help='Seed for reproducability')
parser.add_argument('--num_train_samples', dest='train_samples', type=int, default=50, help='Number of samples in the training subset')
parser.add_argument('--num_test_samples', dest='test_samples', type=int, default=50, help='Number of samples in the test subset')
parser.add_argument('--num_valid_samples', dest='validation_samples', type=int, default=50, help='Number of samples in the validation subset')
parser.add_argument('--csv_dir', dest='csv_dir', default='/content/drive/MyDrive/custaudio/', help='The csv files that contain sample|label mappings are stored there')
parser.add_argument('--csv_flag', dest='csv_flag', default='custom_', help='A name that will be appended to the csv file to differentiate it from other ones')
parser.add_argument('--mixed', dest='mixed', type=int, default=0, help="If set then create a third category that mixes male and female speakers")
parser.add_argument('--simultaneous', dest='simultan', type=int, default=0, help="If set then overlay two speeches at the same timestamp")

args, unknown = parser.parse_known_args()
args = args.__dict__

In [None]:
if __name__ == '__main__':
  
  #create our output directories and lists of speakers
  create_output_paths(output_path=args['output_path'], mixed=args['mixed'])
  female_train, female_valid, female_test, male_train, male_valid, male_test = parse_speakers(speaker_list=args['speaker_file'])
  
  #set a random seed
  random.seed(args['seed'])

  samples_dict = {}
  for i in tqdm.tqdm_notebook(range(1, args['train_samples']+1)):
    out_name, label = generate_sample(subset="train", filename=str(i), args=args)
    samples_dict[out_name] = 0 if label=="male" else 1
  gen_csv(data_dict=samples_dict, outpath=args['csv_dir'], outname=args['csv_flag']+"train.csv")

  samples_dict = {}
  for i in tqdm.tqdm_notebook(range(1, args['test_samples']+1)):
    out_name, label = generate_sample(subset="test", filename=str(i), args=args)
    samples_dict[out_name] = 0 if label=="male" else 1
  gen_csv(data_dict=samples_dict, outpath=args['csv_dir'], outname=args['csv_flag']+"test.csv")

  samples_dict = {}
  for i in tqdm.tqdm_notebook(range(1, args['validation_samples']+1)):
    out_name, label = generate_sample(subset="valid", filename=str(i), args=args)
    samples_dict[out_name] = 0 if label=="male" else 1
  gen_csv(data_dict=samples_dict, outpath=args['csv_dir'], outname=args['csv_flag']+"valid.csv")


Number of female train, validation, and test speakers: 60, 8, 8
Number of male train, validation, and test speakers: 55, 7, 7


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  # This is added back by InteractiveShellApp.init_path()


HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))




Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`


HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))




Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`


HBox(children=(FloatProgress(value=0.0, max=50.0), HTML(value='')))


