# 第10章 隐马尔可夫模型

1．隐马尔可夫模型是关于时序的概率模型，描述由一个隐藏的马尔可夫链随机生成不可观测的状态的序列，再由各个状态随机生成一个观测而产生观测的序列的过程。

隐马尔可夫模型由初始状态概率向$\pi$、状态转移概率矩阵$A$和观测概率矩阵$B$决定。因此，隐马尔可夫模型可以写成$\lambda=(A, B, \pi)$。

隐马尔可夫模型是一个生成模型，表示状态序列和观测序列的联合分布，但是状态序列是隐藏的，不可观测的。

隐马尔可夫模型可以用于标注，这时状态对应着标记。标注问题是给定观测序列预测其对应的标记序列。

2．概率计算问题。给定模型$\lambda=(A, B, \pi)$和观测序列$O＝(o_1，o_2,…,o_T)$，计算在模型$\lambda$下观测序列$O$出现的概率$P(O|\lambda)$。前向-后向算法是通过递推地计算前向-后向概率可以高效地进行隐马尔可夫模型的概率计算。
 
3．学习问题。已知观测序列$O＝(o_1，o_2,…,o_T)$，估计模型$\lambda=(A, B, \pi)$参数，使得在该模型下观测序列概率$P(O|\lambda)$最大。即用极大似然估计的方法估计参数。Baum-Welch算法，也就是EM算法可以高效地对隐马尔可夫模型进行训练。它是一种非监督学习算法。

4．预测问题。已知模型$\lambda=(A, B, \pi)$和观测序列$O＝(o_1，o_2,…,o_T)$，求对给定观测序列条件概率$P(I|O)$最大的状态序列$I＝(i_1，i_2,…,i_T)$。维特比算法应用动态规划高效地求解最优路径，即概率最大的状态序列。


5. 公式：<br>
前向概率：
$$
\begin{gather*}
\alpha_{1}(i)=\pi_{i}b_{i}(o_{1})\tag{1.1} \\ 
\alpha_{t+1}(i)=[\sum_{j=1}^{N}\alpha_{t}(j)a_{ji}]b_{i}(o_{t+1})\tag{1.2} \\
P(O|\lambda) = \sum_{i=1}^{N}\alpha_{T}(i)\tag{1.3}
\end{gather*}
$$
后向概率：
$$
\begin{gather*}
\beta_{T}(i)=1\tag{2.1} \\
\beta_t(i)=\sum_{j=1}^{N}[a_{ij}b_{j}(o_{t+1})\beta_{t+1}(j)]\tag{2.2} \\
P(O|\lambda) = \sum_{i=1}^{N}[\pi_{i}b_{i}(o_{1})\beta_{1}(i)]\tag{2.3}
\end{gather*}
$$
两个状态的概率计算公式：
$$
\begin{equation}
\begin{split}
\xi_{t}(i,j)&=P(i_{t}=q_{i},i_{t+1}=q_{j}|O,\lambda)\\
&=\frac{P(i_{t}=q_{i},i_{t+1}=q_{j},O|\lambda)}{P(O|\lambda)}\\
&=\frac{P(i_{t}=q_{i},i_{t+1}=q_{j},O|\lambda)}{\sum_{i=1}^{N}\sum_{j=1}^{N}P(i_{t}=q_{i},i_{t+1}=q_{j},O|\lambda)}\\
&=\frac{\alpha_{t}(i)a_{ij}b_{j}(o_{t+1})\beta_{t+1}(j)}{\sum_{i=1}^{N}\sum_{j=1}^{N}\alpha_{t}(i)a_{ij}b_{j}(o_{t+1})\beta_{t+1}(j)}
\end{split} \tag{3}
\end{equation}
$$
单个状态的概率计算公式：
$$
\begin{equation}
\begin{split}
\gamma_{t}(i)&=P(i_{t}=q_{i}|O,\lambda)\\
&=\frac{P(i_{t}=q_{i},O|\lambda)}{P(O|\lambda)}\\
&=\frac{P(i_{t}=q_{i},O|\lambda)}{\sum_{j=1}^{T}P(i_{t}=q_{j},O|\lambda)}\\
&=\frac{\alpha_{t}(i)\beta_{t}(i)}{\sum_{j=1}^{N}\alpha_{t}(j)\beta_{t}(j)}
\end{split}\tag{4}
\end{equation}
$$
EM算法对Baum-Welch模型参数估计公式：
$$
\begin{gather*}
a_{ij}=\frac{\sum_{t=1}^{T-1}\xi_{t}(i,j)}{\sum_{t=1}^{T-1}\gamma_{t}(i)}\tag{5.1} \\
b_{j}(k)=\frac{\sum_{t=1,o_{t}=v_{k}}^{T}\gamma_{t}(j)}{\sum_{t=1}^{T}\gamma_{t}(j)}\tag{5.2}\\
\pi_{i}=\gamma_{1}(i)\tag{5.3}
\end{gather*}
$$

<font color='red' size=5>Cautions:</font><br>
即使使用Baum-Welch算法也无法做到完全无监督的训练。比如要预测单词词性并且没有标注语料集，那么必须具备<br>
* 词性集合
* 词典

根据词性集合和词典构建初始的观测概率分布矩阵，构建状态到词性的映射，状态转移矩阵可使用高斯分布随机初始化。<br>
概率直接相乘可能会浮点下溢，可以取对数计算，但是对数不能出现0概率，所以需要对概率进行平滑处理。<br>
对于每个输入的观测序列都需要使用EM算法对参数重新进行训练，然后在使用viterbi算法输出预测的状态序列。

In [1]:
import numpy as np

In [2]:
class HiddenMarkov:
    def __init__(self, Q, V, A, B, PI):
        self.Q = Q  # 状态数组或者状态映射也可以，N
        self.V = np.array(V) # 观测集数组，包含数据集中所有可能的观测项，T
        self.A = np.array(A) # 状态转移概率分布矩阵，N*N
        self.B = np.array(B) # 观测概率分布矩阵，N*T
        self.PI = np.array(PI) # 初始状态概率分布数组，N
        
    def forward(self, O, logs=False):  # 使用前向算法，O为观察序列，logs用于控制是否输出计算过程
        N = len(self.Q)  #可能存在的状态数量
        M = len(O)  # 观测序列的大小
        self.alphas = np.zeros((N, M))  # 前向概率：alphas[i][j]表示t时刻部分观测序列为o1,o2,o3...,ot且状态为qi的概率
        T = M  # 有几个时刻，有几个观测序列，就有几个时刻
        for t in range(T):  # 遍历每一时刻，算出alpha值
            indexOfO = np.where(self.V == O[t])[0][0]  # 找出序列对应的索引
            for i in range(N):
                if t == 0:  # 计算初值
                    self.alphas[i][t] = self.PI[i] * self.B[i][indexOfO]  # P176（10.15）
                    if logs:
                        print('alpha1(%d)=p%db%db(o1)=%f' % (i, i, i, self.alphas[i][t]))
                else:
                    self.alphas[i][t] = np.dot(
                        [alpha[t - 1] for alpha in self.alphas],
                        [a[i] for a in self.A]) * self.B[i][indexOfO]  # 对应P176（10.16）
                    if logs:
                        print('alpha%d(%d)=sigma [alpha%d(i)ai%d]b%d(o%d)=%f' %
                              (t, i, t - 1, i, i, t, self.alphas[i][t]))
                        # print(alphas)
        P = np.sum([alpha[M - 1] for alpha in self.alphas])  # P176(10.17)
        if logs:
            print("P(O|lambda)=", end="")
            for i in range(N):
                print("%.3f+" % self.alphas[i][M - 1], end="")
            print("0=%.6f" % P)
        # alpha11 = pi[0][0] * B[0][0]    #代表a1(1)
        # alpha12 = pi[0][1] * B[1][0]    #代表a1(2)
        # alpha13 = pi[0][2] * B[2][0]    #代表a1(3)

    def backward(self, O, logs=False):  # 后向算法，O为观察序列,logs用于控制是否输出计算过程
        N = len(self.Q)  # 可能存在的状态数量
        M = len(O)  # 观测序列的大小
        self.betas = np.ones((N, M))  # 后向概率：时刻t状态为qi的条件下，从t+1到T的部分观测序列为ot+1,ot+2,...,oT的概率
        if logs:
            for i in range(N):
                print('beta%d(%d)=1' % (M, i))
        for t in range(M - 2, -1, -1):
            indexOfO = np.where(self.V == O[t + 1])[0][0]  # 找出序列对应的索引
            for i in range(N):
                self.betas[i][t] = np.dot(
                    np.multiply(self.A[i], [b[indexOfO] for b in self.B]),
                    [beta[t + 1] for beta in self.betas])
                realT = t + 1
                realI = i + 1
                if logs:
                    print(
                        'beta%d(%d)=sigma [a%djbj(o%d)beta%d(j)]=(' %
                        (realT, realI, realI, realT + 1, realT + 1),
                        end='')
                    for j in range(N):
                        print(
                            "%.3f*%.3f*%.3f+" % (self.A[i][j], self.B[j][indexOfO],
                                                 self.betas[j][t + 1]),
                            end='')
                    print("0)=%.6f" % self.betas[i][t])
        # print(betas)
        if logs:
            indexOfO = np.where(self.V == O[0])[0][0]
            P = np.dot(
                np.multiply(self.PI, [b[indexOfO] for b in self.B]),
                [beta[0] for beta in self.betas])
            print("P(O|lambda)=", end="")
            for i in range(N):
                print(
                    "%.3f*%.3f*%.3f+" % (self.PI[i], self.B[i][indexOfO], self.betas[i][0]),
                    end="")
            print("0=%.6f" % P)

    def viterbi(self, O, logs=False): # viterbi算法进行状态decode，O为观测序列，logs用于控制是否输出计算过程
        N = len(self.Q)  #可能存在的状态数量
        M = len(O)  # 观测序列的大小
        self.deltas = np.zeros((N, M)) # deltas[i][t]表示t时刻状态为qi的所有状态序列中的最大概率
        self.psis = np.zeros((N, M)) # psis[i][t]使t时刻状态为qi最大化的t-1时刻的状态
        I = np.zeros(M, dtype=np.int32)
        for t in range(M):
            realT = t + 1
            indexOfO = np.where(self.V == O[t])[0][0]  # 找出序列对应的索引
            for i in range(N):
                realI = i + 1
                if t == 0:
                    self.deltas[i][t] = self.PI[i] * self.B[i][indexOfO]
                    self.psis[i][t] = 0
                    if logs:
                        print('delta1(%d)=pi%d * b%d(o1)=%.3f * %.3f=%.6f' %
                              (realI, realI, realI, self.PI[i], self.B[i][indexOfO],
                               self.deltas[i][t]))
                        print('psis1(%d)=0' % (realI))
                else:
                    self.deltas[i][t] = np.max(
                        np.multiply([delta[t - 1] for delta in self.deltas],
                                    [a[i] for a in self.A])) * self.B[i][indexOfO]
                    if logs:
                        print(
                            'delta%d(%d)=max[delta%d(j)aj%d]b%d(o%d)=%.3f*%.3f=%.6f'
                            % (realT, realI, realT - 1, realI, realI, realT,
                               np.max(
                                   np.multiply([delta[t - 1] for delta in self.deltas],
                                               [a[i] for a in self.A])), self.B[i][indexOfO],
                               self.deltas[i][t]))
                    self.psis[i][t] = np.argmax(
                        np.multiply(
                            [delta[t - 1] for delta in self.deltas],
                            [a[i]
                             for a in self.A])) + 1  #由于其返回的是索引，因此应+1才能和正常的下标值相符合。
                    if logs:
                        print('psis%d(%d)=argmax[delta%d(j)aj%d]=%d' %
                              (realT, realI, realT - 1, realI, self.psis[i][t]))
        if logs:
            print(self.deltas)
            print(self.psis)
        I[M - 1] = np.argmax([delta[M - 1] for delta in self.deltas
                                 ]) + 1  #由于其返回的是索引，因此应+1才能和正常的下标值相符合。
        print('i%d=argmax[deltaT(i)]=%d' % (M, I[M - 1]))
        for t in range(M - 2, -1, -1):
            I[t] = self.psis[int(I[t + 1]) - 1][t + 1]
            print('i%d=psis%d(i%d)=%d' % (t + 1, t + 2, t + 2, I[t]))
        print("状态序列I：", I)
        return I
    
    def train(self, O, criterion=0.05, logs=False): 
        '''
        Baum-Welch无监督参数学习，EM算法进行训练，训练之前必须已计算前向forward和后向backward概率
        O为观察序列
        criterion为前后两次训练参数相差允许的最小值，用于控制迭代次数
        logs为真则会打印forward和backward详细的计算过程
        '''
        N = len(self.Q)
        M = len(O)
        xi = np.zeros((M - 1, N, N))
        gamma = np.zeros((M, N))
        done = False
        O_index = np.zeros(M, dtype=np.int32)
        for t in range(M):
            O_index[t] = np.where(self.V == O[t])[0][0]
        while not done:
            # 计算更新参数后的前向概率alphas和后向概率betas
            self.forward(O,logs=logs)
            self.backward(O,logs=logs)
            # EM算法的E step
            # 计算xi
            for t in range(M - 1):
                indexofO = O_index[t + 1]
                xi_divisor = np.dot([alpha[t] for alpha in self.alphas], 
                                    [np.dot(np.multiply(self.A[i], [b[indexofO] for b in self.B]), 
                                            [beta[t + 1] for beta in self.betas]) for i in range(N)])
                xi_dividend = np.array([self.alphas[i][t] * np.multiply(np.multiply(self.A[i], [b[indexofO] for b in self.B]), 
                                                  [beta[t + 1] for beta in self.betas]) for i in range(N)])
                xi[t] = xi_dividend / xi_divisor
            # 计算gamma
            for t in range(M):
                gamma[t] = np.multiply([alpha[t] for alpha in self.alphas], [beta[t] for beta in self.betas]) / np.dot(
                    [alpha[t] for alpha in self.alphas], [beta[t] for beta in self.betas])
            # EM算法的M step
            # 更新状态转移概率分布矩阵A
            new_A = np.zeros(self.A.shape)
            for i in range(N):
                new_A_divisor = np.sum([g[i] for g in gamma]) - gamma[M - 1][i]
                for j in range(N):
                    new_A[i][j] = np.sum([xit[i, j] for xit in xi]) / new_A_divisor
            # 更新观测概率分布矩阵B
            new_B = np.zeros(self.B.shape)
            for j in range(new_B.shape[0]):
                new_B_divisor = np.sum([g[j] for g in gamma])
                for k in range(new_B.shape[1]):
                    new_B[j][k] = np.sum([gamma[t][j] for t in range(M) if O_index[t] == k]) / new_B_divisor
            # 更新初始状态概率分布数组PI
            new_PI = np.zeros(self.PI.shape)
            new_PI = gamma[0]
            # 对比前后两次更新幅度
            if (np.max(np.abs(new_A - self.A)) < criterion 
                and np.max(np.abs(new_B - self.B)) < criterion 
                and np.max(np.abs(new_PI - self.PI)) < criterion):
                done = True
            self.A[:,:], self.B[:,:], self.PI[:] = new_A, new_B, new_PI


### 习题10.1

In [3]:
#习题10.1
Q = [1, 2, 3]
V = ['红', '白']
A = [[0.5, 0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]]
B = [[0.5, 0.5], [0.4, 0.6], [0.7, 0.3]]
# O = ['红', '白', '红', '红', '白', '红', '白', '白']
O = ['红', '白', '红', '白']    #习题10.1的例子
PI = [0.2, 0.4, 0.4]

In [4]:
HMM = HiddenMarkov(Q, V, A, B, PI)
# HMM.forward(O)
# HMM.backward(Q)
HMM.viterbi(O)

i4=argmax[deltaT(i)]=2
i3=psis4(i4)=2
i2=psis3(i3)=2
i1=psis2(i2)=3
状态序列I： [3 2 2 2]


array([3, 2, 2, 2], dtype=int32)

### 习题10.2

In [5]:
Q = [1, 2, 3]
V = ['红', '白']
A = [[0.5, 0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]]
B = [[0.5, 0.5], [0.4, 0.6], [0.7, 0.3]]
O = ['红', '白', '红', '红', '白', '红', '白', '白']
PI = [0.2, 0.3, 0.5]

In [6]:
HMM = HiddenMarkov(Q, V, A, B, PI)
print('forward:')
HMM.forward(O, logs=True)
print('\nbackward:')
HMM.backward(O, logs=True)
print('\nviterbi:')
HMM.viterbi(O)

forward:
alpha1(0)=p0b0b(o1)=0.100000
alpha1(1)=p1b1b(o1)=0.120000
alpha1(2)=p2b2b(o1)=0.350000
alpha1(0)=sigma [alpha0(i)ai0]b0(o1)=0.078000
alpha1(1)=sigma [alpha0(i)ai1]b1(o1)=0.111000
alpha1(2)=sigma [alpha0(i)ai2]b2(o1)=0.068700
alpha2(0)=sigma [alpha1(i)ai0]b0(o2)=0.043020
alpha2(1)=sigma [alpha1(i)ai1]b1(o2)=0.036684
alpha2(2)=sigma [alpha1(i)ai2]b2(o2)=0.055965
alpha3(0)=sigma [alpha2(i)ai0]b0(o3)=0.021854
alpha3(1)=sigma [alpha2(i)ai1]b1(o3)=0.017494
alpha3(2)=sigma [alpha2(i)ai2]b2(o3)=0.033758
alpha4(0)=sigma [alpha3(i)ai0]b0(o4)=0.011463
alpha4(1)=sigma [alpha3(i)ai1]b1(o4)=0.013947
alpha4(2)=sigma [alpha3(i)ai2]b2(o4)=0.008080
alpha5(0)=sigma [alpha4(i)ai0]b0(o5)=0.005766
alpha5(1)=sigma [alpha4(i)ai1]b1(o5)=0.004676
alpha5(2)=sigma [alpha4(i)ai2]b2(o5)=0.007188
alpha6(0)=sigma [alpha5(i)ai0]b0(o6)=0.002862
alpha6(1)=sigma [alpha5(i)ai1]b1(o6)=0.003389
alpha6(2)=sigma [alpha5(i)ai2]b2(o6)=0.001878
alpha7(0)=sigma [alpha6(i)ai0]b0(o7)=0.001411
alpha7(1)=sigma [alpha6(i)ai1]

array([3, 3, 3, 3, 2, 2, 2, 2], dtype=int32)

In [7]:
HMM.train(O) 
print('viterbi:')
HMM.viterbi(O)

viterbi:
i8=argmax[deltaT(i)]=2
i7=psis8(i8)=2
i6=psis7(i7)=3
i5=psis6(i6)=2
i4=psis5(i5)=2
i3=psis4(i4)=3
i2=psis3(i3)=2
i1=psis2(i2)=3
状态序列I： [3 2 3 2 2 3 2 2]


array([3, 2, 3, 2, 2, 3, 2, 2], dtype=int32)

----
参考代码：https://blog.csdn.net/tudaodiaozhale

代码全部测试通过。