# 作业一：实现HMM中文分词和BPE英文分词
姓名：

学号：

## 任务一：HMM模型用于中文分词

任务一评分标准：
1. 共有8处TODO需要填写，每个TODO计1-2分，共9分，预计代码量30行。
2. 允许自行修改、编写代码完成。对于该情况，请补充注释以便于评分，否则结果不正确将导致较多的扣分。
3. 用于说明实验的文字和总结不额外计分，但不写会导致扣分。

注：本任务仅在短句子上进行效果测试，因此对概率的计算可直接进行连乘。在实践中，常先对概率取对数，将连乘变为加法来计算，以避免出现数值溢出的情况。

> 你可以像这样在Markdown单元格中使用引用符号`>`，  
以及在代码单元格中使用注释来说明你的实验。  

---

In [1]:
import pickle
import numpy as np

导入HMM参数，初始化所需的起始概率矩阵、转移概率矩阵、发射概率矩阵。

In [2]:
with open("hmm_parameters.pkl", "rb") as f:
    hmm_parameters = pickle.load(f)

# 非断字（B）为第0行，断字（I）为第1行
# 发射概率矩阵中，词典大小为65536，以汉字的Unicode码点（一个整数值）作为行索引
start_probability = hmm_parameters["start_prob"]  # shape(2,)
trans_matrix = hmm_parameters["trans_mat"]  # shape(2, 2)
emission_matrix = hmm_parameters["emission_mat"]  # shape(2, 65536)

定义待处理的句子：

In [3]:
# TODO: 将your_name中的xxx替换为你的姓名【1分】
your_name = "萧明远"
input_sentence = f"{your_name}是一名优秀的学生"

实现Viterbi算法，并以此进行中文分词。

In [4]:
def viterbi(sent_orig: str, start_prob: np.ndarray, trans_mat: np.ndarray, emission_mat: np.ndarray) -> str:
    """
    Viterbi算法进行中文分词。

    Args:
        sent_orig: str - 输入的句子
        start_prob: numpy.ndarray - 起始概率矩阵
        trans_mat: numpy.ndarray - 转移概率矩阵
        emission_mat: numpy.ndarray - 发射概率矩阵

    Return:
        str - 中文分词的结果
    """

    #  将汉字转为数字表示
    sent_ord = [ord(x) for x in sent_orig]

    # `dp`用来储存不同位置每种标注（B/I）的最大概率值
    dp = np.zeros((2, len(sent_ord)), dtype=float)

    # `path`用来储存最大概率对应的上步B/I选择
    #  例如 path[1][7] == 1 意味着第8个（从1开始计数）字符标注I对应的最大概率，其前一步的隐状态为1（I）
    #  例如 path[0][5] == 1 意味着第6个字符标注B对应的最大概率，其前一步的隐状态为1（I）
    #  例如 path[1][1] == 0 意味着第2个字符标注I对应的最大概率，其前一步的隐状态为0（B）
    path = np.zeros((2, len(sent_ord)), dtype=int)

    #  TODO: 第一个位置的最大概率值计算【1分】
    dp[0, 0] = start_prob[0] * emission_mat[0, sent_ord[0]]
    dp[1, 0] = start_prob[1] * emission_mat[1, sent_ord[0]]

    #  TODO: 其余位置的最大概率值计算（填充dp和path矩阵）【2分】
    for t in range(1, len(sent_ord)):
        for s in range(2):
            prob0 = dp[0, t-1] * trans_mat[0, s] * emission_mat[s, sent_ord[t]]
            prob1 = dp[1, t-1] * trans_mat[1, s] * emission_mat[s, sent_ord[t]]
            if prob0 > prob1:
                dp[s, t] = prob0
                path[s, t] = 0
            else:
                dp[s, t] = prob1
                path[s, t] = 1

    #  `labels`用来储存每个位置最有可能的隐状态
    labels = [0 for _ in range(len(sent_ord))]

    #  TODO: 计算labels每个位置上的值（填充labels矩阵）【1分】
    labels[-1] = np.argmax(dp[:, -1])
    for t in range(len(sent_ord) - 2, -1, -1):
        labels[t] = path[labels[t + 1], t + 1]

    #  根据lalels生成切分好的字符串
    sent_split = []
    for idx, label in enumerate(labels):
        if label == 1:
            sent_split += [sent_ord[idx], ord("/")]
        else:
            sent_split += [sent_ord[idx]]
    sent_split_str = "".join([chr(x) for x in sent_split])

    return sent_split_str

测试样例

In [5]:
print("Viterbi算法分词结果：", viterbi(input_sentence, start_probability, trans_matrix, emission_matrix))

Viterbi算法分词结果： 萧明/远是/一名/优秀/的/学生/


实现前向算法，计算该句子的概率值。

In [6]:
def compute_prob_by_forward(sent_orig: str, start_prob: np.ndarray, trans_mat: np.ndarray, emission_mat: np.ndarray) -> float:
    """
    前向算法，计算输入中文句子的概率值。

    Args:
        sent_orig: str - 输入的句子
        start_prob: numpy.ndarray - 起始概率矩阵
        trans_mat: numpy.ndarray - 转移概率矩阵
        emission_mat: numpy.ndarray - 发射概率矩阵

    Return:
        float - 概率值
    """

    #  将汉字转为数字表示
    sent_ord = [ord(x) for x in sent_orig]

    # `dp`用来储存不同位置每种隐状态（B/I）下，到该位置为止的句子的概率
    dp = np.zeros((2, len(sent_ord)), dtype=float)

    # TODO: 初始位置概率的计算【1分】
    dp[0, 0] = start_prob[0] * emission_mat[0, sent_ord[0]]
    dp[1, 0] = start_prob[1] * emission_mat[1, sent_ord[0]]

    # TODO: 先计算其余位置的概率（填充dp矩阵），然后返回概率值【1分】
    for t in range(1, len(sent_ord)):
        dp[0, t] = (dp[0, t-1] * trans_mat[0, 0] + dp[1, t-1] * trans_mat[1, 0]) * emission_mat[0, sent_ord[t]]
        dp[1, t] = (dp[0, t-1] * trans_mat[0, 1] + dp[1, t-1] * trans_mat[1, 1]) * emission_mat[1, sent_ord[t]]

    return sum([dp[i][len(sent_ord) - 1] for i in range(2)])

实现后向算法，计算该句子的概率值。

In [7]:
def compute_prob_by_backward(sent_orig: str, start_prob: np.ndarray, trans_mat: np.ndarray, emission_mat: np.ndarray) -> float:
    """
    后向算法，计算输入中文句子的概率值。

    Args:
        sent_orig: str - 输入的句子
        start_prob: numpy.ndarray - 起始概率矩阵
        trans_mat: numpy.ndarray - 转移概率矩阵
        emission_mat: numpy.ndarray - 发射概率矩阵

    Return:
        float - 概率值
    """

    #  将汉字转为数字表示
    sent_ord = [ord(x) for x in sent_orig]

    # `dp`用来储存不同位置每种隐状态（B/I）下，从结尾到该位置为止的句子的概率
    dp = np.zeros((2, len(sent_ord)), dtype=float)

    # TODO: 终末位置概率的初始化【1分】
    dp[0, -1] = 1.0
    dp[1, -1] = 1.0

    # TODO: 先计算其余位置的概率（填充dp矩阵），然后返回概率值【1分】
    for t in range(len(sent_ord) - 2, -1, -1):
        dp[0, t] = dp[0, t + 1] * trans_mat[0, 0] * emission_mat[0, sent_ord[t + 1]] + \
                   dp[1, t + 1] * trans_mat[0, 1] * emission_mat[1, sent_ord[t + 1]]
        dp[1, t] = dp[0, t + 1] * trans_mat[1, 0] * emission_mat[0, sent_ord[t + 1]] + \
                   dp[1, t + 1] * trans_mat[1, 1] * emission_mat[1, sent_ord[t + 1]]

    return sum([dp[i][0] * start_prob[i] * emission_mat[i][sent_ord[0]] for i in range(2)])

测试样例

In [8]:
print("前向算法概率：", compute_prob_by_forward(input_sentence, start_probability, trans_matrix, emission_matrix))
print("后向算法概率：", compute_prob_by_backward(input_sentence, start_probability, trans_matrix, emission_matrix))

前向算法概率： 9.014396408449741e-34
后向算法概率： 9.014396408449741e-34


### 实验总结
> 
在本次实验中，我们分别使用了前向算法（Forward Algorithm）和后向算法（Backward Algorithm）来计算中文句子的概率，并使用了Viterbi算法（Viterbi Algorithm）进行分词。\
其中，前向算法通过从句子的开头开始逐步计算每个位置的概率，最终累积到整个句子的概率。\
后向算法则从句子的结尾开始逐步计算每个位置的概率，最终累积到整个句子的概率。\
Viterbi算法则通过动态规划的思想，找到最优路径，从而实现分词。\
本次实验深入理解了HMM在中文分词中的应用。前向和后向算法为HMM提供了概率计算工具，而Viterbi算法在解码时提供了最优路径的确定。通过这些算法，我们能够准确地获取句子的分词概率和最优分词路径。


## 任务二：BPE算法用于英文分词

任务二评分标准：

1. 共有6处TODO需要填写，每个TODO计1-2分，共9分，预计代码量50行。
2. 允许自行修改、编写代码完成。对于该情况，请补充注释以便于评分，否则结果不正确将导致较多的扣分。
3. 用于说明实验的文字和总结不额外计分，但不写会导致扣分。

> 你可以像这样在Markdown单元格中使用引用符号`>`，  
以及在代码单元格中使用注释来说明你的实验。  

---

In [9]:
import re
from typing import List, Tuple, Dict

构建空格分词器，将语料中的句子以空格切分成单词，然后将单词拆分成字母加`</w>`的形式。例如`apple`将变为`a p p l e </w>`。

In [10]:
_splitor_pattern = re.compile(r"[^a-zA-Z']+|(?=')")
_digit_pattern = re.compile(r"\d+")


def white_space_tokenize(corpus: List[str]) -> List[List[str]]:
    """
    先正则化（字母转小写、数字转为N、除去标点符号），然后以空格分词语料中的句子，例如：  
    输入 `corpus = ["I am happy.", "I have 10 apples!"]`，  
    得到 `[["i", "am", "happy"], ["i", "have", "N", "apples"]]`

    Args:
        corpus: List[str] - 待处理的语料

    Return:
        List[List[str]] - 二维List，内部的List由每个句子的单词str构成
    """

    tokeneds = [list(filter(lambda token: len(token) > 0, _splitor_pattern.split(_digit_pattern.sub("N", sentence.lower())))) for sentence in corpus]

    return tokeneds

编写相应函数构建BPE算法需要用到的初始状态词典。

In [11]:
def build_bpe_vocab(corpus: List[str]) -> Dict[str, int]:
    """
    将语料进行空格分词处理后，将单词每个字母以空格隔开、结尾加上</w>后，构建带频数的字典。
    
    例如：
    输入 `corpus = ["I am happy.", "I have 10 apples!"]`，
    返回结果为:
    {
        'i </w>': 2,
        'a m </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        '1 0 </w>': 1,
        'a p p l e s </w>': 1
    }

    Args:
        corpus: List[str] - 待处理的语料

    Return:
        Dict[str, int] - "单词分词状态->频数"的词典
    """

    bpe_vocab = {}

    for sentence in corpus:
        # 去除标点符号并转换为小写
        sentence = re.sub(r"[^\w\s]", "", sentence).lower()
        # 分词
        words = sentence.split()

        for word in words:
            # 将每个单词的字母分开并在末尾加上 </w>
            bpe_word = ' '.join(list(word)) + ' </w>'
            # 构建频数词典
            if bpe_word in bpe_vocab:
                bpe_vocab[bpe_word] += 1
            else:
                bpe_vocab[bpe_word] = 1

    return bpe_vocab

测试样例

In [12]:
corpus = ["I am happy.", "I have 10 apples!"]
print(build_bpe_vocab(corpus))

{'i </w>': 2, 'a m </w>': 1, 'h a p p y </w>': 1, 'h a v e </w>': 1, '1 0 </w>': 1, 'a p p l e s </w>': 1}


编写所需的其他函数：

In [13]:
def get_bigram_freq(bpe_vocab: Dict[str, int]) -> Dict[Tuple[str, str], int]:
    """
    统计"单词分词状态->频数"的词典中，各bigram的频次（假设该词典中，各个unigram以空格间隔），例如：  
    输入 
    ```python
    bpe_vocab = {
        'i </w>': 2,
        'a m </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        'N </w>': 1,
        'a p p l e s </w>': 1
    }
    ```
    得到
    ```python
    {
        ('i', '</w>'): 2,
        ('a', 'm'): 1,
        ('m', '</w>'): 1,
        ('h', 'a'): 2,
        ('a', 'p'): 2,
        ('p', 'p'): 2,
        ('p', 'y'): 1,
        ('y', '</w>'): 1,
        ('a', 'v'): 1,
        ('v', 'e'): 1,
        ('e', '</w>'): 1,
        ('N', '</w>'): 1,
        ('p', 'l'): 1,
        ('l', 'e'): 1,
        ('e', 's'): 1,
        ('s', '</w>'): 1
    }
    ```

    Args:
        bpe_vocab: Dict[str, int] - "单词分词状态->频数"的词典

    Return:
        Dict[Tuple[str, str], int] - "bigram->频数"的词典
    """

    bigram_freq = dict()

    # TODO: 完成函数体【1分】
    for word, freq in bpe_vocab.items():
        # 将单词按照空格分隔为各个 unigram
        tokens = word.split()
        
        # 统计相邻 unigram 组成的 bigram
        for i in range(len(tokens) - 1):
            bigram = (tokens[i], tokens[i + 1])
            if bigram in bigram_freq:
                bigram_freq[bigram] += freq
            else:
                bigram_freq[bigram] = freq

    return bigram_freq

测试样例

In [14]:
bpe_vocab = {
        'i </w>': 2,
        'a m </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        'N </w>': 1,
        'a p p l e s </w>': 1
    }
print(get_bigram_freq(bpe_vocab))

{('i', '</w>'): 2, ('a', 'm'): 1, ('m', '</w>'): 1, ('h', 'a'): 2, ('a', 'p'): 2, ('p', 'p'): 2, ('p', 'y'): 1, ('y', '</w>'): 1, ('a', 'v'): 1, ('v', 'e'): 1, ('e', '</w>'): 1, ('N', '</w>'): 1, ('p', 'l'): 1, ('l', 'e'): 1, ('e', 's'): 1, ('s', '</w>'): 1}


In [15]:
def refresh_bpe_vocab_by_merging_bigram(bigram: Tuple[str, str], old_bpe_vocab: Dict[str, int]) -> Dict[str, int]:
    """
    在"单词分词状态->频数"的词典中，合并指定的bigram（即去掉对应的相邻unigram之间的空格），最后返回新的词典，例如：  
    输入 
    ```python
    bigram = ('i', '</w>'), old_bpe_vocab = {
        'i </w>': 2,
        'a m </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        'N </w>': 1,
        'a p p l e s </w>': 1
    }
    ```
    得到
    ```python
    {
        'i</w>': 2,
        'a m </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        'N </w>': 1,
        'a p p l e s </w>': 1
    }
    ```
    
    Args:
        bigram: Tuple[str, str] - 待合并的bigram
        old_bpe_vocab: Dict[str, int] - 初始"单词分词状态->频数"的词典

    Return:
        Dict[str, int] - 合并后的"单词分词状态->频数"的词典
    """

    new_bpe_vocab = dict()

    # TODO: 完成函数体【1分】
    bigram_str = " ".join(bigram)  # 将 bigram 组合成一个字符串，例如 'i </w>'
    merged_bigram = "".join(bigram)  # 合并 bigram，例如 'i</w>'

    for word, freq in old_bpe_vocab.items():
        # 替换 bigram 为合并形式
        new_word = word.replace(bigram_str, merged_bigram)
        new_bpe_vocab[new_word] = freq

    return new_bpe_vocab



测试样例

In [16]:
bigram = ('i', '</w>')
old_bpe_vocab = {
        'i </w>': 2,
        'a m </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        'N </w>': 1,
        'a p p l e s </w>': 1
    }
result = refresh_bpe_vocab_by_merging_bigram(bigram, old_bpe_vocab)
print(result)

{'i</w>': 2, 'a m </w>': 1, 'h a p p y </w>': 1, 'h a v e </w>': 1, 'N </w>': 1, 'a p p l e s </w>': 1}


In [17]:
def get_bpe_tokens(bpe_vocab: Dict[str, int]) -> List[Tuple[str, int]]:
    """
    根据"单词分词状态->频数"的词典，返回所得到的BPE分词列表，并将该列表按照分词长度降序排序返回，例如：  
    输入 
    ```python
    bpe_vocab = {
        'i</w>': 2,
        'a m </w>': 1,
        'ha pp y </w>': 1,
        'ha v e </w>': 1,
        'N </w>': 1,
        'a pp l e s </w>': 1
    }
    ```
    得到
    ```
    [
        ('i</w>', 2),
        ('ha', 2),
        ('pp', 2),
        ('a', 2),
        ('m', 1),
        ('</w>', 5),
        ('y', 1),
        ('v', 1),
        ('e', 2),
        ('N', 1),
        ('l', 1),
        ('s', 1)
    ]
    ```

    Args:
        bpe_vocab: Dict[str, int] - "单词分词状态->频数"的词典

    Return:
        List[Tuple[str, int]] - BPE分词和对应频数组成的List
    """

    bpe_tokens = []
    # TODO: 完成函数体【2分】
    from collections import defaultdict
    token_freq = defaultdict(int)

    # 遍历词典，计算各个 BPE token 的频数
    for word, freq in bpe_vocab.items():
        tokens = word.split()  # 分隔成单个 token
        for token in tokens:
            token_freq[token] += freq

    # 将 token 和频数转为列表，并按 token 长度降序排序
    bpe_tokens = sorted(token_freq.items(), key=lambda x: (-len(x[0]), x[0]))

    return bpe_tokens


测试样例

In [18]:
bpe_vocab = {
        'i</w>': 2,
        'a m </w>': 1,
        'ha pp y </w>': 1,
        'ha v e </w>': 1,
        'N </w>': 1,
        'a pp l e s </w>': 1
    }
result = get_bpe_tokens(bpe_vocab)
print(result)

[('i</w>', 2), ('</w>', 5), ('ha', 2), ('pp', 2), ('N', 1), ('a', 2), ('e', 2), ('l', 1), ('m', 1), ('s', 1), ('v', 1), ('y', 1)]


In [19]:
from typing import List, Tuple

def print_bpe_tokenize(word: str, bpe_tokens: List[Tuple[str, int]]):
    """
    根据按长度降序的BPE分词列表，将所给单词进行BPE分词，最后打印结果。
    
    思想是，对于一个待BPE分词的单词，按照长度顺序从列表中寻找BPE分词进行子串匹配，  
    若成功匹配，则对该子串左右的剩余部分递归地进行下一轮匹配，直到剩余部分长度为0，  
    或者剩余部分无法匹配（该部分整体由`"<unknown>"`代替）
    
    例1：  
    输入 `word = "supermarket"`, `bpe_tokens=[("su", 20), ("are", 10), ("per", 30)]`  
    最终打印 `"su per <unknown>"`

    例2：  
    输入 `word = "shanghai"`, `bpe_tokens=[("hai", 1), ("sh", 1), ("an", 1), ("</w>", 1), ("g", 1)]`  
    最终打印 `"sh an g hai </w>"`

    Args:
        word: str - 待分词的单词
        bpe_tokens: List[Tuple(str, int)] - BPE分词和对应频数组成的列表
    """
    
    def bpe_tokenize(sub_word: str) -> str:
        # 若剩余部分为空，则返回空字符串
        if not sub_word:
            return ""
        
        # 遍历 bpe_tokens 找到匹配的 token
        for token, _ in bpe_tokens:
            if sub_word.startswith(token):
                # 匹配成功，递归处理剩余部分
                remaining_result = bpe_tokenize(sub_word[len(token):])
                return f"{token} {remaining_result}".strip()
        
        # 若无法匹配任何 token，则返回 "<unknown>" 并终止对当前子串的进一步处理，不进行进一步切分
        return "<unknown>"
    
    # 开始递归分词并输出结果
    res = bpe_tokenize(word + "</w>")
    print(res.strip())


测试样例

In [20]:
word = "shanghai"
bpe_tokens=[
        ("hai", 1),
        ("sh", 1),
        ("an", 1),
        ("</w>", 1),
        ("g", 1)]
print_bpe_tokenize(word, bpe_tokens)

sh an g hai </w>


In [21]:
word = "supermarket"
bpe_tokens=[
        ("su", 20),
        ("are", 10),
        ("per", 30),
    ]
print_bpe_tokenize(word, bpe_tokens)

su per <unknown>


开始读取数据集并训练BPE分词器：

In [22]:
with open("data/news.2007.en.shuffled.deduped.train", encoding="utf-8") as f:
    training_corpus = list(map(lambda l: l.strip(), f.readlines()[:1000]))

print("Loaded training corpus.")

Loaded training corpus.


训练与测试样例

In [23]:
training_iter_num = 299

training_bpe_vocab = build_bpe_vocab(training_corpus)
for i in range(training_iter_num):
    
    # 获取当前 BPE 词典中的 bigram 频次
    bigram_freq = get_bigram_freq(training_bpe_vocab)
    
    # 找到频次最高的 bigram
    if not bigram_freq:  # 若没有可合并的 bigram 则停止
        break
    most_frequent_bigram = max(bigram_freq, key=bigram_freq.get)
    
    # 合并该 bigram，得到更新后的 BPE 词典
    training_bpe_vocab = refresh_bpe_vocab_by_merging_bigram(most_frequent_bigram, training_bpe_vocab)

training_bpe_tokens = get_bpe_tokens(training_bpe_vocab)
print(training_bpe_tokens)

[('demolishing</w>', 1), ('economists</w>', 1), ('monitoring</w>', 1), ('soliciting</w>', 1), ('agination</w>', 2), ('americans</w>', 5), ('asurement</w>', 1), ('economist</w>', 1), ('ministers</w>', 4), ('prominent</w>', 1), ('punishing</w>', 1), ('stination</w>', 1), ('stonecold</w>', 1), ('africans</w>', 1), ('american</w>', 11), ('asteland</w>', 1), ('becoming</w>', 1), ('benefits</w>', 2), ('decision</w>', 3), ('declined</w>', 1), ('dination</w>', 1), ('discount</w>', 2), ('discover</w>', 1), ('donation</w>', 1), ('finished</w>', 2), ('homeland</w>', 1), ('honoring</w>', 2), ('lowering</w>', 1), ('maritime</w>', 1), ('michelin</w>', 1), ('minister</w>', 16), ('minitour</w>', 1), ('national</w>', 25), ('parisons</w>', 1), ('promised</w>', 1), ('risoners</w>', 1), ('rouching</w>', 1), ('scheming</w>', 1), ('security</w>', 6), ('sination</w>', 1), ('sorority</w>', 1), ('timeline</w>', 1), ('tirement</w>', 1), ('tisement</w>', 1), ('tremists</w>', 1), ('verished</w>', 1), ('visiting</

测试BPE分词器的分词效果：

In [24]:
test_word = "naturallanguageprocessing"
print("naturallanguageprocessing 的分词结果为：")
print_bpe_tokenize(test_word, training_bpe_tokens)

naturallanguageprocessing 的分词结果为：
n atur al lan gu age pro ce s sing</w>


### 实验总结
> 在本次实验中，我们使用了字节对编码（Byte Pair Encoding, BPE）算法对英文句子进行分词处理。在示例中，我们的输入句子为 "natural language processing"，通过BPE算法处理后得到的分词结果为：n atur al lan gu age pro ce s sing</w>\
表明BPE算法能够有效地将较长的词汇拆分为多个子词，但是分割较为零碎，未能保留词汇的语义和结构。在该结果中，句中单词被正确识别为独立的亚词tokens。该分词结果较为有效的分离了信息，但是由于模型侧重，单词结构不能被有效识别和保留。\
通过本次实验，我们验证了BPE算法在英文分词任务中的有效性。