In [6]:
import numpy as np
import json
import os

In [18]:
class MonoPhoneHMM:
    def __init__(self):
        ## 考慮されうる音素リスト
        self.phones = []
        ## 考慮されうる音素数
        self.num_phones = 1
        # 各音素ごとの状態数
        self.num_states = 1
        # 各状態ごとのGMMにおける混合数
        self.num_mixtures = 1
        # 入力特徴量ベクトルの次元数
        self.num_dims = 1
        
        # 学習するパラメータ
        ## 尤度モデルのパラメーター
        self.pdf = None
        ## 遷移確率（対数）パラメーター
        self.trans = None

        # 学習時・認識時に計算する値
        ## 各状態において混合されている各分布ごとの尤度
        self.elem_probs = None
        ## 各情報ごとの尤度
        self.state_prob = None
        ## 前向き確率
        self.alpha = None
        ## 後ろ向き確率
        self.beta = None
        ## 音素列仮説に対する対数尤度(Forwardによるもの)   
        self.loglikelihood = 0
        ## ビタビアルゴリズムにおける累積確率
        self.score = None
        ## ビタビパス
        self.track = None
        ## ビタビアルゴリズムにおけるスコア
        self.viterbi_score = 0

        # その他
        ## logをとるときの発散防止
        self.LZERO = 1e-10
        self.LSMALL = 0.5e10
        self.ZERO = 1e-100
        self.MINVAR = 1e-4
    
    def make_proto(self, phone_list, num_states, prob_loop, num_dims):
        self.phones = phone_list
        self.num_phones = len(phone_list)
        self.num_states = num_states
        self.num_mixtures = 1
        self.num_dims = num_dims
        
        ## 尤度モデルの初期パラメータ作成
        ## 音素p, 状態s, 混合要素mのパラメーターは pdf[p][s][m]
        self.pdf = []
        for p in range(self.num_phones):
            tmp_p = []
            for s in range(self.num_states):
                tmp_s = []
                for m in range(self.num_mixtures):
                    mu = np.zeros(self.num_dims)
                    ## 共分散行列は対角行列と仮定->分散のみ学習する
                    var = np.ones(self.num_dims)
                    ## 混合重み
                    weight = 1.0
                    ## 対数尤度における定数部分
                    logGaussianConst = self.num_dims * np.log(2. * np.pi) + np.sum(np.log(var))
                    ## パラメーター
                    gaussian = {
                        'weight': weight,
                        'mu': mu,
                        'var': var,
                        'logGaussianConst': logGaussianConst
                    }
                    tmp_s.append(gaussian)
                tmp_p.append(tmp_s)
            self.pdf.append(tmp_p)
        
        ## 遷移確率の初期パラメータ作成
        ## 音素p, 状態sの遷移確率は trans[p][s]であり、[log_prob_loop][log_prob_next]を得る
        prob_next = 1. - prob_loop
        log_prob_loop = np.log(prob_loop) if prob_loop > self.ZERO else self.LZERO
        log_prob_next = np.log(prob_next) if prob_next > self.ZERO else self.LZERO

        self.trans = []
        for p in range(self.num_phones):
            tmp_p = []
            for s in range(self.num_states):
                tmp_p.append([log_prob_loop, log_prob_next])
            self.trans.append(tmp_p)

    def flat_set_gaussian_param(self, mean, var):
        for p in range(self.num_phones):
            for s in range(self.num_states):
                for m in range(self.num_mixtures):
                    pdf = self.pdf[p][s][m]
                    pdf['mu'] = mean
                    pdf['var'] = var
                    logGaussianConst = self.num_dims * np.log(2. * np.pi) + np.sum(np.log(var))
                    pdf['logGaussianConst'] = logGaussianConst


    def save_hmm(self, filename):
        ''' HMMパラメータをjson形式で保存
        filename: 保存ファイル名
        '''
        # json形式で保存するため，
        # HMMの情報を辞書形式に変換する
        hmmjson = {}
        # 基本情報を入力
        hmmjson['num_phones'] = self.num_phones
        hmmjson['num_states'] = self.num_states
        hmmjson['num_mixture'] = self.num_mixtures
        hmmjson['num_dims'] = self.num_dims
        # 音素モデルリスト
        hmmjson['hmms'] = []
        for p, phone in enumerate(self.phones):
            model_p = {}
            # 音素名
            model_p['phone'] = phone
            # HMMリスト
            model_p['hmm'] = []
            for s in range(self.num_states):
                model_s = {}
                # 状態番号
                model_s['state'] = s
                # 遷移確率(対数値から戻す)
                model_s['trans'] = \
                    list(np.exp(self.trans[p][s]))
                # GMMリスト
                model_s['gmm'] = []
                for m in range(self.num_mixtures):
                    model_m = {}
                    # 混合要素番号
                    model_m['mixture'] = m
                    # 混合重み
                    model_m['weight'] = \
                        self.pdf[p][s][m]['weight']
                    # 平均値ベクトル
                    # jsonはndarrayを扱えないので
                    # list型に変換しておく
                    model_m['mean'] = \
                        list(self.pdf[p][s][m]['mu'])
                    # 対角共分散
                    model_m['variance'] = \
                        list(self.pdf[p][s][m]['var'])
                    # gConst
                    model_m['gConst'] = \
                        self.pdf[p][s][m]['logGaussianConst']
                    # gmmリストに加える
                    model_s['gmm'].append(model_m)
                # hmmリストに加える
                model_p['hmm'].append(model_s)
            # 音素モデルリストに加える
            hmmjson['hmms'].append(model_p)

        # JSON形式で保存する
        with open(filename, mode='w') as f:
            json.dump(hmmjson, f, indent=4)

    def load_hmm(self, filename):
        ''' json形式のHMMファイルを読み込む
        filename: 読み込みファイル名
        '''
        # JSON形式のHMMファイルを読み込む
        with open(filename, mode='r') as f:
            hmmjson = json.load(f)

        # 辞書の値を読み込んでいく
        self.num_phones = hmmjson['num_phones']
        self.num_states = hmmjson['num_states']
        self.num_mixture = hmmjson['num_mixture']
        self.num_dims = hmmjson['num_dims']

        # 音素情報の読み込み
        self.phones = []
        for p in range(self.num_phones):
            hmms = hmmjson['hmms'][p]
            self.phones.append(hmms['phone'])

        #遷移確率の読み込み
        # 音素番号p, 状態番号s の遷移確率は
        # trans[p][s] = [loop, next]
        self.trans = []
        for p in range(self.num_phones):
            tmp_p = []
            hmms = hmmjson['hmms'][p]
            for s in range(self.num_states):
                hmm = hmms['hmm'][s]
                # 遷移確率の読み込み
                tmp_trans = np.array(hmm['trans'])
                # 総和が1になるよう正規化
                tmp_trans /= np.sum(tmp_trans)
                # 対数に変換
                for i in [0, 1]:
                    tmp_trans[i] = np.log(tmp_trans[i]) \
                        if tmp_trans[i] > self.ZERO \
                        else self.LZERO
                tmp_p.append(tmp_trans)
            # self.transに追加
            self.trans.append(tmp_p)

        # 正規分布パラメータの読み込み
        # 音素番号p, 状態番号s, 混合要素番号m
        # の正規分布はpdf[p][s][m]でアクセスする
        # pdf[p][s][m] = gaussian
        self.pdf = []
        for p in range(self.num_phones):
            tmp_p = []
            hmms = hmmjson['hmms'][p]
            for s in range(self.num_states):
                tmp_s = []
                hmm = hmms['hmm'][s]
                for m in range(self.num_mixture):
                    gmm = hmm['gmm'][m]
                    # 重み，平均，分散，gConstを取得
                    weight = gmm['weight']
                    mu = np.array(gmm['mean'])
                    var = np.array(gmm['variance'])
                    gconst = gmm['gConst']
                    # 正規分布を作成
                    gaussian = {'weight': weight, 
                                'mu': mu, 
                                'var': var,
                                'gConst': gconst}
                    tmp_s.append(gaussian)
                tmp_p.append(tmp_s)
            # self.pdfに追加
            self.pdf.append(tmp_p)



In [9]:
# 学習データの用意
original_label_file = "../data/label/train_small/text_phone"
int_label_file = "../data/label/train_small/text_int"
phone_list_file = "./phones.txt"
## 音素リストを作成する
phone_dic = {}
phone_dic['pau'] = 0 
with open(phone_list_file, mode='r') as f:
    for i, line in enumerate(f):
        phone = line.strip()
        phone_dic[phone] = i + 1

## 音素リストをベースにして、教師データの音素を数値に変換

with open(original_label_file, mode='r') as f_in, open (int_label_file, mode='w') as f_out:
    for line in f_in:
        text = line.split()
        f_out.write(f"{text[0]}")
        f_out.write(f" {0}")
        for u in text[1:]:
            f_out.write(f" {phone_dic[u]}")
        f_out.write(f" {0}")
        f_out.write("\n")
        

### HMMのプロトタイプ作成（初期化も）

In [17]:
phone_list = list(phone_dic.keys())
num_states = 3
num_dims = 13
prob_loop = 0.7
save_dir = f"./model_{num_states}state_1mix"
os.makedirs(save_dir, exist_ok=True)
hmm = MonoPhoneHMM()
hmm.make_proto(phone_list, num_states, prob_loop, num_dims)
hmm.save_hmm(os.path.join(save_dir, "hmm_proto"))

### プロトタイプをベースとしてFlatStartにより、混合正規分布のパラメータを初期化

In [19]:
hmm_proto_file = os.path.join(save_dir, "hmm_proto")
mean_std_file = "/Users/shibu/Workspace/Text/python_asr/mfcc/train_small/mean_std.txt"
with open(mean_std_file, mode='r') as f:
    # 全行読み込み
    lines = f.readlines()
    # 1行目(0始まり)が平均値ベクトル(mean)，
    # 3行目が標準偏差ベクトル(std)
    mean_line = lines[1]
    std_line = lines[3]
    # スペース区切りのリストに変換
    mean = mean_line.split()
    std = std_line.split()
    # numpy arrayに変換
    mean = np.array(mean, dtype=np.float64)
    std = np.array(std, dtype=np.float64)
    # 標準偏差を分散に変換
    var = std ** 2

hmm = MonoPhoneHMM()

# HMMプロトタイプを読み込む
hmm.load_hmm(hmm_proto_file)

# フラットスタート初期化を実行
hmm.flat_set_gaussian_param(mean, var)

# HMMのプロトタイプをjson形式で保存
hmm.save_hmm(os.path.join(save_dir, '0.hmm'))

### 学習

In [None]:
base_hmm_file = os.path.join(save_dir, '0.hmm')
feat_scp_file = "/Users/shibu/Workspace/Text/python_asr/mfcc/train_small/feats.scp"
label_file = "/Users/shibu/Workspace/Text/python_asr/data/label/train_small/text_int"

# 更新回数
num_iter = 10
# 学習に使う発話数
num_utters = 50

hmm = MonoPhoneHMM()
hmm.load_hmm(base_hmm_file)

# 各発話ごとの教師データをまとめる
label_list = {}
with open(label_file, mode='r') as f:
    for line in f:
        utt = line.split()[0]
        labels = np.int64(line.split()[1:])
        label_list[utt] = labels

# 各発話ごとの特徴量のファイルパスをまとめる
feat_list = {}
with open(feat_scp_file, mode='r') as f:
    for i, line in enumerate(f):
        if i >= num_utters:
            break
        utt = line.split()[0]
        feat_file_path = line.split()[1]
        feat_list[utt] = feat_file_path

# 学習する
for iter in range(num_iter):
    hmm.train(feat_list, label_list)
    hmm.save_hmm(os.path.join(save_dir, f"{iter+1}.hmm"))