In [3]:
import numpy as np
# 前向算法
class ForwardHmm:
    def __init__(self,n_components,observations):
        self.n_components=n_components
        self.observations=observations
    def fit(self,start_probability,transition_probability,emission_probability):
        self.start_probability=start_probability
        self.transition_probability=transition_probability
        self.emission_probability=emission_probability
    def predict(self,test_data):
        t=len(test_data)
        if t<1:
            print("t<1")
            return
        target_type=test_data[0]
        target_index=self.observations.index(target_type)
        
        self.start_probability=self.start_probability*self.emission_probability[:,target_index]
        if t==1:
            return np.sum(self.start_probability)
        for i in range(t-1):
            target_type=test_data[i+1]
            target_index=self.observations.index(target_type)
            pi=[]
            for i in range(len(self.transition_probability)):
                pi.append(self.transition_probability[:,i].dot(self.start_probability)*self.emission_probability[:,target_index][i])
            self.start_probability=np.array(pi)
            print(self.start_probability)
        return np.sum(self.start_probability)
                
# 设定隐藏状态的集合 
states = ["box 1", "box 2", "box3"] 
n_states = len(states) 
# 设定观察状态的集合 
observations = ["red", "white"] 
n_observations = len(observations)
# 设定初始状态分布 
start_probability = np.array([0.2, 0.4, 0.4])
# 设定状态转移概率分布矩阵 
transition_probability = np.array([ [0.5, 0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]])
# 设定观测状态概率矩阵 
emission_probability = np.array([ [0.5, 0.5], [0.4, 0.6], [0.7, 0.3] ])

my_hmm=ForwardHmm(states,observations)
my_hmm.fit(start_probability,transition_probability,emission_probability)
test_data=["red", "white","red"]
my_hmm.predict(test_data)

[0.077  0.1104 0.0606]
[0.04187  0.035512 0.052836]


0.130218

In [2]:
import numpy as np
# 维比特算法
class ViterbiHmm:
    def __init__(self,n_components,observations):
        self.n_components=n_components
        self.observations=observations
    def fit(self,start_probability,transition_probability,emission_probability):
        self.start_probability=start_probability
        self.transition_probability=transition_probability
        self.emission_probability=emission_probability
    def predict(self,test_data):
        t=len(test_data)
        if t<1:
            print("t<1")
            return
        target_type=test_data[0]
        target_index=self.observations.index(target_type)
        
        self.start_probability=self.start_probability*self.emission_probability[:,target_index]
        first_select=self.transform_arg(np.argmax(self.start_probability))
        if t==1: 
            return self.transform_arg(np.argmax(self.start_probability))
        loop_result=[]
        pis=[]
        for i in range(t-1):
            loop_mid_result=[]
            target_type=test_data[i+1]
            target_index=self.observations.index(target_type)
            pi=[]
            for i in range(len(self.transition_probability)):
                mid_result=self.transition_probability[:,i]*self.start_probability*self.emission_probability[:,target_index][i]
                pi.append((self.transition_probability[:,i]*self.start_probability*self.emission_probability[:,target_index][i])[np.argmax(mid_result)])
                loop_mid_result.append(self.transition_probability[:,i]*self.start_probability*self.emission_probability[:,target_index][i])
            loop_result.append(loop_mid_result)
            self.start_probability=np.array(pi)
            pis.append(pi)
        select_list=[]
        select_list.append(self.transform_arg(np.argmax(self.start_probability)))
        for i in reversed(range(len(pis)-1)):
            select_list.append(self.transform_arg(np.argmax(loop_result[i][np.argmax(pis[i])])))
        select_list.append(first_select)
        
        return list(reversed(select_list))
            
    def transform_arg(self,args):
        return args+1
# 设定隐藏状态的集合 
states = ["box 1", "box 2", "box3"] 
n_states = len(states) 
# 设定观察状态的集合 
observations = ["red", "white"] 
n_observations = len(observations)
# 设定初始状态分布 
start_probability = np.array([0.2, 0.4, 0.4])
# 设定状态转移概率分布矩阵 
transition_probability = np.array([ [0.5, 0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]])
# 设定观测状态概率矩阵 
emission_probability = np.array([ [0.5, 0.5], [0.4, 0.6], [0.7, 0.3] ])

vHmm=ViterbiHmm(states,observations)
vHmm.fit(start_probability,transition_probability,emission_probability)
test_data=np.array(["red", "white","red"])
vHmm.predict(test_data)

[3, 3, 3]