| [02_lexical_analysis/02_从头实现中文分词.ipynb](https://github.com/shibing624/nlp-tutorial/blob/main/02_lexical_analysis/02_%E4%BB%8E%E5%A4%B4%E5%AE%9E%E7%8E%B0%E4%B8%AD%E6%96%87%E5%88%86%E8%AF%8D.ipynb)  | 从头实现中文分词模型  |[Open In Colab](https://colab.research.google.com/github/shibing624/nlp-tutorial/blob/main/02_lexical_analysis/02_从头实现中文分词.ipynb) |

# 从头实现中文分词模型

在中文里面，词是最小的能够独立活动的有意义的语言成分，分词和词性标注都是中文自然语言处理的基础工作，能够后续如句法分析带来很大的便利性。

在中文需要分词，英文中单词以空格作为自然分界，虽然也有短语划分的问题，但中文词没有一个形式上的分界，相对而言难度大了许多，分词作为中文自然语言处理的基础工作，质量的好坏对后面的工作影响很大。

## 分词的难点
1. 分词歧义
南京市长江大桥

可以切分为：南京市/长江大桥

可以切分为：南京/市长/江大桥

可以看到，第二个切词结果是歧义的。

2. 未登录词问题
未登录词指的是在已有的词典中，或者训练语料里面没有出现过的词，分为实体名词，专有名词及新词。

## 分词的方法

1. 基于字典、词库匹配的分词机械分词算法，将待分的字符串与一个充分大的机器词典中的词条进行匹配。实际应用中，将机械分词作为初分手段，再利用其他方法提高准确率。

2. 基于词频统计的分词统计分词，是一种全切分方法。切分出待分语句中所有的词，基于训练语料词表中每个词出现的频率，运用统计模型和决策算法决定最优的切分结果。

3. 利用深度学习方法，通过对上下文内容所提供信息的分析对词进行定界。这类方法试图让机器具有人类的理解能力，需要使用大量的语言知识和信息，利用Bert预训练模型finetune分词任务实现。

这里用第二种分词方法，即利用隐马尔可夫模型（HMM）来进行中文分词。

### HMM方法介绍
中文分词问题可以表示为一个序列标注问题，定义两个类别：

- E代表词语中最后一个字
- B代表词的首个字
- M代表词中间的字
- S代表单字成词

对于分词结果："我/只是/做了/一些/微小/的/工作"，可以标注为"我E只B是E做B了E一B些E微B小E的S工B作E".

将标记序列"EBEBEBEBESBE"作为状态序列， 原始文本"我只是做了一些微小的工作"为观测序列. 分词过程即变成了求使给定观测序列出现概率最大的状态序列， 即解码问题。

这里需要说明一下，所谓出现概率最大是指在自然语言中出现概率最大。

## 程序实现

In [1]:
STATES = {'B', 'M', 'E', 'S'}

In [2]:
import pickle
import json

EPS = 0.0001


class HMModel:
    def __init__(self):
        self.trans_mat = {}  # trans_mat[status][status] = int
        self.emit_mat = {}  # emit_mat[status][observe] = int
        self.init_vec = {}  # init_vec[status] = int
        self.state_count = {}  # state_count[status] = int
        self.states = {}
        self.inited = False

    def setup(self):
        for state in self.states:
            # build trans_mat
            self.trans_mat[state] = {}
            for target in self.states:
                self.trans_mat[state][target] = 0.0
            # build emit_mat
            self.emit_mat[state] = {}
            # build init_vec
            self.init_vec[state] = 0
            # build state_count
            self.state_count[state] = 0
        self.inited = True

    def save(self, filename="hmm.json", code='json'):
        fw = open(filename, 'w', encoding='utf-8')
        data = {
            "trans_mat": self.trans_mat,
            "emit_mat": self.emit_mat,
            "init_vec": self.init_vec,
            "state_count": self.state_count
        }
        if code == "json":
            txt = json.dumps(data)
            txt = txt.encode('utf-8').decode('unicode-escape')
            fw.write(txt)
        elif code == "pickle":
            pickle.dump(data, fw)
        fw.close()

    def load(self, filename="hmm.json", code="json"):
        fr = open(filename, 'r', encoding='utf-8')
        if code == "json":
            txt = fr.read()
            model = json.loads(txt)
        elif code == "pickle":
            model = pickle.load(fr)
        self.trans_mat = model["trans_mat"]
        self.emit_mat = model["emit_mat"]
        self.init_vec = model["init_vec"]
        self.state_count = model["state_count"]
        self.inited = True
        fr.close()

    def do_train(self, observes, states):
        if not self.inited:
            self.setup()

        for i in range(len(states)):
            if i == 0:
                self.init_vec[states[0]] += 1
                self.state_count[states[0]] += 1
            else:
                self.trans_mat[states[i - 1]][states[i]] += 1
                self.state_count[states[i]] += 1
                if observes[i] not in self.emit_mat[states[i]]:
                    self.emit_mat[states[i]][observes[i]] = 1
                else:
                    self.emit_mat[states[i]][observes[i]] += 1

    def get_prob(self):
        init_vec = {}
        trans_mat = {}
        emit_mat = {}
        default = max(self.state_count.values())  # avoid ZeroDivisionError
        # convert init_vec to prob
        for key in self.init_vec:
            if self.state_count[key] != 0:
                init_vec[key] = float(self.init_vec[key]) / \
                    self.state_count[key]
            else:
                init_vec[key] = float(self.init_vec[key]) / default
        # convert trans_mat to prob
        for key1 in self.trans_mat:
            trans_mat[key1] = {}
            for key2 in self.trans_mat[key1]:
                if self.state_count[key1] != 0:
                    trans_mat[key1][key2] = float(
                        self.trans_mat[key1][key2]) / self.state_count[key1]
                else:
                    trans_mat[key1][key2] = float(
                        self.trans_mat[key1][key2]) / default
        # convert emit_mat to prob
        for key1 in self.emit_mat:
            emit_mat[key1] = {}
            for key2 in self.emit_mat[key1]:
                if self.state_count[key1] != 0:
                    emit_mat[key1][key2] = float(
                        self.emit_mat[key1][key2]) / self.state_count[key1]
                else:
                    emit_mat[key1][key2] = float(
                        self.emit_mat[key1][key2]) / default
        return init_vec, trans_mat, emit_mat

    def do_predict(self, sequence):
        tab = [{}]
        path = {}
        init_vec, trans_mat, emit_mat = self.get_prob()

        # init
        for state in self.states:
            tab[0][state] = init_vec[state] * \
                emit_mat[state].get(sequence[0], EPS)
            path[state] = [state]

        # build dynamic search table
        for t in range(1, len(sequence)):
            tab.append({})
            new_path = {}
            for state1 in self.states:
                items = []
                for state2 in self.states:
                    if tab[t - 1][state2] == 0:
                        continue
                    prob = tab[t - 1][state2] * trans_mat[state2].get(
                        state1, EPS) * emit_mat[state1].get(sequence[t], EPS)
                    items.append((prob, state2))
                best = max(items)  # best: (prob, state)
                tab[t][state1] = best[0]
                new_path[state1] = path[best[1]] + [state1]
            path = new_path

        # search best path
        prob, state = max([(tab[len(sequence) - 1][state], state)
                          for state in self.states])
        return path[state]

In [3]:
def get_tags(src):
    tags = []
    if len(src) == 1:
        tags = ['S']
    elif len(src) == 2:
        tags = ['B', 'E']
    else:
        m_num = len(src) - 2
        tags.append('B')
        tags.extend(['M'] * m_num)
        tags.append('S')
    return tags


def cut_sent(src, tags):
    word_list = []
    start = -1
    started = False

    if len(tags) != len(src):
        return None

    if tags[-1] not in {'S', 'E'}:
        if tags[-2] in {'S', 'E'}:
            tags[-1] = 'S'  # for tags: r".*(S|E)(B|M)"
        else:
            tags[-1] = 'E'  # for tags: r".*(B|M)(B|M)"

    for i in range(len(tags)):
        if tags[i] == 'S':
            if started:
                started = False
                word_list.append(src[start:i])  # for tags: r"BM*S"
            word_list.append(src[i])
        elif tags[i] == 'B':
            if started:
                word_list.append(src[start:i])  # for tags: r"BM*B"
            start = i
            started = True
        elif tags[i] == 'E':
            started = False
            word = src[start:i+1]
            word_list.append(word)
        elif tags[i] == 'M':
            continue
    return word_list



In [4]:
class HMMSegger(HMModel):

    def __init__(self, *args, **kwargs):
        super(HMMSegger, self).__init__(*args, **kwargs)
        self.states = STATES
        self.data = None

    def load_data(self, filename):
        self.data = open(filename, 'r', encoding="utf-8")

    def train(self):
        if not self.inited:
            self.setup()

        # train
        for line in self.data:
            # pre processing
            line = line.strip()
            if not line:
                continue

            # get observes
            observes = []
            for i in range(len(line)):
                if line[i] == " ":
                    continue
                observes.append(line[i])

            # get states
            words = line.split(" ")  # spilt word by whitespace
            states = []
            for word in words:
                if '/' in word:
                    word = word.split('/')[0]
                states.extend(get_tags(word))

            # resume train
            self.do_train(observes, states)

    def cut(self, sentence):
        try:
            tags = self.do_predict(sentence)
            return cut_sent(sentence, tags)
        except:
            return sentence

In [5]:
segger = HMMSegger()
segger.load_data("data/seg_data.txt")
segger.train()
segger.save()
segger.cut("长春市长春节讲话")

['长春', '市长', '春节', '讲话']

更多例子来测试下：

In [6]:
cases = [
    "我来到北京清华大学",
    "长春市长春节讲话",
    "我们去野生动物园玩",
    "我只是做了一些微小的工作",
]
for case in cases:
    result = segger.cut(case)
    print(result)

['我', '来到', '北京', '清华', '大学']
['长春', '市长', '春节', '讲话']
['我们', '去', '野生动物', '园', '玩']
['我只', '是做', '了一', '些微', '小的', '工作']


重新加载模型，并预测：

In [7]:
m = HMMSegger()
m.load()
segger.cut("长春市长春节讲话")

['长春', '市长', '春节', '讲话']

#### reference

[自制基于HMM的python中文分词器](https://www.cnblogs.com/finley/p/6358097.html)


本节完。