## 隐马尔可夫模型(Hidden Markov Model)

- 动态模型根据系统状态可以分为三种:
    - 离散 HMM
    - 线性 Kalman Filter
    - 非线性 Paricle Filter


##### 一个模型

$$ \lambda  = (\pi, A, B)$$

- $\pi$ 为初始时刻的概率分布
- 状态转移矩阵 $A = [a_{ij}], a_{ij} = p(i_{t+1} = q_j|i_t = q_i)$
    - 知道前一个状态的条件下现在状态的概率
- 发射矩阵 $B = [b_j(k)], b_j(k) = p(o_t=v_k|i_t=q_j)$
    - 知道状态变量的条件下观测变量的概率
- 观测变量, 表示为: $ O = o_1,o_2,\cdots o_t,\cdots$ , 状态空间为: $ V = \{v_1,v_2,\cdots v_M\}$
- 状态变量, 表示为: $ I = i_1,i_2,\cdots i_t,\cdots$ , 状态空间为: $ Q = \{q_1,q_2,\cdots q_N\}$

##### 二个假设

- 齐次Markov假设
    - 当前状态只与前一个状态有关 $$ P(i_{t+1} |i_t,\cdots i_2,i_1, o_t,\cdots o_2,o_1)$$
- 观测独立假设
    - 当前观测变量只与当前的状态变量有关 $$ P(o_t |i_t,\cdots i_2,i_1, o_{t-1},\cdots o_2,o_1) = P(o_t|i_t)$$

##### 三个问题

- Evaluation
    - 在知道所有参数的条件下, 观测序列的概率, 即: $ P(O|\lambda)$
    - 通常采用前向后向算法解决
- Learning
    - 如何求解参数, 即: $ \lambda_{mle} = \arg\max P(O|\lambda) $
    - 通常采用一个特殊的EM算法, 即Baum Welch算法
- Decoding
    - 找到状态序列I, 使得 $ P(I|O)$最大, 即: $ I = \arg\max P(I|O)$
    - 预测问题: 求t + 1时刻的隐状态 $ $
    - 滤波问题: 求t时刻的隐状态 $ $

##### 前向算法

- 根据前面的状态推算后面的观测变量 $ \alpha_t(i) = P(o_1,o_2,\cdots,o_t,i_t = q_i|\lambda)$
- 令初值 $(t = 1), \alpha_1(i) = P(o_1,i_1 = q_1|\lambda) = \pi_ib_i(o_1), i= 1,2,\cdots,N$
- $ \alpha_T(i) = P(O,i_T = q_i|\lambda)$
- 因此 $$ P(O|\lambda) = \sum^N_{i=1}P(O,i_T = q_i|\lambda) =\sum^N_{i=1}\alpha_T(i) $$
- 算法递推式: $$ \alpha_{t+1}(j) = \sum^N_{i=1}b_j(O_{t+1})\cdot a_{ij}\cdot \alpha_t(i)$$

##### 后向算法
- 根据后面的状态推算前面时刻的观测变量
- 令 $ \beta_t(i) = P(o_{t+1},\cdots,o_T|i_t = q_i,\lambda)$
- 因此 $$ P(O|\lambda) = \sum^N_{i=1}P(o_1|i_1 = q_i,\lambda) =\sum^N_{i=1}b_i(o_1)\beta_1(i)\cdot \pi_i $$
- 算法递推式: $$ \beta_t(i) = \sum^N_{i=1}b_j(o_{t+1})\cdot a_{ij}\cdot \beta_{t+1}(j)$$

##### Baum Welch算法

- 求解参数, 即: $ \lambda_{mle} = \arg\max P(O|\lambda) $
$$\lambda^{(t+1)} = \arg\max\limits_\lambda \sum_I P(O,I|\lambda)\cdot P(I|O,\lambda^{(t)}) $$

- 计算分解可得: $$ Q(\lambda,\lambda^{(t)}) = \sum_I [(log\pi_{i_1} + \sum^T_{t=2}loga_{i_{t-1},i_t} + \sum^T_{t=1}logb_{i_t}(o_t))\cdot P(O,I|\lambda^{(t)})]$$
- 求解可得:
$$ a_{ij} = \frac{\sum^T_{t=1}\xi_t(i,j)}{\sum^T_{t=1}\gamma_t(i)} $$
$$ b_j(k) = \frac{\sum^T_{t=1,o_t = v_k}\gamma_t(j)}{\sum^T_{t=1}\gamma_{t+1}(j)} $$
$$ \pi(I) = \gamma_1(i) $$
- 其中 $\gamma_t(i)$ 和$ \xi_t(i,j)$是:
$$ \gamma_t(i) = \frac{P(O,i_t=q_i|\lambda^{(t)})}{P(O|\lambda^{(t)})} = \frac{\alpha_t(i)\beta_t(i)}{\sum^N_{j=1}\alpha_t(j)\beta_t(j)}$$
$$ \xi_t(i,j) = \frac{\alpha_t(i)a_{ij}b_j(o_{t+1})\beta_{t+1}(j)}{\sum^N_{i=1}\sum^N_{j=1}\alpha_t(i)a_{ij}b_j(o_{t+1})\beta_{t+1}(j)}$$

##### 维特比算法(Viterbi)

- 通俗来说, 就是计算上一个时刻每个隐状态转换到当前时刻观测变量对应的每一个隐状态的最大概率

In [None]:
    def viterbi(self, obs_seq):
        # N是隐变量状态数量
        N = self.A.shape[0]
        # T是观测序列长度
        T = len(obs_seq)
        # 每一个时刻,每个状态的概率, T x N
        prev = np.zeros((T - 1, N), dtype=int)
        # N x T
        V = np.zeros((N, T))
        # 初始时刻, 为每个隐状态概率 * 每个隐状态发射为初始时刻观测变量的概率
        V[:,0] = self.pi * self.B[:,obs_seq[0]]
        # 每一个时刻
        for t in range(1, T):
            # 每一个状态都需要单独计算
            for n in range(N):
                # 前一个时刻所有隐状态 * 每一种隐状态转换到当前时刻隐状态的概率 * 当前时刻隐状态发射到当前时刻观测变量的概率
                seq_probs = V[:,t-1] * self.A[:,n] * self.B[n, obs_seq[t]]
                # 上一个时刻转到当前最大概率隐状态的索引
                prev[t-1,n] = np.argmax(seq_probs)
                # 当前时刻, 当前隐状态的最大概率
                V[n,t] = np.max(seq_probs)

        return V, prev

##### 参数对应到实际数值的样貌

In [1]:
import numpy as np

# 对应状态集合Q
states = ('Healthy', 'Fever')
# 对应观测集合V
observations = ('normal', 'cold', 'dizzy')
# 初始状态概率向量π
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
# 状态转移矩阵A
transition_probability = {
    'Healthy': {'Healthy': 0.7, 'Fever': 0.3},
    'Fever': {'Healthy': 0.4, 'Fever': 0.6},
}
# 观测概率矩阵B
emission_probability = {
    'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
    'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
}

# 随机生成观测序列和状态序列    
def simulate(T):

    def draw_from(probs):
        """
        1.np.random.multinomial:
        按照多项式分布，生成数据
        >>> np.random.multinomial(20, [1/6.]*6, size=2)
                array([[3, 4, 3, 3, 4, 3],
                       [2, 4, 3, 4, 0, 7]])
         For the first run, we threw 3 times 1, 4 times 2, etc.  
         For the second, we threw 2 times 1, 4 times 2, etc.
        2.np.where:
        >>> x = np.arange(9.).reshape(3, 3)
        >>> np.where( x > 5 )
        (array([2, 2, 2]), array([0, 1, 2]))
        """
        return np.where(np.random.multinomial(1,probs) == 1)[0][0]

    observations = np.zeros(T, dtype=int)
    states = np.zeros(T, dtype=int)
    states[0] = draw_from(pi)
    observations[0] = draw_from(B[states[0],:])
    for t in range(1, T):
        states[t] = draw_from(A[states[t-1],:])
        observations[t] = draw_from(B[states[t],:])
    return observations, states

def generate_index_map(lables):
    id2label = {}
    label2id = {}
    i = 0
    for l in lables:
        id2label[i] = l
        label2id[l] = i
        i += 1
    return id2label, label2id
 
states_id2label, states_label2id = generate_index_map(states)
observations_id2label, observations_label2id = generate_index_map(observations)
print(states_id2label, states_label2id)
print(observations_id2label, observations_label2id)

def convert_map_to_vector(map_, label2id):
    """将概率向量从dict转换成一维array"""
    v = np.zeros(len(map_), dtype=float)
    for e in map_:
        v[label2id[e]] = map_[e]
    return v

 
def convert_map_to_matrix(map_, label2id1, label2id2):
    """将概率转移矩阵从dict转换成矩阵"""
    m = np.zeros((len(label2id1), len(label2id2)), dtype=float)
    for line in map_:
        for col in map_[line]:
            m[label2id1[line]][label2id2[col]] = map_[line][col]
    return m

A = convert_map_to_matrix(transition_probability, states_label2id, states_label2id)
print("转移矩阵\n", A)
B = convert_map_to_matrix(emission_probability, states_label2id, observations_label2id)
print("发射矩阵\n", B)
observations_index = [observations_label2id[o] for o in observations]
pi = convert_map_to_vector(start_probability, states_label2id)
print("初始概率\n", pi)

# 生成模拟数据
observations_data, states_data = simulate(10)
print(observations_data)
print(states_data)
# 相应的label
print("病人的状态: ", [states_id2label[index] for index in states_data])
print("病人的观测: ", [observations_id2label[index] for index in observations_data])

{0: 'Healthy', 1: 'Fever'} {'Healthy': 0, 'Fever': 1}
{0: 'normal', 1: 'cold', 2: 'dizzy'} {'normal': 0, 'cold': 1, 'dizzy': 2}
转移矩阵
 [[0.7 0.3]
 [0.4 0.6]]
发射矩阵
 [[0.5 0.4 0.1]
 [0.1 0.3 0.6]]
初始概率
 [0.6 0.4]
[0 1 1 2 1 1 0 0 0 2]
[0 1 0 1 1 0 0 0 0 1]
病人的状态:  ['Healthy', 'Fever', 'Healthy', 'Fever', 'Fever', 'Healthy', 'Healthy', 'Healthy', 'Healthy', 'Fever']
病人的观测:  ['normal', 'cold', 'cold', 'dizzy', 'cold', 'cold', 'normal', 'normal', 'normal', 'dizzy']


##### 代码

In [None]:
class HMM:
    """
    Order 1 Hidden Markov Model
 
    Attributes
    ----------
    A : numpy.ndarray
        State transition probability matrix
    B: numpy.ndarray
        Output emission probability matrix with shape(N, number of output types)
    pi: numpy.ndarray
        Initial state probablity vector
    """
    def __init__(self, A, B, pi):
        self.A = A
        self.B = B
        self.pi = pi
    
    def simulate(self, T):
 
        def draw_from(probs):
            """
            1.np.random.multinomial:
            按照多项式分布，生成数据
            >>> np.random.multinomial(20, [1/6.]*6, size=2)
                    array([[3, 4, 3, 3, 4, 3],
                           [2, 4, 3, 4, 0, 7]])
             For the first run, we threw 3 times 1, 4 times 2, etc.  
             For the second, we threw 2 times 1, 4 times 2, etc.
            2.np.where:
            >>> x = np.arange(9.).reshape(3, 3)
            >>> np.where( x > 5 )
            (array([2, 2, 2]), array([0, 1, 2]))
            """
            return np.where(np.random.multinomial(1,probs) == 1)[0][0]

        observations = np.zeros(T, dtype=int)
        states = np.zeros(T, dtype=int)
        states[0] = draw_from(self.pi)
        observations[0] = draw_from(self.B[states[0],:])
        for t in range(1, T):
            states[t] = draw_from(self.A[states[t-1],:])
            observations[t] = draw_from(self.B[states[t],:])
        return observations,states
    
    def _forward(self, obs_seq):
        """前向算法"""
        N = self.A.shape[0]
        T = len(obs_seq)

        F = np.zeros((N,T))
        F[:,0] = self.pi * self.B[:, obs_seq[0]]

        for t in range(1, T):
            for n in range(N):
                F[n,t] = np.dot(F[:,t-1], (self.A[:,n])) * self.B[n, obs_seq[t]]

        return F
    
    def _backward(self, obs_seq):
        """后向算法"""
        N = self.A.shape[0]
        T = len(obs_seq)

        X = np.zeros((N,T))
        X[:,-1:] = 1

        for t in reversed(range(T-1)):
            for n in range(N):
                X[n,t] = np.sum(X[:,t+1] * self.A[n,:] * self.B[:, obs_seq[t+1]])

        return X
    
    def baum_welch_train(self, observations, criterion=0.05):
        """无监督学习算法——Baum-Weich算法"""
        n_states = self.A.shape[0]
        n_samples = len(observations)

        done = False
        while not done:
            # alpha_t(i) = P(O_1 O_2 ... O_t, q_t = S_i | hmm)
            # Initialize alpha
            alpha = self._forward(observations)

            # beta_t(i) = P(O_t+1 O_t+2 ... O_T | q_t = S_i , hmm)
            # Initialize beta
            beta = self._backward(observations)

            xi = np.zeros((n_states,n_states,n_samples-1))
            for t in range(n_samples-1):
                # alpha_t*a_ij*b_t+1*beta_t+1
                denom = np.dot(np.dot(alpha[:,t].T, self.A) * self.B[:,observations[t+1]].T, beta[:,t+1])
                for i in range(n_states):
                    # 
                    numer = alpha[i,t] * self.A[i,:] * self.B[:,observations[t+1]].T * beta[:,t+1].T
                    xi[i,:,t] = numer / denom

            # gamma_t(i) = P(q_t = S_i | O, hmm)
            gamma = np.sum(xi,axis=1)
            # Need final gamma element for new B
            prod =  (alpha[:,n_samples-1] * beta[:,n_samples-1]).reshape((-1,1))
            gamma = np.hstack((gamma,  prod / np.sum(prod))) #append one more to gamma!!!

            newpi = gamma[:,0]
            newA = np.sum(xi,2) / np.sum(gamma[:,:-1],axis=1).reshape((-1,1))
            newB = np.copy(self.B)

            num_levels = self.B.shape[1]
            sumgamma = np.sum(gamma,axis=1)
            for lev in range(num_levels):
                mask = observations == lev
                newB[:,lev] = np.sum(gamma[:,mask],axis=1) / sumgamma

            if np.max(abs(self.pi - newpi)) < criterion and \
                            np.max(abs(self.A - newA)) < criterion and \
                            np.max(abs(self.B - newB)) < criterion:
                done = 1

            self.A[:],self.B[:],self.pi[:] = newA,newB,newpi
    
    def observation_prob(self, obs_seq):
        """ P( entire observation sequence | A, B, pi ) """
        return np.sum(self._forward(obs_seq)[:,-1])

    def state_path(self, obs_seq):
        """
        Returns
        -------
        V[last_state, -1] : float
            Probability of the optimal state path
        path : list(int)
            Optimal state path for the observation sequence
        """
        V, prev = self.viterbi(obs_seq)

        # Build state path with greatest probability
        last_state = np.argmax(V[:,-1])
        path = list(self.build_viterbi_path(prev, last_state))

        return V[last_state,-1], reversed(path)

    def viterbi(self, obs_seq):
        # N是隐变量状态数量
        N = self.A.shape[0]
        # T是观测序列长度
        T = len(obs_seq)
        # 每一个时刻,每个状态的概率, T x N
        prev = np.zeros((T - 1, N), dtype=int)
        # N x T
        V = np.zeros((N, T))
        # 初始时刻, 为每个隐状态概率 * 每个隐状态发射为初始时刻观测变量的概率
        V[:,0] = self.pi * self.B[:,obs_seq[0]]
        # 每一个时刻
        for t in range(1, T):
            # 每一个状态都需要单独计算
            for n in range(N):
                # 前一个时刻所有隐状态 * 每一种隐状态转换到当前时刻隐状态的概率 * 当前时刻隐状态发射到当前时刻观测变量的概率
                seq_probs = V[:,t-1] * self.A[:,n] * self.B[n, obs_seq[t]]
                # 上一个时刻转到当前最大概率隐状态的索引
                prev[t-1,n] = np.argmax(seq_probs)
                # 当前时刻, 当前隐状态的最大概率
                V[n,t] = np.max(seq_probs)

        return V, prev

    def build_viterbi_path(self, prev, last_state):
        """Returns a state path ending in last_state in reverse order."""
        T = len(prev)
        yield(last_state)
        for i in range(T-1, -1, -1):
            yield(prev[i, last_state])
            last_state = prev[i, last_state]


In [None]:
A = np.array([[0.5, 0.5],[0.5, 0.5]])
B = np.array([[0.3, 0.3, 0.3],[0.3, 0.3, 0.3]])
pi = np.array([0.5, 0.5])

observations_data, states_data = simulate(100)
newA, newB, newpi = baum_welch_train(observations_data, A, B, pi)
print("newA: ", newA)
print("newB: ", newB)
print("newpi: ", newpi)




states_out = state_path(observations_data, newA, newB, newpi)[1]
p = 0.0
for s in states_data:
    if next(states_out) == s: 
        p += 1

print(p / len(states_data))


A = convert_map_to_matrix(transition_probability, states_label2id, states_label2id)
B = convert_map_to_matrix(emission_probability, states_label2id, observations_label2id)
observations_index = [observations_label2id[o] for o in observations]
pi = convert_map_to_vector(start_probability, states_label2id)
V, p = viterbi(observations_index, newA, newB, newpi)
print(" " * 7, " ".join(("%10s" % observations_id2label[i]) for i in observations_index))
for s in range(0, 2):
    print("%7s: " % states_id2label[s] + " ".join("%10s" % ("%f" % v) for v in V[s]))
print('\nThe most possible states and probability are:')
p, ss = state_path(observations_index, newA, newB, newpi)
for s in ss:
    print(states_id2label[s])
print(p)