In [None]:
import librosa
import numpy as np
import os
import math
from sklearn.cluster import KMeans
import hmmlearn.hmm

def get_mfcc(file_path):
    y, sr = librosa.load(file_path) # read .wav file
    hop_length = math.floor(sr*0.010) # 10ms hop
    win_length = math.floor(sr*0.025) # 25ms frame
    # mfcc is 12 x T matrix
    mfcc = librosa.feature.mfcc(
        y, sr, n_mfcc=12, n_fft=1024,
        hop_length=hop_length, win_length=win_length)
    # substract mean from mfcc --> normalize mfcc
    mfcc = mfcc - np.mean(mfcc, axis=1).reshape((-1,1)) 
    # delta feature 1st order and 2nd order
    delta1 = librosa.feature.delta(mfcc, order=1)
    delta2 = librosa.feature.delta(mfcc, order=2)
    # X is 36 x T
    X = np.concatenate([mfcc, delta1, delta2], axis=0) # O^r
    # return T x 36 (transpose of X)
    return X.T # hmmlearn use T x N matrix

def get_class_data(data_dir):
    files = os.listdir(data_dir)
    mfcc = [get_mfcc(os.path.join(data_dir,f)) for f in files if f.endswith(".wav")]
    print('data_dir: ', data_dir)
    print(f'mfcc.shape: {np.array(mfcc).shape}')
    return mfcc

def clustering(X, n_clusters=10):
    kmeans = KMeans(n_clusters=n_clusters, n_init=50, random_state=0, verbose=0)
    kmeans.fit(X)
    print("centers", kmeans.cluster_centers_.shape)
    return kmeans  


In [None]:
class_names = ["cothe", "khong", "nguoi", "toi", "va" , "test_cothe" , "test_khong" , "test_nguoi", "test_toi", "test_va" ]
dataset = {}
for cname in class_names:
    print(f"Load {cname} dataset")
    dataset[cname] = get_class_data(os.path.join("data", cname))
 
# Get all vectors in the datasets
all_vectors = np.concatenate([np.concatenate(v, axis=0) for k, v in dataset.items()], axis=0)
print("vectors", all_vectors.shape)
# Run K-Means algorithm to get clusters
kmeans = clustering(all_vectors)
print("centers", kmeans.cluster_centers_.shape)    

models = {}

In [None]:
models = {}
original_dataset = {}

In [None]:
#từ
original_dataset['cothe'] = dataset['cothe'].copy()
cname = 'cothe'
class_vectors = dataset[cname]
# convert all vectors to the cluster index
# dataset['one'] = [O^1, ... O^R] , O^r: the r-th recorded wav file 
# O^r = (c1, c2, ... ct, ... cT) , c_i: the i-th frame in the r-th observation ( or the r-th wav file )
# O^r size T x 1
dataset[cname] = list([kmeans.predict(v).reshape(-1,1) for v in original_dataset[cname]])


hmm = hmmlearn.hmm.MultinomialHMM(
    n_components=13, random_state=0, n_iter=1000, verbose=True,
    params='te',
    init_params='e'
)
hmm.startprob_=np.array([0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])
hmm.transmat_=np.array([
    [0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.7,0.3],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0],
])
if cname[:4] != 'test':
    X = np.concatenate(dataset[cname])
    lengths = list([len(x) for x in dataset[cname]])
    print("training class", cname)
    print(X.shape, lengths, len(lengths))
    hmm.fit(X, lengths=lengths)
    models[cname] = hmm
#dataset['test_cothe'] = list([kmeans.predict(v).reshape(-1,1) for v in dataset['test_cothe']])

In [None]:
#từ
original_dataset['toi'] = dataset['toi'].copy()
cname = 'toi'
class_vectors = dataset[cname]
# convert all vectors to the cluster index
# dataset['one'] = [O^1, ... O^R] , O^r: the r-th recorded wav file 
# O^r = (c1, c2, ... ct, ... cT) , c_i: the i-th frame in the r-th observation ( or the r-th wav file )
# O^r size T x 1

dataset[cname] = list([kmeans.predict(v).reshape(-1,1) for v in original_dataset[cname]])


hmm = hmmlearn.hmm.MultinomialHMM(
    n_components= 8, random_state=0, n_iter=1100, verbose=True,
    params='te',
    init_params='e'
)
hmm.startprob_ = np.array([0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,])
hmm.transmat_ = np.array([
        [0.5,0.3,0.2,0.0,0.0,0.0,0.0,0.0],
        [0.0,0.5,0.3,0.2,0.0,0.0,0.0,0.0],
        [0.0,0.0,0.5,0.3,0.2,0.0,0.0,0.0],
        [0.0,0.0,0.0,0.5,0.3,0.2,0.0,0.0],
        [0.0,0.0,0.0,0.0,0.5,0.3,0.2,0.0],
        [0.0,0.0,0.0,0.0,0.0,0.5,0.3,0.2],
        [0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.4],
        [0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0],       
    ])

if cname[:4] != 'test':
    X = np.concatenate(dataset[cname])
    lengths = list([len(x) for x in dataset[cname]])
    print("training class", cname)
    print(X.shape, lengths, len(lengths))
    hmm.fit(X, lengths=lengths)
    models[cname] = hmm
#dataset['test_toi'] = list([kmeans.predict(v).reshape(-1,1) for v in dataset['test_toi']])

In [None]:
#từ
original_dataset['khong'] = dataset['khong'].copy()
cname = 'khong'
class_vectors = dataset[cname]
# convert all vectors to the cluster index
# dataset['one'] = [O^1, ... O^R] , O^r: the r-th recorded wav file 
# O^r = (c1, c2, ... ct, ... cT) , c_i: the i-th frame in the r-th observation ( or the r-th wav file )
# O^r size T x 1
dataset[cname] = list([kmeans.predict(v).reshape(-1,1) for v in original_dataset[cname]])


hmm = hmmlearn.hmm.MultinomialHMM(
    n_components=13, random_state=0, n_iter=1000, verbose=True,
    params='te',
    init_params='e'
)
hmm.startprob_=np.array([0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])
hmm.transmat_=np.array([
    [0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.7,0.3],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0],
])
if cname[:4] != 'test':
    X = np.concatenate(dataset[cname])
    lengths = list([len(x) for x in dataset[cname]])
    print("training class", cname)
    print(X.shape, lengths, len(lengths))
    hmm.fit(X, lengths=lengths)
    models[cname] = hmm
#dataset['test_khong'] = list([kmeans.predict(v).reshape(-1,1) for v in dataset['test_khong']])

In [None]:
#từ
original_dataset['nguoi'] = dataset['nguoi'].copy()
cname = 'nguoi'
class_vectors = dataset[cname]
# convert all vectors to the cluster index
# dataset['one'] = [O^1, ... O^R] , O^r: the r-th recorded wav file 
# O^r = (c1, c2, ... ct, ... cT) , c_i: the i-th frame in the r-th observation ( or the r-th wav file )
# O^r size T x 1
dataset[cname] = list([kmeans.predict(v).reshape(-1,1) for v in original_dataset[cname]])


hmm = hmmlearn.hmm.MultinomialHMM(
    n_components=13, random_state=0, n_iter=1000, verbose=True,
    params='te',
    init_params='e'
)
hmm.startprob_=np.array([0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])
hmm.transmat_=np.array([
    [0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1,0.0],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.6,0.3,0.1],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.7,0.3],
    [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0],
])
if cname[:4] != 'test':
    X = np.concatenate(dataset[cname])
    lengths = list([len(x) for x in dataset[cname]])
    print("training class", cname)
    print(X.shape, lengths, len(lengths))
    hmm.fit(X, lengths=lengths)
    models[cname] = hmm
#dataset['test_nguoi'] = list([kmeans.predict(v).reshape(-1,1) for v in dataset['test_nguoi']])

In [None]:
#từ
original_dataset['va'] = dataset['va'].copy()
cname = 'va'
class_vectors = dataset[cname]
# convert all vectors to the cluster index
# dataset['one'] = [O^1, ... O^R] , O^r: the r-th recorded wav file 
# O^r = (c1, c2, ... ct, ... cT) , c_i: the i-th frame in the r-th observation ( or the r-th wav file )
# O^r size T x 1
dataset[cname] = list([kmeans.predict(v).reshape(-1,1) for v in original_dataset[cname]])


hmm = hmmlearn.hmm.MultinomialHMM(
    n_components=5, random_state=0, n_iter=1000, verbose=True,
    params='te',
    init_params='e'
)
hmm.startprob_=np.array([0.7,0.2,0.1,0.0,0.0])
hmm.transmat_=np.array([
    [0.7,0.2,0.1,0.0,0.0],
    [0.0,0.7,0.2,0.1,0.0],
    [0.0,0.0,0.7,0.2,0.1],
    [0.0,0.0,0.0,0.6,0.4],
    [0.0,0.0,0.0,0.0,1.0],
])
if cname[:4] != 'test':
    X = np.concatenate(dataset[cname])
    lengths = list([len(x) for x in dataset[cname]])
    print("training class", cname)
    print(X.shape, lengths, len(lengths))
    hmm.fit(X, lengths=lengths)
    models[cname] = hmm
#dataset['test_va'] = list([kmeans.predict(v).reshape(-1,1) for v in dataset['test_va']])

In [None]:
dataset['test_va'] = list([kmeans.predict(v).reshape(-1,1) for v in dataset['test_va']])

dataset['test_cothe'] = list([kmeans.predict(v).reshape(-1,1) for v in dataset['test_cothe']])
dataset['test_toi'] = list([kmeans.predict(v).reshape(-1,1) for v in dataset['test_toi']])
dataset['test_khong'] = list([kmeans.predict(v).reshape(-1,1) for v in dataset['test_khong']])
dataset['test_nguoi'] = list([kmeans.predict(v).reshape(-1,1) for v in dataset['test_nguoi']])


In [None]:
print("Testing")
for true_cname in class_names:
    for O in dataset[true_cname]:
        score = {cname : model.score(O, [len(O)]) for cname, model in models.items() if cname[:4] != 'test' }
        print(true_cname, score)

In [None]:
import operator

print("Testing")

class_names = ["test_cothe","test_toi", "test_khong", "test_nguoi", "test_va"]
for true_cname in class_names:
    index = 0
    count = 0;
    total = 0;
    print("-----------------------")
    for O in dataset[true_cname]:
        index+=1
        total+=1
        pred = max(score.items(), key=operator.itemgetter(1))[0]
        if(true_cname[5:] == pred): count+=1
        score = {cname : model.score(O, [len(O)]) for cname, model in models.items() if cname[:4] != 'test' }
        print(true_cname, pred, index)
    print(count / total * 100)

In [12]:
import sys
import os
import queue
import math
import threading
from pynput import keyboard

from python_speech_features import mfcc
from sklearn.cluster import KMeans
import hmmlearn.hmm
import numpy as np
import pickle as pk

import librosa
import sounddevice as sd
import soundfile as sf

CLASS_LABELS = {"cothe", "toi", "khong", "nguoi", "va"}

# Controls
STOP_RECORD_CMD = "/s"
RECORD_CMD = "/r"

inputQueue = queue.Queue()

def read_kb_input(inputQueue):
    while True:
        input_str = input()
        inputQueue.put(input_str)

# Indexing functions
def writeIndex(file, sentence, file_name):
    file.write(file_name + "\n")
    file.write(sentence  + "\n")

# Recording functions
SAMPLE_RATE = 22050
CHANNELS = 2

q = queue.Queue()

def callback( indata, frames, time, status):
    #This is called (from a separate thread) for each audio block.
    if status:
        print(status, file=sys.stderr)
    q.put(indata.copy())

def record(file_name):
    try:
        #Open a new soundfile and attempt recording
        with sf.SoundFile(file_name, mode='x', samplerate=SAMPLE_RATE, channels=CHANNELS, subtype="PCM_24") as file:
            with sd.InputStream(samplerate=SAMPLE_RATE, device=sd.default.device, channels=CHANNELS, callback=callback):
                print("Recording ... ('{}' to stop recording)".format(STOP_RECORD_CMD))
            
                while True:
                    file.write(q.get())

                    if (inputQueue.qsize() > 0):
                        input_str = inputQueue.get()
                        if (input_str == STOP_RECORD_CMD):
                            break

                print("Saved to: {}\n".format(file_name))

    except Exception as e:
        print(e)

# Kmeans
def clustering(X, n_clusters=10):
    kmeans = KMeans(n_clusters=n_clusters, n_init=50, random_state=0, verbose=0)
    kmeans.fit(X)
    return kmeans  



In [15]:

    models = {}
    for label in CLASS_LABELS:
        with open(os.path.join("Models", label + ".pkl"), "rb") as file: models[label] = pk.load(file)

    input("Press any key to start recording")

    inputThread = threading.Thread(target=read_kb_input, args=(inputQueue,), daemon=True)
    inputThread.start()

    record("live_recording.wav")

    sound_mfcc = MFCC.get_mfcc("live_recording.wav")

    os.remove("live_recording.wav")

    kmeans = clustering(sound_mfcc)
    sound_mfcc = kmeans.predict(sound_mfcc).reshape(-1,1)

    evals = {cname : model.score(sound_mfcc, [len(sound_mfcc)]) for cname, model in models.items()}
    cmax = max(evals.keys(), key=(lambda k: evals[k]))
    print(evals)
    print("Conclusion: " + cmax)

FileNotFoundError: [Errno 2] No such file or directory: 'Models\\va.pkl'