## Data Preprocessing extractor
This code is meant to extract MFCC features from audio data.

In [1]:
import tensorflow as tf
import sys
import input_data
import models
import numpy as np
import pickle
import shutil
import os
import random

In [2]:
#from google.colab import drive
#drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!wget 'https://storage.googleapis.com/download.tensorflow.org/data/speech_commands_v0.02.tar.gz'
dataset_dir = "speech_commands"
shutil.unpack_archive("speech_commands_v0.02.tar.gz", dataset_dir)

shutil.unpack_archive("silence.zip", dataset_dir)

--2022-11-14 08:35:52--  https://storage.googleapis.com/download.tensorflow.org/data/speech_commands_v0.02.tar.gz
Resolving storage.googleapis.com (storage.googleapis.com)... 64.233.191.128, 173.194.192.128, 209.85.146.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|64.233.191.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2428923189 (2.3G) [application/gzip]
Saving to: ‘speech_commands_v0.02.tar.gz’


2022-11-14 08:36:03 (202 MB/s) - ‘speech_commands_v0.02.tar.gz’ saved [2428923189/2428923189]



In [4]:
## Print some dataset information
total_samples = 0;
n_classes = 0 #How many classes the dataset has (it will be automatically found)
iteration = 0

for word in os.fwalk(dataset_dir):
  if(iteration==0):
    print("Number of classes: " + str(len(word[1])))
    n_classes = str(len(word[1]))
    print("--")
    iteration = iteration+1
  else:
    print("Class '" + os.path.basename(word[0]) + "'" + ": " + str(len(word[2])) + " samples")
    total_samples = total_samples + len(word[2])
print('--')
print("The dataset has " +  str(total_samples) + " samples.")

Number of classes: 37
--
Class 'no': 3941 samples
Class 'nine': 3934 samples
Class 'five': 4052 samples
Class 'sheila': 2022 samples
Class 'bed': 2014 samples
Class 'forward': 1557 samples
Class 'three': 3727 samples
Class 'eight': 3787 samples
Class 'tree': 1759 samples
Class 'dog': 2128 samples
Class 'marvin': 2100 samples
Class 'left': 3801 samples
Class 'on': 3845 samples
Class 'six': 3860 samples
Class 'visual': 1592 samples
Class 'one': 3890 samples
Class 'down': 3917 samples
Class '_background_noise_': 7 samples
Class 'backward': 1664 samples
Class 'wow': 2123 samples
Class 'follow': 1579 samples
Class 'silence': 6115 samples
Class 'seven': 3998 samples
Class 'house': 2113 samples
Class 'two': 3880 samples
Class 'four': 3728 samples
Class 'off': 3745 samples
Class 'go': 3880 samples
Class 'up': 3723 samples
Class 'zero': 4052 samples
Class 'bird': 2064 samples
Class 'right': 3778 samples
Class 'yes': 4044 samples
Class 'stop': 3872 samples
Class 'cat': 2031 samples
Class 'learn'

In [5]:
# Random seed for reproducibility

seed = 22 #Choose a fixed seed to have reproducible results (22=Gonzales o Chiesa)

random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

!pip install split_folders
import splitfolders

shutil.rmtree("speech_commands/_background_noise_")

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting split_folders
  Downloading split_folders-0.5.1-py3-none-any.whl (8.4 kB)
Installing collected packages: split-folders
Successfully installed split-folders-0.5.1


In [6]:
wanted_words = ['sheila']

In [7]:
## WARNING: running this cell will merge the dataset folders according to the wanted words.

folders = []
wanted_words.insert(0, "silence")

for x in os.fwalk(dataset_dir):
  folders.append(x[1])

unknown_folders = folders[0]

for word in wanted_words:
  unknown_folders.remove(word)

print("Selected words are: " + str(unknown_folders))
 
## Now we create a new dataset where there is a folder for each word in wanted_words and a folder that contains all other words, called "Unknown".
import shutil
  
  
# Function to create new folder if not exists
def make_new_folder(folder_name, parent_folder):
      
    # Path
    path = os.path.join(parent_folder, folder_name)
      
    # Create the folder
    # 'new_folder' in
    # parent_folder
    try: 
        # mode of the folder
        mode = 0o777
  
        # Create folder
        os.mkdir(path, mode) 
    except OSError as error: 
        print(error)
  
# folder in which all the content will
# be merged
dest_folder_name = "unknown"
  
# merge_folder path - current_folder 
# + merge_folder
merge_folder_path = os.path.join(dataset_dir, dest_folder_name) 
  
# create merge_folder if not exists
make_new_folder(dest_folder_name, dataset_dir)
  
print("Selected words will be merged in the folder: " + merge_folder_path)
fileindex = 0;

## Building the unknown folder
for folder in unknown_folders:
  file_names = os.listdir(os.path.join(dataset_dir, folder))
  for file_name in file_names:
    srcpath = os.path.join(os.path.join(dataset_dir, folder), file_name)
    shutil.move(srcpath, os.path.join(merge_folder_path, str(fileindex) + "_" + file_name))
    fileindex = fileindex+1
  shutil.rmtree(os.path.join(dataset_dir, folder))

## remove all duplicate in names
for folder in wanted_words:
  file_names = os.listdir(os.path.join(dataset_dir, folder))
  for file_name in file_names:
    srcname = os.path.join(os.path.join(dataset_dir, folder), file_name)
    destname = os.path.join(os.path.join(dataset_dir, folder), str(fileindex) + "_" + file_name)
    os.rename(srcname, destname)
    fileindex = fileindex+1

Selected words are: ['no', 'nine', 'five', 'bed', 'forward', 'three', 'eight', 'tree', 'dog', 'marvin', 'left', 'on', 'six', 'visual', 'one', 'down', 'backward', 'wow', 'follow', 'seven', 'house', 'two', 'four', 'off', 'go', 'up', 'zero', 'bird', 'right', 'yes', 'stop', 'cat', 'learn', 'happy']
Selected words will be merged in the folder: /content/speech_commands/unknown


In [8]:
# This function will print the number of samples for each class in your wanted_words dataset.

wanted_words.insert(1, "unknown")
print('The model will learn to distinguish the following words: ' +  str(wanted_words))
print("--")

def printNumSamples(dataset_path):
  total_meaningful_samples = 0

  for word in wanted_words:
    samples = len(os.listdir(os.path.join(dataset_path, word)))
    print('Number of samples in class ' + '"' + word + '"' + ': ' + str(samples))
    total_meaningful_samples = total_meaningful_samples + samples

printNumSamples(dataset_dir)

The model will learn to distinguish the following words: ['silence', 'unknown', 'sheila']
--
Number of samples in class "silence": 6115
Number of samples in class "unknown": 103807
Number of samples in class "sheila": 2022


## Class Rebalancing

In [11]:
## Class to rebalance:
rebalance_class = 'unknown'

## Number of samples to have at the end:
to_keep = 2000

In [12]:
## Execute this cell if you want to rebalance.

to_remove = len(os.listdir(os.path.join(dataset_dir, rebalance_class))) - to_keep

if(to_remove<=0):
  to_remove = 0

print("Removing " + str((to_remove)) + " samples from class " + '"' + rebalance_class + '"' + "...")

samples = os.listdir(os.path.join(dataset_dir, rebalance_class))

samples_to_remove = np.random.choice(range(len(samples)), to_remove, replace=False)

for sample_index in samples_to_remove:
  pathname = os.path.join(os.path.join(dataset_dir, rebalance_class), samples[sample_index])
  os.remove(pathname)

print('New number of samples in class ' + '"' + rebalance_class + '"' + ': ' + 
      str(len(os.listdir(os.path.join(dataset_dir, rebalance_class)))))

print('--')
printNumSamples(dataset_dir)

Removing 101807 samples from class "unknown"...
New number of samples in class "unknown": 2000
--
Number of samples in class "silence": 2000
Number of samples in class "unknown": 2000
Number of samples in class "sheila": 2022


## Training-Testing-Validation splitting

In [13]:
training_percentage = 0.80
testing_percentage = 0.10
validation_percentage = 0.10

In [14]:
splitfolders.ratio(dataset_dir, output="input_dataset", seed=seed, ratio=(training_percentage, testing_percentage, validation_percentage)) 

train_dir = "input_dataset/train"
val_dir = "input_dataset/val"
test_dir = "input_dataset/test"

Copying files: 6022 files [00:01, 4284.35 files/s]


In [15]:
def getSampleList(directory):
  samplelist = []
  for r, d, file in os.walk(directory):
    for f in file:
      if '.wav' in f:
        samplelist.append(os.path.join(r, f))
  return samplelist
print("Training set:")
printNumSamples(train_dir)
print("---")
print("Validation set:")
printNumSamples(val_dir)
print("---")
print("Testing set:")
printNumSamples(test_dir)

Training set:
Number of samples in class "silence": 1600
Number of samples in class "unknown": 1600
Number of samples in class "sheila": 1617
---
Validation set:
Number of samples in class "silence": 200
Number of samples in class "unknown": 200
Number of samples in class "sheila": 202
---
Testing set:
Number of samples in class "silence": 200
Number of samples in class "unknown": 200
Number of samples in class "sheila": 203


In [16]:
import shutil
representative_dataset_ratio = 0.01
representative_dataset_temp = "representative_dataset_temp"
splitfolders.ratio(dataset_dir, output=representative_dataset_temp, seed=seed, ratio=(1-representative_dataset_ratio, representative_dataset_ratio)) 

shutil.rmtree("representative_dataset_temp/train")
shutil.move("representative_dataset_temp/val", "representative_dataset")

representative_dataset_dir = "representative_dataset"
shutil.rmtree(representative_dataset_temp)

representative_list = getSampleList(representative_dataset_dir)

print("The representative dataset is composed by " + str(len(representative_list)) + " samples.")

Copying files: 6022 files [00:01, 4331.53 files/s]


The representative dataset is composed by 61 samples.


## Data Augmentation

This section will add background noise and time-shifting to the training samples. You can choose how many samples to augment.

In [17]:
#########################################################################
## LIBRARIES IMPORT ##
#########################################################################
!pip install pydub

import os
import random
import numpy as np
from pydub import AudioSegment

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


In [18]:
## These functions performs data augmentation on audio samples, in order to build a more robust dataset that can help train the
## network models. It is divided into three parts: the noise augmentation part, the gain staging part and the time-shifting part.

## -----------
## Pipeline of audio processing: (sample) > |noise fetching| > |time shifting |> |gain staging| > |noise augmentation| > (2 samples)
## ----------

## Noise fetching part: a random noise sample is picked from the "silence" folder.

## Time shifting part: each audio sample fetched is shifted by +/- 300ms at most, in order to have the same word represented in 
## different positions inside the 1-second window. This will help the recognizer to discriminate better even on early or later windows.
## The sample in its original position is also preserved and passed further into the pipeline. The sample shifted is padded with 0s.

## Gain staging part: this part performs an amplitude difference measurement between the word sample and the noise sample:
## if the signal to noise ratio is too low, the word audio sample is amplified in order to have a clearer utterance. 

## Noise augmentation part: its goal is to add the noise sample to the word samples (both the shifted and the original one).

#########################################################################
## USEFUL FUNCTIONS AND VARIABLES ##
#########################################################################

def fetchSample(samples):
    
    sample_index = random.randint(0, len(samples) -1 )
    sample = AudioSegment.from_file(samples[sample_index])
    path = samples[sample_index]

    samples.pop(sample_index)

    return sample, path

def timeShift(sample):

    shifted_sample = AudioSegment.silent(1000, 16000)
    shift_entity = random.randint(-200, 200)

    if(shift_entity == 0):
        shift_entity = 100

    if(shift_entity > 0):
        shifted_sample = AudioSegment.silent(shift_entity, 16000)
        shifted_sample = shifted_sample.append(sample[0:(1000-shift_entity)], crossfade=0)

    if(shift_entity < 0):
        shift_entity = 0-shift_entity
        shifted_sample = AudioSegment.silent(shift_entity, 16000)
        shifted_sample = sample[shift_entity:1000].append(shifted_sample, crossfade = 0)

    return shifted_sample

def gainAdjust(sample, noise_sample):

    gain_difference = 9 #dBFS

    sample_loudness = sample.dBFS
    noise_loudness = noise_sample.dBFS

    diff = noise_loudness - sample_loudness

    if(diff<0):
        diff = 0-diff

    if(diff < gain_difference):
        to_increase = gain_difference - diff
        sample = sample.apply_gain(+to_increase)

    return sample

def augmentSample(sample, noise_sample, targetfile):
    augmented_sample = noise_sample.overlay(sample)
    augmented_sample.export(targetfile, format='wav')


def runAugmentation(sample, sample_path, noise_sample, destfolder, overwrite):

  print("Processing " + sample_path)

  augmented_sample_file = os.path.basename(sample_path)

  if(not overwrite):
    augmented_sample_file = "aug_" + os.path.basename(sample_path) #Name of new sample

  input_sample = sample
  augmented_sample = timeShift(input_sample) #Shift in time
  augmented_sample = gainAdjust(augmented_sample, noise_sample) #Gain correction
  augmentSample(augmented_sample, noise_sample, os.path.join(destfolder, augmented_sample_file)) #Overlapping of noise and audio + saving


In [22]:
noise_path = "speech_commands/silence"

class_to_augment = "unknown"
samples_to_overwrite = 1000 #How many samples replace with their augmented version
samples_to_add = 0 #How many augmented samples save as new samples

In [23]:
#########################################################################
## RUN DATA AUGMENTATION ##
#########################################################################

folder_to_augment = os.path.join(train_dir, class_to_augment)

noise_samples = getSampleList(noise_path)
input_samples = getSampleList(folder_to_augment)

for i in range(samples_to_overwrite):
  input_sample, sample_path = fetchSample(input_samples) #Randomly pick an input sample
  noise_sample, noise_path = fetchSample(noise_samples) #Randomly pick a noise sample
  runAugmentation(input_sample, sample_path, noise_sample, folder_to_augment, True) #Run augmentation

for i in range(samples_to_add):
  input_sample, sample_path = fetchSample(input_samples) #Randomly pick an input sample
  noise_sample, noise_path = fetchSample(noise_samples) #Randomly pick a noise sample
  runAugmentation(input_sample, sample_path, noise_sample, folder_to_augment, False) #Run augmentation

#########################################################################

Processing /content/input_dataset/train/unknown/62192_be7a5b2d_nohash_1.wav
Processing /content/input_dataset/train/unknown/82182_90e72357_nohash_3.wav
Processing /content/input_dataset/train/unknown/41840_24a3e589_nohash_3.wav
Processing /content/input_dataset/train/unknown/54402_8fe67225_nohash_0.wav
Processing /content/input_dataset/train/unknown/21560_2aca1e72_nohash_7.wav
Processing /content/input_dataset/train/unknown/92101_b72e58c9_nohash_0.wav
Processing /content/input_dataset/train/unknown/58359_cce7416f_nohash_9.wav
Processing /content/input_dataset/train/unknown/57363_3d86b69a_nohash_3.wav
Processing /content/input_dataset/train/unknown/8836_d5b963aa_nohash_0.wav
Processing /content/input_dataset/train/unknown/61937_172dc2b0_nohash_0.wav
Processing /content/input_dataset/train/unknown/48420_37e8db82_nohash_0.wav
Processing /content/input_dataset/train/unknown/83268_5b32733e_nohash_0.wav
Processing /content/input_dataset/train/unknown/40609_e71a9381_nohash_2.wav
Processing /c

In [24]:
training_list = getSampleList(train_dir)
validation_list = getSampleList(val_dir)
test_list = getSampleList(test_dir)

print("Number of samples after data augmentation:")
print("Training samples: " + str(len(training_list)))
print("Validation samples: " + str(len(validation_list)))
print("Testing samples: " + str(len(test_list)))
print("---")
print("Training set:")
printNumSamples(train_dir)

Number of samples after data augmentation:
Training samples: 4817
Validation samples: 602
Testing samples: 603
---
Training set:
Number of samples in class "silence": 1600
Number of samples in class "unknown": 1600
Number of samples in class "sheila": 1617


If you want to listen to the augmented training samples, run the following cell:

In [None]:
from IPython.display import Audio

print("Without augmentation:")
wn = Audio('input_dataset/train/three/109182_57b38f48_nohash_0.wav', autoplay=False)
display(wn)
print("With augmentation:")
wn = Audio('input_dataset/train/three/aug_109182_57b38f48_nohash_0.wav', autoplay=False)
display(wn)

Without augmentation:


With augmentation:


## NPZ Archives Generator

In [25]:
SAMPLE_RATE = 16000
CLIP_DURATION_MS = 1000
WINDOW_SIZE_MS = 30
WINDOW_STRIDE = 20
FEATURE_BIN_COUNT = 40
PREPROCESS = 'micro'

TF_SESS = tf.compat.v1.InteractiveSession()

custom_model_settings = models.prepare_model_settings(
      0, SAMPLE_RATE, CLIP_DURATION_MS, WINDOW_SIZE_MS,
      WINDOW_STRIDE, FEATURE_BIN_COUNT, PREPROCESS)

custom_audio_processor = input_data.AudioProcessor(None, None, 0, 0, '', 0, 0,
                                                    custom_model_settings, None)

In [26]:
spectrogram_shape = (49,40,1)

def generate_data(samples_list, destpath):
  sample_number = len(samples_list)
  X = np.empty((sample_number, *spectrogram_shape))
  y = np.empty((sample_number, 1))

  # Generate data
  for i, sample in enumerate(samples_list):
      # Store sample
      sample_preprocessed = custom_audio_processor.get_features_for_wav(
          sample, custom_model_settings, TF_SESS)
      mfcc = np.asarray(sample_preprocessed).reshape(*spectrogram_shape)
      X[i,] = (mfcc / 13.0) - 1.0
      # Store class
      filepath = os.path.dirname(sample)
      classname = os.path.basename(filepath)
      y[i] = wanted_words.index(classname)

  np.savez_compressed(destpath, X, y)

In [27]:
training_dataset_npz = "train.npz"
testing_dataset_npz = "testing.npz"
validation_dataset_npz = "validation.npz"
representative_dataset_npz = "representative.npz"

generate_data(training_list, training_dataset_npz)
generate_data(test_list, testing_dataset_npz)
generate_data(validation_list, validation_dataset_npz)
generate_data(representative_list, representative_dataset_npz)

In [28]:
import json
 
# Data to be written
dictionary = {
    "classes": wanted_words,
    "train_samples_num": len(training_list),
    "testing_samples_num": len(test_list),
    "validation_samples_num": len(validation_list),
    "representative_samples_num": len(representative_list),
    "data_shape": spectrogram_shape
}
 
with open("dataset_info.json", "w") as outfile:
    json.dump(dictionary, outfile)

In [30]:
# Zipping into a dataset to be used

import zipfile

list_files = [training_dataset_npz, testing_dataset_npz, validation_dataset_npz, representative_dataset_npz, "dataset_info.json"]

with zipfile.ZipFile('sheila_normalized_dataset.zip', 'w') as zipF:
    for file in list_files:
        zipF.write(file, compress_type=zipfile.ZIP_DEFLATED)

from google.colab import files
files.download("sheila_normalized_dataset.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## NPZ Class format file editing
Conversion of npz with encoding of class as index to a categorical representation.

In [None]:
import tensorflow as tf
import tensorflow.keras as tfk

npzfile = "small_test_data.npz"

data = np.load(npzfile)

X = data['arr_0']
y = data['arr_1']

Y = np.empty((len(X), 3))

for i, index in enumerate(y):
  Y[i,] = tfk.utils.to_categorical(index, 3)
  print(Y[i])

np.savez_compressed("small_test_categorical_data.npz", X, Y)

In [None]:
import tensorflow as tf
import tensorflow.keras as tfk
import numpy as np

npzfile = "small_test_categorical_data.npz"

data = np.load(npzfile)

X = data['arr_0']
y = data['arr_1']

print(X.shape)
print(y.shape)

(81, 49, 40, 1)
(81, 3)


## Feature data visualization

This section prints the feature data in order to check the shape and format of the values.

In [33]:
import tensorflow as tf
import tensorflow.keras as tfk
import numpy as np

npzfile = "testing.npz"

data = np.load(npzfile)

X = data['arr_0']
y = data['arr_1']

print(X.shape)
print(y.shape)

print(str(y))

(603, 49, 40, 1)
(603, 1)
[[2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [2.]
 [