语言模型、HMM模型、CRF模型

隐马尔可夫模型是将分词作为字在字符串中的序列标注任务来完成的，基本思路是：每个字在构成一个特定的词语时都占据一个确定的构词位置（B开始，M中间，E结尾，S独立词）

训练数据是已经分词好了的文本，对文本进行标记，每一个字都有（BMES）四种可能，需要计算在这四种可能下，每一个字的概率，和在这四种可能下，下一个字的四种状态的概率，然后对目标文本进行预测，预测方法是veterbi算法

In [4]:
class HMM(object):
    def __init__(self):
        import os

        # 主要是用于存取算法中间结果，不用每次都训练模型
        self.model_file = './data/hmm_model.pkl'

        # 状态值集合
        self.state_list = ['B', 'M', 'E', 'S']
        # 参数加载,用于判断是否需要重新加载model_file
        self.load_para = False

    # 用于加载已计算的中间结果，当需要重新训练时，需初始化清空结果
    def try_load_model(self, trained):
        if trained:
            import pickle
            with open(self.model_file, 'rb') as f:
                self.A_dic = pickle.load(f)
                self.B_dic = pickle.load(f)
                self.Pi_dic = pickle.load(f)
                self.load_para = True

        else:
            # 状态转移概率（状态->状态的条件概率）
            self.A_dic = {}
            # 发射概率（状态->词语的条件概率）
            self.B_dic = {}
            # 状态的初始概率
            self.Pi_dic = {}
            self.load_para = False

    # 计算转移概率、发射概率以及初始概率
    def train(self, path):

        # 重置几个概率矩阵
        self.try_load_model(False)

        # 统计状态出现次数，求p(o)
        Count_dic = {}

        # 初始化参数
        def init_parameters():
            for state in self.state_list:
                self.A_dic[state] = {s: 0.0 for s in self.state_list}
                self.Pi_dic[state] = 0.0
                self.B_dic[state] = {}

                Count_dic[state] = 0

        def makeLabel(text):
            out_text = []
            if len(text) == 1:
                out_text.append('S')
            else:
                out_text += ['B'] + ['M'] * (len(text) - 2) + ['E']

            return out_text

        init_parameters()
        line_num = -1
        # 观察者集合，主要是字以及标点等
        words = set()
        with open(path, encoding='utf8') as f:
            for line in f:
                line_num += 1

                line = line.strip()
                if not line:
                    continue

                word_list = [i for i in line if i != ' ']
                words |= set(word_list)  # 更新字的集合

                linelist = line.split()

                line_state = []
                for w in linelist:
                    line_state.extend(makeLabel(w))
                
                assert len(word_list) == len(line_state)

                for k, v in enumerate(line_state):
                    Count_dic[v] += 1
                    if k == 0:
                        self.Pi_dic[v] += 1  # 每个句子的第一个字的状态，用于计算初始状态概率
                    else:
                        self.A_dic[line_state[k - 1]][v] += 1  # 计算转移概率
                        self.B_dic[line_state[k]][word_list[k]] = \
                            self.B_dic[line_state[k]].get(word_list[k], 0) + 1.0  # 计算发射概率
        
        self.Pi_dic = {k: v * 1.0 / line_num for k, v in self.Pi_dic.items()}
        self.A_dic = {k: {k1: v1 / Count_dic[k] for k1, v1 in v.items()}
                      for k, v in self.A_dic.items()}
        #加1平滑
        self.B_dic = {k: {k1: (v1 + 1) / Count_dic[k] for k1, v1 in v.items()}
                      for k, v in self.B_dic.items()}
        #序列化
        import pickle
        with open(self.model_file, 'wb') as f:
            pickle.dump(self.A_dic, f)
            pickle.dump(self.B_dic, f)
            pickle.dump(self.Pi_dic, f)

        return self

    def viterbi(self, text, states, start_p, trans_p, emit_p):
        V = [{}]
        path = {}
        for y in states:
            V[0][y] = start_p[y] * emit_p[y].get(text[0], 0)
            path[y] = [y]
        for t in range(1, len(text)):
            V.append({})
            newpath = {}
            
            #检验训练的发射概率矩阵中是否有该字
            neverSeen = text[t] not in emit_p['S'].keys() and \
                text[t] not in emit_p['M'].keys() and \
                text[t] not in emit_p['E'].keys() and \
                text[t] not in emit_p['B'].keys()
            for y in states:
                emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0 #设置未知字单独成词
                (prob, state) = max(
                    [(V[t - 1][y0] * trans_p[y0].get(y, 0) *
                      emitP, y0)
                     for y0 in states if V[t - 1][y0] > 0])
                V[t][y] = prob
                newpath[y] = path[state] + [y]
            path = newpath
            
        if emit_p['M'].get(text[-1], 0)> emit_p['S'].get(text[-1], 0):
            (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E','M')])
        else:
            (prob, state) = max([(V[len(text) - 1][y], y) for y in states])
        
        return (prob, path[state])

    def cut(self, text):
        import os
        if not self.load_para:
            self.try_load_model(os.path.exists(self.model_file))
        prob, pos_list = self.viterbi(text, self.state_list, self.Pi_dic, self.A_dic, self.B_dic)      
        begin, next = 0, 0    
        for i, char in enumerate(text):
            pos = pos_list[i]
            if pos == 'B':
                begin = i
            elif pos == 'E':
                yield text[begin: i+1]
                next = i+1
            elif pos == 'S':
                yield char
                next = i+1
        if next < len(text):
            yield text[next:]

### hmm模型粗糙版

### 预处理

In [63]:
import pickle
import re

#读取文本
with open('train_text.txt','r')as f:
    raw_text=f.read()

#消除'\n'
raw_text=re.sub(r'\n',' ',raw_text)

#建立三个字典，分别存储计算发射概率、转移概率、初始概率
#初始概率是一句话的开头是S还是B的概率

dic_A={}
dic_B={}
dic_Pi={'B':0,'S':0}

#初始化字典A
word_list=[]
for word in raw_text:
    word_list.append(word)
word_list=set(word_list)

for word in word_list:
    dic_A[word]=[0]*4

#初始化字典B
for i in 'BMES':
    dic_B[i]=[0]*4

#完善字典Pi    
sentences=raw_text.split('。')
for sent in sentences:
    if len(sent.split()[0])==0:
        continue
    if len(sent.split()[0])==1:
        dic_Pi['S']+=1
    else:
        dic_Pi['B']+=1

#完善字典A，并为字典B做准备        
text_list=raw_text.split()
Token=[]
sum_of_state=[0]*4
for i,phrase in enumerate(text_list):
    if len(phrase)==0:
        continue
    if len(phrase)==1:
        dic_A[phrase][3]+=1
        Token.append('S')
        sum_of_state[3]+=1
    elif len(phrase)==2:
        dic_A[phrase[0]][0]+=1
        Token.append('B')
        sum_of_state[0]+=1
        dic_A[phrase[1]][2]+=1
        Token.append('E')
        sum_of_state[2]+=1
    else:
        dic_A[phrase[0]][0]+=1
        Token.append('B')
        sum_of_state[0]+=1
        for word in phrase[1:-1]:
            dic_A[word][1]+=1
            Token.append('M')
            sum_of_state[1]+=1
        dic_A[phrase[-1]][2]+=1
        Token.append('E')
        sum_of_state[2]+=1

#完善字典B
pre_Token=Token[:-1]
next_Token=Token[1:]
for i,token in enumerate(pre_Token):
    if next_Token[i]=='B':
        dic_B[token][0]+=1
    elif next_Token[i]=='M':
        dic_B[token][1]+=1
    elif next_Token[i]=='E':
        dic_B[token][2]+=1
    elif next_Token[i]=='S':
        dic_B[token][3]+=1

### 计算转移概率分布、发射概率分布和初始概率分布

已经有了计数，接下来计算转移概率分布、发射概率分布和初始概率分布

In [64]:
#dic_init、dic_launch、dic_change分别记录初始概率，发射概率(当前点是B/W/E/S的概率)和转移概率（上一个点是B/W/E/S时，下一个点是B/W/E/S的概率）
import copy
dic_init={}
dic_init['B']=dic_Pi['B']/(dic_Pi['B']+dic_Pi['S'])
dic_init['S']=1-dic_init['B']
dic_init['M']=0
dic_init['E']=0
dic_launch=copy.deepcopy(dic_A)
for key in dic_A.keys():
    for i in range(4):
        dic_launch[key][i]=dic_A[key][i]/sum_of_state[i]
        
dic_change=copy.deepcopy(dic_B)
for i in 'BMES':
    SUM=dic_B[i][0]+dic_B[i][1]+dic_B[i][2]+dic_B[i][3]
    for j in range(4):
        dic_change[i][j]=dic_B[i][j]/SUM

### viterbi算法

一个点最优路径的前一个节点也是最优路径

In [65]:
test='这是一个非常棒的方案！'

P_now=dic_init.copy()#dict对象，键值BMES的值分别表示当前节点的最大概率

output=[[] for i in range(4)]#记录最优路径

#先计算前两个节点的最优概率
#初始字符的概率是初始字的概率乘以初始概率

word=test[0]
for index,label in enumerate('BMES'):
    P_now[label]=P_now[label]*dic_launch[word][index]
#此时用于计算第一层的分布
word=test[1]
pre_P=copy.deepcopy(P_now)
for index,label in enumerate('BMES'):
    #计算上一节点概率分布和当前节点的概率分布的乘积，当前节点每个位置选择最优路径
    operation= [pre_P['B']*dic_change['B'][index]*dic_launch[word][index],\
            pre_P['M']*dic_change['M'][index]*dic_launch[word][index],\
            pre_P['E']*dic_change['E'][index]*dic_launch[word][index],\
            pre_P['S']*dic_change['S'][index]*dic_launch[word][index]]
    #到达label的最佳路径是从'BMES'[max(operation)]到label的路径
    P_now[label]=max(operation)
    #当前选择的最优节点的index是operation.index(max(operation))
    #上次选择的节点，加上本次到达的节点就是当前节点的最优路径
    output[index]='BMES'[operation.index(max(operation))]+label
for word in test[2:]:
    #用pre_output来记录上一节点中的最优路径
    pre_P=copy.deepcopy(P_now)
    pre_output=copy.deepcopy(output)
    for index,label in enumerate('BMES'):
        operation= [pre_P['B']*dic_change['B'][index]*dic_launch[word][index],\
                pre_P['M']*dic_change['M'][index]*dic_launch[word][index],\
                pre_P['E']*dic_change['E'][index]*dic_launch[word][index],\
                pre_P['S']*dic_change['S'][index]*dic_launch[word][index]]
        P_now[label]=max(operation)
        output[index]=pre_output[operation.index(max(operation))]+label

In [66]:
MAX=0
for key in 'BMES':
    if P_now[key]>MAX:
        P_now[key]
        index=key
for i in output:
    if i[-1]=='S':
        LIST=i 
f=[]
index=0
for i,value in enumerate(LIST):
    if value == 'S':
        f.append(test[index:i+1])
        index=i+1
    if value == 'E':
        f.append(test[index:i+1])
        index=i+1
seq='/'.join(f)
print(seq)

这/是/一个/非常/棒/的/方案/！
