In [11]:
'''
E代表词语中最后一个字
B代表词的首个字
M代表词中间的字
S代表单字成词
'''
STATES = {'B', 'M', 'E', 'S'}

In [12]:
import pickle
import json

EPS = 0.0001


class HMModel:
    def __init__(self):
        self.trans_mat = {}  # trans_mat[status][status] = int
        self.emit_mat = {}  # emit_mat[status][observe] = int
        self.init_vec = {}  # init_vec[status] = int
        self.state_count = {}  # state_count[status] = int
        self.states = {}
        self.inited = False

    def setup(self):
        for state in self.states:
            # build trans_mat
            self.trans_mat[state] = {}
            for target in self.states:
                self.trans_mat[state][target] = 0.0
            # build emit_mat
            self.emit_mat[state] = {}
            # build init_vec
            self.init_vec[state] = 0
            # build state_count
            self.state_count[state] = 0
        self.inited = True

    def do_train(self, observes, states):
        if not self.inited:
            self.setup()

        for i in range(len(states)):
            if i == 0:
                self.init_vec[states[0]] += 1
                self.state_count[states[0]] += 1
            else:
                self.trans_mat[states[i - 1]][states[i]] += 1
                self.state_count[states[i]] += 1
                if observes[i] not in self.emit_mat[states[i]]:
                    self.emit_mat[states[i]][observes[i]] = 1
                else:
                    self.emit_mat[states[i]][observes[i]] += 1

    def get_prob(self):
        init_vec = {}
        trans_mat = {}
        emit_mat = {}
        default = max(self.state_count.values())  # avoid ZeroDivisionError
        # convert init_vec to prob
        for key in self.init_vec:
            if self.state_count[key] != 0:
                init_vec[key] = float(self.init_vec[key]) / \
                                self.state_count[key]
            else:
                init_vec[key] = float(self.init_vec[key]) / default
        # convert trans_mat to prob
        for key1 in self.trans_mat:
            trans_mat[key1] = {}
            for key2 in self.trans_mat[key1]:
                if self.state_count[key1] != 0:
                    trans_mat[key1][key2] = float(
                        self.trans_mat[key1][key2]) / self.state_count[key1]
                else:
                    trans_mat[key1][key2] = float(
                        self.trans_mat[key1][key2]) / default
        # convert emit_mat to prob
        for key1 in self.emit_mat:
            emit_mat[key1] = {}
            for key2 in self.emit_mat[key1]:
                if self.state_count[key1] != 0:
                    emit_mat[key1][key2] = float(
                        self.emit_mat[key1][key2]) / self.state_count[key1]
                else:
                    emit_mat[key1][key2] = float(
                        self.emit_mat[key1][key2]) / default
        return init_vec, trans_mat, emit_mat

    def do_predict(self, sequence):
        tab = [{}]
        path = {}
        init_vec, trans_mat, emit_mat = self.get_prob()

        # init
        for state in self.states:
            tab[0][state] = init_vec[state] * \
                            emit_mat[state].get(sequence[0], EPS)
            path[state] = [state]

        # build dynamic search table
        for t in range(1, len(sequence)):
            tab.append({})
            new_path = {}
            for state1 in self.states:
                items = []
                for state2 in self.states:
                    if tab[t - 1][state2] == 0:
                        continue
                    prob = tab[t - 1][state2] * trans_mat[state2].get(
                        state1, EPS) * emit_mat[state1].get(sequence[t], EPS)
                    items.append((prob, state2))
                best = max(items)  # best: (prob, state)
                tab[t][state1] = best[0]
                new_path[state1] = path[best[1]] + [state1]
            path = new_path

        # search best path
        prob, state = max([(tab[len(sequence) - 1][state], state)
                           for state in self.states])
        return path[state]

In [13]:
def get_tags(src):
    tags = []
    if len(src) == 1:
        tags = ['S']
    elif len(src) == 2:
        tags = ['B', 'E']
    else:
        m_num = len(src) - 2
        tags.append('B')
        tags.extend(['M'] * m_num)
        tags.append('S')
    return tags


def cut_sent(src, tags):
    word_list = []
    start = -1
    started = False

    if len(tags) != len(src):
        return None

    if tags[-1] not in {'S', 'E'}:
        if tags[-2] in {'S', 'E'}:
            tags[-1] = 'S'  # for tags: r".*(S|E)(B|M)"
        else:
            tags[-1] = 'E'  # for tags: r".*(B|M)(B|M)"

    for i in range(len(tags)):
        if tags[i] == 'S':
            if started:
                started = False
                word_list.append(src[start:i])  # for tags: r"BM*S"
            word_list.append(src[i])
        elif tags[i] == 'B':
            if started:
                word_list.append(src[start:i])  # for tags: r"BM*B"
            start = i
            started = True
        elif tags[i] == 'E':
            started = False
            word = src[start:i + 1]
            word_list.append(word)
        elif tags[i] == 'M':
            continue
    return word_list

In [14]:
class HMMSegger(HMModel):

    def __init__(self, *args, **kwargs):
        super(HMMSegger, self).__init__(*args, **kwargs)
        self.states = STATES
        self.data = None

    def load_data(self, filename):
        self.data = open(filename, 'r', encoding="utf-8")

    def train(self):
        if not self.inited:
            self.setup()

        # train
        for line in self.data:
            # pre processing
            line = line.strip()
            if not line:
                continue

            # get observes
            observes = []
            for i in range(len(line)):
                if line[i] == " ":
                    continue
                observes.append(line[i])

            # get states
            words = line.split(" ")  # spilt word by whitespace
            states = []
            for word in words:
                if '/' in word:
                    word = word.split('/')[0]
                states.extend(get_tags(word))

            # resume train
            self.do_train(observes, states)

    def cut(self, sentence):
        try:
            tags = self.do_predict(sentence)
            return cut_sent(sentence, tags)
        except:
            return sentence

In [15]:
segger = HMMSegger()
segger.load_data("data/seg_data.txt")
segger.train()
segger.cut("长春市长春节讲话")

['长春', '市长', '春节', '讲话']

In [16]:
cases = [
    "我来到北京清华大学",
    "长春市长春节讲话",
    "我们去野生动物园玩",
    "我只是做了一些微小的工作",
]
for case in cases:
    result = segger.cut(case)
    print(result)

['我', '来到', '北京', '清华', '大学']
['长春', '市长', '春节', '讲话']
['我们', '去', '野生动物', '园', '玩']
['我只', '是做', '了一', '些微', '小的', '工作']
