# Chinese Word Segmentation using HMM
By: [Richard Cornelius Suwandi](https://richardcsuwandi.github.io)

In this project, we are going to perform word segmentation using the Hidden Markov Model (HMM) on the given dataset. The dataset contains 5500 sentences for training and 1492 sentences for testing, all of them written in Chinese.

In [2]:
# Import the necessary libraries
import os
import numpy as np

import re
# Use regular expression to detect Chinese characters
regex = re.compile("[^\u4e00-\u9fff]")

We first load the training and testing data, as well as the "gold standard" (ground truth):

In [3]:
# Load the data and labels
train_data = open(os.path.join("data", "train.txt"), encoding="utf-8-sig").read().splitlines()
test_data = open(os.path.join("data", "test.txt"), encoding="utf-8-sig").read().splitlines()
test_gold = open(os.path.join("data", "test_gold.txt"), encoding="utf-8-sig").read().splitlines()

The training data is stored in a list, where each element of the list corresponds to a sentence. Let's take a look at some training examples:

In [5]:
# Print some training examples
train_data[:5]

['立會 選情 告一段落 民主 進程 還 看 明天',
 '立法會 選舉 出現 了 戲劇性 的 結果 ， 儘管 投票率 創下 新高 ， 而 過去 經驗 顯示 高 投票率 對 民主派 較 有利 ， 但 由於 名單 協調 不當 及 配票 策略 失誤 ， 加上 醜聞 影響 選情 ， 民主黨 的 議席 比 上 一 屆 減少 ， 由 第 一 大 黨 跌 至 第 三 ；',
 '而 泛民主派 在 30 席 普選 中 亦 只能 取得 18 席 ， 比 選前 預期 的 20 席 少 ；',
 '但 在 功能 組別 選舉 卻 有 意外 收穫 ， 除 保住 原有 的 5 個 議席 ， 還 搶佔 了 醫學 和 會計 兩 個 專業 界別 ， 令 議席 總數 達到 25 席 ， 比 上 一 屆 多 了 3 席 。',
 '更 值得 注意 的 是 ， 泛民主派 候選人 在 普選 中 合共 取得 110萬 張 選票 ， 佔 178萬 選票 總數 的 62 ％ ， 顯示 多數 市民 認同 早日 實現 全面 普選 的 民主 訴求 ， 這 一 點 應 為 政府 及 各 黨派 人士 所 尊重 。']

As we can see, the sentences in the training data have already been segmented according to their labels. In particular, there are 4 labels (hidden states) for Chinese word segmentation:
1. 'B': Beginning character 
2. 'I': Intermediate (internal) character
3. 'E': Ending character
4. 'S': Single character

For example, the sentence '立會 選情 告一段落 民主 進程 還 看 明天' would correspond to the following labels: 'BE BE BIIE BE BE BE S S BE'. Hence, we first need to preprocess our data and create the hidden states for each sentence. We will define a helper function to preprocess both the training and testing data:

In [16]:
def preprocess(data):
    hidden_state_lst = []
    data_lst = []
    word_count = {}

    for sentence in data:
        hidden_state = ""
        words = []
        for token in sentence.split():
            token = regex.sub("", token)
            # Label the token according to its length
            if len(token) == 1:
                hidden_state += "S"
            elif len(token) == 2:
                hidden_state += "BE"
            elif len(token) > 2:
                hidden_state += "B" + (len(token) - 2)*"I" + "E"

        # Remove the spaces between characters
        sentence = sentence.replace(" ", "")
        for word in sentence:
            # Verify if the word is a Chinese character
            word = regex.sub("", word)
            # Update the word count dictionary
            if word in word_count:
                word_count[word] += 1
            else:
                word_count[word] = 1
            words.append(word)
        if len(words) > 0:
            data_lst.append(words)
            states = [s for s in hidden_state]
            hidden_state_lst.append(states)

    return data_lst, hidden_state_lst, word_count

train_data, train_hs, train_wc = preprocess(train_data)
test_data, _, test_wc = preprocess(test_data)
_, test_hs, _ = preprocess(test_gold)

Notice that we also created a dictionary to store the word count, this will be handy for computing the probabilities. In HMM, we need construct the start/initial state matrix, the transition matrix, and the emission matrix. The start probabilities for the initial state matrix can be simply calculated as:
\begin{align}
    P(\text{State } k) = \frac{\text{The frequency of state } k}{\text{The total number of states}}
\end{align}

In [19]:
# Initialize a dictionary to store the state count
state_count = {}
states = ["B", "I", "E", "S"]
for state in states: 
    state_count[state] = 0

for i in range(len(train_hs)):
    length = len(train_hs[i])
    if length > 0:
        for j in range(length - 1):
            # Update the state count
            state_count[train_hs[i][j]] += 1
        state_count[train_hs[i][length - 1]] += 1

total_states = sum(state_count.values())  # Get the total number of states in the sample
start_prob = {}

for state in states:
    # Normalize the state count with the total number of states
    start_prob[state] = state_count[state] / total_states
    
print(start_prob)

{'B': 0.3575882854480201, 'I': 0.0595393888921936, 'E': 0.3575882854480201, 'S': 0.2252840402117661}


Next, we can obtain the transition probabilities by calculating the frequency of the state transitions
on the training data:

In [20]:
# Initialize the transition probabilities
trans_prob = {}
for state in states:
    trans_prob[state] = {}
    for state_i in states:
        trans_prob[state][state_i] = 0

for i in range(len(train_hs)):
    length = len(train_hs[i])
    if length > 0:
        for j in range(length - 1):
            # Update the transition probabilities
            s_from = train_hs[i][j]
            s_to = train_hs[i][j+1]
            trans_prob[s_from][s_to] += 1

for i in states:
    for j in states:
        # Normalize the frequency of the transition with the state counts
        trans_prob[i][j] /= float(state_count[i])

print(trans_prob)

{'B': {'B': 0.0, 'I': 0.13515948819607138, 'E': 0.8648405118039286, 'S': 0.0}, 'I': {'B': 0.0, 'I': 0.18824410956623094, 'E': 0.8117558904337691, 'S': 0.0}, 'E': {'B': 0.5855109028653811, 'I': 0.0, 'E': 0.0, 'S': 0.3511928691240279}, 'S': {'B': 0.5720509604594363, 'I': 0.0, 'E': 0.0, 'S': 0.40755165357449336}}


Finally, the emission probabilities can be calculated as:
\begin{align}
    P(\text{Observation | State } k) = \frac{\text{The frequency of the observation given state } k}{\text{The total number of state } k}
\end{align}

In [24]:
# Initialize the emission probabilities
emission_prob = {}

# Get all the vocabulary in the corpus (train and test sets)
vocab = list(set(train_wc.keys()) | set(test_wc.keys()))

# Initialize the emission probabilities
for state in states:
    emission_prob[state] = {}
    for word in vocab:
        emission_prob[state][word] = 1

for i in range(len(train_hs)):
    length = len(train_hs[i])
    for j in range(length):
        # Update the emission probabilities
        obs = train_data[i][j]
        hidden = train_hs[i][j]
        emission_prob[hidden][obs] += 1

for state in states:
    for word in vocab:
        if emission_prob[state][word] == 0:
            continue
        else:
            # Normalize the emission probabilities
            emission_prob[state][word] /= float(state_count[state])

After calculating the start, transition, and emission probabilities, we can utilize the Viterbi algorithm to label the states in the testing data. Given the sequence of observations, the Viterbi algorithm obtains the maximum a posteriori probability estimate of the most likely sequence of hidden states. 

Initially, the Viterbi algorithm will calculate the probabilities of the initial state which is the product of the start probabilities and the emission probabilities given the first observation. For each observation step, the Viterbi algorithm will calculate these probabilities until the last observation. Finally, it will perform backtrack to get the most probable state at each step. This is also known as the Viterbi path, which will be the output of the algorithm.

In [25]:
def viterbi_decoding(obs):
    obs = [i for i in obs if i]
    if len(obs) > 0:
        # Initialize a list of dictionary to store the probabilites
        V = [{}]
        for st in states:
            # Append the initial probabilites
            V[0][st] = {"prob": start_prob[st] * emission_prob[st][obs[0]], "prev": None}
        for t in range(1, len(obs)):
            # Append a dictionary to store the probabilities at time/step t
            V.append({})
            for st in states:
                max_tr_prob = V[t - 1][states[0]]["prob"] * trans_prob[states[0]][st]
                prev_st_selected = states[0]
                for prev_st in states[1:]:
                    # Calculate the probabilities of each state
                    tr_prob = V[t - 1][prev_st]["prob"] * trans_prob[prev_st][st]
                    if tr_prob > max_tr_prob:
                        max_tr_prob = tr_prob
                        prev_st_selected = prev_st

                # Get the max probability at time/step t
                max_prob = max_tr_prob * emission_prob[st][obs[t]]
                V[t][st] = {"prob": max_prob, "prev": prev_st_selected}

        path = []
        max_prob = -float("inf")
        best_st = None
        # Get most probable state and its backtrack
        for st, data in V[-1].items():
            if data["prob"] > max_prob:
                max_prob = data["prob"]
                best_st = st
        path.append(best_st)
        previous = best_st

        # Follow the backtrack till the first observation
        for t in range(len(V) - 2, -1, -1):
            path.insert(0, V[t + 1][previous]["prev"])
            previous = V[t + 1][previous]["prev"]

        # Return the path
        return path
    else:
        return []

We will use the above Viterbi algorithm to label the sentences (sequences) in the testing data:

In [26]:
test_pred = []
for obs in test_data:
    # Label the sequence using Viterbi algorithm
    test_pred.append(viterbi_decoding(obs))

After obtaining the labels for the test data, we can compute the $F_1$ score to evaluate the model's performance:

In [29]:
# Create mapping for each label
map_dict = {
    "B": 0,
    "I": 1,
    "E": 2,
    "S": 3
}

# Flatten the lists into a 1D list
true = list(np.concatenate(test_hs).flat)
pred = list(np.concatenate(test_pred).flat)

k = len(np.unique(true)) # Number of classes
result = np.zeros((k, k)) # Initialize the confusion matrix

for i in range(len(true)):
    # Calculate the confusion matrix
    result[map_dict[true[i]]][map_dict[pred[i]]] += 1

# Calculate precision for each label
precision_B = result[0][0] / (result[0][0] + result[1][0] + result[2][0] + result[3][0])
precision_I = result[1][1] / (result[0][1] + result[1][1] + result[2][1] + result[3][1])
precision_E = result[2][2] / (result[0][2] + result[1][2] + result[2][2] + result[3][2])
precision_S = result[3][3] / (result[0][3] + result[1][3] + result[2][3] + result[3][3])

# Calculate recall for each label
recall_B = result[0][0] / (result[0][0] + result[0][1] + result[0][2] + result[0][3])
recall_I = result[1][1] / (result[1][0] + result[1][1] + result[1][2] + result[1][3])
recall_E = result[2][2] / (result[2][0] + result[2][1] + result[2][2] + result[2][3])
recall_S = result[3][3] / (result[3][0] + result[3][1] + result[3][2] + result[3][3])

# Calculate F1 for each label
f1_B = 2 *(precision_B * recall_B)/(precision_B + recall_B)
f1_I = 2 *(precision_I * recall_I)/(precision_I + recall_I)
f1_E = 2 *(precision_E * recall_E)/(precision_E + recall_E)
f1_S = 2 *(precision_S * recall_S)/(precision_S + recall_S)

# Calculate Macro-F1
macro_f1 = (f1_B + f1_I + f1_E + f1_S) / k

print(f"F1 score for state B: {f1_B}")
print(f"F1 score for state I: {f1_I}")
print(f"F1 score for state E: {f1_E}")
print(f"F1 score for state S: {f1_S}")
print(f"Macro-F1 score: {macro_f1}")

F1 score for state B: 0.5777712683600452
F1 score for state I: 0.15008726003490402
F1 score for state E: 0.5862872146698852
F1 score for state S: 0.3726343144455116
Macro-F1 score: 0.4216950143775865


Overall, we can observe that most of the misclassification occurs for state 'I', which is the intermediate state. This might be because the intial probability and the transition probability of state 'I' are significantly lower than the other states, i.e., the state 'I' is quite rare.

Finally, we can apply the word segmentation according to the predicted states for the sequence of testinh data:

In [31]:
def word_segmentation(test_data, pred):
    segmented = ""
    i = 0 # Counter for the test data index
    j = 0 # Counter for the prediction index
    while i < len(test_data):
        segmented += test_data[i] 
        # Check for Chinese character
        if test_data[i] > u"\u4e00" and test_data[i] < u"\u9fff":
            # Add space after the character if label
            # is either "E" or "S"
            if pred[j] in ["E", "S"]:
                segmented += " "
            j += 1
        i += 1
    return segmented

segmented = []
for i in range(len(test_data)):
    # Convert BIES to word segmentation
    segmented.append(word_segmentation(test_data[i], test_pred[i]))

In [45]:
# Look at random segmented sentence
i = np.random.randint(0, len(test_gold))
print(test_label[i])
print(test_pred[i])
print(segmented[i])

這 是 記者 近日 從 浙江省 政府 了解 到 的 。
['S', 'S', 'B', 'E', 'B', 'E', 'B', 'I', 'I', 'E', 'B', 'E', 'S', 'B', 'E', 'B']
這 是 記者 近日 從浙江省 政府 了 解到 的


In [46]:
# Save the segmented sentences to my_prediction.txt file
with open("my_prediction.txt", "w", encoding="utf-8") as fout:
    for item in segmented:
        fout.write("%s\n" % item)