In [5]:
import tensorflow as tf
import numpy as np
import pickle
import keras

from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from keras.callbacks import ModelCheckpoint
from keras.callbacks import TensorBoard
from keras.utils import print_summary
from keras.models import Sequential
from keras.layers import Dense,Flatten, Conv2D,Conv1D , Convolution1D , Activation
from keras.layers import MaxPooling2D, Dropout,MaxPooling1D,GlobalAveragePooling1D

from data_utils import loadFromPickle

from keras.utils import np_utils

def prepress_labels(labels):
    labels = np_utils.to_categorical(labels) # one-hot编码 把类别id转换为表示当前类别的向量，比如0 1 2 =》 [[1 0 0] [0 1 0] [0 0 1]]
    return labels

def keras_model_1dconv(input_shape,num_classes):
    model = Sequential()
    model.add(Conv1D(64, 5, activation='relu', input_shape=input_shape))
    # model.add(Conv1D(256, 10, activation='relu'))
    model.add(MaxPooling1D(2))
    model.add(Conv1D(128, 5, activation='relu'))
    model.add(Conv1D(64, 5, activation='relu'))
    model.add(GlobalAveragePooling1D())
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    print(model.summary())
    model.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adadelta(), metrics=['accuracy'])
    filepath = "1dconv.h5"
    checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=1, save_best_only=True, mode='max')
    callbacks_list = [checkpoint]
    return model, callbacks_list

    # print(model.summary())   

def keras_model2(input_shape,num_classes):
    # 构建模型
    model = Sequential()
    model.add(Dense(512, activation='relu',input_shape=input_shape))
    model.add(Dense(128, activation='relu'))
    model.add(Dense(num_classes, activation='softmax'))
    # [编译模型] 配置模型，损失函数采用交叉熵，优化采用Adadelta，将识别准确率作为模型评估
    model.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adadelta(), metrics=['accuracy'])
    filepath = "model1.h5"
    checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=1, save_best_only=True, mode='max')
    callbacks_list = [checkpoint]
    return model, callbacks_list

def keras_model1(input_shape,num_of_classes):
    # 构建模型
    model = Sequential()
    model.add(Dense(128, activation='relu',input_shape=input_shape))
    model.add(keras.layers.core.Reshape((128,1)))
    model.add(Conv1D(64, 20, activation='relu'))
    # model.add(MaxPooling1D(2))
    model.add(Dense(128, activation='relu'))
    model.add(GlobalAveragePooling1D())
    model.add(Dense(num_of_classes, activation='softmax'))
    # [编译模型] 配置模型，损失函数采用交叉熵，优化采用Adadelta，将识别准确率作为模型评估
    model.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adadelta(), metrics=['accuracy'])
    filepath = "asr_mfcc_dense_model.h5"
    checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=1, save_best_only=True, mode='max')
    callbacks_list = [checkpoint]
    return model, callbacks_list

def keras_model(input_shape,num_of_classes):
    num_of_classes = num_of_classes
    model = Sequential()
    model.add(Conv2D(32, (5, 5), input_shape=input_shape, activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same'))
    model.add(Conv2D(64, (5, 5), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding='same'))

    model.add(Flatten())
    model.add(Dense(140, activation='relu'))
    model.add(Dropout(0.6))
    model.add(Dense(70, activation='relu'))
    model.add(Dropout(0.6))
    model.add(Dense(num_of_classes, activation='softmax'))

    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    filepath = "asr_mfcc_conv2d_model.h5"
    checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=1, save_best_only=True, mode='max')
    callbacks_list = [checkpoint]

    return model, callbacks_list    

def test_model():
    features, labels = loadFromPickle()
    features, labels = shuffle(features, labels)
    features=features.reshape(features.shape[0],64,40,1)
    labels=prepress_labels(labels)
    train_x, test_x, train_y, test_y = train_test_split(features, labels, random_state=0,
                                                        test_size=0.1)
    model, callbacks_list = keras_model((64,40,1,),len(labels[0]))
    print_summary(model)
    model.fit(train_x, train_y, batch_size=128, epochs=5, verbose=1, validation_data=(test_x, test_y))

    # 开始评估模型效果 # verbose=0为不输出日志信息
    score = model.evaluate(test_x, test_y, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1]) # 准确度

    model.save('asr_mfcc_conv2d_model.h5') # 保存训练模型

def test_model1(input_length):
    features, labels = loadFromPickle()
    features, labels = shuffle(features, labels)
    features=features.reshape(features.shape[0],input_length)
    labels=prepress_labels(labels)
    train_x, test_x, train_y, test_y = train_test_split(features, labels, random_state=0,
                                                        test_size=0.2)
    model, callbacks_list = keras_model1((input_length,),len(labels[0]))
    print_summary(model)
    model.fit(train_x, train_y, batch_size=128, epochs=20, verbose=1, validation_data=(test_x, test_y),
    	callbacks=[TensorBoard(log_dir="TensorBoard")])

    score = model.evaluate(test_x, test_y, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1]) # 准确度
    model.save('test_model1.h5') # 保存训练模型

def test_model2(input_length):
    features, labels = loadFromPickle()
    features, labels = shuffle(features, labels)
    features=features.reshape(features.shape[0],input_length)
    labels=prepress_labels(labels)
    train_x, test_x, train_y, test_y = train_test_split(features, labels, random_state=0,
                                                        test_size=0.2)
    model, callbacks_list = keras_model2((input_length,),len(labels[0]))
    print_summary(model)
    model.fit(train_x, train_y, batch_size=128, epochs=20, verbose=1, validation_data=(test_x, test_y))

    score = model.evaluate(test_x, test_y, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1]) # 准确度
    model.save('test_model2.h5') # 保存训练模型

def test_1dconv(d_x,d_y):
    features, labels = loadFromPickle()
    features, labels = shuffle(features, labels)
    # features=features.reshape(features.shape[0],64,40)
    features=features.reshape(features.shape[0],d_x,d_y)
    labels=prepress_labels(labels)
    train_x, test_x, train_y, test_y = train_test_split(features, labels, random_state=0,
                                                        test_size=0.2)
    model, callbacks_list = keras_model_1dconv((d_x,d_y),len(labels[0]))
    #print_summary(model)
    model.fit(train_x, train_y, batch_size=128, epochs=5, verbose=1, validation_data=(test_x, test_y))

    score = model.evaluate(test_x, test_y, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1]) # 准确度
    model.save('test_1dconv.h5') # 保存训练模型

if __name__ == '__main__':
    test_1dconv(32,20)
    # test_model2()
    # test_model1()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv1d_10 (Conv1D)           (None, 28, 64)            6464      
_________________________________________________________________
max_pooling1d_4 (MaxPooling1 (None, 14, 64)            0         
_________________________________________________________________
conv1d_11 (Conv1D)           (None, 10, 128)           41088     
_________________________________________________________________
conv1d_12 (Conv1D)           (None, 6, 64)             41024     
_________________________________________________________________
global_average_pooling1d_4 ( (None, 64)                0         
_________________________________________________________________
dropout_4 (Dropout)          (None, 64)                0         
_________________________________________________________________
dense_4 (Dense)              (None, 2)                 130       
Total para

In [6]:
#!coding:utf-8
import sys
import os
from keras.models import load_model
from data_utils import load_label_name,get_mfcc,get_pcm
import numpy as np

root_dir = "/xiaobai/"
sys.path.append(root_dir)
from xiaobai import XiaoBai,BaseSkill

# model = load_model('asr_model.h5') # 加载训练模型
# model = load_model('asr_mfcc_conv1d_model.h5') # 加载训练模型
model = load_model('test_1dconv.h5') # 加载训练模型
# model = load_model('asr_mfcc_dense_model.h5') # 加载训练模型
label_names = load_label_name()
speak = None
def predict(model):
    global speak
    X=get_mfcc('record.wav',samples=16000)
    # X=get_pcm('record.wav',samples=32000)
    #print(X.shape)
    #test dense model
    # X = np.reshape(np.array(X),(-1,32000))

    # test conv1d model
    X = np.reshape(np.array(X),(-1,32,20))
    # test conv2d model
    # X = np.reshape(np.array(X),(-1,64,40))    
    pred_probab = model.predict(X)
    pred_class = list(pred_probab[0]).index(max(pred_probab[0]))
    print("may be " , label_names[pred_class] , ", probab: " ,  pred_probab[0][pred_class])
    if speak is not None:
        speak("你说的是"+label_names[pred_class])
    formated = list( map(lambda x,i : (x.item(),label_names[i]) , pred_probab[0],[i for i in range(len(label_names))]) )
    lists = sorted(formated,reverse=True)
    top5 = lists[:5]
    #print("top5 :",top5)
    return lists
def callback():
    print("Listening...")
    os.system("arecord -d %d -r 16000 -c 1 -t wav -f S16_LE record.wav" % (1,) )   
    predict(model)
def main():
    global speak
    keyword_model = root_dir+'resources/小白.pmdl'
    xiaobai = XiaoBai(keyword_model=keyword_model,callback=callback)
    speak = xiaobai.speak
    xiaobai.listen_for_keyword()

if __name__ == '__main__':
    main()

Listening...


INFO:snowboy:Keyword 1 detected at time: 2019-05-13 14:19:49


Listening...
may be  yes , probab:  0.6989424
小白：你说的是yes


INFO:snowboy:Keyword 1 detected at time: 2019-05-13 14:20:00


Listening...
may be  no , probab:  1.0
小白：你说的是no
stop
