In [1]:
from python_speech_features import mfcc
import librosa
from hmmlearn import hmm
from sklearn.externals import joblib
import numpy as np
import os

In [14]:
# 录制wav，时长s
def recordWav(name):
    # 参数定义
    CHUNK = 1024
    FORMAT = pyaudio.paInt16
    CHANNELS = 1  # 声道数
    RATE = 16000  # 采样率
    RECORD_SECONDS = 1.5  # 录制时长
    WAVE_OUTPUT_FILENAME = name

    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)

    print('* recording')
    frames = []
    for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
        data = stream.read(CHUNK)
        frames.append(data)

    print('* done recording')
    stream.stop_stream()
    stream.close()
    p.terminate()

    print('* save recording')
    wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(p.get_sample_size(FORMAT))
    wf.setframerate(RATE)
    wf.writeframes(b''.join(frames))
    wf.close()
    
def getList(path):
    wavdict = {}
    labeldict = {}
    for (dirpath, dirnames, filenames) in os.walk(path):
        print(filenames)
        for filename in filenames:
            fileid = filename.strip('.wav')
            wavdict[fileid] = os.sep.join([dirpath, filename])
            labeldict[fileid] = fileid.split('_')[1]
    return wavdict, labeldict

In [3]:
class Model():
    # 初始化
    def __init__(self, CATEGORY, n_components=3, n_mix = 3, covariance_type='diag', n_iter=1000):
        super(Model, self).__init__()
        self.CATEGORY = CATEGORY
        self.category = len(CATEGORY)
        
        self.n_components = n_components
        self.n_mix = n_mix
        self.covariance_type = covariance_type
        self.n_iter = n_iter

        self.models = []
        for k in range(self.category):
            model = hmm.GMMHMM(n_components=self.n_components, n_mix = self.n_mix, covariance_type=self.covariance_type, n_iter=self.n_iter)
            self.models.append(model)

    # 训练
    def train(self, wavdict, labeldict):
        for k in range(self.category):
            model = self.models[k]
            for x in wavdict:
                if labeldict[x] == self.CATEGORY[k]:
                    y, sr = librosa.load(wavdict[x], sr=16000)
                    X = mfcc(y, samplerate=16000, numcep=26)
                    model.fit(X)

    # 测试
    def test(self, wav):
        X = mfcc(wav, samplerate=16000, numcep=26)
        result = []
        for k in range(self.category):
            model = self.models[k]
            re = model.score(X)
            result.append(re)
        return np.argmax(result)
    
    # 保存
    def save(self, path="models.pkl"):
        joblib.dump(self.models, path)

    # 加载
    def load(self, path="models.pkl"):
        self.models = joblib.load(path)

In [8]:
wavdict, labeldict = getList('records')

CATEGORY = ['back', 'go', 'left', 'right', 'stop']
models = Model(CATEGORY=CATEGORY)

models.train(wavdict=wavdict, labeldict=labeldict)
models.save()

[]
['1_back.wav', '2_back.wav', '3_back.wav', '4_back.wav', '5_back.wav']
['1_go.wav', '2_go.wav', '3_go.wav', '4_go.wav', '5_go.wav']
['1_left.wav', '2_left.wav', '3_left.wav', '4_left.wav', '5_left.wav']
['1_right.wav', '2_right.wav', '3_right.wav', '4_right.wav', '5_right.wav']
['1_stop.wav', '2_stop.wav', '3_stop.wav', '4_stop.wav', '5_stop.wav']


In [None]:
import serial
ser = serial.Serial('COM6', 9600)

In [None]:
command = ['b', 'f', 'l', 'r', 's']

In [None]:
models.load()

In [13]:
name = 'records\\stop\\5_stop.wav'

recordWav(name)
y, sr = librosa.load(name, sr=16000)

rst = models.test(wav=y)
print(command[rst])
ser.write(command[rst].encode())

s
