In [53]:

import numpy as np
class HMM(object):
    def __init__(self,hidden_status_num = None,visible_status_num=None):
        self.hidden_status_num = hidden_status_num
        self.visible_status_num = visible_status_num
        self.pi = None
        self.A = None
        self.B = None
    
    # 预测 P(O|lambda):前向后向算法
    def predict_joint_visible_prob(self, visible_list=None, forward_type="forward"):
        if forward_type == 'forward':
            # 计算alpha_0
            alpha = self.pi * self.B[:,[visible_list[0]]]
            # 计算alpha_1-alpha_T-1
            for t in range(1,len(visible_list)):
                alpha = self.A.T @ alpha * self.B[:,[visible_list[t]]]
            # 对alpha_T-1列向量的每个维度求和
            return np.sum(alpha)
        # 使用后向算法计算概率
        else:
            # 计算beta_T-1，是一个列向量
            beta = np.ones_like(self.pi)
            # 计算beta_T-2 - beta_0
            for t in range(len(visible_list) - 2,-1,-1):
                beta = self.A @ (self.B[:,[visible_list[t + 1]]] * beta)
            # 多个向量按照哈达马乘积进行运算之后对每个维度求和
            return np.sum(self.pi * beta * self.B[:,[visible_list[0]]])
    
    # 参数估计：监督学习
    def fit_with_hidden_status(self, visible_list, hidden_list):
        self.pi = np.zeros(shape=(self.hidden_status_num,1))
        self.A = np.zeros(shape=(self.hidden_status_num,self.hidden_status_num))
        self.B = np.zeros(shape=(self.hidden_status_num,self.visible_status_num))
        
        # 统计
        for i in range(len(visible_list)):
            visible_i = visible_list[i]
            hidden_i = hidden_list[i]
            self.pi[hidden_i[0]] += 1
            T = len(visible_i)
            for j in range(T - 1):
                self.A[hidden_i[j],hidden_i[j+1]] += 1
                self.B[hidden_i[j],visible_i[j]] += 1
            self.B[hidden_i[T - 1],visible_i[T - 1]] += 1
        
        # 对转移概率等矩阵进行归一化
        self.pi /= np.sum(self.pi)
        self.A = self.A / np.sum(self.A,axis=1,keepdims=True)
        self.B = self.B / np.sum(self.B,axis=1,keepdims=True)
    
    # 通过无监督学习方法估计模型的参数，使用EM算法
    def fit_without_hidden_status(self, visible_list=None, tol=1e-5, n_iter=10):
        # 如果是监督学习之后在进行无监督学习则模型的初始化参数是存在的
        # 如果是单纯的无监督学习则需要随机初始化参数，EM算法对初始化的参数是敏感的
        if self.pi is None:
            self.pi = np.ones(shape=(self.hidden_status_num,1)) + np.random.random(size=(self.hidden_status_num,1))
            self.pi /= np.sum(self.pi)
        
        if self.A is None:
            self.A = np.ones(shape=(self.hidden_status_num,self.hidden_status_num)) + \
                np.random.random(size=(self.hidden_status_num,self.hidden_status_num))
            self.A = self.A / np.sum(self.A,axis=1,keepdims=True)
        
        if self.B is None:
            self.B = np.ones(shape=(self.hidden_status_num,self.visible_status_num)) + \
                np.random.random(size=(self.hidden_status_num,self.visible_status_num))
            self.B = self.B / np.sum(self.B,axis=1,keepdims=True)
        
        for _ in range(n_iter):
            # 计算alpha ,beta
            alphas = []
            betas = []
            T = len(visible_list)
            # 计算alpha_0,
            alpha0 = self.pi * self.B[:,[visible_list[0]]]
            alphas.append(alpha0)
            # 计算alpha_1-alpha_T-1
            for t in range(1,T):
                alphas.append( self.A.T @ alphas[t - 1] * self.B[:,[visible_list[t]]] )
            # 计算beta_T-1，是一个列向量
            betaT_1 = np.ones_like(self.pi)
            betas.append(betaT_1)
            # 计算beta_T-2 - beta_0
            for t in range(T - 2,-1,-1):
                betas.append(self.A @ (self.B[:,[visible_list[t + 1]]] * betas[T - 2 - t]))
                
            betas.reverse()
            
            # 计算gamma和ksi
            gammas = []
            
            for t in range(T):
                # 将列向量变成列表形式，易于构建二维数组
                gammas.append((alphas[t] * betas[t]).squeeze())
            gammas = np.asarray(gammas)    
            gammas /= np.sum(gammas,axis=1,keepdims=True)
                

            ksi = np.zeros(shape=(len(visible_list) - 1,self.A.shape[0],self.A.shape[1]))
            for t in range(T-1):
                for i in range(0, self.hidden_status_num):
                    for j in range(0, self.hidden_status_num):
                        ksi[t][i][j] = alphas[t][i][0] * self.A[i,j] * self.B[j,visible_list[t+1]] * \
                            betas[t + 1][j][0]
            
            for t in range(ksi.shape[0]):
                ksi[t] = ksi[t] / np.sum(ksi[t])

            # 更新模型参数
            for i in range(self.hidden_status_num):
                self.pi[i][0] = gammas[0,i]
            
            sum_ksi_t = np.sum(ksi,axis=0)
            for i in range(self.hidden_status_num):
                for j in range(self.hidden_status_num):
                    # 其中计算的是[0,T-2]之间的gamma的和
                    # ksi用的是T-1个阶段的和
                    self.A[i,j] = sum_ksi_t[i,j] / np.sum(gammas[:,i][:-1])
            
            for j in range(self.hidden_status_num):
                for k in range(self.visible_status_num):
                    # 按照示性函数对visible_list=k的索引处的gamma求和
                    self.B[j,k] = np.sum(gammas[:,j] * ( np.asarray( visible_list ) == k) ) / np.sum(gammas[:,j])
            
    # 计算隐含层状态
    # viterbi算法：动态规划
    def predict_hidden_status(self, visible_list):
        # 初始化delta0和psi0
        delta = self.pi * self.B[:,[visible_list[0]]]
        psi = [[0] * self.hidden_status_num]
        T = len(visible_list)
        for t in range(1,T):
            delta_t = np.zeros_like(delta)
            psi_t = []
            for i in range(self.hidden_status_num):
                best = - np.Infinity
                best_hidden_index = None
                for j in range(self.hidden_status_num):
                    delta_t_i = delta[j,0] * self.A[j,i] * self.B[i,visible_list[t]]
                    if delta_t_i > best:
                        best = delta_t_i
                        best_hidden_index = j
                delta_t[i,0] = best
                psi_t.append(best_hidden_index)
            delta = delta_t
            psi.append(psi_t)
            
        best_hidden_list = []
        best_hidden_list.append(np.argmax(delta))
        print(delta)
        for t in range(T - 1,0,-1):
            best_hidden_list.append(psi[t][best_hidden_list[-1]])
        return best_hidden_list

In [54]:
# 通过前向后巷算法进行概率计算
#造数据
pi = np.asarray([[0.2], [0.4], [0.4]])
A = np.asarray([[0.5, 0.2, 0.3],
                [0.3, 0.5, 0.2],
                [0.2, 0.3, 0.5]])
B = np.asarray([[0.5, 0.5],
                [0.4, 0.6],
                [0.7, 0.3]])

hmm = HMM()
hmm.pi = pi
hmm.A = A
hmm.B = B
# 使用前向后向算法计算概率
print(hmm.predict_joint_visible_prob([0,1,0],forward_type='forward'))
print(hmm.predict_joint_visible_prob([0,1,0],forward_type='backward'))

0.130218
0.130218


In [55]:
# 通过监督学习方法估计hmm模型的参数
O = [
    [1, 2, 3, 0, 1, 3, 4],
    [1, 2, 3],
    [0, 2, 4, 2],
    [4, 3, 2, 1],
    [3, 1, 1, 1, 1],
    [2, 1, 3, 2, 1, 3, 4]
]
I = O
# 有监督学习
hmm = HMM(hidden_status_num=5, visible_status_num=5)
hmm.fit_with_hidden_status(visible_list=O, hidden_list=I)
print(hmm.pi)
print(hmm.A)
print(hmm.B)

[[0.16666667]
 [0.33333333]
 [0.16666667]
 [0.16666667]
 [0.16666667]]
[[0.         0.5        0.5        0.         0.        ]
 [0.         0.375      0.25       0.375      0.        ]
 [0.         0.5        0.         0.33333333 0.16666667]
 [0.16666667 0.16666667 0.33333333 0.         0.33333333]
 [0.         0.         0.5        0.5        0.        ]]
[[1. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0.]
 [0. 0. 1. 0. 0.]
 [0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 1.]]


In [56]:
# 无监督学习
hmm = HMM(hidden_status_num=5, visible_status_num=5)
hmm.fit_without_hidden_status(O[0] + O[1] + O[2] + O[3] + O[4] + O[5],n_iter=10)
print(hmm.pi)
print(hmm.A)
print(hmm.B)

[[2.81919114e-03]
 [1.70107609e-04]
 [3.93910865e-04]
 [9.83781494e-01]
 [1.28352961e-02]]
[[0.14639938 0.23976884 0.17421697 0.17550724 0.26410757]
 [0.24261686 0.14629225 0.12302975 0.20343418 0.28462696]
 [0.29308868 0.16468167 0.15168812 0.21798993 0.1725516 ]
 [0.10919495 0.30386934 0.29426509 0.19554399 0.09712663]
 [0.18032675 0.18328383 0.29057587 0.16695637 0.17885718]]
[[0.07764967 0.25623728 0.20852755 0.18857236 0.26901314]
 [0.04834797 0.16047594 0.3943583  0.2899254  0.10689239]
 [0.07870007 0.16737282 0.35872905 0.32557941 0.06961866]
 [0.03507071 0.72193914 0.09221933 0.09731933 0.05345149]
 [0.09900301 0.32082335 0.1166343  0.27655068 0.18698865]]


In [58]:
#用例10.3做测试
pi = np.asarray([[0.2], [0.4], [0.4]])
A = np.asarray([[0.5, 0.2, 0.3],
                [0.3, 0.5, 0.2],
                [0.2, 0.3, 0.5]])
B = np.asarray([[0.5, 0.5],
                [0.4, 0.6],
                [0.7, 0.3]])

hmm = HMM(hidden_status_num=3, visible_status_num=2)
hmm.pi = pi
hmm.A = A
hmm.B = B
print(hmm.predict_hidden_status([0, 1, 0]))

[[0.00756]
 [0.01008]
 [0.0147 ]]
[2, 2, 2]
