In [None]:
import keras
from keras.layers import Activation, Dense, Dropout, Conv2D, Flatten, MaxPooling2D
from keras.models import Sequential
from keras.callbacks import EarlyStopping,ReduceLROnPlateau,ModelCheckpoint,TensorBoard,ProgbarLogger
from sklearn.model_selection import train_test_split
import librosa
import librosa.display
import numpy as np
import pandas as pd
import random
import warnings
from tqdm import tqdm
warnings.filterwarnings('ignore')

import _pickle as cPickle
import os  

%matplotlib inline
import tensorflow as tf
from keras import backend as k
config = tf.ConfigProto()                                   
config.gpu_options.allow_growth = True                      
k.tensorflow_backend.set_session(tf.Session(config=config)) 

In [None]:
SKIP_AUDIO_RELOAD = True

In [None]:
TRAIN_AUDIO_DIR='Train'
TEST_AUDIO_DIR='Test'
def load_input_data(pd, filepath):
    data = pd.read_csv(filepath)
    return data
TRAIN_FILE='train.csv'
train_input=load_input_data(pd,TRAIN_FILE)
# train_input.head()
TEST_FILE='test.csv'
test_input=load_input_data(pd,TEST_FILE)
# test_input.head()
valid_train_label = train_input[['Class']]
valid_train_label.count()
x = train_input.groupby('Class')['Class'].count()
print(x)
valid_train_data = train_input[['ID', 'Class']] 
valid_train_data.count()
valid_test_data = test_input[['ID']] 
valid_test_data.count()

In [None]:
# sample-1 load
sample1=TRAIN_AUDIO_DIR+'/943.wav'
duration=2.97 
sr=22050

y, sr = librosa.load(sample1, duration=duration,  sr=sr)
ps = librosa.feature.melspectrogram(y=y, sr=sr)

input_length=sr*duration
offset = len(y) - round(input_length)
librosa.display.waveplot(y,sr)

In [None]:
import IPython.display as ipd
ipd.Audio(sample1) 

In [None]:
librosa.display.specshow(ps, y_axis='mel', x_axis='time')

In [None]:
sample2=TRAIN_AUDIO_DIR+'/1.wav'
duration=2.97 
sr=22050

y2, sr2 = librosa.load(sample2, duration=duration,  sr=sr)
ps2 = librosa.feature.melspectrogram(y=y2, sr=sr2)

input_length=sr*duration
offset = len(y) - round(input_length)
print ("input:", round(input_length), " load:", len(y) , " offset:", offset)
print ("y shape:", y.shape, " melspec shape:", ps2.shape)

In [None]:
ipd.Audio(sample2) 

In [None]:
librosa.display.specshow(ps2, y_axis='mel', x_axis='time')
ps.shape

**Prepare data file loading**

In [None]:
valid_train_data['path'] = TRAIN_AUDIO_DIR + '/' + train_input['ID'].astype('str')+".wav"
print ("sample",valid_train_data.path[1])
valid_train_data.head(5)

In [None]:
valid_test_data['path'] = TEST_AUDIO_DIR + '/' + test_input['ID'].astype('str') +".wav"
print ("sample",valid_test_data.path[1])
valid_test_data.head(5)

**Loading audio file and features**

In [None]:
def audio_norm(data):
    max_data = np.max(data)
    min_data = np.min(data)
    data = (data-min_data)/(max_data-min_data+0.0001)
    return data-0.5

audio_play_duration=2.97

def load_audio_file(file_path, duration=2.97, sr=22050):
    input_length=sr*duration
    y, sr = librosa.load(file_path,sr=sr, duration=duration)
    dur = librosa.get_duration(y=y)
    if (round(dur) < duration):
        offset = len(y) - round(input_length)
        print ("fixing audio length :", file_path)
        print ("input:", round(input_length), " load:", len(y) , " offset:", offset)
        y = librosa.util.fix_length(y, round(input_length)) 
    ps = librosa.feature.melspectrogram(y=y, sr=sr)
    return ps

In [None]:
train_object_file='saved_train_audio_data.p'

if not os.path.isfile(train_object_file):
    SKIP_AUDIO_RELOAD = False

if SKIP_AUDIO_RELOAD is True:
    print ("skip re-loading TRAINING data from audio files")
else:
    print ("loading train audio data, may take more than 15 minutes. please wait!")
    for row in tqdm(valid_train_data.itertuples()):
        ps = load_audio_file(file_path=row.path, duration=2.97)
        if ps.shape != (128, 128): continue
        train_audio_data.append( (ps, row.Class) ) 
    print("Number of train samples: ", len(train_audio_data))

In [None]:
if SKIP_AUDIO_RELOAD is True:
    train_audio_data = cPickle.load(open(train_object_file, 'rb'))
    print ("loaded train data [%s] records from object file" % len(train_audio_data))  
else:
    cPickle.dump(train_audio_data, open(train_object_file, 'wb')) 
    print ("saved loaded train data :",len(train_audio_data))

In [None]:
test_object_file='saved_test_audio_data.p'

if not os.path.isfile(test_object_file):
    SKIP_AUDIO_RELOAD = False

if SKIP_AUDIO_RELOAD is True:
    print ("skip re-loading TEST data from audio files")
else:
    print ("loading test audio data, may take more than 15 minutes. please wait!")
    for row in tqdm(valid_test_data.itertuples()):
        ps = load_audio_file(file_path=row.path, duration=2.97)
        if ps.shape != (128, 128):
            print ("***data shape is wrong, replace it with zeros ", ps.shape, row.path)
            ps = np.zeros([128, 128])
            #continue
        test_audio_data.append( (ps, row.ID) ) 
    print("Number of train samples: ", len(train_audio_data))

In [None]:
if SKIP_AUDIO_RELOAD is True:
    test_audio_data = cPickle.load(open(test_object_file, 'rb'))
    print ("loaded test data [%s] records from object file" % len(test_audio_data))      
else:
    cPickle.dump(test_audio_data, open(test_object_file, 'wb')) 
    print ("save loaded test data :", len(test_audio_data))

**Prepare data for training**

**Encode labels**

In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from keras.utils import to_categorical
from numpy import argmax

# get a set of unique text labels
list_labels = sorted(list(set(valid_train_data.Class.values)))
print ("unique text labels count: ",len(list_labels))
print ("labels: ",list_labels)

# integer encode
label_encoder = LabelEncoder()
label_integer_encoded = label_encoder.fit_transform(list_labels)
print("encoded labelint values", label_integer_encoded)

# one hot encode
encoded_test = to_categorical(label_integer_encoded)
inverted_test = argmax(encoded_test[0])
#print(encoded_test, inverted_test)

#map filename to label
file_to_label = {k:v for k,v in zip(valid_train_data.path.values, valid_train_data.ID.values)}

# Map integer value to text labels
label_to_int = {k:v for v,k in enumerate(list_labels)}
#print ("test label to int ",label_to_int["Applause"])

# map integer to text labels
int_to_label = {v:k for k,v in label_to_int.items()}


#### split up data into train,  test and validation

In [None]:
dataset = train_audio_data
random.shuffle(dataset)

RATIO=0.9
train_cutoff= round(len(dataset) * RATIO)
train = dataset[:train_cutoff]
test = dataset[train_cutoff:]

X_train, y_train = zip(*train)
X_test, y_test = zip(*test)

# Reshape for CNN input
X_train = np.array([x.reshape( (128, 128, 1) ) for x in X_train])
X_test = np.array([x.reshape( (128, 128, 1) ) for x in X_test])

print ("train ",X_train.shape, len(y_train))
print ("test ", X_test.shape, len(y_test))

In [None]:
label_encoder = LabelEncoder()
y_train_integer_encoded = label_encoder.fit_transform(y_train)
y_test_integer_encoded = label_encoder.fit_transform(y_test)

In [None]:
y_train = np.array(keras.utils.to_categorical(y_train_integer_encoded, len(list_labels)))
y_test = np.array(keras.utils.to_categorical(y_test_integer_encoded, len(list_labels)))

In [None]:
print ("test ",X_test.shape, len(y_test))

In [None]:
k.clear_session()

model = Sequential()
input_shape= X_train.shape[1:] 

model.add(Conv2D(32, (5, 5), strides=(1, 1), input_shape=input_shape))
model.add(MaxPooling2D((4, 2), strides=(4, 2)))
model.add(Activation('relu'))

model.add(Conv2D(64, (5, 5), padding="valid"))
model.add(MaxPooling2D((4, 2), strides=(4, 2)))
model.add(Activation('relu'))

model.add(Conv2D(64, (5, 5), padding="valid"))
model.add(Activation('relu'))

model.add(Flatten())
model.add(Dropout(rate=0.5))

model.add(Dense(64))
model.add(Activation('relu'))
model.add(Dropout(rate=0.5))

model.add(Dense(len(list_labels)))
model.add(Activation('softmax'))
model.summary()

In [None]:
MAX_EPOCHS=100
MAX_BATCH_SIZE=32        
MAX_PATIENT=2  

best_model_file="./best_model_trained4.hdf5"

callback=[ReduceLROnPlateau(patience=MAX_PATIENT, verbose=1), ModelCheckpoint(filepath=best_model_file, monitor='val_acc', verbose=1, save_best_only=True)]

model.compile(optimizer="Adam",loss="categorical_crossentropy",metrics=['accuracy'])

print('training ....')
history = model.fit(x=X_train, y=y_train, epochs=MAX_EPOCHS, batch_size=MAX_BATCH_SIZE, verbose=1, validation_split=0.1, callbacks=callback)
print('training finished')

print('Evaluate model with test data')
score = model.evaluate(x=X_test,y=y_test)

print('test loss:', score[0])
print('test accuracy:', score[1])

In [None]:
from keras.models import load_model
history = load_model('best_model_trained3.hdf5')

import matplotlib.pyplot as plt
def plot_history(history):
    loss_list = [s for s in history.history.keys() if 'loss' in s and 'val' not in s]
    val_loss_list = [s for s in history.history.keys() if 'loss' in s and 'val' in s]
    acc_list = [s for s in history.history.keys() if 'acc' in s and 'val' not in s]
    val_acc_list = [s for s in history.history.keys() if 'acc' in s and 'val' in s]
    if len(loss_list) == 0:
        print('Loss is missing in history')
        return 
    plt.figure(figsize=(22,10))
    epochs = range(1,len(history.history[loss_list[0]]) + 1)
    plt.figure(221, figsize=(20,10))
    plt.subplot(221, title='Accuracy')
    for l in acc_list:
        plt.plot(epochs, history.history[l], 'b', label='Training accuracy (' + str(format(history.history[l][-1],'.5f'))+')')
    for l in val_acc_list:    
        plt.plot(epochs, history.history[l], 'g', label='Validation accuracy (' + str(format(history.history[l][-1],'.5f'))+')')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.subplot(222, title='Loss')
    for l in loss_list:
        plt.plot(epochs, history.history[l], 'b', label='Training loss (' + str(str(format(history.history[l][-1],'.5f'))+')'))
    for l in val_loss_list:
        plt.plot(epochs, history.history[l], 'g', label='Validation loss (' + str(str(format(history.history[l][-1],'.5f'))+')'))    
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

plot_history(history)

**Model Evaluation**

In [None]:
from keras.models import load_model
model = load_model('best_model_trained3.hdf5')
score = model.evaluate(X_train, y_train, verbose=1) 
print ("model train data score       : ",round(score[1]*100) , "%")

score = model.evaluate(X_test, y_test, verbose=1) 
print ("model test data score        : ",round(score[1]*100) , "%")

#### Prediction test 

In [None]:
print ("Prediction with [train] data")
y_pred = model.predict_classes(X_train)
missed=[]
matched=[]
for i in range(len(y_pred)):
    y_val_label_int = argmax(y_train[i])
    if (y_pred[i]!=y_val_label_int):
        missed.append( (y_pred[i], "-", int_to_label[y_pred[i]], " - ", int_to_label[y_val_label_int] ))
    else:
        matched.append((y_pred[i], "-", int_to_label[y_pred[i]], " - ", int_to_label[y_val_label_int]))

print ("  |__match    :", len(matched))
print ("  |__miss     :", len(missed))
print ("  |__accuracy :", round((len(matched)-len(missed))/len(matched)*100,2), "%")
print ("")

print ("---samples---")
for i in range(5):
    print (i,"predict =", int_to_label[y_pred[i]])
    print (i,"original=", int_to_label[argmax(y_train[i])])
    print ("")

In [None]:
print ("Prediction with [test] data")
y_pred = model.predict_classes(X_test)
missed=[]
matched=[]
for i in range(len(y_pred)):
    y_val_label_int = argmax(y_test[i])
    if (y_pred[i]!=y_val_label_int):
        missed.append( (y_pred[i], "-", int_to_label[y_pred[i]], " - ", int_to_label[y_val_label_int] ))
    else:
        matched.append((y_pred[i], "-", int_to_label[y_pred[i]], " - ", int_to_label[y_val_label_int]))

print ("  |__match    :", len(matched))
print ("  |__miss     :", len(missed))
print ("  |__accuracy :", round((len(matched)-len(missed))/len(matched)*100,2), "%")
print ("")
#print ("Value missed : \n",missed)

# show sample results
print ("---samples---")
for i in range(8):
    print (i,"predict =", int_to_label[y_pred[i]])
    print (i,"original=", int_to_label[argmax(y_test[i])])
    print ("")