<a href="https://colab.research.google.com/github/shaobingbing/AI-For-NLP/blob/master/%E4%B8%AD%E6%96%87%E5%88%86%E8%AF%8D%EF%BC%88HMM%EF%BC%89.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### HMM 建立中文分词模型，使用人们日报的分词语料

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
class HMM(object):
    def __init__(self):
        import os
    
        # 用于存取算法中间结果，不用每次都训练模型
        self.model_file = './drive/Shared drives/shao_bingbing/model/hmm_model.pkl'

        # 状态值集合
        # B: 单词的开头
        # M: 单词的中间
        # E: 单词的末尾
        # S: 单独成词
        self.state_list = ['B', 'M', 'E', 'S']

        # 参数加载，用于判断是否需要重新加载model_file
        self.load_para = False
    
    # 用于加载已计算的中间结果，当需要重新训练时，需初始化清空结果
    def try_load_model(self, trained):
        if trained:
            import pickle
            with open(self.model_file, 'rb') as f:
                self.A_dic = pickle.load(f)
                self.B_dic = pickle.load(f)
                self.pi_dic = pickle.load(f)
                self.load_para = True
        
        else:
            # 状态转移概率：从上一个状态转移到下一个状态的概率 P(q_2|q_1)
            self.A_dic = {}
            # 发射概率 在该状态下对应观测值的概率 P(s_1|q_1)
            self.B_dic = {}
            # 状态的初始概率,对应相应的状态和概率
            self.Pi_dic = {}
            self.load_para = False
        
    # 计算转移概率、发射概率以及初始概率
    def train(self, path):
        self.try_load_model(False)
        
        #统计状态出现次数，求P(o)
        Count_dic = {}
        
        #初始化参数
        def init_parameter():
            for state in self.state_list:
                # 个数为n*n n为状态个数
                self.A_dic[state] = {s: 0.0 for s in self.state_list}
                # 初始状态概率，初始置为0
                self.Pi_dic[state] = 0.0
                # 发射概率,初始置为空字典，因为没有对应单词,找不到对应的键
                self.B_dic[state] = {}
                Count_dic[state] = 0
                
        def makeLabel(text):
            # 对切分好的单词生成对应的标签
            # 若单词长度为1，则为S，若为2，则为BS，多个以上BS中间使用M填充
            out_text = []
            if len(text) == 1:
                out_text.append('S')
            else:
                out_text += ['B'] + ['M'] * (len(text) - 2) + ['E']
                
            return out_text
        
        init_parameter()
        line_num = -1
        
        # 观察者集合，主要是字符以及标点
        words = set()
        with open(path, encoding = 'utf-8') as f:
            for line in f:
                line_num += 1
                
                # 去除两侧的空格
                line = line.strip()
                
                # 跳过空行
                if not line:
                    continue
                    
                word_list = [i for i in line if i != ' ']
                # 取集合的并集，保存了所有的字符，包括标点符号和特殊字符(如果存在的话，取决于语料库)
                words |= set(word_list)
                
                # 分词语料库分词后的单词
                linelist = line.split()
                
                line_state = []
                for w in linelist:
                    line_state.extend(makeLabel(w))
                    
#                 print(len(line_state), line_state)
#                 print(len(word_list), word_list)
                # word_list 和 line_state 分别对应该句话的观测值和状态值
                assert len(word_list) == len(line_state)
                
                for k,v in enumerate(line_state):
                    
                    # 更新状态数目
                    Count_dic[v] += 1
                    
                    if k == 0:
                        # 每个句子的第一个字作为初始状态出现的次数
                        self.Pi_dic[v] += 1
                        
                    else:
                        # q_t和q_t+1出现的联合次数
                        self.A_dic[line_state[k - 1]][v] += 1
                        # 发射概率 P(S_k|q_k),需要对空字典进行更新操作，使用get(key, default_value)进行更新操作（如果该键不存在
                        self.B_dic[line_state[k]][word_list[k]] = self.B_dic[line_state[k]].get(word_list[k], 0) + 1.0
            
                        
        # 上面计算的为统计量，下面计算概率，范围(0,1)
        # 更新初始状态概率，直接除以行数即可，有多少行代表以第一个出现了多少次
        self.Pi_dic = {k: v*1.0/line_num for k,v in self.Pi_dic.items()}
        
        # 更新转移概率
        self.A_dic = {k: {k1: v1/Count_dic[k] for k1,v1 in v.items()} for k, v in self.A_dic.items()}
        
        # 更新发射概率,加1平滑(在使用维特比算法进行计算可能会出现未知词，因此需要对其进行概率上的更新)
        self.B_dict = {k: {k1: (v1 + 1)/Count_dic[k] for k1, v1 in v.items()} for k,v in self.B_dic.items()}
        
        import pickle
        with open(self.model_file, 'wb') as f:
            pickle.dump(self.A_dic, f)
            pickle.dump(self.B_dic, f)
            pickle.dump(self.Pi_dic, f)

        return self
    
    
    def viterbi(self, text, states, start_p, trans_p, emit_p):
        
        # 保存第i步的最佳路径
        V = [{}]
        
        path = {}
        for y in states:
            V[0][y] = start_p[y] * emit_p[y].get(text[0], 0)
            path[y] = [y]
        for t in range(1, len(text)):
            V.append({})
            newpath = {}
            
            #检验训练的发射概率矩阵中是否有该字
            neverSeen = text[t] not in emit_p['S'].keys() and text[t] not in emit_p['M'].keys() and text[t] not in emit_p['E'].keys() and text[t] not in emit_p['B'].keys()
            for y in states:
                # 如果该单词未在语料库中出现
                emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0
                (prob, state) = max([(V[t - 1][y0] * trans_p[y0].get(y, 0) *emitP, y0) for y0 in states if V[t - 1][y0] > 0])
                V[t][y] = prob
                newpath[y] = path[state] + [y]
            path = newpath
            
        if emit_p['M'].get(text[-1], 0)> emit_p['S'].get(text[-1], 0):
            (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E','M')])
        else:
            (prob, state) = max([(V[len(text) - 1][y], y) for y in states])
        
        return (prob, path[state])
    
    def cut(self, text):
        import os
        if not self.load_para:
            self.try_load_model(os.path.exists(self.model_file))
        prob, pos_list = self.viterbi(text, self.state_list, self.Pi_dic, self.A_dic, self.B_dic)      
        begin, next = 0, 0    
        for i, char in enumerate(text):
            pos = pos_list[i]
            if pos == 'B':
                begin = i
            elif pos == 'E':
                yield text[begin: i+1]
                next = i+1
            elif pos == 'S':
                yield char
                next = i+1
        if next < len(text):
            yield text[next:]

In [0]:
hmm = HMM()

In [55]:
hmm.train('./drive/Shared drives/shao_bingbing/data/trainCorpus.txt')

<__main__.HMM at 0x7f4a2a1425c0>

In [56]:
text = '这是一个非常棒的方案！'
res = hmm.cut(text)
print(text)
print(str(list(res)))

这是一个非常棒的方案！
['这是', '一个', '非常', '棒', '的', '方案', '！']
