In [1]:
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers

In [5]:
from helper_fns import *

In [2]:
test_set      = 'norfolk'  # can be one of: bulgaria, uk, norfolk
data_set      = 'bat_train/data/train_test_split/test_set_' + test_set + '.npz'
raw_audio_dir = 'bat_train/data/wav/'
base_line_dir = 'bat_train/data/baselines/'
result_dir    = 'bat_train/results/'
model_dir     = 'bat_train/data/models/'
if not os.path.isdir(result_dir):
    os.mkdir(result_dir)
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)
print('test set:', test_set)
plt.close('all')

test set: norfolk


In [3]:
# train and test_pos are in units of seconds
loaded_data_tr  = np.load(data_set, allow_pickle = True, encoding = 'latin1')
train_pos       = loaded_data_tr['train_pos']
train_files     = loaded_data_tr['train_files']
train_durations = loaded_data_tr['train_durations']
test_pos        = loaded_data_tr['test_pos']
test_files      = loaded_data_tr['test_files']
test_durations  = loaded_data_tr['test_durations']

In [4]:
train_files_decode = [s.decode() for s in train_files]
test_files_decode  = [s.decode() for s in test_files]

In [6]:
positions, class_labels = generate_training_positions(train_files_decode, train_pos, train_durations)

In [7]:
train_paths_decode = [raw_audio_dir + fn for fn in train_files_decode]
test_paths_decode  = [raw_audio_dir + fn for fn in test_files_decode]

In [19]:
train_paths_decode_full = [s+'.wav' for s in train_paths_decode]
test_paths_decode_full  = [s+'.wav' for s in test_paths_decode]

In [8]:
def get_waveform_and_label(file_path):
    label        = get_label(file_path)
    audio_binary = tf.io.read_file(file_path)
    waveform     = decode_audio(audio_binary)
    return waveform, label

In [9]:
def get_label(file_path):
    parts = tf.strings.split(file_path, os.path.sep)
    #Second last part is the folder the file is contained in, i.e. the label
    return parts[-2]

In [10]:
def decode_audio(audio_binary):
    audio, _ = tf.audio.decode_wav(audio_binary) # returns the WAV-encoded audio as a tensor and the sample rate
    #if there is both left and right audio, only keep the left source to avoid dimension problems
    return audio[:,0]

In [11]:
def spec_post(spec):
    spec = spec[1:, :]
    spec = np.flipud(spec)
    # only keep the relevant bands - could do this outside
    if crop_spec:
        spec = spec[-max_freq:-min_freq, :]
    
        # add some zeros if too small
        req_height = max_freq-min_freq
        if spec.shape[0] < req_height:
            zero_pad = np.zeros((req_height-spec.shape[0], spec.shape[1]))
            spec     = np.vstack((zero_pad, spec))

    # perform log scaling - here the same as matplotlib
    log_scaling = 2.0 * (1.0 / sampling_rate) * (1.0/(np.abs(np.hanning(int(fft_win_length*sampling_rate)))**2).sum())
    spec        = np.log(1.0 + log_scaling*spec)
    return(spec)

In [12]:
def get_spectrogram(waveform):
    #cutoff spectrogram size to either splice or pad
    nfft     = int(fft_win_length*sampling_rate)
    noverlap = int(fft_overlap*nfft)

    # window data
    step    = nfft - noverlap
    #print(step)
    shape   = (nfft, (waveform.shape[-1]-noverlap)//step)
    strides = (waveform.strides[0], step*waveform.strides[0])
    x_wins  = np.lib.stride_tricks.as_strided(waveform, shape=shape, strides=strides)
    x_wins_han = np.hanning(x_wins.shape[0])[..., np.newaxis] * x_wins
    
    complex_spec = tf.signal.rfft(x_wins_han.T).numpy().T
    spec = tf.math.real(tf.math.conj(complex_spec) * complex_spec)
    spec = spec_post(spec)
    spec = process_spectrogram(spec, denoise_spec=denoise, mean_log_mag=mean_log_mag, smooth_spec=smooth_spec)
    
    return spec

In [20]:
#defines appropriate number of processes that are free for working.
AUTOTUNE    = tf.data.experimental.AUTOTUNE
#Creates tensorflow dataset with (waveform, label) pairs
files_ds    = tf.data.Dataset.from_tensor_slices(train_paths_decode_full)
waveform_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)

In [21]:
for wf,lab in waveform_ds.take(1):
    print(lab)

InvalidArgumentError: slice index -1 of dimension 0 out of bounds.
	 [[{{node strided_slice}}]] [Op:IteratorGetNext]