In [2]:
import pickle
import wave
import librosa
import math
import numpy as np
import pyaudio
from pydub import AudioSegment
import sounddevice as sd
from scipy.io.wavfile import write, read
import noisereduce as nr

In [3]:
model = {}

#class_names = ['bat', 'tat', 'thiet', 'bi', 'mot', 'hai', 'ba', 'bon']
class_names = ['bat', 'tat', 'mot', 'hai', 'ba', 'sheila', 'den', 'quat']
audio_format = 'wav'

record_path = 'temp/record.wav'
trimmed_path = 'temp/trimmed.wav'
model_path = 'models_train_main'

In [4]:
def get_mfcc(file_path):
    y, sr = librosa.load(file_path)  # read .wav file
       
    hop_length = math.floor(sr * 0.010)  # 10ms hop
    win_length = math.floor(sr * 0.025)  # 25ms frame
    # mfcc is 12 x T matrix
    mfcc = librosa.feature.mfcc(
        y=y, sr=sr, n_mfcc=12, n_fft=1024,
        hop_length=hop_length, win_length=win_length)
    # subtract mean from mfcc --> normalize mfcc
    mfcc = mfcc - np.mean(mfcc, axis=1).reshape((-1, 1))
    # delta feature 1st order and 2nd order
    delta1 = librosa.feature.delta(mfcc, order=1)
    delta2 = librosa.feature.delta(mfcc, order=2)
    # X is 36 x T
    X = np.concatenate([mfcc, delta1, delta2], axis=0)  # O^r
    # return T x 36 (transpose of X)
    return X.T  # hmmlearn use T x N matrix

In [4]:
def detect_leading_silence(sound, silence_threshold=-42.0, chunk_size=10):
        trim_ms = 0  # ms
        assert chunk_size > 0  # to avoid infinite loop
        while sound[trim_ms:trim_ms + chunk_size].dBFS < silence_threshold and trim_ms < len(sound):
            trim_ms += chunk_size
        return trim_ms

In [5]:
for key in class_names:
    name = f"{model_path}/model_{key}.pkl"
    with open(name, 'rb') as file:
        model[key] = pickle.load(file)

In [6]:
def predict(file_name=None):
    if not file_name:
        file_name = record_path
        
# #     # load data
#     rate, data = read(file_name)
# #     # perform noise reduction
#     reduced_noise = nr.reduce_noise(y=data, sr=rate)
#     write("temp/reduced.wav", rate, reduced_noise)

    # Trim silence
    sound = AudioSegment.from_file(file_name, format=audio_format)

    start_trim = detect_leading_silence(sound)
    end_trim = detect_leading_silence(sound.reverse())
    if start_trim > 250:
        start_trim -= 250
    else:
        start_trim = 0
    
    duration = len(sound)
    if duration - end_trim <= start_trim + 200:
        end_trim = 0
    if start_trim >= 750:
        print("XXX")
        return
    
    print(str(start_trim) + " - " + str(duration - end_trim))
    #trimmed_sound = sound[start_trim:duration - end_trim]
    trimmed_sound = sound[start_trim:]
    trimmed_sound.export(trimmed_path, format=audio_format)

    # Predict
    record_mfcc = get_mfcc(trimmed_path)
    #record_mfcc = get_mfcc2(trimmed_sound)
    scores = [model[cname].score(record_mfcc) for cname in class_names]
    #print('scores', np.array(scores))
    predict_word = np.argmax(scores)
    
    print(class_names[predict_word])

In [7]:
# word = "bat"
# predict(file_name='datasets_main/' + word + '/c1_dat_3.wav')
# predict(file_name='datasets_main/' + word + '/c1_dat_8.wav')
# predict(file_name='datasets_main/' + word + '/c1_hieu_3.wav')
# predict(file_name='datasets_main/' + word + '/c1_hieu_8.wav')

In [8]:
RATE = 16000
RECORD_SECONDS = 1
myrecording = sd.rec(int(RECORD_SECONDS * RATE), channels=1, samplerate=RATE)
sd.wait()
write(record_path, RATE, myrecording)
predict(record_path)

0 - 970
ba


In [9]:
window = None
command = ""
phase = 0
def sd_callback(rec, frames, time, status):
    global window
    global command
    global phase
    rec = np.squeeze(rec)
    if window is None:
        window = np.copy(rec)

    window2 = np.copy(rec)
    rec = np.concatenate((window, rec))
    window = np.copy(window2)
    
    # Notify if errors
    if status:
        print('Error:', status)
    
    write("temp/record.wav", 16000, rec)

    sound = AudioSegment.from_wav("temp/record.wav")
    start_trim = detect_leading_silence(sound)
    end_trim = detect_leading_silence(sound.reverse())
    if start_trim > 200:
        start_trim -= 200
    else:
        start_trim = 0
    duration = len(sound)
    if duration - end_trim <= start_trim + 200:
        end_trim = 0
    
    trimmed_sound = sound[start_trim:]
    trimmed_sound.export(trimmed_path, format=audio_format)
    if start_trim >= 800:
        return
    if duration - end_trim - start_trim > 600:
        return
    if start_trim <= 30:
        return
    
    #print(str(start_trim) + " - " + str(duration - end_trim))
    # Compute features
    mfccs = get_mfcc(trimmed_path)
    #mfccs = mfccs.transpose()

    scores = [model[cname].score(mfccs) for cname in class_names]
    
    #print('scores', np.array(scores) / 1000)
    predict_word = np.argmax(scores)
    print(class_names[predict_word])
#     if phase == 0 and class_names[predict_word] == 'sheila':
#         phase = 1 
#         print(class_names[predict_word])
#     elif phase == 1 and class_names[predict_word] in ['bat', 'tat']:
#         phase = 2
#         print(class_names[predict_word])
#     elif phase == 2 and class_names[predict_word] in ['den', 'quat']:
#         phase = 3
#         print(class_names[predict_word])
#     elif phase == 2 and class_names[predict_word] in ['mot', 'hai', 'ba']:
#         phase = 0
#         print(class_names[predict_word])

#    print()

In [10]:
with sd.InputStream(channels=1,
                    samplerate=16000,
                    blocksize=int(16000 * 0.5),
                    callback=sd_callback):
    while True:
        pass

Exception ignored from cffi callback <function _StreamBase.__init__.<locals>.callback_ptr at 0x7f9a765dfd90>:
Traceback (most recent call last):
  File "/home/pp311/.local/lib/python3.10/site-packages/sounddevice.py", line 846, in callback_ptr
    return _wrap_callback(callback, data, frames, time, status)
  File "/home/pp311/.local/lib/python3.10/site-packages/sounddevice.py", line 2687, in _wrap_callback
    callback(*args)
  File "/tmp/ipykernel_179565/1554775209.py", line 22, in sd_callback
  File "/home/pp311/.local/lib/python3.10/site-packages/pydub/audio_segment.py", line 808, in from_wav
    return cls.from_file(file, 'wav', parameters=parameters)
  File "/home/pp311/.local/lib/python3.10/site-packages/pydub/audio_segment.py", line 728, in from_file
    info = mediainfo_json(orig_file, read_ahead_limit=read_ahead_limit)
  File "/home/pp311/.local/lib/python3.10/site-packages/pydub/utils.py", line 279, in mediainfo_json
    info = json.loads(output)
  File "/usr/lib/python3.10/j

KeyboardInterrupt: 