<img src="../img/hmm_forward.png" style="zoom:45%;" align="left"/>

<img src="../img/hmm_backward.png" style="zoom:45%;" align="left"/>

In [17]:
import numpy as np

class HMM:
    def __init__(self, pi, A, B):
        self.pi = pi  # 初始隐状态概率矩阵
        self.A = A  # 转移概率矩阵
        self.B = B  # 发射概率矩阵
        self.alpha = None
        self.beta = None

    def forward(self, O):
        T = len(O)  # 观测序列长度
        N = len(self.A)  # 状态数
        alpha = np.zeros([T, N])  # 初始化alpha矩阵

        # alpha_1(i)=p_i*b_i(o_1)
        alpha[0, :] = self.pi * self.B[:, O[0]]
        for t in range(1, T):
            # alpha_t+1(i)=\sum_j{alpha_t(j)*a_ji}*b_i(o_t+1)
            alpha[t, :] = np.sum(alpha[t - 1, :] * self.A.T, axis=1) * self.B[:, O[t]]
        # print(alpha)
        self.alpha = alpha  # shape: N * T

        return np.sum(alpha[-1, :])
    
    def backward(self, O):
        T = len(O)  # 观测序列长度
        N = len(self.A)  # 状态数
        beta = np.zeros([T, N])  # 初始化beta矩阵

        # beta_T=1
        beta[-1, :] = 1
        for t in range(T - 2, -1, -1):
            # beta_t(i)=\sum_j{beta_t+1(j)*a_ij*b_j(o_t+1)}
            beta[t, :] = np.sum(beta[t + 1, :] * self.A * self.B[:, O[t + 1]], axis=1)
        # print(beta)
        self.beta = beta  # shape: N * T

        # \sum_i{pi_i*b_i(o_1)*beta_1(i}
        return np.sum(self.pi * self.B[:, O[0]] * beta[0, :])
    
     def baumwelch(self, O, criterion=0.001):
        T = len(O)
        N = len(self.A)
        while True:
            self.forward(O)
            self.backward(O)

            # 求gamma, shape: T * N
            molecular1 = self.alpha * self.beta
            denominator1 = np.sum(molecular1, axis=1).reshape(-1, 1)
            gamma = molecular1 / denominator1

            # 求xi, shape: T * N * N
            xi = np.zeros([T - 1, N, N])
            for t in range(T - 1):
                molecular2 = self.alpha[t, :].reshape(1, -1) * self.A.T * self.B[:, O[t + 1]].reshape(-1,
                                                                                                      1) * self.beta[
                                                                                                           t + 1,
                                                                                                           :].reshape(
                    -1, 1)
                denominator2 = np.sum(molecular2)
                xi[t, :, :] = molecular2.T / denominator2

            # 递推
            newpi = gamma[0, :]
            newA = np.sum(xi, axis=0) / np.sum(gamma[:-1, :], axis=0).reshape(-1, 1)
            newB = np.zeros(self.B.shape)
            for k in range(self.B.shape[1]):
                mask = (O == k)
                newB[:, k] = np.sum(gamma[mask, :], axis=0) / np.sum(gamma, axis=0)

            # 终止
            if np.max(abs(self.pi - newpi)) < criterion and \
                    np.max(abs(self.A - newA)) < criterion and \
                    np.max(abs(self.B - newB)) < criterion:
                break

            self.A, self.B, self.pi = newA, newB, newpi

In [2]:
import numpy as np
from tqdm import tqdm
import json

In [3]:
def load_dict(path):
    """
    加载字典
    :param path:
    :return:
    """
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


def load_data(path):
    """
    读取txt文件, 加载训练数据
    :param path:
    :return:
    [{'text': ['当', '希', '望', ...],
     'label': ... }, {...}, ... ]
    """
    with open(path, "r", encoding="utf-8") as f:
        return [eval(i) for i in f.readlines()]


In [4]:
class HMM_NER:
    def __init__(self, char2idx_path, tag2idx_path):
        # 载入一些字典
        # char2idx: 字 转换为 token
        self.char2idx = load_dict(char2idx_path)
        # tag2idx: 标签转换为 token
        self.tag2idx = load_dict(tag2idx_path)
        # idx2tag: token转换为标签
        self.idx2tag = {v: k for k, v in self.tag2idx.items()}
        # 初始化隐状态数量(实体标签数)和观测数量(字数)
        self.tag_size = len(self.tag2idx)
        self.vocab_size = max([v for _, v in self.char2idx.items()]) + 1
        # 初始化A, B, pi为全0
        self.transition = np.zeros([self.tag_size,
                                    self.tag_size])
        self.emission = np.zeros([self.tag_size,
                                  self.vocab_size])
        self.pi = np.zeros(self.tag_size)
        # 偏置, 用来防止log(0)或乘0的情况
        self.epsilon = 1e-8

    def fit(self, train_dic_path):
        """
        fit用来训练HMM模型
        :param train_dic_path: 训练数据目录
        """
        print("initialize training...")
        train_dic = load_data(train_dic_path)
        # 估计转移概率矩阵, 发射概率矩阵和初始概率矩阵的参数
        self.estimate_transition_and_initial_probs(train_dic)
        self.estimate_emission_probs(train_dic)
        # take the logarithm
        # 取log防止计算结果下溢
        self.pi = np.log(self.pi)
        self.transition = np.log(self.transition)
        self.emission = np.log(self.emission)
        print("DONE!")

    def estimate_emission_probs(self, train_dic):
        """
        发射矩阵参数的估计
        estimate p( Observation | Hidden_state )
        :param train_dic:
        :return:
        """
        print("estimating emission probabilities...")
        for dic in tqdm(train_dic):
            for char, tag in zip(dic["text"], dic["label"]):
                self.emission[self.tag2idx[tag],
                              self.char2idx[char]] += 1
        self.emission[self.emission == 0] = self.epsilon
        self.emission /= np.sum(self.emission, axis=1, keepdims=True)

    def estimate_transition_and_initial_probs(self, train_dic):
        """
        转移矩阵和初始概率的参数估计, 也就是bigram二元模型
        estimate p( Y_t+1 | Y_t )
        :param train_dic:
        :return:
        """
        print("estimating transition and initial probabilities...")
        for dic in tqdm(train_dic):
            for i, tag in enumerate(dic["label"][:-1]):
                if i == 0:
                    self.pi[self.tag2idx[tag]] += 1
                curr_tag = self.tag2idx[tag]
                next_tag = self.tag2idx[dic["label"][i + 1]]
                self.transition[curr_tag, next_tag] += 1
        self.transition[self.transition == 0] = self.epsilon
        self.transition /= np.sum(self.transition, axis=1, keepdims=True)
        self.pi[self.pi == 0] = self.epsilon
        self.pi /= np.sum(self.pi)

    def get_p_Obs_State(self, char):
        # 计算p( observation | state)
        # 如果当前字属于未知, 则讲p( observation | state)设为均匀分布
        char_token = self.char2idx.get(char, 0)
        if char_token == 0:
            return np.log(np.ones(self.tag_size) / self.tag_size)
        return np.ravel(self.emission[:, char_token])

    def predict(self, text):
        # 预测并打印出预测结果
        # 维特比算法解码
        if len(text) == 0:
            raise NotImplementedError("输入文本为空!")
        best_tag_id = self.viterbi_decode(text)
        self.print_func(text, best_tag_id)

    def print_func(self, text, best_tags_id):
        # 用来打印预测结果
        for char, tag_id in zip(text, best_tags_id):
            print(char + "_" + self.idx2tag[tag_id] + "|", end="")

    def viterbi_decode(self, text):
        """
        维特比解码, 详见视频教程或文字版教程
        :param text: 一段文本string
        :return: 最可能的隐状态路径
        """
        # 得到序列长度
        seq_len = len(text)
        # 初始化T1和T2表格
        T1_table = np.zeros([seq_len, self.tag_size])
        T2_table = np.zeros([seq_len, self.tag_size])
        # 得到第1时刻的发射概率
        start_p_Obs_State = self.get_p_Obs_State(text[0])
        # 计算第一步初始概率, 填入表中
        T1_table[0, :] = self.pi + start_p_Obs_State
        T2_table[0, :] = np.nan

        for i in range(1, seq_len):
            # 维特比算法在每一时刻计算落到每一个隐状态的最大概率和路径
            # 并把他们暂存起来
            # 这里用到了矩阵化计算方法, 详见视频教程
            p_Obs_State = self.get_p_Obs_State(text[i])
            p_Obs_State = np.expand_dims(p_Obs_State, axis=0)
            prev_score = np.expand_dims(T1_table[i - 1, :], axis=-1)
            # 广播算法, 发射概率和转移概率广播 + 转移概率
            curr_score = prev_score + self.transition + p_Obs_State
            # 存入T1 T2中
            T1_table[i, :] = np.max(curr_score, axis=0)
            T2_table[i, :] = np.argmax(curr_score, axis=0)
        # 回溯
        best_tag_id = int(np.argmax(T1_table[-1, :]))
        best_tags = [best_tag_id, ]
        for i in range(seq_len - 1, 0, -1):
            best_tag_id = int(T2_table[i, best_tag_id])
            best_tags.append(best_tag_id)
        return list(reversed(best_tags))

In [8]:
model = HMM_NER(char2idx_path="./corpus/char2idx.json",
                tag2idx_path="./corpus/label2idx.json")
model.fit("./corpus/train.txt")
model.predict("中国周五宣布其北斗三号全球卫星导航系统正式开通，并做好为全球提供服务的准备，从而跻身由美国、俄罗斯和欧盟组成的提供空基（导航）系统的精英行列。")

initialize training...


  6%|▌         | 3163/55290 [00:00<00:01, 31624.00it/s]

estimating transition and initial probabilities...


100%|██████████| 55290/55290 [00:01<00:00, 34675.45it/s]
 14%|█▍        | 8007/55290 [00:00<00:01, 40097.97it/s]

estimating emission probabilities...


100%|██████████| 55290/55290 [00:01<00:00, 37821.96it/s]


DONE!
中_B-LOC|国_I-LOC|周_O|五_O|宣_O|布_O|其_O|北_B-ORG|斗_I-ORG|三_I-ORG|号_I-ORG|全_I-ORG|球_I-ORG|卫_I-ORG|星_I-ORG|导_I-ORG|航_I-ORG|系_O|统_O|正_O|式_O|开_O|通_O|，_O|并_O|做_O|好_O|为_O|全_O|球_O|提_O|供_O|服_O|务_O|的_O|准_O|备_O|，_O|从_O|而_O|跻_O|身_O|由_O|美_B-LOC|国_I-LOC|、_O|俄_B-LOC|罗_I-LOC|斯_I-LOC|和_O|欧_B-ORG|盟_I-ORG|组_I-ORG|成_O|的_O|提_O|供_O|空_O|基_O|（_O|导_O|航_O|）_O|系_O|统_O|的_O|精_O|英_B-LOC|行_I-LOC|列_I-LOC|。_O|

In [13]:
model.predict("中国科学院大学计算机网络信息中心的研究生董继平")

中_B-ORG|国_I-ORG|科_I-ORG|学_I-ORG|院_I-ORG|大_I-ORG|学_I-ORG|计_I-ORG|算_I-ORG|机_I-ORG|网_I-ORG|络_I-ORG|信_I-ORG|息_I-ORG|中_I-ORG|心_I-ORG|的_O|研_O|究_O|生_O|董_B-PER|继_I-PER|平_I-PER|