In [1]:
import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)

import io
import sys
import librosa
import os.path
import numpy as np
import soundfile as sf
from pathlib import Path
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.utils import *
from tensorflow.keras import backend as K
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

In [2]:
def saveModel(model, file):
    model.save(file)
    print('Saved model to disk.')

In [3]:
def loadModel(file):
    print('Loading model from disk.')
    return load_model(file)

In [4]:
def modelExist(model):
    file = Path(model)
    if file.is_file():
        print('Model file exist.')
    else:
        print('Model file not found.')
    return file.is_file()

In [5]:
def two_dim_convolution_model_for_multi_class(width, height, channel, nClasses):
    inputShape = (width, height, channel)
    print('inputshape', inputShape)

    model = Sequential()
    model.add(Conv2D(32, (2, 2), input_shape=inputShape,
                     data_format='channels_last'))
    model.add(Activation('relu'))

    model.add(Conv2D(64, (2, 2)))
    model.add(Activation('relu'))

    model.add(Conv2D(128, (2, 2)))
    model.add(Activation('relu'))

    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.3))

    model.add(Flatten())

    model.add(Dense(128))
    #model.add(Activation('relu'))
    model.add(Dropout(0.25))

    model.add(Dense(64))
    #model.add(Activation('relu'))
    model.add(Dropout(0.25))

    model.add(Dense(nClasses))
    model.add(Activation('softmax'))

    print(model.summary())

    model.compile(optimizer='adam',
                  loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [6]:
def audio2mfcc(audioData, maxPadLen):
    # code: if you pass the filepath
    # audio, sr = librosa.load(filePath, mono=True, sr=None)

    data = io.BytesIO(audioData)
    audio, sr = sf.read(data)

    # output generated from target has some noise for first 3ms
    # skip read first 3ms, if the file' sample rate is > 16000
    if (sr > 16000):
        adata = io.BytesIO(audioData)
        audio, sr = sf.read(adata, start=14400)

    # Reading audio files using PySoundFile is similmar to the method in librosa.
    # One important difference is that the read data is of shape (nb_samples, nb_channels) compared to (nb_channels, nb_samples) in <librosa.core.load>.
    audio = audio.T

    # Force an audio signal down to mono
    audio = librosa.to_mono(audio)

    # Resample a time series from orig_sr to target_sr
    audio = librosa.resample(audio, sr, 16000)

    #sf.write('sample_output.wav', audio, 16000, 'PCM_16')

    #cutshort the training time
    #audio = audio[::3]

    mfcc = librosa.feature.mfcc(audio, sr=16000)
    padWidth = maxPadLen - mfcc.shape[1]
    mfcc = np.pad(mfcc, pad_width=((0, 0), (0, padWidth)), mode='constant')
    return mfcc

In [7]:
featureWidth = 20
featureHeight = 300
featureChannel = 1
nClasses = 11
epochs = 10
batchSize = 100
verbose = 1

datasetPath = 'dataset2'
modelFile = 'waves.hdf5'

def generateTrainTestData(path=datasetPath, testSize=0.30):
    #get labels
    labels = os.listdir(path)
    labels = [x.split('.')[0] for x in labels]
    print('tLabels', labels)

    binaryLabelPath = path + '/' + labels[0] + '.npy'
    tData = np.load(binaryLabelPath)
    tLabel = np.zeros(tData.shape[0])
    
    trainData, testData, trainLabel, testLabel = train_test_split(tData, tLabel, test_size=testSize, shuffle=True)

    print('trainData shape', trainData.shape)
    print('trainLabel shape', trainLabel.shape)
    print('testData shape', testData.shape)
    print('testLabel shape', testLabel.shape)

    for i, label in enumerate(labels[1:]):

        if (label == '_background_noise_'):
            print('skipping bg noise label')
            continue

        binaryLabelPath = path + '/' + label + '.npy'
        x = np.load(binaryLabelPath)
        y = np.full(x.shape[0], fill_value=(i + 1))
        temp_trainData, temp_testData, temp_trainLabel, temp_testLabel = train_test_split(x, y, test_size=testSize, shuffle=True)
        print('temp_trainData shape', temp_trainData.shape)
        print('temp_trainLabel shape', temp_trainLabel.shape)
        print('temp_testData shape', temp_testData.shape)
        print('temp_testLabel shape', temp_testLabel.shape)
        trainData = np.vstack((trainData, temp_trainData))
        testData = np.vstack((testData, temp_testData))
        trainLabel = np.append(trainLabel,temp_trainLabel)
        testLabel = np.append(testLabel,temp_testLabel)
        
        
        #tData = np.vstack((tData, x))
        #tLabel = np.append(tLabel, np.full(x.shape[0], fill_value=(i + 1)))

    print('trainData shape', trainData.shape)
    print('trainLabel shape', trainLabel.shape)
    print('testData shape', testData.shape)
    print('testLabel shape\n', testLabel.shape)
    #print('tData shape', tData.shape)
    #print('tLabel shape', tLabel.shape)
    return trainData, testData, trainLabel, testLabel


checkpoint = [
    ModelCheckpoint(filepath=modelFile, verbose=1,
                    monitor='val_acc', save_best_only=True),
    EarlyStopping(monitor='val_acc', min_delta=0,
                  patience=100, verbose=1, mode='auto')
]

def trainModel(model, trainData, trainLabelHot, testData, testLabelHot, epochs=epochs, verbose=verbose, batchSize=batchSize):
    return model.fit(
        trainData, trainLabelHot,
        batch_size=batchSize,
        epochs=epochs,
        verbose=verbose,
        validation_data=(testData, testLabelHot),
        callbacks=checkpoint)

def waves_training():
    if (modelExist(modelFile)):
        print('Waves model file exist in disk.')
        return loadModel(modelFile)
    else:
        # generate train test data
        trainData, testData, trainLabel, testLabel = generateTrainTestData()

        # One hot encoding
        trainLabelHot = to_categorical(trainLabel)
        testLabelHot = to_categorical(testLabel)

        # Reshaping to perform 2D convolution
        trainData = trainData.reshape(
            trainData.shape[0], featureWidth, featureHeight, featureChannel)
        testData = testData.reshape(
            testData.shape[0], featureWidth, featureHeight, featureChannel)

        # build model
        model = two_dim_convolution_model_for_multi_class(featureWidth, featureHeight, featureChannel, nClasses)

        # train
        trainModel(model, trainData, trainLabelHot, testData, testLabelHot)

        #save model
        saveModel(model, modelFile)

        return loadModel(modelFile)

In [8]:
waves_training()

Model file not found.
tLabels ['down', 'go', 'left', 'no', 'noise', 'off', 'on', 'right', 'stop', 'up', 'yes']
trainData shape (1652, 20, 300)
trainLabel shape (1652,)
testData shape (708, 20, 300)
testLabel shape (708,)
temp_trainData shape (1661, 20, 300)
temp_trainLabel shape (1661,)
temp_testData shape (712, 20, 300)
temp_testLabel shape (712,)
temp_trainData shape (1647, 20, 300)
temp_trainLabel shape (1647,)
temp_testData shape (707, 20, 300)
temp_testLabel shape (707,)
temp_trainData shape (1663, 20, 300)
temp_trainLabel shape (1663,)
temp_testData shape (713, 20, 300)
temp_testLabel shape (713,)
temp_trainData shape (21, 20, 300)
temp_trainLabel shape (21,)
temp_testData shape (9, 20, 300)
temp_testLabel shape (9,)
temp_trainData shape (1650, 20, 300)
temp_trainLabel shape (1650,)
temp_testData shape (708, 20, 300)
temp_testLabel shape (708,)
temp_trainData shape (1657, 20, 300)
temp_trainLabel shape (1657,)
temp_testData shape (711, 20, 300)
temp_testLabel shape (711,)
temp_tr

Saved model to disk.
Loading model from disk.


<tensorflow.python.keras.engine.sequential.Sequential at 0x20e03939ee0>