In [None]:
# path
import os
from os.path import join, isdir
from pathlib import Path

# Scientific Math
import numpy as np
from scipy.fftpack import fft
from scipy import signal
from scipy.io import wavfile
from sklearn.model_selection import train_test_split

# Visualization
import matplotlib.pyplot as plt
import tensorflow as tf
import plotly.offline as py
import plotly.graph_objs as go

# Deep learning
import tensorflow.keras as keras
from tensorflow.keras.layers import Dense,Dropout,Flatten
from tensorflow.keras import Input,layers
from tensorflow.keras import backend as K

import random
import copy
import librosa

%matplotlib inline

In [None]:
!pip install pyunpack
!pip install patool
# Extracting the .7z file
import os
from pyunpack import Archive
os.system('apt-get install p7zip')
import shutil
if not os.path.exists('/kaggle/working/train/'):
    os.makedirs('/kaggle/working/train/')
Archive('../input/tensorflow-speech-recognition-challenge/train.7z').extractall('/kaggle/working/train/')

# Checking the number of each file
import os
path = os.listdir('./train/train/audio/')
size = {}
for i in path:
      size[i] = len(os.listdir('./train/train/audio/'+i))
print(size)

In [None]:
os.listdir('../input/tensorflow-speech-recognition-challenge')

In [None]:
train_audio_path = './train/train/audio/'
print(os.listdir(train_audio_path))

## Load the data
Target list is ['yes','no','up','down','left','right','on','off','stop','go'], unknown list is other silence will be made from '_background_noise_'

Train data's sampling rate is 16000Hz, but for making lower computation cost, we would resample it to 8000Hz

After, training the test set will also be resample to 8000Hz

In [None]:
dirs = [f for f in os.listdir(train_audio_path) if isdir(join(train_audio_path,f))]
dirs.sort()
print("Number of labels:",len(dirs)-1)
print(dirs)

In [None]:
all_wave = []
unknown_wav = []
label_all = []
label_value = {}
target_list = ['yes','no','up','down','left','right','on','off','stop','go']
unknown_list = [d for d in dirs if d not in target_list and d!='_background_noise_']
print("Target list : ",end = '')
print(target_list)
print("Unknown List : ",end = '')
print(unknown_list)
print("Silence : _background_noise_")
i = 0
background = [f for f in os.listdir(join(train_audio_path,'_background_noise_')) if f.endswith('.wav')]
background_noise = []
for wav in background:
    samples,sample_rate = librosa.load(join(join(train_audio_path,'_background_noise_'),wav))
    samples = librosa.resample(samples,sample_rate, 8000)
    background_noise.append(samples)

for direct in dirs[1:]:
    waves = [f for f in os.listdir(join(train_audio_path,direct)) if f.endswith('.wav')]
    label_value[direct] = i
    i+=1
    print(str(i)+ ' : '+str(direct)+" ",end = "")
    for wav in waves:
        samples,sample_rate = librosa.load(join(join(train_audio_path,direct),wav),sr = 16000)
        samples = librosa.resample(samples,sample_rate,8000)
        if len(samples)!=8000:
            continue
        if direct in unknown_list:
            unknown_wav.append(samples)
        else:
            label_all.append(direct)
            all_wave.append([samples,direct])

## Split WAV, Label

In [None]:
wav_all = np.reshape(np.delete(all_wave,1,1),(len(all_wave)))
label_all = [i for i in np.delete(all_wave,0,1).tolist()]

## Data Augmentation

For data augmentation, i will mix train wav, and same length (1 second) noise (10%) from '_background_noise_'

In [None]:
# Random pick start point
def get_one_noise(noise_num = 0):
    selected_noise = background_noise[noise_num]
    start_idx = random.randint(0,len(selected_noise)-1-8000)
    return selected_noise[start_idx:(start_idx+8000)]

In [None]:
max_ratio = 0.1
noised_wav = []
augment = 1
delete_index = []
for i in range(augment):
    new_wav = []
    noise = get_one_noise(i)
    for i,s in enumerate(wav_all):
        if len(s)!=8000:
            delete_index.append(i)
            continue
        s = s + (max_ratio*noise)
        noised_wav.append(s)
np.delete(wav_all,delete_index)
np.delete(label_all,delete_index)

In [None]:
wav_vals = np.array([x for x in wav_all])
label_vals = [x for x in label_all]
wav_vals.shape

In [None]:
labels = copy.deepcopy(label_vals)
for _ in range(augment):
    label_vals = np.concatenate((label_vals,labels),axis = 0)
label_vals = label_vals.reshape(-1,1)

## Random sampling from unknown wav data

In [None]:
# Knowns audio random sampling
unknown = unknown_wav
np.random.shuffle(unknown_wav)
unknown = np.array(unknown)
unknown = unknown[:2000*(augment+1)]
unknown_label = np.array(['unknown' for _ in range(2000*(augment+1))])
unknown_label = unknown_label.reshape(2000*(augment+1),1)

## May some of the wav data has different length, so we need to delete it

In [None]:
delete_index = []
for i,w in enumerate(unknown):
    if len(w) !=8000:
        delete_index.append(i)
unknown = np.delete(unknown,delete_index,axis = 0)

## Random sampling from '_background_noise_'
Random pick background noise

In [None]:
# silence audio
silence_wav = []
num_wav = (2000*(augment+1))//len(background_noise)
for i,_ in  enumerate(background_noise):
    for _ in range((2000*(augment+1))//len(background_noise)):
        silence_wav.append(get_one_noise(i))
silence_wav = np.array(silence_wav)
silence_label = np.array(['silence' for _ in range(num_wav*len(background_noise))])
silence_label = silence_label.reshape(-1,1)
silence_wav.shape

In [None]:
wav_vals = np.reshape(wav_vals,(-1,8000))
noised_wav = np.reshape(noised_wav,(-1,8000))
unknown = np.reshape(unknown,(-1,8000))
silence_wav = np.reshape(silence_wav, (-1,8000))

## Check the dimension

In [None]:
print(wav_vals.shape)
print(noised_wav.shape)
print(unknown.shape)
print(silence_wav.shape)

In [None]:
print(label_vals.shape)
print(unknown_label.shape)
print(silence_label.shape)

## Concatenate Waves, labels

In [None]:
wav_vals = np.concatenate((wav_vals,noised_wav),axis = 0)
wav_vals = np.concatenate((wav_vals,unknown),axis= 0)
wav_vals = np.concatenate((wav_vals,silence_wav),axis= 0)

In [None]:
label_vals = np.concatenate((label_vals,unknown_label),axis =0)
label_vals = np.concatenate((label_vals,silence_label),axis=0)

In [None]:
print(len(wav_vals))
print(len(label_vals))

## Prepare train test dataset

In [None]:
train_wav,test_wav,train_label,test_label = train_test_split(wav_vals,label_vals,test_size = 0.2,random_state = 1993,shuffle=True)

In [None]:
# Parameters
lr = 0.001
generations = 20000
num_gens_to_wait = 250
batch_size = 512
drop_out_rate = 0.5
input_shape = (8000,1)


In [None]:
# For Conv1D add channel
train_wav = train_wav.reshape(-1,8000,1)
test_wav = test_wav.reshape(-1,8000,1)

In [None]:
label_value = target_list
label_value.append('unknown')
label_value.append('silence')

In [None]:
new_label_value = dict()
for i,l in enumerate(label_value):
    new_label_value[l] = i
label_value = new_label_value

In [None]:
label_value

In [None]:
# Make Label data 'string' -> class 'num'
temp = []
for v in train_label:
    temp.append(label_value[v[0]])
train_label = np.array(temp)

temp = []
for v in test_label:
    temp.append(label_value[v[0]])
test_label = np.array(temp)

# Make label data class 'num' -> 'One hot vector'
train_label = keras.utils.to_categorical(train_label,len(label_value))
test_label = keras.utils.to_categorical(test_label,len(label_value))

In [None]:
print("Train_Wav dimension : "+str(np.shape(train_wav)))

In [None]:
print("Train_Label dimension : "+str(np.shape(train_label)))

In [None]:
print("Test_Wav dimension : "+str(np.shape(test_wav)))

In [None]:
print("Test_label dimension : "+str(np.shape(test_label)))

In [None]:
print('Number Of Labels : ' + str(len(label_value)))

In [None]:
# Conv1D Model
input_tensor = Input(shape = (input_shape))

x = layers.Conv1D(8,11,padding = 'valid',activation='relu',strides = 1)(input_tensor)
x = layers.MaxPooling1D(2)(x)
x = layers.Dropout(drop_out_rate)(x)
x = layers.Conv1D(16,7,padding='valid',activation='relu',strides = 1)(x)
x = layers.MaxPooling1D(2)(x)
x = layers.Dropout(drop_out_rate)(x)
x = layers.Conv1D(32,5,padding='valid',activation='relu',strides = 1)(x)
x = layers.MaxPooling1D(2)(x)
x = layers.Dropout(drop_out_rate)(x)
x = layers.Conv1D(64,5,padding='valid',activation='relu',strides = 1)(x)
x = layers.MaxPooling1D(2)(x)
x = layers.Dropout(drop_out_rate)(x)
x = layers.Conv1D(128,3,padding='valid',activation='relu',strides = 1)(x)
x = layers.MaxPooling1D(2)(x)
x = layers.Flatten()(x)
x = layers.Dense(256,activation='relu')(x)
x = layers.Dropout(drop_out_rate)(x)
x = layers.Dense(128,activation='relu')(x)
x = layers.Dropout(drop_out_rate)(x)
output_tensor = layers.Dense(len(label_value),activation='softmax')(x)

model = tf.keras.Model(input_tensor,output_tensor)

model.compile(loss = keras.losses.categorical_crossentropy,
             optimizer = keras.optimizers.Adam(lr = lr),
             metrics = ['accuracy'])

In [None]:
model.summary()

In [None]:
history = model.fit(train_wav, train_label, validation_data=(test_wav, test_label),
          batch_size=batch_size, 
          epochs=100,
          verbose=1)

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title("Model Accuracy")
plt.ylabel("Accuracy")
plt.xlabel("Epochs")
plt.legend(['train','test'],loc = 'upper left')
plt.show()

# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title("Model Loss")
plt.ylabel("Loss")
plt.xlabel("Epoch")
plt.legend(['train','test'],loc = 'upper left')
plt.show()

In [None]:
model.save('model.h5')

In [None]:
from tensorflow import keras
keras.models.save_model(model,'speech-recognition.h5')

In [None]:
target_list