<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#手写HiddenMarkovModel" data-toc-modified-id="手写HiddenMarkovModel-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>手写HiddenMarkovModel</a></span></li><li><span><a href="#sklearn-库实现" data-toc-modified-id="sklearn-库实现-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>sklearn 库实现</a></span></li></ul></div>

In [1]:
import numpy as np
import copy

## 手写HiddenMarkovModel

In [2]:
class HiddenMarkovModel:
    def __init__(self, Q, V, threshold=1e-6):
        self.Q = Q
        self.V = V
        self.threshold = threshold
    
    def forward(self, O, A, B, pi):
        N, T = len(self.Q), len(O)
        self.alphas = np.zeros((T, N)) 
        self.alphas[0] = pi * B[:, self.V.index(O[0])] 
        for t in range(1, T): 
            self.alphas[t] = np.dot(self.alphas[t-1], A) * B[:, self.V.index(O[t])]
        return np.sum(self.alphas[T-1]) 
    
    def backward(self, O, A, B, pi):
        N, T = len(self.Q), len(O)
        self.betas = np.zeros((T, N)) 
        self.betas[T-1] = np.ones(N) 
        for t in range(T-2, -1, -1): 
            self.betas[t] = np.dot(self.betas[t+1] * B[:, self.V.index(O[t+1])], A.T)
        return np.dot(pi * B[:, self.V.index(O[0])], self.betas[0])
    
    def viterbi(self, O, A, B, pi):
        N, T = len(self.Q), len(O)
        self.deltas = np.zeros((T, N))
        self.psis = np.zeros((T, N), dtype=np.int16)
        self.deltas[0] = pi*B[:, self.V.index(O[0])]
        for t in range(1, T):
            last = self.deltas[t-1].reshape(-1, 1) * A
            self.deltas[t] = np.max(last, axis=0) * B[:, self.V.index(O[t])]
            self.psis[t] = np.argmax(last, axis=0)
        pstar = np.max(self.deltas[T-1])
        path = [np.argmax(self.deltas[T-1])]
        for t in range(T-1, 0, -1):
            path.insert(0, self.psis[t, path[0]])
        return pstar, [index+1 for index in path]
        
    def fit(self, O, A, B, pi, n_iter = 100):
        N, M, D, T = len(self.Q), len(self.V), len(O), len(O[0])
        self.A, self.B, self.pi = copy.deepcopy(A), copy.deepcopy(B), copy.deepcopy(pi)
        gamma, xi, mask = np.zeros((D, T, N)), np.zeros((D, T, N, N)), np.zeros((D, T, N, M))
        for step in range(n_iter):
            for d, o in enumerate(O): 
                p = self.forward(o, self.A, self.B, self.pi)
                p = self.backward(o, self.A, self.B, self.pi)
                gamma[d] = self.alphas * self.betas / p
                for t in range(T-1):
                    xi[d, t] = self.alphas[t].reshape(-1, 1) * self.A * \
                        self.B[:, self.V.index(o[t+1])].reshape(1, -1) * self.betas[t+1] / p
                    mask[d, t, :, self.V.index(o[t])] = np.ones(N)
            mask[d, T-1, :, self.V.index(o[T-1])] = np.ones(N)
            old_A, old_B, old_pi = copy.deepcopy(self.A), copy.deepcopy(self.B), copy.deepcopy(self.pi)
            self.A = np.sum(np.sum(xi[:, :-1, :, :], axis = 1), axis = 0) / \
            np.sum(np.sum(gamma[:, :-1, :], axis = 1), axis = 0).reshape(-1, 1)
            self.B = np.sum(np.sum(gamma.reshape(D, T, N, 1)*mask, axis = 1),
            axis = 0) / np.sum(np.sum(gamma, axis = 1), axis = 0).reshape(-1, 1)
            self.pi = np.sum(gamma[:, 0, :], axis = 0) / D
            if max(np.abs(self.A - old_A).max(), np.abs(self.B - old_B).max(), np.abs(self.pi - old_pi).max()) < self.threshold:
                break
        print(self.A)
        print(self.B)
        print(self.pi)

In [3]:
Q = ['1', '2', '3'] # 状态集合
V = ['红', '白'] # 观测集合
# 模型参数
A = [[0.5, 0.2, 0.3],
[0.3, 0.5, 0.2],
[0.2, 0.3, 0.5]]
B = [[0.5, 0.5],
[0.4, 0.6],
[0.7, 0.3]]
pi = [0.2, 0.4, 0.4]

In [4]:
myhmm = HiddenMarkovModel(Q, V)
O = ['红', '白', '红']
print(myhmm.forward(O, np.array(A), np.array(B), np.array(pi)))
print(myhmm.backward(O, np.array(A), np.array(B), np.array(pi)))
print(myhmm.viterbi(O, np.array(A), np.array(B), np.array(pi)))

0.130218
0.130218
(0.014699999999999998, [3, 3, 3])


## sklearn 库实现

In [5]:
from hmmlearn import hmm

In [6]:
model = hmm.MultinomialHMM(n_components = len(Q))
model.startprob_ = np.array(pi)
model.transmat_ = np.array(A)
model.emissionprob_ = np.array(B)
O = np.array([0, 1, 0]).reshape(-1, 1) #0：红， 1：白
logprob = model.score(O) # 观测序列的 log 概率
print(np.exp(logprob))
# 观测序列对应的最佳隐状态
logprob, states = model.decode(O, algorithm = 'viterbi')
print(np.exp(logprob), [state+1 for state in states])

0.13021800000000003
0.014699999999999996 [3, 3, 3]


In [7]:
O = ['红', '白', '红', '白']
print(myhmm.forward(O, np.array(A), np.array(B), np.array(pi)))
print(myhmm.backward(O, np.array(A), np.array(B), np.array(pi)))
print(myhmm.viterbi(O, np.array(A), np.array(B), np.array(pi)))
print("*"*40)
# 库 HMM
O = np.array([0, 1, 0, 1]).reshape(-1, 1) #0：红， 1：白
print(np.exp(model.score(O)))
logprob, states = model.decode(O, algorithm = 'viterbi')
print(np.exp(logprob), [state+1 for state in states])

0.06009079999999999
0.06009079999999999
(0.0030239999999999993, [3, 2, 2, 2])
****************************************
0.06009079999999997
0.003024 [3, 2, 2, 2]


In [8]:
Q = ['1', '2', '3'] # 状态集合
V = ['红', '白'] # 观测集合
# 模型参数
A = [[0.5, 0.1, 0.4],
[0.3, 0.5, 0.2],
[0.2, 0.2, 0.6]]
B = [[0.5, 0.5],
[0.4, 0.6],
[0.7, 0.3]]
pi = [0.2, 0.3, 0.5]

In [9]:
myhmm = HiddenMarkovModel(Q, V)
O = ['红', '白', '红', '红', '白', '红', '白', '白']
print(myhmm.forward(O, np.array(A), np.array(B), np.array(pi)))
print(myhmm.backward(O, np.array(A), np.array(B), np.array(pi)))
print(myhmm.viterbi(O, np.array(A), np.array(B), np.array(pi)))
# 库 HMM
model = hmm.MultinomialHMM(n_components = len(Q))
O = np.array([0, 1, 0, 0, 1, 0, 1, 1]).reshape(-1, 1) #0：红， 1：白
model.startprob_ = np.array(pi)
model.transmat_ = np.array(A)
model.emissionprob_ = np.array(B)
print(np.exp(model.score(O)))
logprob, states = model.decode(O, algorithm = 'viterbi')
print(np.exp(logprob), [state+1 for state in states])

0.0034767094492823987
0.0034767094492824
(3.024568511999999e-05, [3, 3, 3, 3, 3, 3, 2, 2])
0.003476709449282401
3.024568512e-05 [3, 3, 3, 3, 3, 3, 2, 2]


In [10]:
import pandas as pd
data = pd.read_csv('data_python.csv')
O = data['Visible'].values

In [11]:
Q = ['A', 'B'] # 状态集合
V = [0, 1, 2] # 观测集合
N, M = len(Q), len(V)
# 初始化 A、 B、 pi：使用固定参数
A = np.ones((N, N))
A = A / np.sum(A, axis = 1).reshape(-1, 1)
B = np.array(((1, 2, 3), (4, 5, 6)))
B = B / np.sum(B, axis = 1).reshape(-1, 1)
pi = np.array((0.5, 0.5))
# 自定义 HMM
myhmm = HiddenMarkovModel(Q, V)
myhmm.fit([O], A, B, pi)
print('*'*40)
# 库 HMM
model = hmm.MultinomialHMM(n_components = N, n_iter = 100, tol = 1e-6)
model.startprob = pi
model.transmat = A
model.emissionprob_ = B
model.init_params = 'st'
model.fit(O.reshape(-1, 1))
print(model.transmat_)
print(model.emissionprob_)
print(model.startprob_)

[[0.77384134 0.22615866]
 [0.2278316  0.7721684 ]]
[[0.07801696 0.17193173 0.7500513 ]
 [0.33355442 0.36773984 0.29870574]]
[9.39942699e-51 1.00000000e+00]
****************************************
[[0.77384134 0.22615866]
 [0.2278316  0.7721684 ]]
[[0.07801696 0.17193173 0.7500513 ]
 [0.33355442 0.36773984 0.29870574]]
[9.39942699e-51 1.00000000e+00]


In [12]:
# 初始化 A、 B、 pi：全部使用随机参数
A = np.random.rand(N, N)
A = A / np.sum(A, axis = 1).reshape(-1, 1)
B = np.random.rand(N, M)
B = B / np.sum(B, axis = 1).reshape(-1, 1)
pi = np.random.rand(N)
pi = pi / np.sum(pi)
# 自定义 HMM
myhmm = HiddenMarkovModel(Q, V)
myhmm.fit([O], A, B, pi)
print('*'*40)
# 库 HMM
model = hmm.MultinomialHMM(n_components = N, n_iter = 100, tol = 1e-6)
model.startprob = pi
model.transmat = A
model.emissionprob_ = B
model.init_params = 'st'
model.fit(O.reshape(-1, 1))
print(model.transmat_)
print(model.emissionprob_)
print(model.startprob_)

[[0.86785984 0.13214016]
 [0.09332514 0.90667486]]
[[0.08752113 0.16290358 0.74957529]
 [0.28820287 0.34430551 0.36749162]]
[3.67450001e-44 1.00000000e+00]
****************************************
[[0.85294957 0.14705043]
 [0.11130601 0.88869399]]
[[0.08778356 0.16258662 0.74962982]
 [0.29402465 0.34998063 0.35599472]]
[2.1906687e-49 1.0000000e+00]
