第08课：从自然语言处理角度看 HMM 和 CRF

HMM 模型是由一个“五元组”组成的集合：

StatusSet：状态值集合，状态值集合为 (B, M, E, S)，其中 B 为词的首个字，M 为词中间的字，E 为词语中最后一个字，S 为单个字，B、M、E、S 每个状态代表的是该字在词语中的位置。

举个例子，对“中国的人工智能发展进入高潮阶段”，分词可以标注为：“中B国E的S人B工E智B能E发B展E进B入E高B潮E阶B段E”，最后的分词结果为：['中国', '的', '人工', '智能', '发展', '进入', '高潮', '阶段']。

ObservedSet：观察值集合，观察值集合就是所有语料的汉字，甚至包括标点符号所组成的集合。

TransProbMatrix：转移概率矩阵，状态转移概率矩阵的含义就是从状态 X 转移到状态 Y 的概率，是一个4×4的矩阵，即 {B,E,M,S}×{B,E,M,S}。

EmitProbMatrix：发射概率矩阵，发射概率矩阵的每个元素都是一个条件概率，代表 P(Observed[i]|Status[j]) 概率。

InitStatus：初始状态分布，初始状态概率分布表示句子的第一个字属于 {B,E,M,S} 这四种状态的概率。



In [1]:
import pickle
import json 

In [2]:
STATES = {'B', 'M', 'E', 'S'}
EPS = 0.0001
#定义停顿标点
seg_stop_words = {" ","，","。","“","”",'“', "？", "！", "：", "《", "》", "、", "；", "·", "‘ ", "’", "──", ",", ".", "?", "!", "`", "~", "@", "#", "$", "%", "^", "&", "*", "(", ")", "-", "_", "+", "=", "[", "]", "{", "}", '"', "'", "<", ">", "\\", "|" "\r", "\n","\t"}

其中的数据结构定义：

trans_mat：状态转移矩阵，trans_mat[state1][state2] 表示训练集中由 state1 转移到 state2 的次数。

emit_mat：观测矩阵，emit_mat[state][char] 表示训练集中单字 char 被标注为 state 的次数。

init_vec：初始状态分布向量，init_vec[state] 表示状态 state 在训练集中出现的次数。

state_count：状态统计向量，state_count[state]表示状态 state 出现的次数。

word_set：词集合，包含所有单词。

In [9]:
class HMM_Model:
    def __init__(self):
            self.trans_mat = {}  
            self.emit_mat = {} 
            self.init_vec = {}  
            self.state_count = {} 
            self.states = {}
            self.inited = False
            
     #初始化数据结构    
    def setup(self):
        for state in self.states:
            # build trans_mat
            self.trans_mat[state] = {}
            for target in self.states:
                self.trans_mat[state][target] = 0.0
            self.emit_mat[state] = {}
            self.init_vec[state] = 0
            self.state_count[state] = 0
        self.inited = True
        
        
    #模型保存   
    def save(self, filename="../data/08/hmm.json", code='json'):
        fw = open(filename, 'w', encoding='utf-8')
        data = {
            "trans_mat": self.trans_mat,
            "emit_mat": self.emit_mat,
            "init_vec": self.init_vec,
            "state_count": self.state_count
        }
        if code == "json":
            txt = json.dumps(data)
            txt = txt.encode('utf-8').decode('unicode-escape')
            fw.write(txt)
        elif code == "pickle":
            pickle.dump(data, fw)
        fw.close()
        
        
     #模型加载
    def load(self, filename="../data/08/hmm.json", code="json"):
        fr = open(filename, 'r', encoding='utf-8')
        if code == "json":
            txt = fr.read()
            model = json.loads(txt)
        elif code == "pickle":
            model = pickle.load(fr)
        self.trans_mat = model["trans_mat"]
        self.emit_mat = model["emit_mat"]
        self.init_vec = model["init_vec"]
        self.state_count = model["state_count"]
        self.inited = True
        fr.close()
        
    def do_train(self, observes, states):
        if not self.inited:
            self.setup()

        for i in range(len(states)):
            if i == 0:
                self.init_vec[states[0]] += 1
                self.state_count[states[0]] += 1
            else:
                self.trans_mat[states[i - 1]][states[i]] += 1
                self.state_count[states[i]] += 1
                if observes[i] not in self.emit_mat[states[i]]:
                    self.emit_mat[states[i]][observes[i]] = 1
                else:
                    self.emit_mat[states[i]][observes[i]] += 1
                    
                    
    #频数转频率
    def get_prob(self):
        init_vec = {}
        trans_mat = {}
        emit_mat = {}
        default = max(self.state_count.values())  

        for key in self.init_vec:
            if self.state_count[key] != 0:
                init_vec[key] = float(self.init_vec[key]) / self.state_count[key]
            else:
                init_vec[key] = float(self.init_vec[key]) / default

        for key1 in self.trans_mat:
            trans_mat[key1] = {}
            for key2 in self.trans_mat[key1]:
                if self.state_count[key1] != 0:
                    trans_mat[key1][key2] = float(self.trans_mat[key1][key2]) / self.state_count[key1]
                else:
                    trans_mat[key1][key2] = float(self.trans_mat[key1][key2]) / default

        for key1 in self.emit_mat:
            emit_mat[key1] = {}
            for key2 in self.emit_mat[key1]:
                if self.state_count[key1] != 0:
                    emit_mat[key1][key2] = float(self.emit_mat[key1][key2]) / self.state_count[key1]
                else:
                    emit_mat[key1][key2] = float(self.emit_mat[key1][key2]) / default
        return init_vec, trans_mat, emit_mat
    
#     预测采用 Viterbi 算法求得最优路径
 #模型预测
    def do_predict(self, sequence):
        tab = [{}]
        path = {}
        init_vec, trans_mat, emit_mat = self.get_prob()

        # 初始化
        for state in self.states:
            tab[0][state] = init_vec[state] * emit_mat[state].get(sequence[0], EPS)
            path[state] = [state]

        # 创建动态搜索表
        for t in range(1, len(sequence)):
            tab.append({})
            new_path = {}
            for state1 in self.states:
                items = []
                for state2 in self.states:
                    if tab[t - 1][state2] == 0:
                        continue
                    prob = tab[t - 1][state2] * trans_mat[state2].get(state1, EPS) * emit_mat[state1].get(sequence[t], EPS)
                    items.append((prob, state2))
                best = max(items)  
                tab[t][state1] = best[0]
                new_path[state1] = path[best[1]] + [state1]
            path = new_path

        # 搜索最有路径
        prob, state = max([(tab[len(sequence) - 1][state], state) for state in self.states])
        return path[state]

In [10]:
 def get_tags(src):
        tags = []
        if len(src) == 1:
            tags = ['S']
        elif len(src) == 2:
            tags = ['B', 'E']
        else:
            m_num = len(src) - 2
            tags.append('B')
            tags.extend(['M'] * m_num)
            tags.append('E')
        return tags

In [11]:
def cut_sent(src, tags):
        word_list = []
        start = -1
        started = False

        if len(tags) != len(src):
            return None

        if tags[-1] not in {'S', 'E'}:
            if tags[-2] in {'S', 'E'}:
                tags[-1] = 'S'  
            else:
                tags[-1] = 'E'  

        for i in range(len(tags)):
            if tags[i] == 'S':
                if started:
                    started = False
                    word_list.append(src[start:i])  
                word_list.append(src[i])
            elif tags[i] == 'B':
                if started:
                    word_list.append(src[start:i])  
                start = i
                started = True
            elif tags[i] == 'E':
                started = False
                word = src[start:i+1]
                word_list.append(word)
            elif tags[i] == 'M':
                continue
        return word_list

In [12]:
 class HMMSoyoger(HMM_Model):
        def __init__(self, *args, **kwargs):
                super(HMMSoyoger, self).__init__(*args, **kwargs)
                self.states = STATES
                self.data = None
                
     #加载语料
        def read_txt(self, filename):
            self.data = open(filename, 'r', encoding="utf-8")
            
        def train(self):
            if not self.inited:
                self.setup()

            for line in self.data:
                line = line.strip()
                if not line:
                    continue

               #观测序列
                observes = []
                for i in range(len(line)):
                    if line[i] == " ":
                        continue
                    observes.append(line[i])

                #状态序列
                words = line.split(" ")  

                states = []
                for word in words:
                    if word in seg_stop_words:
                        continue
                    states.extend(get_tags(word))
                #开始训练
                if(len(observes) >= len(states)):
                    self.do_train(observes, states)
                else:
                    pass
                
        def lcut(self, sentence):
            try:
                tags = self.do_predict(sentence)
                return cut_sent(sentence, tags)
            except:
                return sentence

In [13]:
soyoger = HMMSoyoger()
soyoger.read_txt("../data/08/syj_trainCorpus_utf8.txt")
soyoger.train()

In [14]:
soyoger.lcut("中国的人工智能发展进入高潮阶段。")

['中国', '的', '人工', '智能', '发展', '进入', '高潮', '阶段', '。']

In [16]:
import genius
text = u"""中文自然语言处理是人工智能技术的一个重要分支。"""
seg_list = genius.seg_text(
    text,
    use_combine=True,
    use_pinyin_segment=True,
    use_tagging=True,
    use_break=True
)


In [18]:
seg_list

[<genius.word.Word at 0x105645588>,
 <genius.word.Word at 0x1273c1a90>,
 <genius.word.Word at 0x1273c19b0>,
 <genius.word.Word at 0x1273c1b00>,
 <genius.word.Word at 0x1273c1ac8>,
 <genius.word.Word at 0x1273c1a58>,
 <genius.word.Word at 0x1273c1b38>,
 <genius.word.Word at 0x1273c1ba8>,
 <genius.word.Word at 0x1273c1be0>,
 <genius.word.Word at 0x1273c1b70>,
 <genius.word.Word at 0x105645b70>]

In [20]:
seg_list[0].text

'中文'

In [24]:
[word.text  for word in seg_list]

['中文', '自然语言', '处理', '是', '人工智能', '技术', '的', '一个', '重要', '分支', '。']

其中，genius.seg_text 函数接受5个参数，其中 text 是必填参数：

text 第一个参数为需要分词的字。
use_break 代表对分词结构进行打断处理，默认值 True。\n
use_combine 代表是否使用字典进行词合并，默认值 False。\n
use_tagging 代表是否进行词性标注，默认值 True。\n
use_pinyin_segment 代表是否对拼音进行分词处理，默认值 True。