# Audio Recognition

In [56]:
#import pyaudio
import matplotlib.pyplot as plt
from scipy import signal
from scipy.io import wavfile
import os
import numpy as np
import pandas as pd
import random
from skimage.measure import block_reduce
from tqdm.notebook import tqdm
from sklearn.model_selection import StratifiedKFold, ShuffleSplit
from sklearn.preprocessing import OneHotEncoder

#To find the duration of wave file in seconds
import wave
import contextlib
import librosa
import soundfile as sf

#Keras imports
import keras
from keras.layers import Dense, Conv2D, Dropout, Flatten, MaxPooling2D
from keras.models import Sequential, model_from_json
import tensorflow as tf

import time
import datetime

In [4]:
def save_model_to_disk(model):
    """
    Converts the model to a json and saves as an h5 file
    """
    model_json = model.to_json()
    with open("model.json", "w") as json_file:
        json_file.write(model_json)
    # serialize weights to HDF5
    model.save_weights("model.h5")
    print("Saved model to disk")


def rgb2gray(rgb):
    """
    Convert color image to grayscale
    """
    return np.dot(rgb[...,:3], [0.299, 0.587, 0.114])

def normalize_gray(array):
    """
    Normalize Gray colored image
    """
    return (array - array.min())/(array.max() - array.min())

def findDuration(fname):
    with contextlib.closing(wave.open(fname,'r')) as f:
        frames = f.getnframes()
        rate = f.getframerate()
        sw   = f.getsampwidth()
        chan = f.getnchannels()
        duration = frames / float(rate)
        #print("File:", fname, "--->",frames, rate, sw, chan)
        return duration

# def findDuration(fname):
#     """
#     Function to find the duration of the wave file in seconds
#     """

#     sample_rate,samples = wavfile.read(fname)
#     frequencies, times, spectogram = signal.spectrogram(samples, sample_rate)
#     sf.write('tmp.wav', t, 16000)
#     wave.open('tmp.wav','r')
#     frames = f.getnframes()
#     rate = f.getframerate()
#     sw   = f.getsampwidth()
#     chan = f.getnchannels()
#     duration = frames / float(rate)
#     return duration
        
def graph_spectrogram(wav_file, nfft=512, noverlap=256):
    """
    Converts the wav file to a spectrogram for the NN to interpret
    """
    findDuration(wav_file)
    rate, data = wavfile.read(wav_file)
    #print("")
    fig,ax = plt.subplots(1)
    fig.subplots_adjust(left=0,right=1,bottom=0,top=1)
    ax.axis('off')
    pxx, freqs, bins, im = ax.specgram(x=data, Fs=rate, noverlap=noverlap, NFFT=nfft)
    ax.axis('off')
    plt.rcParams['figure.figsize'] = [0.75,0.5]
    #fig.savefig('sp_xyz.png', dpi=300, frameon='false')
    fig.canvas.draw()
    size_inches  = fig.get_size_inches()
    dpi          = fig.get_dpi()
    width, height = fig.get_size_inches() * fig.get_dpi()

    #print(size_inches, dpi, width, height)
    mplimage = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
    #print("MPLImage Shape: ", np.shape(mplimage))
    imarray = np.reshape(mplimage, (int(height), int(width), 3))
    plt.close(fig)
    return imarray

## Creating a Directory for the Files

In [5]:
# os.environ['KMP_DUPLICATE_LIB_OK']  = 'True'

folder_names = [f for f in os.listdir('train/audio') if not 'background' in f]
folder_names.sort()

imheight = int(17)
imwidth = int(25)
k = int(0)

In [6]:
file = open("train/testing_list.txt", "r")
testing_list = file.read().split('\n')
file.close()
file = open("train/validation_list.txt", "r")
validation_list = file.read().split('\n')

In [7]:
master_file_list = []
master_target_list = []
train_val_test_list = []
for phrase in folder_names:
    file_list = [phrase + '/' + f for f in os.listdir('train/audio/' + phrase) if '.wav' in f]
    for file in file_list:
        if file in testing_list:
            train_val_test_list.append('test')
        elif file in validation_list:
            train_val_test_list.append('val')
        else:
            train_val_test_list.append('train')
        master_target_list.append(phrase)
        master_file_list.append(file)
data = np.transpose(np.array([master_file_list, master_target_list, train_val_test_list]))
master_df = pd.DataFrame(data = data, columns=['filename','target','train_val_test'])
master_df.head()

Unnamed: 0,filename,target,train_val_test
0,bed/00176480_nohash_0.wav,bed,train
1,bed/004ae714_nohash_0.wav,bed,train
2,bed/004ae714_nohash_1.wav,bed,train
3,bed/00f0204f_nohash_0.wav,bed,train
4,bed/00f0204f_nohash_1.wav,bed,train


## Creating the Keras Model

In [82]:
def new_keras():
    # input_shape = (imheight, imwidth, 1)
    # input_shape = (4, 36, 54, 1)
    input_shape = (4, 36, 54, 1, 1)
    
    batch_size = 4
    epochs = 1

    model = Sequential()
    model.add(Conv2D(32, kernel_size=(5, 5), activation='relu', input_shape=input_shape))
    # model.add(Conv2D(32, kernel_size=(3, 3), activation='relu'))
    #model.add(MaxPooling2D(pool_size=(2, 2), input_shape=(None, 4, 32, 50, 12)))
    model.add(Flatten())
    model.add(Dropout(0.2))
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(num_classes, activation='softmax'))

    opt = keras.optimizers.Adam(learning_rate=0.01)
    model.compile(loss=keras.losses.categorical_crossentropy, optimizer=opt, metrics=['accuracy'])
    print(model.summary())
    return model

In [70]:
def load_waves(fpath):
    fpath = 'train/audio/' + fpath
    spectrogram = graph_spectrogram(fpath)
    graygram = rgb2gray(spectrogram)
#     normgram = normalize_gray(graygram)
#     norm_shape = normgram.shape
#     redgram = block_reduce(normgram, block_size = (3,3), func = np.mean)
#     return redgram
    return(graygram)

In [60]:
def plot_results(history):
    # summarize history for accuracy
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()
    # summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

In [83]:
num_classes = len(master_df['target'].unique().tolist())
#SKF is just being used to batch the data into memory, not do any sort of k-fold training process
total_batch_count = 50
skf = StratifiedKFold(n_splits=total_batch_count, shuffle=True, random_state=42)
# X = master_df['filename'][(master_df['train_val_test'] == 'train') or (master_df['train_val_test'] =='val')]
# y = master_df['target'][(master_df['train_val_test'] == 'train') or (master_df['train_val_test'] == 'val')]

X = master_df['filename'][[(val in ['train','val']) for val in master_df['train_val_test']]]
y = master_df['target'][[(val in ['train','val']) for val in master_df['train_val_test']]]

enc = OneHotEncoder(handle_unknown='ignore')
enc.fit(np.array(y).reshape(-1, 1))

batch_count = 0
for train_index, test_index in skf.split(X, y):
    batch_count += 1
    print(f'working on batch: {batch_count}/{total_batch_count}')
    rs = ShuffleSplit(n_splits=2, test_size=.25, random_state=42)
    X, y = np.array(X), np.array(y)
    X, y = X[test_index], y[test_index]
    for train_index2, test_index2 in rs.split(X): #should only run once
        X_train, X_val = X[train_index2], X[test_index2]
        y_train, y_val = y[train_index2], y[test_index2]
        # Split is done, X=filename, y=target

        train_sound_clips = []
        val_sound_clips = []
        
        print(f'loading training images')
        for fpath in tqdm(X_train):
            redgram = load_waves(fpath)
            #print(np.shape(redgram))
            train_sound_clips.append(redgram)
        train_sound_clips = np.array(train_sound_clips)
        # print(f'train sound clips shape:{train_sound_clips.shape}')
        print(f'loading validation images')
        for fpath in tqdm(X_val):
            redgram = np.array(load_waves(fpath))
            val_sound_clips.append(redgram)
        val_sound_clips = np.array(val_sound_clips)

        # x_train = x_train.reshape(x_train.shape[0], imheight, imwidth, 1)
        x_train = train_sound_clips.reshape(train_sound_clips.shape[0],
                                            train_sound_clips.shape[1], 
                                            train_sound_clips.shape[2], 1)
        # y_train = keras.utils.to_categorical(y_train, num_classes)
        y_train = enc.transform(y_train.reshape(-1, 1)).toarray()
        # x_val = x_train.reshape(x_val.shape[0], imheight, imwidth, 1)
        
        x_val = val_sound_clips.reshape(val_sound_clips.shape[0],
                                        val_sound_clips.shape[1],
                                        val_sound_clips.shape[2], 1)
        # y_val = keras.utils.to_categorical(y_val, num_classes)
        y_val = enc.transform(y_val.reshape(-1, 1)).toarray()

        try:
            loaded_model = tf.keras.models.load_model('./MyModel_tf')
            history = loaded_model.fit(x_train, y_train, batch_size=4, epochs=10, verbose=1, validation_data=(x_val, y_val))
            plot_results(history)
            loaded_model.save('./MyModel_tf',save_format='tf')
        except:
            print("Starting with a fresh model")
            model = new_keras()
            model.fit(x_train, y_train, batch_size=4, epochs=10, verbose=1, validation_data=(x_val, y_val))
            plot_results(history)
            model.save('./MyModel_tf',save_format='tf')
            model=[]

        print(f'done with batch:{batch_count}/{total_batch_count}')

working on batch: 1/50
loading training images


HBox(children=(FloatProgress(value=0.0, max=868.0), HTML(value='')))

  Z = 10. * np.log10(spec)



loading validation images


HBox(children=(FloatProgress(value=0.0, max=290.0), HTML(value='')))


Starting with a fresh model


ValueError: Negative dimension size caused by subtracting 5 from 1 for '{{node conv2d_20/Conv2D/Conv2D}} = Conv2D[T=DT_FLOAT, data_format="NHWC", dilations=[1, 1, 1, 1], explicit_paddings=[], padding="VALID", strides=[1, 1, 1, 1], use_cudnn_on_gpu=true](conv2d_20/Conv2D/Reshape, conv2d_20/Conv2D/Conv2D/ReadVariableOp)' with input shapes: [?,54,1,1], [5,5,1,32].

In [None]:


y_train = keras.utils.to_categorical(y_train, num_classes)
#converts the data set so each integer(?) input is scaled from 0 to num_classes ???
#could also be a dictionary
x_train = x_train.reshape(x_train.shape[0], imheight, imwidth, 1)

y_test = keras.utils.to_categorical(y_test, num_classes)
#converts the data set so each integer(?) input is scaled from 0 to num_classes ???
#could also be a dictionary
x_test = x_train.reshape(x_test.shape[0], imheight, imwidth, 1)

print("x and y training/testing data done and formatted, starting keras sequential model function now")

model.fit(x_train, y_train, batch_size=4, epochs=10, verbose=1, validation_data=(x_test, y_test))

save_model_to_disk(model)