In [13]:
import pandas as pd
import numpy as np
import os

train_data_path = "./data/train/audio/"
test_data_path = "./data/test/audio/"

In [14]:
train_labels = 'yes no up down left right on off stop go silence unknown'.split()

# 1. Helper functions

In [15]:
def custom_fft(y, fs):
    T = 1.0 / fs
    N = y.shape[0]
    yf = fft(y)
    xf = np.linspace(0.0, 1.0/(2.0*T), N//2)
    # FFT is simmetrical, so we take just the first half
    # FFT is also complex, to we take just the real part (abs)
    vals = 2.0/N * np.abs(yf[0:N//2])
    return xf, vals

def log_specgram(audio, sample_rate, window_size=20,
                 step_size=10, eps=1e-10):
    nperseg = int(round(window_size * sample_rate / 1e3))
    noverlap = int(round(step_size * sample_rate / 1e3))
    freqs, times, spec = signal.spectrogram(audio,
                                    fs=sample_rate,
                                    window='hann',
                                    nperseg=nperseg,
                                    noverlap=noverlap,
                                    detrend=False)
    return freqs, times, np.log(spec.T.astype(np.float32) + eps)

In [16]:
def list_wavs_fname_train(dirpath, ext='wav'):
    print(dirpath)
    fpaths = glob(os.path.join(dirpath, r'*/*' + ext))
    pat = r'.+/(\w+)/\w+\.' + ext + '$'
    labels = []
    for fpath in fpaths:
        r = re.match(pat, fpath)
        if r:
            labels.append(r.group(1))
    pat = r'.+/(\w+\.' + ext + ')$'
    fnames = []
    for fpath in fpaths:
        r = re.match(pat, fpath)
        if r:
            fnames.append(r.group(1))
    return labels, fnames

def list_wavs_fname_test(dirpath, ext='wav'):
    fnames = glob(os.path.join(dirpath, r'*' + ext))
    return fnames

In [17]:
def pad_audio(samples):
    if len(samples) >= L: return samples
    else: return np.pad(samples, pad_width=(L - len(samples), 0), mode='constant', constant_values=(0, 0))

def chop_audio(samples, L=16000, num=20):
    for i in range(num):
        beg = np.random.randint(0, len(samples) - L)
        yield samples[beg: beg + L]

def label_transform(labels):
    nlabels = []
    for label in labels:
        if label == '_background_noise_':
            nlabels.append('silence')
        elif label not in train_labels:
            nlabels.append('unknown')
        else:
            nlabels.append(label)
    return nlabels

# 2. Importing train dataset

In [25]:
from glob import glob
import gc
import re
from scipy.io import wavfile
from scipy import signal
from sklearn.preprocessing import OneHotEncoder

In [26]:
labels, fnames = list_wavs_fname_train(train_data_path)

./data/train/audio/


In [27]:
L = 16000
new_sample_rate = 8000
y_train = []
x_train = []

for label, fname in zip(labels, fnames):
    sample_rate, samples = wavfile.read(os.path.join(train_data_path, label, fname))
    samples = pad_audio(samples)
    if len(samples) > 16000:
        n_samples = chop_audio(samples)
        print(label)
    else: n_samples = [samples]
    for samples in n_samples:
        resampled = signal.resample(samples, int(new_sample_rate / sample_rate * samples.shape[0]))
        _, _, specgram = log_specgram(resampled, sample_rate=new_sample_rate)
        y_train.append(label)
        x_train.append(specgram)
x_train = np.array(x_train)
x_train = x_train.reshape(tuple(list(x_train.shape)))

ohc = OneHotEncoder()
y_train = label_transform(y_train)

y_train = np.array(y_train).reshape(-1, 1)

ohc.fit(y_train)

y_train = ohc.transform(y_train)

y_train = y_train.toarray()

del labels, fnames
gc.collect()

  import sys


_background_noise_
_background_noise_
_background_noise_
_background_noise_
_background_noise_
_background_noise_


293

# 3. Training model

In [2]:
from tensorflow.keras import optimizers, losses, activations, models, layers
from sklearn.model_selection import train_test_split
#import keras
import tensorflow as tf

In [None]:
input_shape = (99, 81)
nclass = 12
inp = layers.Input(shape=input_shape)
norm_inp = layers.BatchNormalization()(inp)
lstm = layers.Bidirectional(layers.CuDNNGRU(128, return_sequences=True))(norm_inp)
lstm = layers.GlobalMaxPooling1D()(lstm)
dense_1 = layers.Dense(128, activation=activations.relu)(lstm)
dense_out = layers.Dense(nclass, activation=activations.softmax)(dense_1)

model = models.Model(inputs=inp, outputs=dense_out)
opt = optimizers.Adam()

model.compile(optimizer=opt, loss=losses.binary_crossentropy, metrics=['accuracy'])
model.summary()

model.fit(x_train, y_train, batch_size=32, epochs=4, shuffle=True)

model.save(os.path.join('./models', 'LSTM_model_1'))

# 4. Cleaning memory

In [32]:
del x_train, y_train
gc.collect()

122776

# 5. Importing test dataset

In [33]:
fnames = list_wavs_fname_test(test_data_path)

In [34]:
L = 16000
new_sample_rate = 8000
x_test = []

for fname in fnames:
    sample_rate, samples = wavfile.read(fname)
    samples = pad_audio(samples)
    if len(samples) > 16000:
        n_samples = chop_audio(samples)
    else: n_samples = [samples]
    for samples in n_samples:
        resampled = signal.resample(samples, int(new_sample_rate / sample_rate * samples.shape[0]))
        _, _, specgram = log_specgram(resampled, sample_rate=new_sample_rate)
        x_test.append(specgram)
x_test = np.array(x_test)
x_test = x_test.reshape(tuple(list(x_test.shape)))
del fnames
gc.collect()

0

In [35]:
y_test = model.predict(x_test)

In [36]:
del x_test
gc.collect()

703

In [37]:
fnames = list_wavs_fname_test(test_data_path)
filenames = [x.split('/')[-1] for x in fnames]

In [38]:
y_pred = ohc.inverse_transform(y_test)

In [39]:
kaggle_df = pd.DataFrame()

In [40]:
kaggle_df['fname'] = filenames

In [41]:
kaggle_df['label'] = y_pred

In [42]:
kaggle_df.to_csv('kaggle_lstm_1.csv', index=False, header=True, quoting=0)

In [23]:
np.unique(y_pred, return_counts=True)

(array(['down', 'go', 'left', 'no', 'off', 'on', 'right', 'silence',
        'stop', 'unknown', 'up', 'yes'], dtype='<U7'),
 array([  3470,   6608,   4083,   6474,   7022,   6025,   6100,   2171,
          4090, 104078,   3667,   4750]))