# HMM 的实现

本教程改编自Github[链接](https://github.com/shanguanma/Aligners/blob/master/HMM.ipynb)

本教程利用 *Python* 实现 HMM 算法的前向过程和后向过程，并通过 Viterbi算法，在给定 HMM 模型和观测序列的条件下，求出可能性最大的状态序列

- 状态序列是马尔可夫链，用转移概率描述
- 每一状态对应一个可以观察的事件，用观测概率描述
- Viterbi 算法利用动态规划求解最优路径，最优路径对应最大概率

## 选择合适的数据结构

在算法实现的过程中选择合适的数据结构，注意模型中需要包含：

- N 个状态

  用字符、字符串或数字表示，注意初始状态和最终状态可以显式表示

- 状态转移概率
  
  初始状态转移到其他状态的概率：$\pi = (\pi_1, ..., \pi_N)$

  与时间无关的状态转移概率矩阵：${a_{ij}} = P(q_t= j |q_{t-1} =i) $

  即假设$t-1$ 时刻的状态为 $i$，下一时刻转移到状态 $j$ 的概率
  
- 发射概率
  
  $b_i(O_t) = P(O_t| q_t = i)$


  即在 $t$ 时刻的状态为 $i$ 的条件下，观测到符号 $O_t$ 的概率


## 状态

用字符表示各个状态


In [61]:
states = ['Begin','S1','S2']

## 状态转移概率矩阵


用如下字典表示状态转移概率矩阵，字典的键(key)为包含两个字符的元组，表示从第一个字符所代表的状态转移到第二个字符所代表的状态，字典的值(value)为浮点数，表示转移概率

In [62]:
transitions = {
               ('Begin', 'S1'): 0.5,
               ('Begin', 'S2'): 0.5,
               ('S1','S1') : 0.7,
               ('S1','S2') : 0.3,
               ('S2','S2') : 0.8,
               ('S2','S1') : 0.2}
    

## 发射概率矩阵

用如下字典表示发射概率矩阵，字典的键(key)为状态对应的字符，字典的值(value)为该状态下观测到各个字符的概率

In [63]:
emissions = {'S1' : {'A':0.4, 'C':0.1, 'G':0.4, 'T':0.1},
            'S2': {'A': 0.1, 'C': 0.4,'G':0.1,'T':0.4}}

## 样本集合的观测序列

观测序列用如下字符串表示

In [64]:
sequence = 'ACT'

## 初始化

初始化一个全 0 矩阵

In [65]:
def initialize_matrix(dim1, dim2, value=0):
    F = []
    for i in range(0, dim1):
        F.append([])
        for j in range(0,dim2):
            F[i].append(value)
    return F
            

## 输出格式设计

为了更方便地观察算法的计算过程和计算结果，定义了如下函数，\<sos\>(start of sequence) 对应于初始状态

In [66]:
def print_matrix(matrix, axis1,axis2):
    w = '{:<10}'
    print(w.format('') + w.format('<sos>') + ''.join([w.format(char) for char in axis2]))
    for i, row in enumerate(matrix):
        print(w.format(axis1[i]) + ''.join(['{:<10.3e}'.format(item) for item in row]))

def print_matrix_p(matrix, axis1, axis2):
    w = '{:<10}'
    print(w.format('') + w.format('<sos>') + ''.join([w.format(char) for char in axis2]))
    for i, row in enumerate(matrix):
        print(w.format(axis1[i]) + ''.join(['{:<10s}'.format(item) for item in row]))


## 前向算法

初始化矩阵 $F$，行数和状态数对应，列数等于观测序列中的字符数加 1（第一列表示初始状态）

在初始状态的概率为1，即矩阵的第(1, 1) 个元素的值为1

In [67]:
F = initialize_matrix(len(states), len(sequence)+1)
F[0][0] = 1
print_matrix(F,states, sequence)

          <sos>     A         C         T         
Begin     1.000e+00 0.000e+00 0.000e+00 0.000e+00 
S1        0.000e+00 0.000e+00 0.000e+00 0.000e+00 
S2        0.000e+00 0.000e+00 0.000e+00 0.000e+00 


采用前向变量$\alpha_t (i)=P(O_1,O_2,...,O_t,q_t=S_i \vert \lambda$)，$1\le t \le T, 1\le i \le N$

初始化$t=1$时刻不同状态的前向变量：对于每一个状态，初始状态转移到当前状态$i$得到的概率$\pi_i$乘以当前状态下观测到第一个字符$O_1$的概率$b_i (O_1)$，得到对应的值: $\alpha_1 (i)=\pi_i b_i (O_1)$

In [68]:
for i in range(1, len(states)):
    F[i][1] = transitions[(states[0],states[i])] * emissions[states[i]][sequence[0]]

print_matrix(F, states, sequence)
# F[1][1] = transitions[states[0],states[1]] * emissions[states[1]][sequence[0]]
# F[1][1] = transitions[('b','S1')] * emissions['S1']['A']
# F[1][1] = 0.5 * 0.4
# F[1][1] = 0.02
# F[1][1] = 2.00e-01
# F[2][1] = transitions[states[0],states[2]] * emissions[states[2]][sequence[0]]
# F[2][1] = transitions[('b','S2')] * emissioms['S2']['A']
# F[2][1] = 0.5 * 0.1
# F[2][1] = 0.05
# F[2][1] = 5.00e-02

          <sos>     A         C         T         
Begin     1.000e+00 0.000e+00 0.000e+00 0.000e+00 
S1        0.000e+00 2.000e-01 0.000e+00 0.000e+00 
S2        0.000e+00 5.000e-02 0.000e+00 0.000e+00 




计算$t+1$时刻不同状态的前向变量：$t$时刻状态$i$转移到$t+1$时刻状态$j$的概率$a_{ij}$乘以$t+1$时刻状态$j$观测到对应字符$O_{t+1}$的概率$b_j (O_{t+1})$，再乘以$t$时刻状态$i$的前向变量$\alpha_t (i)$，对于$t$时刻所有可能状态求和得到对应的值：$\alpha_{t+1}(j) = \sum_{i=1}^N \alpha_t (i) a_{ij} b_j(O_{t+1})$

In [69]:
# loops on the symbols ,from second symbol to last specify symbol (e.g: 0)
for j in range(2, len(sequence) + 1):
    # loops on the states, from second state to last state
    for i in range(1, len(states)):
        p_sum = 0
        # loops on all of the possible previous states 
        for k in range(1, len(states)):
            p_sum += F[k][j-1] * transitions[(states[k],states[i])] * emissions[states[i]][sequence[j-1]]
        F[i][j] = p_sum

print_matrix(F, states, sequence)




          <sos>     A         C         T         
Begin     1.000e+00 0.000e+00 0.000e+00 0.000e+00 
S1        0.000e+00 2.000e-01 1.500e-02 1.850e-03 
S2        0.000e+00 5.000e-02 4.000e-02 1.460e-02 



最后，对于不同的最终状态下观测到序列sequence的概率求和, 得到观测到序列ACT的概率，前向计算完成：$P(O\vert \lambda)=\sum_{i=1}^N \alpha_T (i)$

In [70]:
# The final value  is equivalent to the end marker <eos>
# of the sentence in end-to-end speech recognition.
p_sum = 0
for k in range(1, len(states)):
    p_sum += F[k][len(sequence)]

print('观测到序列ACT的概率为：', '{:<10.3e}'.format(p_sum))


观测到序列ACT的概率为： 1.645e-02 


将上述前向计算过程封装为函数，输入状态列表、状态转移概率、发射概率以及观测序列，返回观测序列出现的概率

In [71]:
def forward(states, transitions, emissions, sequence):
    # initialize matrix, set <sos> to 1.
    F = initialize_matrix(len(states), len(sequence)+1)
    F[0][0] = 1
    # first symbol(e.g: 'A') of sequence , initialize hmm state
    for i in range(1, len(states)):
        F[i][1] = transitions[(states[0],states[i])] * emissions[states[i]][sequence[0]]
    
    # Question:Why len(squence) + 1?
    # Reply: Because if you want to compute last symbol(e.g.: 'G').
    #        range function is [start,stop).
    for j in range(2, len(sequence) + 1 ):
        for i in range(1, len(states)):
            p_sum = 0
            for k in range(1, len(states) ):
                # probabilty of previous symbol  * 
                # probablity from previous state to current state * 
                # observing symbol j-1 given current state i 
                p_sum += F[k][j-1] * transitions[(states[k],states[i])] * emissions[states[i]][sequence[j-1]]
            F[i][j] = p_sum
    
    p_sum = 0
    for k in range(1, len(states)):
        p_sum += F[k][len(sequence)]
    print('观测到序列ACT的概率为：', '{:<10.4e}'.format(p_sum))
    return p_sum


## 后向算法

采用后向变量$\beta_t (i)=P(O_{t+1},O_{t+2},...,O_T \vert q_t=S_i, \lambda)$，$1\le t \le T, 1\le i \le N$

初始化$T$时刻的后向变量：$\beta_T (i)=1, 1\le i \le N$，即矩阵最后一列除了初始状态外，后向变量$\beta$均为1

In [72]:
F = initialize_matrix(len(states), len(sequence) + 1)
for i in range(1, len(states)):
    F[i][-1] = 1
print_matrix(F, states, sequence)


          <sos>     A         C         T         
Begin     0.000e+00 0.000e+00 0.000e+00 0.000e+00 
S1        0.000e+00 0.000e+00 0.000e+00 1.000e+00 
S2        0.000e+00 0.000e+00 0.000e+00 1.000e+00 




计算t时刻的后向变量：$t$时刻状态$i$的后向变量，由$t$时刻状态$i$转移到$t+1$时刻状态$j$的概率$a_{ij}$，乘以下一时刻状态$j$中观测到对应字符的概率$b_j (O_{t+1})$，再乘以到$t+1$时刻为状态$j$的概率$\beta_{t+1}(j)$，对于所有可能的状态求和得到矩阵中对应的值: $\beta_t (i) = \sum_{j=1}^N a_{ij} b_j(O_{t+1})\beta_{t+1}(j)$，$t=T-1, T-2, ..., 1$，$ 1\le i \le N$

In [73]:
# loops on the symbol
for j in range(len(sequence) -1, 0, -1):
    # loops on the states
    for i in range(1, len(states)):
        p_sum = 0
        # loops on all of the possible succesive states
        for k in range(1, len(states)):
            p_sum += F[k][j+1] * transitions[(states[i],states[k])] * \
                     emissions[states[k]][sequence[j]]                                                                
        F[i][j] = p_sum
print_matrix(F, states,sequence)

          <sos>     A         C         T         
Begin     0.000e+00 0.000e+00 0.000e+00 0.000e+00 
S1        0.000e+00 5.410e-02 1.900e-01 1.000e+00 
S2        0.000e+00 1.126e-01 3.400e-01 1.000e+00 



计算观测序列出现的概率：初始状态转移到状态$i$的概率$\pi_i$乘以在该状态中观测到序列中第一个字符的概率$b_i(O_1)$，再乘以该状态在$t=1$时刻的后向变量$\beta_1 (i)$，对于所有可能的状态求和得到观测到序列ACT的概率，后向计算完成：$P(O\vert \lambda) = \sum_{i=1}^N \beta_1 (i) \pi_i b_i(o_1)$

In [74]:
p_sum = 0
for k in range(1, len(states)):
    p_sum += F[k][1] * transitions[(states[0],states[k])] * emissions[states[k]][sequence[0]]
F[0][0] = p_sum
print_matrix(F, states, sequence)
print('观测到序列ACT的概率为：', '{:<10.4e}'.format(p_sum))

          <sos>     A         C         T         
Begin     1.645e-02 0.000e+00 0.000e+00 0.000e+00 
S1        0.000e+00 5.410e-02 1.900e-01 1.000e+00 
S2        0.000e+00 1.126e-01 3.400e-01 1.000e+00 
观测到序列ACT的概率为： 1.6450e-02


将上述后向计算过程封装为函数，输入状态列表、状态转移概率、发射概率以及观测序列，返回观测序列出现的概率

In [75]:
def backward(states, transitions, emissions,sequence):
    # initialize matrix, set last symbol (e.g: T) to 1.
    F = initialize_matrix(len(states), len(sequence) + 1)
    for i in range(1, len(states)):
        F[i][-1] = 1
    
    for j in range(len(sequence) -1, 0 , -1):
        for i in range(1, len(states)):
            p_sum = 0
            for k in range(1, len(states)):
                p_sum += F[k][j+1] * \
                         transitions[(states[i], states[k])] * \
                         emissions[states[k]][sequence[j]]
            F[i][j] = p_sum


    p_sum = 0
    for k in range(1, len(states)):
        p_sum += F[k][1] * \
                 transitions[(states[0],states[k])] * \
                 emissions[states[k]][sequence[0]]
    F[0][0] = p_sum
    print('观测到序列ACT的概率为：', '{:<10.4e}'.format(p_sum))
    return p_sum

# Viterbi 算法

初始化矩阵 $F$，行数和状态数对应，列数等于观测序列中的字符数加 1（第一列表示初始状态）

在初始状态的概率为1，即矩阵的第(1, 1) 个元素的值为1，矩阵 $FP$ 初始化元素全为初始状态

In [76]:
F = initialize_matrix(len(states), len(sequence) + 1)
FP = initialize_matrix(len(states), len(sequence) + 1, states[0])
F[0][0] = 1
print_matrix(F, states, sequence)
print("\n\n")
print_matrix_p(FP, states, sequence)

          <sos>     A         C         T         
Begin     1.000e+00 0.000e+00 0.000e+00 0.000e+00 
S1        0.000e+00 0.000e+00 0.000e+00 0.000e+00 
S2        0.000e+00 0.000e+00 0.000e+00 0.000e+00 



          <sos>     A         C         T         
Begin     Begin     Begin     Begin     Begin     
S1        Begin     Begin     Begin     Begin     
S2        Begin     Begin     Begin     Begin     


与前向计算算法类似，初始化$t=1$时刻不同状态的值：对于每一个状态，初始状态转移到当前状态$i$得到的概率$\pi_i$乘以当前状态下观测到第一个字符$O_1$的概率$b_i (O_1)$，得到对应的值: $\delta_1 (i)=\pi_i b_i (O_1)$

In [77]:
for i in range(1, len(states)):
    F[i][1] = transitions[(states[0], states[i])] * \
              emissions[states[i]][sequence[0]]
print_matrix(F,states, sequence)


          <sos>     A         C         T         
Begin     1.000e+00 0.000e+00 0.000e+00 0.000e+00 
S1        0.000e+00 2.000e-01 0.000e+00 0.000e+00 
S2        0.000e+00 5.000e-02 0.000e+00 0.000e+00 




为计算每一时刻状态序列出现相应观测值的最大概率，找到最大概率对应的状态索引，在 Viterbi 算法中，只计算最优路径的概率

In [78]:
def get_max_val_index(values):
    max_val = values[0]
    max_index = 0 
    for index, val in enumerate(values):
        if val > max_val:
            max_val = val
            max_index = index
    return max_val, max_index



对于$t$时刻的状态$j$，由$t-1$时刻不同状态转移到$t$时刻状态$j$的最大概率${max}_{1\le i \le N}[\delta_{t-1} (i) a_{ij}]$乘以$t$时刻状态$j$下观测值为$O_t$的概率$b_j(O_t)$: $\delta_{t}(j)={max}_{1\le i \le N}[\delta_{t-1} (i) a_{ij}] b_j(O_t)$。同时更新矩阵FP对应位置为最大概率所对应的状态：$\phi_t(j) = {argmax}_{1\le i \le N}[\delta_{t-1} (i) a_{ij}]$

In [79]:
# loops on the symbol, from the second to the last, it excluldes <eos>.
for j in range(2, len(sequence) + 1):
    # loops on the states, from second state(e.g: 'S1')
    # to third state(e.g.: 'S2'), 
    # it excludes the last state(e.g.: 'e').
    for i in range(1, len(states)):
        values = []
        # loops on all of the possible previous states
        for k in range(1, len(states)):
            # appends the value to the list(e.g.: values)
            values.append(F[k][j-1] * 
                          transitions[(states[k],states[i])] * 
                          emissions[states[i]][sequence[j-1]])
        # finds the maximum and the index of the maximum in the list
        max_val, max_index = get_max_val_index(values)
        # sets the probability to the maximum probability
        F[i][j] = max_val
        # because now it starts from second state, 
        # so it should be puls one. 
        FP[i][j] = states[max_index + 1]
    
print_matrix(F, states, sequence)
print("\n\n")
print_matrix_p(FP, states, sequence)



          <sos>     A         C         T         
Begin     1.000e+00 0.000e+00 0.000e+00 0.000e+00 
S1        0.000e+00 2.000e-01 1.400e-02 9.800e-04 
S2        0.000e+00 5.000e-02 2.400e-02 7.680e-03 



          <sos>     A         C         T         
Begin     Begin     Begin     Begin     Begin     
S1        Begin     Begin     S1        S1        
S2        Begin     Begin     S1        S2        



最可能的最终状态为概率矩阵F中最后一列中最大概率所对应的状态：$q^*_T = {argmax}_{1\le i \le N}[\delta_{T} (i)]$

In [80]:

max_val, max_index = get_max_val_index(F[1:][-1])
print('最可能的最终状态为：', states[max_index + 1])


最可能的最终状态为： S2


将上述过程封装成函数

In [81]:
def viterbi(states, transitions, emissions, sequence):
    # initialize two matrix,
    F = initialize_matrix(len(states), len(sequence) + 1)
    FP = initialize_matrix(len(states), len(sequence) + 1, states[0])
    F[0][0] = 1 

    for i in range(1, len(states)):
        F[i][1] = transitions[(states[0], states[i])] * \
                  emissions[states[i]][sequence[0]]
    
    for j in range(2, len(sequence) + 1 ):
        for i in range(1, len(states)):
            values = []
            for k in range(1, len(states)):
                values.append(F[k][j-1] * 
                              transitions[(states[k], states[i])] *
                              emissions[states[i]][sequence[j-1]])
            max_val, max_index = get_max_val_index(values)
            F[i][j] = max_val
            FP[i][j] = states[max_index + 1]
    return F, FP

为了追溯路径，我们需要从最可能的最终状态开始并遍历 $FP$ 矩阵，将每一时刻最可能的状态加入到路径中：$q^*_t = \phi_{t+1}(q^*_{t+1}), t=T-1, T-2, ..., 1$

In [82]:
def traceback(states, F, FP):
    # initialize an emtpy list
    path = []
    # the last state is the one with the maximum probability of the last columns
    _, max_index = get_max_val_index(F[1:][-1])
    current = states[max_index + 1]
    # Loops on the symbols 
    for i in range(len(FP[0]) - 1, 0, -1):
        # appends the current state to the path
        path = [current] + path
        # finds the index of the current state in the list
        # of states and moves to the corresponding row of FP
        current = FP[states.index(current)][i]
        # the first element of the path is the begin state
    path = ['Begin'] + path
    # coverts the  list to a string where elements 
    # are separated by space
    return ' '.join(path)

得到最终路径

In [83]:
path = traceback(states, F, FP)
print('<sos> '+ ' '.join(sequence))
print(path)

<sos> A C T
Begin S1 S2 S2
