In [1]:
import numpy as np

In [2]:
# 对应状态集合Q
states = ('Healthy', 'Fever')
# 对应观测集合V
observations = ('normal', 'cold', 'dizzy')
# 初始状态概率向量π
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
# 状态转移矩阵A
transition_probability = {
    'Healthy': {'Healthy': 0.7, 'Fever': 0.3},
    'Fever': {'Healthy': 0.4, 'Fever': 0.6},
}
# 观测概率矩阵B
emission_probability = {
    'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
    'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
}

In [3]:
def generate_index_map(lables):
    id2label = {}
    label2id = {}
    i = 0
    for l in lables:
        id2label[i] = l
        label2id[l] = i
        i += 1
    return id2label, label2id
 
states_id2label, states_label2id = generate_index_map(states)
observations_id2label, observations_label2id = generate_index_map(observations)
print(states_id2label, states_label2id)
print(observations_id2label, observations_label2id)

{0: 'Healthy', 1: 'Fever'} {'Healthy': 0, 'Fever': 1}
{0: 'normal', 1: 'cold', 2: 'dizzy'} {'normal': 0, 'cold': 1, 'dizzy': 2}


In [4]:
def convert_map_to_vector(map_, label2id):
    """将概率向量从dict转换成一维array"""
    v = np.zeros(len(map_), dtype=float)
    for e in map_:
        v[label2id[e]] = map_[e]
    return v

 
def convert_map_to_matrix(map_, label2id1, label2id2):
    """将概率转移矩阵从dict转换成矩阵"""
    m = np.zeros((len(label2id1), len(label2id2)), dtype=float)
    for line in map_:
        for col in map_[line]:
            m[label2id1[line]][label2id2[col]] = map_[line][col]
    return m

In [5]:
A = convert_map_to_matrix(transition_probability, states_label2id, states_label2id)
print(A)
B = convert_map_to_matrix(emission_probability, states_label2id, observations_label2id)
print(B)
observations_index = [observations_label2id[o] for o in observations]
pi = convert_map_to_vector(start_probability, states_label2id)
print(pi)

[[0.7 0.3]
 [0.4 0.6]]
[[0.5 0.4 0.1]
 [0.1 0.3 0.6]]
[0.6 0.4]


In [6]:
# 随机生成观测序列和状态序列    
def simulate(T):

    def draw_from(probs):
        """
        1.np.random.multinomial:
        按照多项式分布，生成数据
        >>> np.random.multinomial(20, [1/6.]*6, size=2)
                array([[3, 4, 3, 3, 4, 3],
                       [2, 4, 3, 4, 0, 7]])
         For the first run, we threw 3 times 1, 4 times 2, etc.  
         For the second, we threw 2 times 1, 4 times 2, etc.
        2.np.where:
        >>> x = np.arange(9.).reshape(3, 3)
        >>> np.where( x > 5 )
        (array([2, 2, 2]), array([0, 1, 2]))
        """
        return np.where(np.random.multinomial(1,probs) == 1)[0][0]

    observations = np.zeros(T, dtype=int)
    states = np.zeros(T, dtype=int)
    states[0] = draw_from(pi)
    observations[0] = draw_from(B[states[0],:])
    for t in range(1, T):
        states[t] = draw_from(A[states[t-1],:])
        observations[t] = draw_from(B[states[t],:])
    return observations, states

In [8]:
# 生成模拟数据
observations_data, states_data = simulate(10)
print(observations_data)
print(states_data)
# 相应的label
print("病人的状态: ", [states_id2label[index] for index in states_data])
print("病人的观测: ", [observations_id2label[index] for index in observations_data])

[2 0 2 0 0 1 0 0 0 2]
[1 0 1 0 0 0 1 0 1 1]
病人的状态:  ['Fever', 'Healthy', 'Fever', 'Healthy', 'Healthy', 'Healthy', 'Fever', 'Healthy', 'Fever', 'Fever']
病人的观测:  ['dizzy', 'normal', 'dizzy', 'normal', 'normal', 'cold', 'normal', 'normal', 'normal', 'dizzy']


## offline 

In [32]:
def forward(obs_seq):
    """前向算法"""
    N = A.shape[0]
    T = len(obs_seq)
    
    # F保存前向概率矩阵
    F = np.zeros((N,T))
    F[:,0] = pi * B[:, obs_seq[0]]

    for t in range(1, T):
        for n in range(N):
            F[n,t] = np.dot(F[:,t-1], (A[:,n])) * B[n, obs_seq[t]]

    return F

def backward(obs_seq):
    """后向算法"""
    N = A.shape[0]
    T = len(obs_seq)
    # X保存后向概率矩阵
    X = np.zeros((N,T))
    X[:,-1:] = 1
#     print("x: ", X)

    for t in reversed(range(T-1)):
        for n in range(N):
#             dd = X[:,t+1] * A[n,:] * B[:, obs_seq[t+1]]
#             print(dd)
            X[n,t] = np.sum(X[:,t+1] * A[n,:] * B[:, obs_seq[t+1]])

    return X

In [33]:
forward(observations_data)

array([[6.00000000e-02, 6.90000000e-02, 5.47800000e-03, 5.56770000e-03,
        2.20058700e-03, 6.54980136e-04, 2.77587163e-04, 1.03986035e-04,
        3.84704668e-05, 2.84262018e-06],
       [2.40000000e-01, 1.62000000e-02, 1.82520000e-02, 1.25946000e-03,
        2.42598600e-04, 2.41720578e-04, 3.41526388e-05, 1.03767732e-05,
        3.74218744e-06, 8.27187151e-06]])

In [34]:
backward(observations_data)

array([[4.72193484e-05, 1.19866304e-04, 6.41945373e-04, 1.73522452e-03,
        4.63311200e-03, 1.36959500e-02, 3.70450000e-02, 9.95000000e-02,
        2.50000000e-01, 1.00000000e+00],
       [3.45055449e-05, 1.75538070e-04, 4.16278486e-04, 1.15389304e-03,
        3.78784400e-03, 8.86940000e-03, 2.43400000e-02, 7.40000000e-02,
        4.00000000e-01, 1.00000000e+00]])

In [35]:
def baum_welch_train(observations, A, B, pi, criterion=0.05):
    """无监督学习算法——Baum-Weich算法"""
    n_states = A.shape[0]
    n_samples = len(observations)

    done = False
    while not done:
        # alpha_t(i) = P(O_1 O_2 ... O_t, q_t = S_i | hmm)
        # Initialize alpha
        alpha = forward(observations)

        # beta_t(i) = P(O_t+1 O_t+2 ... O_T | q_t = S_i , hmm)
        # Initialize beta
        beta = backward(observations)
        # ξ_t(i,j)=P(i_t=q_i,i_{i+1}=q_j|O,λ)
        xi = np.zeros((n_states,n_states,n_samples-1))
        for t in range(n_samples-1):
            denom = np.dot(np.dot(alpha[:,t].T, A) * B[:,observations[t+1]].T, beta[:,t+1])
            for i in range(n_states):
                numer = alpha[i,t] * A[i,:] * B[:,observations[t+1]].T * beta[:,t+1].T
                xi[i,:,t] = numer / denom

        # γ_t(i)：gamma_t(i) = P(q_t = S_i | O, hmm)
        gamma = np.sum(xi,axis=1)
        # Need final gamma element for new B
        # xi的第三维长度n_samples-1，少一个，所以gamma要计算最后一个
        prod =  (alpha[:,n_samples-1] * beta[:,n_samples-1]).reshape((-1,1))
        gamma = np.hstack((gamma,  prod / np.sum(prod))) #append one more to gamma!!!
        
        # 更新模型参数
        newpi = gamma[:,0]
        newA = np.sum(xi,2) / np.sum(gamma[:,:-1],axis=1).reshape((-1,1))
        newB = np.copy(B)
        num_levels = B.shape[1]
        sumgamma = np.sum(gamma,axis=1)
        for lev in range(num_levels):
            mask = observations == lev
            newB[:,lev] = np.sum(gamma[:,mask],axis=1) / sumgamma
        
        # 检查是否满足阈值
        if np.max(abs(pi - newpi)) < criterion and \
                        np.max(abs(A - newA)) < criterion and \
                        np.max(abs(B - newB)) < criterion:
            done = 1
        A[:], B[:], pi[:] = newA, newB, newpi
    return newA, newB, newpi

In [131]:
A = np.array([[0.5, 0.5],[0.5, 0.5]])
B = np.array([[0.3, 0.3, 0.4],[0.3, 0.3, 0.4]])
pi = np.array([0.4, 0.6])
print("pi: ", pi)
pic = pi.copy()

observations_data, states_data = simulate(200)
newA, newB, newpi = baum_welch_train(observations_data, A, B, pic,0.02)
print("newA: ", newA)
print("newB: ", newB)
print("newpi: ", newpi)
print("pi: ", pi)


pi:  [0.4 0.6]
newA:  [[0.5 0.5]
 [0.5 0.5]]
newB:  [[0.28928929 0.2952953  0.41541542]
 [0.29070929 0.29470529 0.41458541]]
newpi:  [0.4 0.6]
pi:  [0.4 0.6]


## online