In [233]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gc
gc.collect()

10517

In [234]:
A = np.array([[0.5, 0.2, 0.3],
                    [0.3, 0.5, 0.2],
                    [0.2, 0.3, 0.5]])

B = np.array([[0.5, 0.5],
                    [0.4, 0.6],
                    [0.7, 0.3]])

pi = np.array([0.2, 0.4, 0.4])

V = {"red":0, "white":1}

O = np.array([0, 1, 0]) #红 白 红

In [272]:
class HMM:
    def __init__(self, A, B, pi, n_component, V):
        #lambda (A, B, pi)
        self.A = A #状态转移概率矩阵 公式10.1 N*N
        self.B = B #观测概率矩阵 公式10.3 N*M
        self.pi = pi #初始状态概率向量 公式10.5 N
        
        self.T = None  #长度为T 观测序列长度(len(O))
        self.N = n_component #可能的状态数
        self.M = len(V) #可能的观测数
        #I 是长度为T的状态序列，O是对应的观测序列
        
    def forward(self, O): #算法10.2
        self.T = len(O)
        alpha = np.zeros((self.T, self.N))   #前向概率矩阵初始化 公式10.14
        
        for i in range(self.N):  #公式10.15
            alpha[0,i] = self.pi[i] * self.B[i, O[0]]
        
        for t in range(self.T-1):  #公式10.16
            for i in range(self.N):
                for j in range(self.N):
                    alpha[t+1, i]  += (alpha[t, j] * self.A[j, i]) * self.B[i, O[t+1]] 
    
        p = np.sum(alpha[self.T-1, :])  #公式10.17
        
        return p, alpha
    
    def backward(self, O): #算法10.3
        self.T = len(O)
        beta = np.zeros((self.T, self.N)) #后向概率矩阵初始化 公式10.18
        
        beta[self.T-1, :] = 1 #公式10.19
        
        for t in range(self.T-2, -1, -1): #公式10.20   #注意range不包含最后一个元素，递减需要写间隔
            for i in range(self.N):
                for j in range(self.N):
                    beta[t, i] += (self.A[i, j] * self.B[j, O[t+1]]) * beta[t+1, j]
              
        p = 0
        for i in range(self.N): #公式10.21
            p += self.pi[i] * self.B[i, O[0]] * beta[0, i]
            
        return p, beta
                    
    def fit(self, O, maxIter): #算法10.4

        #初始化 假定都为均值初始化
        n = 0
        
        self.A = np.ones((self.N, self.N)) / self.N
        self.B = np.ones((self.N, self.M)) / self.M
        self.pi = np.ones(self.N) / self.N
        self.T = len(O)
        
        for _ in range(maxIter):
            
            self.eta = np.zeros((self.T, self.N))
            self.epsilon = np.zeros((self.T, self.N, self.N))
        
            _, alpha = self.forward(O)
            _, beta = self.backward(O)
            
            self.eta = (alpha*beta)  #公式10.24
            for t in range(self.T):
                self.eta[t, :] /= np.sum(self.eta[t, :]) 

            for t in range(self.T-1):   #公式10.26
                s = 0
                for i in range(self.N):
                    for j in range(self.N):
                        self.epsilon[t, i, j]  += (alpha[t, i]*self.A[i, j]*self.B[j, O[t+1]]*beta[t+1, j]) 
                        s +=  (alpha[t, i]*self.A[i, j]*self.B[j, O[t+1]]*beta[t+1, j]) 
                for i in range(self.N):
                    for j in range(self.N):
                        self.epsilon[t, i, j] /= s

                
            #以下按照递推公式
            for i in range(self.N): 
                for j in range(self.N):
                    self.A[i, j] = np.sum(self.epsilon[:self.T-1, i, j]) / np.sum(self.eta[:self.T-1, i])
            
            for j in range(self.N):
                for k in range(self.M):
                    o = np.array(O)
                    indices = np.where(o==k)[0]
                    self.B[j, k] = np.sum(self.eta[indices, j]) / np.sum(self.eta[:, j])
                    
            for i in range(self.N):
                self.pi[i] = self.eta[0, i]

        return self.A, self.B, self.pi
    
    def predict(self, O):
        #算法10.5
        self.T = len(O)
        
        delta = np.zeros((self.T, self.N))
        psa = np.zeros((self.T, self.N))
        
        #初始化
        for i in range(self.N):
            delta[0, i] = self.pi[i] * self.B[i, O[0]] 
          
        #递推
        for t in range(1, self.T):
            for i in range(self.N):
                delta[t, i] = np.max(delta[t-1, :]*self.A[:, i].flatten()*self.B[i, O[t]])
                psa[t, i] = np.argmax(delta[t-1, :]*self.A[:, i].flatten())
        
        #终止
        P = np.max(delta[self.T-1, :])
        iT = np.argmax(delta[self.T-1, :])
        
        it_root = iT
        root = [iT]

        for t in range(self.T-2, -1, -1):
            
            it = psa[t+1, it_root]
            it_root = int(it)
            root.append(it_root)
            
        return np.array(root[::-1])
      
                

In [273]:
#calculate
hmm = HMM(A=A, B=B, pi=pi, n_component=3, V=V)
f, _ = hmm.forward(O)
b, _ = hmm.backward(O)
print(f, b)


0.130218 0.130218


In [274]:
#learn
#T=3 为观测序列0的长度，n_component为可能的状态数（盒子数量），V为观测类型，len（V）为观测数
hmm_em = HMM(A=None, B=None, pi=None, n_component=3, V=V)
A_hat, B_hat, pi_hat = hmm_em.fit(O, 5)
print(A_hat)
print(B_hat)
print(pi_hat)

[[0.33333333 0.33333333 0.33333333]
 [0.33333333 0.33333333 0.33333333]
 [0.33333333 0.33333333 0.33333333]]
[[0.66666667 0.33333333]
 [0.66666667 0.33333333]
 [0.66666667 0.33333333]]
[0.33333333 0.33333333 0.33333333]


In [275]:
#predict
hmm_predict = HMM(A=A, B=B, pi=pi, n_component=3, V=V)
root = hmm_predict.predict(O)
print(root)



[2 2 2]
