# 作业一：实现N-gram语言模型平滑算法

### 预处理

首先创建一些预处理函数。

引入必要的模块，定义些类型别名。

In [1]:
import re
import itertools

from typing import List, Dict, Tuple

Sentence = List[str]
IntSentence = List[int]

Corpus = List[Sentence]
IntCorpus = List[IntSentence]

Gram = Tuple[int]

下面的函数用于将文本正则化并词元化。该函数会将所有英文文本转为小写，去除文本中所有的标点，简单起见将所有连续的数字用一个`N`代替，将形如`let's`的词组拆分为`let`和`'s`两个词。

In [2]:
_splitor_pattern = re.compile(r"[^a-zA-Z']+|(?=')")
_digit_pattern = re.compile(r"\d+")
def normaltokenize(corpus: List[str]) -> Corpus:
    """
    Normalizes and tokenizes the sentences in `corpus`. Turns the letters into
    lower case and removes all the non-alphadigit characters and splits the
    sentence into words and added BOS and EOS marks.

    Args:
        corpus - list of str

    Return:
        list of list of str where each inner list of str represents the word
          sequence in a sentence from the original sentence list
    """

    tokeneds = [ ["<s>"]
               + list(
                   filter(lambda tkn: len(tkn)>0,
                       _splitor_pattern.split(
                           _digit_pattern.sub("N", stc.lower()))))
               + ["</s>"]
                    for stc in corpus
               ]
    return tokeneds

接下来定义两个函数用来从训练语料中构建词表，并将句子中的单词从字符串表示转为整数索引表示。

In [3]:
def extract_vocabulary(corpus: Corpus) -> Dict[str, int]:
    """
    Extracts the vocabulary from `corpus` and returns it as a mapping from the
    word to index. The words will be sorted by the codepoint value.

    Args:
        corpus - list of list of str

    Return:
        dict like {str: int}
    """

    vocabulary = set(itertools.chain.from_iterable(corpus))
    vocabulary = dict(
            map(lambda itm: (itm[1], itm[0]),
                enumerate(
                    sorted(vocabulary))))
    return vocabulary

def words_to_indices(vocabulary: Dict[str, int], sentence: Sentence) -> IntSentence:
    """
    Convert sentence in words to sentence in word indices.

    Args:
        vocabulary - dict like {str: int}
        sentence - list of str

    Return:
        list of int
    """

    return list(map(lambda tkn: vocabulary.get(tkn, len(vocabulary)), sentence))

接下来读入训练数据，将数据预处理。

In [4]:
import functools

with open("data/news.2007.en.shuffled.deduped.train", encoding="utf-8") as f:
    texts = list(map(lambda l: l.strip(), f.readlines()))

print("Loaded training set.")

corpus = normaltokenize(texts)
vocabulary = extract_vocabulary(corpus)
corpus = list(
        map(functools.partial(words_to_indices, vocabulary),
            corpus))

print("Preprocessed training set.")

Loaded training set.
Preprocessed training set.


### 设计模型

参照公式

$$
P(w_k | W_{k-n+1}^{k-1}) = \begin{cases}
    d(W_{k-n+1}^k) * \dfrac{C(W_{k-n+1}^k)}{C(W_{k-n+1}^{k-1})} &  C(W_{k-n+1}^k) > 0 \\
    \alpha(W_{k-n+1}^{k-1}) * P(w_k | W_{k-n+2}^{k-1}) &  \text{否则} \\
\end{cases}
$$

实现N-gram语言模型及采用带Good-Turing折扣的Katz回退算法。

需要实现的功能包括：

1. 统计各词组（gram）在训练语料中的频数
2. 计算同频词组个数$N_r$
3. 计算$d(W_{k-n+1}^k)$
4. 计算$\alpha(W_{k-n+1}^{k-1})$
5. 根据公式计算回退概率
6. 计算概率对数与困惑度（PPL）


In [5]:
import math

class NGramModel:
    def __init__(self, vocab_size: int, n: int = 4):
        """
        Constructs `n`-gram model with a `vocab_size`-size vocabulary.

        Args:
            vocab_size - int
            n - int
        """

        # 字典的大小
        self.vocab_size: int = vocab_size
        
        # 表示该语言模型n-gram的取值
        self.n: int = n
        
        # 记录语料库中，某个gram的出现的频次
        # 例如 self.frequencies[1][(12, 23)] 应等于 (12, 23) 这个 2-gram 在语料库中出现的次数
        # 注：第一个下标表示长度，调用时下标要减一
        self.frequencies: List[Dict[Gram, int]] = [{} for _ in range(n)]
        
        # 缓存语言模型提供的概率值，即对于某个gram而言 P(`gram[-1]`|`gram[:-1]`)的值
        # 例如 self.disfrequencies[2][(12, 23, 77)] 应等于 P(`77`|`12, 23`) 的值
        # 注：第一个下标表示长度，调用时下标要减一
        self.disfrequencies: List[Dict[Gram, float]] = [{} for _ in range(n)]

        # 记录出现次数为r次的N-gram种类数
        self.N_r: Dict[int, int] = {}

        # 说明文档中所提到的折扣阈值θ
        self.discount_threshold:int = 7
        
        # 对于某个gram而言的$d'$
        self._d: List[Dict[Gram, float]] = [{} for _ in range(n)]
        
        # 缓存对于某个gram而言的回退系数α
        # 例如 self._alpha[2][(122, 323)] 应等于α(`122, 323`)
        # 注意：调用时，第一个下标虽然表示长度，但不需要减一，因为此处的gram是上层调用时输入的某个gram的前缀。
        self._alpha: List[Dict[Gram, float]] = [{} for _ in range(n)]
        
        # 当计算结果为0时，应取该值，避免潜在的除零错误
        self.eps = 1e-10

        # $\lambda = \frac{N_1}{N_1-(\theta+1)N_{\theta+1}}$，用于计算$d'$
        self.Lambda: float

        # 这是__getitem__中计算长度为1的gram的 $P(gram[-1])$时用到的所有长度为1的gram出现的次数。为了避免重复计算，在`learn` 函数中缓存。
        self.num_of_gram_with_length_1: float


    def learn(self, corpus: IntCorpus):
        """
        Learns the parameters of the n-gram model.

        Args:
            corpus - list of list of int
        """

        for stc in corpus:
            for i in range(1, len(stc)+1):
                for j in range(min(i, self.n)):
                    # TODO: count the frequencies of the grams
                    if tuple(stc[i-j-1:i]) not in self.frequencies[j].keys():
                        self.frequencies[j][tuple(stc[i-j-1:i])] = 1
                    else:
                        self.frequencies[j][tuple(stc[i-j-1:i])] += 1               

        for i in range(0, self.n):
            # NOTE: calculates the value of $N_r$
            grams = itertools.groupby(
                sorted(map(lambda itm: (itm[0], itm[1]),self.frequencies[i].items()),key=lambda itm: itm[1]),lambda itm: itm[1]
            )
            
            for k, gr in grams:
                if k not in self.N_r:
                    self.N_r[k] = len(list(gr))
                else:
                    self.N_r[k] += len(list(gr))

        # NOTE: 这两个缓存的参数说明见__init__函数
        lambda_numerator = self.N_r.get(1,self.eps)
        lambda_denominator = (self.N_r.get(1,self.eps)-(self.discount_threshold+1)*self.N_r.get(self.discount_threshold+1,self.eps))
        if lambda_denominator == 0:
            self.Lambda = lambda_numerator / self.eps
        else:
            self.Lambda = lambda_numerator / lambda_denominator        
        self.num_of_gram_with_length_1 = float(sum([item[1] for item in self.frequencies[0].items()]))


    def d(self, gram: Gram) -> float:
        """
        Calculates the discounting factor $d'$.

        Args:
            gram - tuple of int

        Return:
            float
        """
        n = len(gram)-1
        if gram not in self._d[n]:
            # 对于某个$gram=(W_{k-n+1}^{k-1},w_k)$而言，返回$d(W_{k-n+1}^{k-1},w_k)$.
            # TODO: calculates the value of $d$

            if self.frequencies[len(gram)-1].get(gram,self.eps) > self.discount_threshold:
                self._d[n][gram] = 1
            else:
                # NOTE: calculates the value of $d'$
                # $ r = C(W_{k-n+1}^{k-1},w_k) $
                r = self.frequencies[n].get(gram,self.eps)
                self._d[n][gram] = self.Lambda * (r+1) * self.N_r.get(r+1,self.eps) / r / self.N_r.get(r,self.eps) +1 - self.Lambda

        return self._d[n][gram]

    def alpha(self, gram: Gram) -> float:
        """
        Calculates the back-off weight alpha(`gram`)

        Args:
            gram - tuple of int

        Return:
            float
        """

        n = len(gram)
        if gram not in self._alpha[n]:
            if gram in self.frequencies[n-1]:
                # 按照定义计算分子分母的 1 - sum (...) 的值即可，
                # 记得考虑分母为0应该怎么处理
                # TODO: calculates the value of $\alpha$

                numerator = [gram_[1] for gram_ in self.disfrequencies[n].items() if gram == gram_[0][:-1]]
                numerator = 1-sum(numerator)

                denominator = [gram_[1] for gram_ in self.disfrequencies[n-1].items() if gram[1:] == gram_[0][:-1]]
                denominator = 1-sum(denominator)

                denominator = self.eps if denominator == 0 else denominator
                self._alpha[n][gram] = numerator / denominator
            else:
                self._alpha[n][gram] = 1.
        return self._alpha[n][gram]

    def __getitem__(self, gram: Gram) -> float:
        """
        Calculates smoothed conditional probability P(`gram[-1]`|`gram[:-1]`).

        Args:
            gram - tuple of int

        Return:
            float
        """

        n = len(gram)-1
        if gram not in self.disfrequencies[n]:
            if n>0:
                # 按照P的公式，分类讨论count(gram)>0和count(gram)=0的情况
                # TODO: calculates the smoothed probability value according to the formulae
                if gram in self.frequencies[n]:
                    d_ = self.d(gram)
                    self.disfrequencies[n][gram] = d_ * self.frequencies[n][gram] / self.frequencies[n-1][gram[:-1]]
                else:
                    alpha_ = self.alpha(gram[:-1])
                    self.disfrequencies[n][gram] = alpha_ * self[gram[:-1]]
            else:
                self.disfrequencies[n][gram] = self.frequencies[n].get(gram, self.eps) / self.num_of_gram_with_length_1
        return self.disfrequencies[n][gram]
        
    def log_prob(self, sentence: IntSentence) -> float:
        """
        Calculates the log probability of the given sentence. Assumes that the
        first token is always "<s>".

        Args:
            sentence: list of int

        Return:
            float
        """

        log_prob = 0.
        for i in range(2, len(sentence)+1):
            # 注意这是对数运算的情况
            # 这里i的取值会把<s>和</s>考虑进来，可以自行修改使得不考虑这两个词
            # TODO: calculates the log probability
            start = max(0,i-4)
            gram = tuple(sentence[start:i])
            log_prob += math.log(self[gram])
        return log_prob

    def ppl(self, sentence: IntSentence) -> float:
        """
        Calculates the PPL of the given sentence. Assumes that the first token
        is always "<s>".

        Args:
            sentence: list of int

        Return:
            float
        """
        # 和我们课件中的PPW是同一个东西，利用上面的log_prob加以计算
        # TODO: calculates the PPL
        return math.exp(-1/len(sentence) * self.log_prob(sentence))

### 训练与测试

现在数据与模型均已齐备，可以训练并测试了。

训练模型：

In [6]:
import pickle as pkl

model = NGramModel(len(vocabulary))
model.learn(corpus)
with open("model.pkl", "wb") as f:
    pkl.dump(vocabulary, f)
    pkl.dump(model, f)

print("Dumped model.")

Dumped model.


在测试集上测试计算困惑度：

In [7]:
import pickle as pkl
with open("model.pkl", "rb") as f:
    vocabulary = pkl.load(f)
    model = pkl.load(f)
print("Loaded model.")

with open("data/news.2007.en.shuffled.deduped.test", encoding="utf-8") as f:
    test_set = list(map(lambda l: l.strip(), f.readlines()))
test_corpus = normaltokenize(test_set)
test_corpus = list(
        map(functools.partial(words_to_indices, vocabulary),
            test_corpus))
ppls = []
for t in test_corpus:
    ppls.append(model.ppl(t))
    print(ppls[-1])
print("Avg: ", sum(ppls)/len(ppls))  

Loaded model.
4024.4285854408345
329.98308100051946
115.98473767631742
93.1752138770833
541.1954858948097
38.25865859400025
13.033512752523269
96.36248023325777
1697.405435379303
66.00653225865175
59.7516412469696
21.75448961948782
713.4413376691816
1893.5053083857292
205.6865390162769
71.58770935595584
32893.155407496335
19911.18192579337
67.28307420810295
97.65383255155868
202.2452518656914
1843.7025454955794
149.72395745980316
134.9285685522969
95.7396117042547
20.044531779737692
83.38641757019148
7.187039591498948
66.9100171685885
223.93527067981415
869.3689805891278
1631.5191645126
42.881305194744876
88.36889717265751
497.28404823815663
50.24954303479835
391.1455808785404
447.59020273304395
119.7904630862471
280.4110647283188
75.56217850055091
402.5829961855147
75.56654580995954
45.06544173377153
383.744006022129
355.3712430543032
116.03356226245106
244.56038506716578
45.04898134658861
98.21639408492136
Avg:  1440.7799836910665


<font color="red">总结</font>

这是本学期NLP课程的第一次作业，实现了课堂上所述的n-gram模型的一种算法。我实现算法的过程如下：
1. 补全`learn`函数。函数需要根据数据集进行学习，这个过程分为以下几步：
   - 计算`self.frequencies`。统计每一个gram出现的次数，根据gram长度存入`self.frequencies`中的一个字典中。
   - 计算`self.N_r`。统计出现次数是$r$次的n-gram种类数，用于计算修正后的折扣因子$d'$。
   - 计算`self.Lambda`。$\lambda = \frac{N_1}{N_1-(\theta+1)N_{\theta+1}}$，用于计算修正后的折扣因子。
   - 计算`self.num_of_gram_with_length_1`。统计所有长度为1的gram出现的次数，用于计算gram长度为1时的 $P(gram[-1]|gram[:-1])$。原因在于此时的gram的前缀可看作`<s>`，需要统计所有长度为1的gram作为分母。

2. 补全`d`函数。`d`函数返回修正后的折扣因子$d'$，根据算法可得，
$$
d(W_{k-n+1}^{k-1}, w_k) = \begin{cases}
    1 &  C(W_{k-n+1}^{k-1}, w_k) > \theta \\
    d'(W_{k-n+1}^{k-1}, w_k) &  \text{否则} \\
\end{cases}
$$

3. 补全`alpha`函数。此函数返回某个`gram`的前缀`gram[:-1]`的$\alpha$值。根据算法，
$$
\alpha(W_{k-n+1}^{k-1}) = \frac{1-\sum_{w_k\in V_+} P(w_k|W_{k-n+1}^{k-1})}{1-\sum_{w_k\in V_+} P(w_k|W_{k-n+2}^{k-1})} 
$$

4. 补全`__getitem__`函数。这个函数用于计算 $P(gram[-1]|gram[:-1])$，展开条件概率公式即可计算。

5. 补全`log_prob`函数。针对输入的一个sentence，计算其出现的对数概率，只需循环调用`__getitem__`函数即可

6. 补全`ppl`函数，根据算法，
$$
PPL = \frac{1}{\sqrt[T]{\prod_{t=1}^T P(x^{(t)} | x^{(t-1)}, \dots, x^{(1)})}}
$$
即可得到每个句子的混淆度。注意，由于前文计算得到对数概率，此处应将对数概率求指数后再代入公式计算。

<font color="red"> 心得体会 </font>

这次作业对数学、编程以及课堂知识的要求都比较高，在实现过程中，我也不断地考虑如何高效实现算法，并且尝试稍微修改了框架提供的参数。同时，我也和助教老师进行了交流，得到了他的帮助和指导。总体上来说，这次作业还是比较有挑战性，我也在其中锻炼了自己的能力。

<font color="red">附录</font>：实现字典树(前缀树) `DictTree`

由于助教老师说不需要应用字典树，故在此只将字典树的实现附上。

In [None]:
# OPTIONAL: implement a DictTree instead of using a flattern dict
class DictTree():
    def __init__(self):
        self.root = {}
        self.end = -1
        self.len = 0
        self.all_gram = []
        # pass

    def __len__(self) -> int:
        self.len = 0
        def number_of_end(node):
            if node is not None:
                if self.end in node:
                    self.len += 1
                for c in node.keys():
                    if c != self.end:
                        number_of_end(node[c])
        number_of_end(self.root)
        return self.len
        # pass

    def __iter__(self):
        self.all_gram = []
        self.now_index = -1
        def all(node, word):
            if node is not None:
                # print(node)
                if self.end in node.keys():
                    self.all_gram.append(tuple(word))
                for c in node.keys():
                    if c != self.end:
                        all(node[c],word+[c])
        word = [] 
        all(self.root,word)
        return self
        # pass
    def __next__(self):
        self.now_index += 1
        if self.now_index < len(self.all_gram):
            return self.all_gram[self.now_index]
        else:
            raise StopIteration()

    def __contains__(self, key: Gram):
        curNode = self.root
        for c in key:
            if not c in curNode:
                return False
            curNode = curNode[c]
        if not self.end in curNode:
            return False
        return True
        # pass

    def __getitem__(self, key: Gram) -> int:
        curNode = self.root
        # print(key)
        for c in key:
            if c not in curNode:
                return -1
            curNode = curNode[c]
        if not self.end in curNode:
            return -1
        return curNode[self.end]
        # pass

    def __setitem__(self, key: Gram, frequency: int):
        curNode = self.root
        for c in key:
            if c not in curNode:
                curNode[c] = {}
            curNode = curNode[c]
        curNode[self.end] = frequency
        # pass

    def __delitem__(self, key: Gram):
        curNode = self.root
        for c in key:
            curNode = curNode[c]
        del curNode[self.end]
        # pass
    def get(self, key: Gram, other):
        if self.__contains__(key):
            return self.__getitem__(key)
        else:
            return other