In [1]:
#隐马尔科夫中文分词
class HMM(object):
    def __init__(self):
        #做一些初始化的工作，例如加载模型文件，状态参数等
        self.status = ['B','I','E','S']#词位状态集合
        self.A = {}#状态转移矩阵
        self.B = {}#发射矩阵(混淆矩阵)
        self.PI = {}#状态初始概率
        
    def train(self,path):
        #传入语料文件路径，训练HMM模型
         #统计状态出现次数,用于求初始化概率
        stateCounter = {}
        def initParameters():#初始化参数
            for state in self.status:
                self.A[state] = {s:0.0 for s in self.status}
                self.PI[state] = 0.0
                self.B[state] = {}
                stateCounter[state] = 0
                
        def getLabel(text):#传入词，返回词位
            out = []
            if(len(text) == 1):
                out.append('S')
            else:
                out += ['B']+["I"]*(len(text)-2)+["E"]
            return out
        
        initParameters()
        lineCounter = 0
        # 观察者集合，主要是字以及标点等
        with open(path, encoding='utf8') as f:
            for line in f.readlines():
                lineCounter += 1
                line = line.strip()
                if not line:
                    continue
                word_list = [i for i in line if i != ' ']
                linelist = line.split()
                line_state = []
                for w in linelist:
                    line_state.extend(getLabel(w))
                try:
                    assert len(word_list) == len(line_state)
                except :
                    continue
                for k, v in enumerate(line_state):
                    stateCounter[v] += 1
                    if k == 0:
                        self.PI[v] += 1  # 每个句子的第一个字的状态，用于计算初始状态概率
                    else:
                        self.A[line_state[k - 1]][v] += 1  # 计算转移概率
                        self.B[line_state[k]][word_list[k]] = \
                            self.B[line_state[k]].get(word_list[k], 0) + 1.0  # 计算发射概率
        self.PI= {k: v * 1.0 / lineCounter for k, v in self.PI.items()}
        self.A = {k: {k1: v1 / stateCounter[k] for k1, v1 in v.items()}
                      for k, v in self.A.items()}
        #加1平滑
        self.B = {k: {k1: (v1 + 1) / stateCounter[k] for k1, v1 in v.items()}
                      for k, v in self.B.items()}
        
    def viterbi(self,text,states,startP,transP,emitP):
        V = [{}]
        path = {}
        for state in states:
            V[0][state] = startP[state] * emitP[state].get(text[0], 0)
            path[state] = [state]
        for t in range(1, len(text)):
            V.append({})
            newpath = {}
            #检验训练的发射概率矩阵中是否有该字
            neverSeen = text[t] not in emitP['S'].keys() and \
                text[t] not in emitP['I'].keys() and \
                text[t] not in emitP['E'].keys() and \
                text[t] not in emitP['B'].keys()
            for y in states:
                emitP_ = emitP[y].get(text[t], 0) if not neverSeen else 1.0 #设置未知字单独成词
                (prob, state) = max([(V[t - 1][y0] * transP[y0].get(y, 0) *emitP_, y0) for y0 in states if V[t - 1][y0] > 0])
                V[t][y] = prob
                newpath[y] = path[state] + [y]
            path = newpath
        if emitP['I'].get(text[-1], 0)> emitP['S'].get(text[-1], 0):
            (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E','I')])
        else:
            (prob, state) = max([(V[len(text) - 1][y], y) for y in states])
        return (prob, path[state])
    
    def cut(self,text):
        #加载训练好的模型，进行分词
        prob, pos_list = self.viterbi(text, self.status, self.PI, self.A, self.B)      
        begin, next = 0, 0    
        for i, char in enumerate(text):
            pos = pos_list[i]
            if pos == 'B':
                begin = i
            elif pos == 'E':
                yield text[begin: i+1]
                next = i+1
            elif pos == 'S':
                yield char
                next = i+1
        if next < len(text):
            yield text[next:]

In [6]:
hmm1 = HMM()
hmm1.train("./datas/Corpus_utf8.txt")

In [8]:
text = "我总是尽我的精力和才能来摆脱那种繁重而单调的计算。"
print(str(list(hmm1.cut(text))))

['我', '总是', '尽我', '的', '精力', '和', '才', '能来', '摆脱', '那种', '繁重', '而', '单调', '的', '计算', '。']


In [9]:
hmm1.A

{'B': {'B': 0.0, 'I': 0.173818605218333, 'E': 0.826181394781667, 'S': 0.0},
 'I': {'B': 0.0, 'I': 0.3331339038862623, 'E': 0.6668660961137377, 'S': 0.0},
 'E': {'B': 0.5282171424006276, 'I': 0.0, 'E': 0.0, 'S': 0.46560773066087907},
 'S': {'B': 0.6219373199357607, 'I': 0.0, 'E': 0.0, 'S': 0.32832106717991055}}

In [10]:
hmm1.B

{'B': {'清': 0.0009306603675294129,
  '同': 0.003218851914983312,
  '安': 0.002798638617240977,
  '人': 0.006302183913052407,
  '门': 0.00043883177092372225,
  '不': 0.009172136924539666,
  '民': 0.0022072105923841494,
  '专': 0.0022784121473990292,
  '拍': 0.0003205386433369299,
  '留': 0.0004548173287057212,
  '甚': 0.00048156022654807714,
  '穿': 0.00017038723941754195,
  '统': 0.0007867903474914221,
  '服': 0.0021847179722579483,
  '集': 0.0014743950105801824,
  '中': 0.01237015119479068,
  '三': 0.0021478195435893814,
  '1': 0.00963985553870239,
  '日': 0.003825588462233725,
  '周': 0.0010351118827308509,
  '曾': 0.00025870274452844446,
  '海': 0.0018946835344767858,
  '导': 0.0009322025036919116,
  '驱': 0.0001708009832660172,
  '长': 0.0014022531286369494,
  '综': 0.000635924295106486,
  '补': 0.0005957159156501168,
  '骆': 1.824234241004588e-05,
  '组': 0.0015540971210273725,
  '远': 0.00030169449169273815,
  '训': 0.0002741617192305658,
  '编': 0.0003603332789448238,
  '下': 0.0021478571566665157,
  '驶': 5.2

In [11]:
hmm1.PI

{'B': 0.6885825943318098, 'I': 0.0, 'E': 0.0, 'S': 0.3014821811475153}