/
audio_dataloader.py
386 lines (321 loc) · 14.7 KB
/
audio_dataloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Audio dataloader."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import random
import pandas as pd
import tensorflow as tf
from tensorflow_examples.lite.model_maker.core.api.api_util import mm_export
from tensorflow_examples.lite.model_maker.core.data_util import dataloader
from tensorflow_examples.lite.model_maker.core.task.model_spec import audio_spec
error_import_librosa = None
try:
import librosa # pylint: disable=g-import-not-at-top
ENABLE_RESAMPLE = True
except (OSError, ImportError) as _error_import_librosa: # pylint: disable=invalid-name
ENABLE_RESAMPLE = False
error_import_librosa = _error_import_librosa
class ExamplesHelper(object):
"""Helper class for matching examples and labels."""
@classmethod
def from_examples_folder(cls, path, examples_filter_fn):
"""Helper function for loading examples and parsing labels from example path.
This path contain a number of folders, each named by the category name. Each
folder contain a number of files. This helper class loads and parse the
tree structure.
Example folder:
/category1
/file1.wav
/file2.wav
/category2
/file2.wav
/README
Usage:
>>> helper = ExamplesHelper.from_example_folder(path, is_wav)
>>> # helper.shuffle() if shuffle is needed
>>> helper.examples_and_labels()
('/category1/file1.wav', '/category1/file2.wav', '/category2/file2.wav'),
('category1', 'category1', 'category2')
>>> helper.index_to_label()
('category1', 'category2')
>>> helper.examples_and_label_indices()
('/category1/file1.wav', '/category1/file2.wav', '/category2/file2.wav'),
(0, 0, 1)
Args:
path: String, relative path to the data folder. This folder should contain
a list of sub-folders, named after its categories.
examples_filter_fn: A lambda function to filter out unrelated files. It
takes in a full path to the example file and returns a boolean,
representing if this example file can be preserved.
Returns:
An instance of ExamplesHelper.
"""
def _list_files(path):
return tf.io.gfile.glob(os.path.join(path, '*', '*'))
def _get_label(example):
"""Parses the example path and return the label string."""
return example.rsplit(os.path.sep, 2)[1]
examples = list(filter(examples_filter_fn, _list_files(path)))
labels = list(map(_get_label, examples))
return cls(examples, labels)
def __init__(self, examples_in_absolute_path, labels):
self.index_to_label = self._get_index_to_label(labels) # [label]
self.label_to_index = self._get_label_to_index(self.index_to_label)
# [(example, label)] in stable order
self._data = sorted(list(zip(examples_in_absolute_path, labels)))
def _get_index_to_label(self, used_labels):
return sorted(list(set(used_labels)))
def _get_label_to_index(self, index_to_label):
return {label: i for i, label in enumerate(index_to_label)}
def shuffle(self):
random.shuffle(self._data)
def filter(self, labels):
self._data = filter(lambda v: v[1] in labels, self._data)
examples, labels = zip(*self._data)
return ExamplesHelper(examples, labels)
def examples_and_label_indices(self):
"""Returns a tuple of example and a tuple of their corresponding label idx."""
examples, labels = self.examples_and_labels()
label_indicies = tuple(map(self.label_to_index.get, labels))
return examples, label_indicies
def examples_and_labels(self):
"""Returns a tuple of example and a tuple of their corresponding labels."""
if not self._data:
return (), ()
examples, labels = zip(*self._data)
return examples, labels
def examples_and_label_indices_ds(self):
examples, labels = self.examples_and_label_indices()
wav_ds = tf.data.Dataset.from_tensor_slices(list(examples))
label_ds = tf.data.Dataset.from_tensor_slices(list(labels))
ds = tf.data.Dataset.zip((wav_ds, label_ds))
return ds
@mm_export('audio_classifier.DataLoader')
class DataLoader(dataloader.ClassificationDataLoader):
"""DataLoader for audio tasks."""
def __init__(self, dataset, size, index_to_label, spec, cache=False):
super(DataLoader, self).__init__(dataset, size, index_to_label)
self._spec = spec
self._cache = cache
def __len__(self):
"""Returns the number of audio files in the DataLoader.
Note that one audio file could be framed (mostly via a sliding window of
fixed size) into None or multiple audio clips during training and
evaluation.
"""
return self._size
@classmethod
def from_folder(cls,
spec,
data_path,
categories=None,
shuffle=True,
cache=False):
"""Load audio files from a data_path.
- The root `data_path` folder contains a number of folders. The name for
each folder is the name of the audio class.
- Within each folder, there are a number of .wav files. Each .wav file
corresponds to an example. Each .wav file is mono (single-channel) and has
the typical 16 bit pulse-code modulation (PCM) encoding.
- .wav files will be resampled to `spec.target_sample_rate` then fed into
`spec.preprocess_ds` for split and other operations. Normally long wav files
will be framed into multiple clips. And wav files shorter than a certain
threshold will be ignored.
Args:
spec: instance of `audio_spec.BaseSpec`.
data_path: string, location to the audio files.
categories: A string list of selected categories. If empty, all categories
will be selected.
shuffle: boolean, if True, random shuffle data.
cache: str or boolean. When set to True, intermediate results will be
cached in ram. When set to a file path in string, intermediate results
will be cached in this file. Please note that, once file based cache is
created, changes to the input data will have no effects until the cache
file is removed or the filename is changed. More details can be found at
https://www.tensorflow.org/api_docs/python/tf/data/Dataset#cache
Returns:
`AudioDataLoader` containing audio spectrogram (or any data type generated
by `spec.preprocess_ds`) and labels.
"""
assert isinstance(spec, audio_spec.BaseSpec)
root_dir = os.path.abspath(data_path)
helper = ExamplesHelper.from_examples_folder(root_dir,
lambda s: s.endswith('.wav'))
if categories:
helper = helper.filter(categories)
if shuffle:
helper.shuffle()
ds = helper.examples_and_label_indices_ds()
if len(ds) == 0: # pylint: disable=g-explicit-length-test
raise ValueError('No audio files found.')
return DataLoader(ds, len(ds), helper.index_to_label, spec, cache)
@classmethod
def from_esc50(cls,
spec,
data_path,
folds=None,
categories=None,
shuffle=True,
cache=False):
"""Load ESC50 style audio samples.
ESC50 file structure is expalined in https://github.com/karolpiczak/ESC-50
Audio files should be put in `${data_path}/audio`
Metadata file should be put in `${data_path}/meta/esc50.csv`
Note that instead of relying on the `target` field in the CSV, a new
`index_to_label` mapping is created based on the alphabet order of the
available categories.
Args:
spec: An instance of audio_spec.YAMNet
data_path: A string, location of the ESC50 dataset. It should contain at
folds: A integer list of selected folds. If empty, all folds will be
selected.
categories: A string list of selected categories. If empty, all categories
will be selected.
shuffle: boolean, if True, random shuffle data.
cache: str or boolean. When set to True, intermediate results will be
cached in ram. When set to a file path in string, intermediate results
will be cached in this file. Please note that, once file based cache is
created, changes to the input data will have no effects until the cache
file is removed or the filename is changed. More details can be found at
https://www.tensorflow.org/api_docs/python/tf/data/Dataset#cache
Returns:
An instance of AudioDataLoader containing audio samples and labels.
"""
def _fullpath(filename):
return os.path.join(data_path, 'audio', filename)
csv_path = os.path.join(data_path, 'meta/esc50.csv')
pd_data = pd.read_csv(csv_path)
if categories:
pd_data = pd_data[pd_data.category.isin(categories)]
if folds:
pd_data = pd_data[pd_data.fold.isin(folds)]
helper = ExamplesHelper(map(_fullpath, pd_data.filename), pd_data.category)
if shuffle:
helper.shuffle()
ds = helper.examples_and_label_indices_ds()
if len(ds) == 0: # pylint: disable=g-explicit-length-test
raise ValueError('No audio files found.')
return DataLoader(ds, len(ds), helper.index_to_label, spec, cache)
def split(self, fraction):
return self._split(fraction, self.index_to_label, self._spec, self._cache)
def gen_dataset(self,
batch_size=1,
is_training=False,
shuffle=False,
input_pipeline_context=None,
preprocess=None,
drop_remainder=False):
"""Generate a shared and batched tf.data.Dataset for training/evaluation.
Args:
batch_size: A integer, the returned dataset will be batched by this size.
is_training: A boolean, when True, the returned dataset will be optionally
shuffled. Data augmentation, if exists, will also be applied to the
returned dataset.
shuffle: A boolean, when True, the returned dataset will be shuffled to
create randomness during model training. Only applies when `is_training`
is set to True.
input_pipeline_context: A InputContext instance, used to shared dataset
among multiple workers when distribution strategy is used.
preprocess: Not in use.
drop_remainder: boolean, whether the finaly batch drops remainder.
Returns:
A TF dataset ready to be consumed by Keras model.
"""
# This argument is only used for image dataset for now. Audio preprocessing
# is defined in the spec.
del preprocess
ds = self._dataset
spec = self._spec
autotune = tf.data.AUTOTUNE
if is_training and shuffle:
options = tf.data.Options()
options.experimental_deterministic = False
ds = ds.with_options(options)
ds = dataloader.shard(ds, input_pipeline_context)
@tf.function
def _load_wav(filepath, label):
file_contents = tf.io.read_file(filepath)
# shape: (audio_samples, 1), dtype: float32
wav, sample_rate = tf.audio.decode_wav(file_contents, desired_channels=1)
# shape: (audio_samples,)
wav = tf.squeeze(wav, axis=-1)
return wav, sample_rate, label
# This is a eager mode numpy_function. It can be converted to a tf.function
# using https://www.tensorflow.org/io/api_docs/python/tfio/audio/resample
def _resample_numpy(waveform, sample_rate, label):
if ENABLE_RESAMPLE:
waveform = librosa.resample(
waveform, orig_sr=sample_rate, target_sr=spec.target_sample_rate)
else:
error_message = (
'Failed to import librosa. You might be missing sndfile, which '
'can be installed via `sudo apt-get install libsndfile1` on '
'Ubuntu/Debian.')
raise RuntimeError(error_message) from error_import_librosa
return waveform, label
@tf.function
def _resample(waveform, sample_rate, label):
# Short circuit resampling if possible.
if sample_rate == spec.target_sample_rate:
return [waveform, label]
return tf.numpy_function(
_resample_numpy,
inp=(waveform, sample_rate, label),
Tout=[tf.float32, tf.int32])
@tf.function
def _elements_finite(preprocess_data, unused_label):
# Make sure that the data sent to the model does not contain nan or inf
# values. This should be the last filter applied to the dataset.
# Arguably we could possibly apply this filter to all tasks.
return tf.size(preprocess_data) > 0 and tf.math.reduce_all(
tf.math.is_finite(preprocess_data))
ds = ds.map(_load_wav, num_parallel_calls=autotune)
ds = ds.map(_resample, num_parallel_calls=autotune)
def _cache_fn(dataset):
if self._cache:
if isinstance(self._cache, str):
# Cache to a file
dataset = dataset.cache(self._cache)
else:
# In ram cache.
dataset = dataset.cache()
return dataset
# `preprocess_ds` contains data augmentation, so it knows when it's the best
# time to do caching.
ds = spec.preprocess_ds(ds, is_training=is_training, cache_fn=_cache_fn)
ds = ds.filter(_elements_finite)
# Apply one-hot encoding after caching to reduce the cache size.
@tf.function
def _one_hot_encoding_label(wav, label):
return wav, tf.one_hot(label, len(self.index_to_label))
ds = ds.map(_one_hot_encoding_label, num_parallel_calls=autotune)
# Shuffle needs to be done after caching to create randomness across epochs.
if is_training:
if shuffle:
# Shuffle size should be bigger than the batch_size. Otherwise it's only
# shuffling within the batch, which equals to not having shuffle.
buffer_size = 3 * batch_size
# But since we are doing shuffle before repeat, it doesn't make sense to
# shuffle more than total available entries.
# TODO(wangtz): Do we want to do shuffle before / after repeat?
# Shuffle after repeat will give a more randomized dataset and mix the
# epoch boundary: https://www.tensorflow.org/guide/data
ds = ds.shuffle(buffer_size=min(self._size, buffer_size))
ds = ds.batch(batch_size, drop_remainder=drop_remainder)
ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
# TODO(b/171449557): Consider converting ds to distributed ds here.
return ds