<a href="https://colab.research.google.com/github/vlozg/speech_hmm/blob/main/Test_DiagHMM_017.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LƯU Ý: NOTEBOOK NÀY CHỈ DÙNG ĐỂ SHOW KẾT QUẢ CHẠY, KHÔNG NÊN CHẠY LẠI NOTEBOOK NÀY 
(VÌ TRONG NÀY CÓ CODE LƯU LẠI PRETRAINED MODEL LÊN DRIVE SẼ BỊ XÓA)

# Speech to text with HMM

- **Bài toán**: Chuyển giọng nói thành văn bản
    - **Input**: Đoạn ghi âm chứa nội dung là các số từ 0 đến 9
    - **Output**: Phân lớp của đoạn ghi âm

# Các biến thiết lập cho thử nghiệm

In [None]:
n_mfcc_ceptrum = 12
n_delta_features = 1
n_mixtures = 3
fsdd_split = 0.6
wolfram_split = 0.6
experiment_id = '017'

In [None]:
n_mfcc_features = n_mfcc_ceptrum * (1+n_delta_features)
n_mfcc_features

24

# Import và cài đặt thư viện

In [None]:
# cài lib. note: cài xong phải restart runtime
!pip install pydub
!pip install pomegranate



In [None]:
# Xác thực google để upload/download qua google drive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Quản lý file, folder
import os
from shutil import copyfile, rmtree
import random

# Xử lý audio
import librosa
import librosa.display
from scipy.io import wavfile

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import pomegranate # Thư viện cho mô hình xác suất
from pomegranate import *

# Tải dữ liệu và lấy xác thực Google

Dữ liệu dùng để huấn luyện và đánh giá, còn xác thực google thì dùng để upload/download mô hình trên drive.

In [None]:
%%capture
# download wolfram
if not os.path.isfile('./dataset_1_wolfram.zip'):
  !gdown --id 115tIAitBNeJC0DwrP-ZyJ6RS3TyWN0qD
  !unzip -o dataset_1_wolfram.zip

# dowload FSDD
if not os.path.isfile('./dataset_2_FSDD.zip'):
  !gdown --id 1Ua9zlPBc0Fv4xGHSQTb7eIvUh_dqFI6P
  !unzip -o dataset_2_FSDD.zip

# download self recorded audio
!gdown --id 1lH_k1AYMVlJvodtZdD7OK2zkdPXxlW9i

In [None]:
# Lấy xác thực google để upload/download file
auth.authenticate_user()
gauth =  GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# Hàm xử lý âm thanh

In [None]:
def minmax_scale(wave):
  return ((wave - wave.min()) / (wave.max() - wave.min()) - 0.5)

def standard_scale(wave):
  return ((wave - wave.mean()) / wave.std())

def scaleAddNoise(wave):
  return standard_scale(wave) + 0.2*np.random.normal(size=wave.shape)

'''
  Hàm đọc audio
  
  Return:
    sample_rate(int): thường cố định là 16000
    wave(np.array): waveform
'''
def read_audio(full_audio_path):
  # Bắt buộc âm thanh đọc vào phải cùng sample rate là 16000
  sample_rate, wave =  wavfile.read(full_audio_path)
  if sample_rate != 16000:
    wave, sample_rate =  librosa.load(full_audio_path, sr=16000) # Hàm đọc của librosa quá chậm, scipy nhanh gấp 5-10 lần
  else:
    wave=wave/32768 # Chuẩn hóa về số thực
  return wave, sample_rate

'''
  Hàm đọc audio, có thêm khoảng trắng ở 2 đầu audio,
  scale lại waveform và thêm white noise
  
  Return:
    sample_rate(int): thường cố định là 16000
    wave(np.array): waveform
'''
def read_process_audio(full_audio_path):
  wave, sample_rate = read_audio(full_audio_path)
  wave = np.pad(wave, (2000,2000), 'constant', constant_values=(0.0,0.0))
  wave = scaleAddNoise(wave)
  return wave, sample_rate

def unvoiced_frame(wave, sample_rate, min_len = 10):
  rms = librosa.feature.rms(y=wave)[0]
  r_normalized = standard_scale(rms)
  p = np.exp(r_normalized) / (1 + np.exp(r_normalized))
  # Giảm dần threshold xuống nếu không đủ min_len để train HMM
  thresh = 0.4
  slice_ = p > thresh
  while (slice_.sum() < min_len):
    thresh-=0.05
    slice_ = p > thresh
  return slice_

def extract_mfcc(wave, sample_rate, trim=True):
  S = librosa.feature.melspectrogram(y=wave, sr=sample_rate, n_mels=40)
  if trim:
    S = S[:,unvoiced_frame(wave, sample_rate)]
  mfccs = librosa.feature.mfcc(S=librosa.power_to_db(S), n_mfcc=n_mfcc_ceptrum+2, lifter=40)[2:,:].T/800
  return mfccs

def mfcc_delta_features(mfcc, order):
  if order==0:
    return mfcc
  dmfcc = librosa.feature.delta(mfcc, order=order)
  return dmfcc

def full_mfcc_from_file(full_audio_path, trim=True):
  wave, sr = read_process_audio(full_audio_path)
  mfccs = extract_mfcc(wave, sr, trim)
  full_mfccs = mfccs
  if n_delta_features >= 1:
    mfccs_d1 = mfcc_delta_features(mfccs, 1)
    full_mfccs = np.hstack([full_mfccs,mfccs_d1])
  if n_delta_features >= 2:
    mfccs_d2 = mfcc_delta_features(mfccs, 2)
    full_mfccs = np.hstack([full_mfccs,mfccs_d2])
  return full_mfccs

# Hàm bổ trợ

In [None]:
def buildDataSet(dir, trim=True):
    # Filter out the wav audio files under the dir
    fileList = [f for f in os.listdir(dir) if os.path.splitext(f)[1] == '.wav']
    dataset = {}
    for fileName in fileList:
        tmp = fileName.split('.')[0]
        label = tmp.split('_')[1]

        # label = filename.split('_')[0]
        feature = full_mfcc_from_file(dir+fileName, trim)
        if label not in dataset.keys():
            dataset[label] = []
            dataset[label].append(feature)
        else:
            exist_feature = dataset[label]
            exist_feature.append(feature)
            dataset[label] = exist_feature
    return dataset

def makeTrainTestDir(mainFolder, filenames, rate):
    paths = [f'{mainFolder}',
             f'{mainFolder}/train_audio',
             f'{mainFolder}/test_audio']

    for path in paths:
        try:
            os.mkdir(path)
        except:
            rmtree(path)
            os.mkdir(path)

    random.seed(1)
    random.shuffle(filenames)
    splitPoint = int(len(filenames)*rate)
    trainFilenames = filenames[:splitPoint]
    testFilenames = filenames[splitPoint:]

    for filename in trainFilenames:
        copyfile(filename[0], f"{paths[1]}/{filename[1]}")

    for filename in testFilenames:
        copyfile(filename[0], f"{paths[2]}/{filename[1]}")

# Setup thư mục chứa data

In [None]:
def formatFilenameFSDD(dir):
    filenames = []
    count = 0
    for filename in os.listdir(dir):
        tmp = str(count) + '_' + filename.split('_')[0] + '.wav'
        filenames.append((f"{dir}/{filename}", tmp))
        count += 1
    return filenames

filenames = formatFilenameFSDD('./dataset_2_FSDD')
makeTrainTestDir('fsdd', filenames, fsdd_split)

In [None]:
def formatFilenameWolfram(dir):
    filenames = []
    count = 0
    folders = os.listdir(dir)
    for folder in folders:
        for filename in os.listdir(f'{dir}/{folder}'):
            tmp = str(count) + '_' + folder.split('_')[0] + '.wav'
            filenames.append((f"{dir}/{folder}/{filename}", tmp))
            count += 1

    return filenames

filenames = formatFilenameWolfram('./dataset_1_wolfram')
makeTrainTestDir('wolfram', filenames, wolfram_split)

# Huấn luyện mô hình (có thể bỏ qua vì mô hình đã save trên drive)

## Hàm train mô hình

In [None]:
def Generate_DiagGMM(full_fset, n_features, n_states=5, n_cmps=3):
  dists = []
  for state_i in range(n_states):
    if n_cmps > 1:
        mixtures = []
        for cmp_i in range(n_cmps):
          cmp = IndependentComponentsDistribution(tuple(
              NormalDistribution(*np.random.random(2))
              for feat_i in range(n_features)
              ))
          mixtures.append(cmp)
        comp = GeneralMixtureModel(mixtures)
    else:
        comp = IndependentComponentsDistribution(tuple(
            NormalDistribution(*np.random.random(2))
            for feat_i in range(n_features)
            ))
    dists.append(comp)

  return dists

In [None]:
# Generate progressive HMM model
def left_right_GMMHMM(seed_sample, x_dim, n_states=10, n_modals=9, diag=True, random=0):
  rng = np.random.RandomState(random)
  if random!=0:
    init_prob = lambda: rng.rand(1)[0]
    
  model = HiddenMarkovModel()
  if diag:
    states = [State(gmm, name=f"H{i}") for i, gmm in enumerate(Generate_DiagGMM(seed_sample, x_dim, n_states, n_modals))]
  else:
    states = [State( GeneralMixtureModel([MultivariateGaussianDistribution.blank(x_dim) for i in range(n_modals)]),
                    name=f"H{i}" ) for i in range(n_states)]
  model.add_states(states)
  model.add_transition(model.start, states[0], 1)
  for i in range(n_states-1):
    model.add_transition(states[i], states[i], 0.5)
    model.add_transition(states[i], states[i+1], 0.5)
  model.add_transition(states[n_states-1], states[n_states-1], 0.5)
  model.add_transition(states[n_states-1], model.end, 0.5)
  model.bake()
  return model

In [None]:
tol = 1000
def train_GMMHMM(dataset, input_dim, n_hidden_state, n_gauss_modal, diag=True, failed_label_return=False, report_fail=None):
    GMMHMM_Models = dict()

    for label in dataset.keys():
        print(f"Training model detect {label}")
        for i in range(tol):
          model = left_right_GMMHMM(dataset[label][0], input_dim, n_hidden_state, n_gauss_modal, diag=diag)
          _, imprv = model.fit(dataset[label], verbose=True, multiple_check_input=False,return_history=True)  # get optimal parameters
          if ~np.isnan(imprv.improvements[-1]):
            break
          model = None
        if model is None and failed_label_return:
          report_fail.append(label)
        GMMHMM_Models[label] = model

    return GMMHMM_Models

In [None]:
def retrain_specific_class(hmmModels, label, dataset, input_dim, n_hidden_state, n_gauss_modal, diag=True):
  for i in range(tol):
    model = left_right_GMMHMM(dataset[label][0], input_dim, n_hidden_state, n_gauss_modal, diag=diag)
    _, imprv = model.fit(dataset[label], verbose=True, multiple_check_input=False,return_history=True)  # get optimal parameters
    if ~np.isnan(imprv.improvements[-1]):
      break
  hmmModels[label] = model
  return hmmModels

## Hàm test mô hình

In [None]:
# test model
def evaluateModel(testDataset, model):
  if (len(testDataset) == 0):
    return
  digit_clf = BayesClassifier(list(dict(sorted(model.items())).values()))
  true_cnt = 0
  total = 0
  for label in sorted(testDataset.keys()):
      features = np.array(testDataset[label], dtype='object')
      pred = digit_clf.predict(features)
      iter_cnt = (pred == int(label)).sum()
      iter_total = len(features)
      total += iter_total
      true_cnt += iter_cnt
      print(f"{label}: {iter_cnt}/{iter_total} ({iter_cnt/iter_total})")
  print("Final recognition rate is %.2f"%(100.0*true_cnt/total), "%")

## Đọc và tiền xử lý data

In [None]:
# prepare data for training
master_path = 'fsdd'

trainDir = master_path + '/train_audio/'
trainDataSet_fsdd = buildDataSet(trainDir)
print("Finish prepare the training data")

# prepare data for testing
testDir = master_path + '/test_audio/'
testDataSet_fsdd = buildDataSet(testDir)
print("Finish prepare the test data")

Finish prepare the training data
Finish prepare the test data


In [None]:
# prepare data for training
master_path = 'wolfram'

trainDir = master_path + '/train_audio/'
trainDataSet_wolfram = buildDataSet(trainDir)
print("Finish prepare the training data")

# prepare data for testing
testDir = master_path + '/test_audio/'
testDataSet_wolfram = buildDataSet(testDir)
print("Finish prepare the test data")

Finish prepare the training data
Finish prepare the test data


In [None]:
trainDataSet = trainDataSet_fsdd
if wolfram_split > 0:
  for label in trainDataSet_fsdd.keys():
    trainDataSet[label] = trainDataSet_fsdd[label] + trainDataSet_wolfram[label]

## **Mô hình 1**
- 10 hiddent states
- Multivariate Diagonal Gauss cho emission probs

In [None]:
# Kiểm tra đảm bảo chuỗi có độ dài nhỏ nhất không nhỏ hơn số state
for label in trainDataSet.keys():
  print(min(map(len, trainDataSet[label])))

10
10
10
10
10
10
10
10
10
10


In [None]:
%%time
# train
failed_label = []
hmmModels = train_GMMHMM(trainDataSet, n_mfcc_features,10,n_mixtures, failed_label_return=True, report_fail=failed_label)
print("Finish training of the GMM_HMM models for digits 0-9")
print(failed_label)
assert len(failed_label) == 0

Training model detect 1
[1] Improvement: 4463547.470881152	Time (s): 1.449
[2] Improvement: nan	Time (s): 1.451
Total Training Improvement: nan
Total Training Time (s): 4.4201
[1] Improvement: 3054640.3234874886	Time (s): 1.445
[2] Improvement: 42635.16983833304	Time (s): 1.452
[3] Improvement: 15422.020972744562	Time (s): 1.444
[4] Improvement: nan	Time (s): 1.431
Total Training Improvement: nan
Total Training Time (s): 7.2746
[1] Improvement: 3720063.2688771645	Time (s): 1.447
[2] Improvement: nan	Time (s): 1.426
Total Training Improvement: nan
Total Training Time (s): 4.3253
[1] Improvement: 7812426.988551942	Time (s): 1.422
[2] Improvement: 28521.161207220284	Time (s): 1.445
[3] Improvement: 12365.303335543955	Time (s): 1.459
[4] Improvement: 8232.478676344035	Time (s): 1.431
[5] Improvement: 5936.138964566169	Time (s): 1.426
[6] Improvement: 3445.732892747503	Time (s): 1.468
[7] Improvement: 1883.5838982649148	Time (s): 1.423
[8] Improvement: 1133.0691953955684	Time (s): 1.441
[9]

In [None]:
print("Evaluate on train set")
evaluateModel(trainDataSet, hmmModels)
print("Evaluate on rest of FSDD set")
evaluateModel(testDataSet_fsdd, hmmModels)
print("Evaluate on wolfram test set")
evaluateModel(testDataSet_wolfram, hmmModels)

Evaluate on train set
0: 1278/1600 (0.79875)
1: 1210/1614 (0.7496902106567535)
2: 1025/1562 (0.6562099871959027)
3: 1127/1579 (0.7137428752374921)
4: 1230/1644 (0.7481751824817519)
5: 1324/1594 (0.8306148055207027)
6: 1164/1624 (0.7167487684729064)
7: 1084/1592 (0.6809045226130653)
8: 1273/1573 (0.8092816274634457)
9: 1307/1617 (0.808286951144094)
Final recognition rate is 75.14 %
Evaluate on rest of FSDD set
0: 97/113 (0.8584070796460177)
1: 93/120 (0.775)
2: 77/114 (0.6754385964912281)
3: 77/130 (0.5923076923076923)
4: 101/115 (0.8782608695652174)
5: 119/121 (0.9834710743801653)
6: 41/108 (0.37962962962962965)
7: 118/145 (0.8137931034482758)
8: 101/119 (0.8487394957983193)
9: 77/115 (0.6695652173913044)
Final recognition rate is 75.08 %
Evaluate on wolfram test set
0: 770/963 (0.7995846313603323)
1: 676/936 (0.7222222222222222)
2: 641/997 (0.6429287863590772)
3: 667/947 (0.7043294614572334)
4: 670/913 (0.7338444687842278)
5: 738/942 (0.7834394904458599)
6: 713/937 (0.7609391675560299

In [None]:
# lưu lại model
for model_label in hmmModels.keys():
  file = drive.CreateFile({'title': f'hmm[{model_label}]_{n_mfcc_features}_10_{n_mixtures}[{experiment_id}].json', 'parents': [{'id': '1QPUr4vwYHu3n9iH3iQmnvDUt2Dgx4V3Y'}]})
  file.SetContentString(hmmModels[model_label].to_json())
  file.Upload()

## **Mô hình 2**
- 5 hiddent states
- Multivariate Diagonal Gauss cho emission probs

In [None]:
%%time
# train
failed_label = []
hmmModels = train_GMMHMM(trainDataSet, n_mfcc_features,5,n_mixtures, failed_label_return=True, report_fail=failed_label)
print("Finish training of the GMM_HMM models for digits 0-9")
print(failed_label)
assert len(failed_label) == 0

Training model detect 1
[1] Improvement: 3090745.5757925306	Time (s): 0.7523
[2] Improvement: nan	Time (s): 0.7843
Total Training Improvement: nan
Total Training Time (s): 2.3291
[1] Improvement: 2494160.714672424	Time (s): 0.7931
[2] Improvement: nan	Time (s): 0.7501
Total Training Improvement: nan
Total Training Time (s): 2.3479
[1] Improvement: 2987605.3476266456	Time (s): 0.7761
[2] Improvement: 5921.9130305551225	Time (s): 0.8186
[3] Improvement: 24452.177403895417	Time (s): 0.778
[4] Improvement: 19196.6998876445	Time (s): 0.7752
[5] Improvement: 8047.541551130591	Time (s): 0.7812
[6] Improvement: 3615.8969143547583	Time (s): 0.7848
[7] Improvement: 2253.3828309604432	Time (s): 0.7799
[8] Improvement: 1556.2124542673118	Time (s): 0.7709
[9] Improvement: 1511.4792811484076	Time (s): 0.7797
[10] Improvement: 1873.4021943993866	Time (s): 0.7649
[11] Improvement: 2699.397201906191	Time (s): 0.7808
[12] Improvement: 3305.3446915470995	Time (s): 0.7703
[13] Improvement: 3839.6832128949

In [None]:
print("Evaluate on train set")
evaluateModel(trainDataSet, hmmModels)
print("Evaluate on rest of FSDD set")
evaluateModel(testDataSet_fsdd, hmmModels)
print("Evaluate on wolfram test set")
evaluateModel(testDataSet_wolfram, hmmModels)

Evaluate on train set
0: 1180/1600 (0.7375)
1: 1069/1614 (0.6623296158612144)
2: 1004/1562 (0.6427656850192062)
3: 1211/1579 (0.7669411019632679)
4: 1207/1644 (0.7341849148418491)
5: 1241/1594 (0.7785445420326224)
6: 1143/1624 (0.7038177339901478)
7: 1121/1592 (0.7041457286432161)
8: 800/1573 (0.5085823267641449)
9: 1024/1617 (0.6332714904143476)
Final recognition rate is 68.75 %
Evaluate on rest of FSDD set
0: 81/113 (0.7168141592920354)
1: 57/120 (0.475)
2: 86/114 (0.7543859649122807)
3: 95/130 (0.7307692307692307)
4: 96/115 (0.8347826086956521)
5: 117/121 (0.9669421487603306)
6: 48/108 (0.4444444444444444)
7: 100/145 (0.6896551724137931)
8: 61/119 (0.5126050420168067)
9: 55/115 (0.4782608695652174)
Final recognition rate is 66.33 %
Evaluate on wolfram test set
0: 745/963 (0.7736240913811008)
1: 636/936 (0.6794871794871795)
2: 657/997 (0.6589769307923772)
3: 726/947 (0.7666314677930306)
4: 658/913 (0.7207009857612268)
5: 706/942 (0.7494692144373672)
6: 676/937 (0.7214514407684098)
7:

In [None]:
# lưu lại model
for model_label in hmmModels.keys():
  file = drive.CreateFile({'title': f'hmm[{model_label}]_{n_mfcc_features}_5_{n_mixtures}[{experiment_id}].json', 'parents': [{'id': '1QPUr4vwYHu3n9iH3iQmnvDUt2Dgx4V3Y'}]})
  file.SetContentString(hmmModels[model_label].to_json())
  file.Upload()

## **Mô hình 3**
- 3 hiddent states
- Multivariate Diagonal Gauss cho emission probs

In [None]:
%%time
# train
failed_label = []
hmmModels = train_GMMHMM(trainDataSet, n_mfcc_features,3,n_mixtures, failed_label_return=True, report_fail=failed_label)
print("Finish training of the GMM_HMM models for digits 0-9")
print(failed_label)
assert len(failed_label) == 0

Training model detect 1
[1] Improvement: 2833248.107970586	Time (s): 0.4895
[2] Improvement: 44126.34806062956	Time (s): 0.5062
[3] Improvement: 23696.777385677444	Time (s): 0.4948
[4] Improvement: 10561.75179979601	Time (s): 0.5003
[5] Improvement: 4955.058037675917	Time (s): 0.5454
[6] Improvement: 2140.7736943447962	Time (s): 0.4893
[7] Improvement: 985.4725367147475	Time (s): 0.4943
[8] Improvement: 625.6980115305632	Time (s): 0.4876
[9] Improvement: 482.36813962925225	Time (s): 0.4971
[10] Improvement: 332.30468106945045	Time (s): 0.4815
[11] Improvement: 220.54108794801868	Time (s): 0.5013
[12] Improvement: 153.16981470794417	Time (s): 0.4886
[13] Improvement: 106.79981436650269	Time (s): 0.5018
[14] Improvement: 74.4718236355111	Time (s): 0.4924
[15] Improvement: 67.31897422950715	Time (s): 0.4898
[16] Improvement: 54.86658096942119	Time (s): 0.4915
[17] Improvement: 36.433959268731996	Time (s): 0.4959
[18] Improvement: 30.910779799334705	Time (s): 0.4979
[19] Improvement: 28.84

In [None]:
print("Evaluate on train set")
evaluateModel(trainDataSet, hmmModels)
print("Evaluate on rest of FSDD set")
evaluateModel(testDataSet_fsdd, hmmModels)
print("Evaluate on wolfram test set")
evaluateModel(testDataSet_wolfram, hmmModels)

Evaluate on train set
0: 68/1600 (0.0425)
1: 1129/1614 (0.6995043370508055)
2: 740/1562 (0.4737516005121639)
3: 826/1579 (0.5231158961367954)
4: 1284/1644 (0.781021897810219)
5: 1125/1594 (0.705771643663739)
6: 1243/1624 (0.7653940886699507)
7: 616/1592 (0.3869346733668342)
8: 760/1573 (0.4831532104259377)
9: 987/1617 (0.6103896103896104)
Final recognition rate is 54.87 %
Evaluate on rest of FSDD set
0: 15/113 (0.13274336283185842)
1: 64/120 (0.5333333333333333)
2: 64/114 (0.5614035087719298)
3: 72/130 (0.5538461538461539)
4: 107/115 (0.9304347826086956)
5: 101/121 (0.8347107438016529)
6: 75/108 (0.6944444444444444)
7: 68/145 (0.4689655172413793)
8: 60/119 (0.5042016806722689)
9: 64/115 (0.5565217391304348)
Final recognition rate is 57.50 %
Evaluate on wolfram test set
0: 31/963 (0.032191069574247146)
1: 658/936 (0.7029914529914529)
2: 449/997 (0.45035105315947843)
3: 479/947 (0.5058078141499472)
4: 681/913 (0.7458926615553122)
5: 643/942 (0.6825902335456475)
6: 724/937 (0.772678762006

In [None]:
# lưu lại model
for model_label in hmmModels.keys():
  file = drive.CreateFile({'title': f'hmm[{model_label}]_{n_mfcc_features}_3_{n_mixtures}[{experiment_id}].json', 'parents': [{'id': '1QPUr4vwYHu3n9iH3iQmnvDUt2Dgx4V3Y'}]})
  file.SetContentString(hmmModels[model_label].to_json())
  file.Upload()