# n元语言模型回退算法

n元语言模型采用Good-Turing折扣的Katz回退算法。



### 预处理

首先创建一些预处理函数。

引入必要的模块，定义些类型别名。

In [None]:
import re
import itertools

from typing import List, Dict, Tuple

Sentence = List[str]
IntSentence = List[int]

Corpus = List[Sentence]
IntCorpus = List[IntSentence]

Gram = Tuple[int]

下面的函数用于将文本正则化并词元化。该函数会将所有英文文本转为小写，去除文本中所有的标点，简单起见将所有连续的数字用一个`N`代替，将形如`let's`的词组拆分为`let`和`'s`两个词。

In [None]:
_splitor_pattern = re.compile(r"[^a-zA-Z']+|(?=')")
_digit_pattern = re.compile(r"\d+")
def normaltokenize(corpus: List[str]) -> Corpus:
    """
    Normalizes and tokenizes the sentences in `corpus`. Turns the letters into
    lower case and removes all the non-alphadigit characters and splits the
    sentence into words and added BOS and EOS marks.

    Args:
        corpus - list of str

    Return:
        list of list of str where each inner list of str represents the word
          sequence in a sentence from the original sentence list
    """

    tokeneds = [ ["<s>"]
               + list(
                   filter(lambda tkn: len(tkn)>0,
                       _splitor_pattern.split(
                           _digit_pattern.sub("N", stc.lower()))))
               + ["</s>"]
                    for stc in corpus
               ]
    return tokeneds

接下来定义两个函数用来从训练语料中构建词表，并将句子中的单词从字符串表示转为整数索引表示。

In [None]:
def extract_vocabulary(corpus: Corpus) -> Dict[str, int]:
    """
    Extracts the vocabulary from `corpus` and returns it as a mapping from the
    word to index. The words will be sorted by the codepoint value.

    Args:
        corpus - list of list of str

    Return:
        dict like {str: int}
    """

    vocabulary = set(itertools.chain.from_iterable(corpus))
    vocabulary = dict(
            map(lambda itm: (itm[1], itm[0]),
                enumerate(
                    sorted(vocabulary))))
    return vocabulary

def words_to_indices(vocabulary: Dict[str, int], sentence: Sentence) -> IntSentence:
    """
    Convert sentence in words to sentence in word indices.

    Args:
        vocabulary - dict like {str: int}
        sentence - list of str

    Return:
        list of int
    """

    return list(map(lambda tkn: vocabulary.get(tkn, len(vocabulary)), sentence))

接下来读入训练数据，将数据预处理。

In [None]:
import functools

with open("data/news.2007.en.shuffled.deduped.train", encoding="utf-8") as f:
    texts = list(map(lambda l: l.strip(), f.readlines()))

print("Loaded training set.")

corpus = normaltokenize(texts)
vocabulary = extract_vocabulary(corpus)
corpus = list(
        map(functools.partial(words_to_indices, vocabulary),
            corpus))

print("Preprocessed training set.")

### 设计模型

参照公式

$$
P_{\text{bo}}(w_k | W_{k-n+1}^{k-1}) = \begin{cases}
    d(W_{k-n+1}^k) \dfrac{C(W_{k-n+1}^k)}{C(W_{k-n+1}^{k-1})} &  C(W_{k-n+1}^k) > 0 \\
    \alpha(W_{k-n+1}^{k-1}) P_{\text{bo}}(w_k | W_{k-n+2}^{k-1}) &  \text{否则} \\
\end{cases}
$$

实现n元语言模型及采用Good-Turing折扣的Katz回退算法。

实现的功能包括：

1. 统计各词组（gram）在训练语料中的频数
2. 计算同频词组个数$N_r$
3. 计算$d(W_{k-n+1}^k)$
4. 计算$\alpha(W_{k-n+1}^{k-1})$
5. 根据公式计算回退概率
6. 计算概率对数与困惑度（PPL）



In [None]:
import math


class NGramModel:
    def __init__(self, vocab_size: int, n: int = 4):
        """
        Constructs `n`-gram model with a `vocab_size`-size vocabulary.

        Args:
            vocab_size - int
            n - int
        """

        self.vocab_size: int = vocab_size
        self.n: int = n

        self.frequencies: List[Dict[Gram, int]]\
            = [{} for _ in range(n)]
        self.disfrequencies: List[Dict[Gram, float]]\
            = [{} for _ in range(n)]
        self.discount_threshold:int = 7
        self._d: Dict[Gram, Tuple[float, float]] = {}
        self._alpha: List[Dict[Gram, float]]\
            = [{} for _ in range(n)]
        self._D: Dict[Gram, float] \
            = {}
        self.ncounts: Dict[Gram
        , Dict[int, int]
        ] = {} #记录N_r
        self.r = [[] for _ in range(n)] # 对于不同长度词组，从小到大存放词组出现次数
        self.Nr = [[] for _ in range(n)] # 不同的n各自记录全局的Nr
        self.eps = 1e-10

    def learn(self, corpus: IntCorpus):
        """
        Learns the parameters of the n-gram model.

        Args:
            corpus - list of list of int
        """

        for stc in corpus:
            for i in range(1, len(stc)+1):
                for j in range(min(i, self.n)):
                    # TODO: count the frequencies of the grams
                    if tuple([stc[k] for k in range(i - j - 1, i)]) in self.frequencies[j]:
                        self.frequencies[j][tuple([stc[k] for k in range(i - j - 1, i)])] += 1
                    else:
                        self.frequencies[j][tuple([stc[k] for k in range(i - j - 1, i)])] = 1

        # TODO: calculates the value of $N_r$
        """
        采用两种记录N_r的方式
        """
        #针对ngram进行全局记录
        for j in range(self.n):
            val = list(self.frequencies[j].values())
            self.r[j] = list(range(1,sorted(val)[-1] + 1))
            for i in self.r[j]:
                self.Nr[j].append(val.count(i))

        #针对(n-1)前缀记录ngram的N_r
        for i in range(1, self.n):
            grams = itertools.groupby(
                sorted(
                    sorted(
                        map(lambda itm: (itm[0][:-1], itm[1]),
                            self.frequencies[i].items()),
                        key=(lambda itm: itm[1])),
                    key=(lambda itm: itm[0])))
            # TODO: calculates the value of $N_r$
            for k, v in grams:
                tup = k[0]
                cnt = k[1]
                n = 0
                for i in v:
                    n = n + 1
                if tup not in self.ncounts:
                    self.ncounts[tup] = {}
                self.ncounts[tup][cnt] = n
            
    def d(self, gram: Gram) -> float:
        """
        Calculates the interpolation coefficient.

        Args:
            gram - tuple of int

        Return:
            float
        """

        # 两种计算N_r方法对应的代码

        #法一:全局记录
        n = len(gram)
        if gram not in self._D:
            if self.frequencies[n-1][gram] > self.discount_threshold:
                self._D[gram] = 1.0
            else:
                theta = self.discount_threshold
                r = self.frequencies[n-1][gram] # 计算r
                N1 = self.Nr[n-1][0] # 计算N_1
                Nt1 = self.Nr[n-1][theta] if theta<len(self.Nr[n-1]) else 0 # 计算N_(theta+1)，若theta越界则取0
                deno = N1 - (theta + 1)* Nt1 if N1 != (theta+1)*Nt1 else self.eps # 防止分母为0
                lamda = N1 / deno #计算lambda
                self._D[gram] = lamda * (r+1) * self.Nr[n-1][r] / (r * self.Nr[n-1][r-1]) + 1 - lamda # 计算d
                if self._D[gram]<=0: # 如果出现d小于0，则取一个小正数
                    self._D[gram] = self.eps
        return self._D[gram]

        """
        #法二:对前缀记录
        n = len(gram)
        if gram not in self._D:
            if self.frequencies[n - 1][gram] > self.discount_threshold:
                self._D[gram] = 1.0
            else:
                theta = self.discount_threshold
                r = self.frequencies[n - 1][gram]
                if 1 in self.ncounts[gram[0:-1]]:
                    N1 = self.ncounts[gram[0:-1]][1]
                else:
                    N1 = 0
                if (theta + 1) in self.ncounts[gram[0:-1]]:
                    Nt1 = self.ncounts[gram[0:-1]][theta + 1]
                else:
                    Nt1 = 0
                if N1 == (theta + 1) * Nt1:
                    fm = self.eps
                else:
                    fm = N1 - (theta + 1) * Nt1
                lamda = N1 / (fm)
                if (r+1) in self.ncounts[gram[0:-1]]:
                    fz = self.ncounts[gram[0:-1]][r + 1]
                else:
                    fz = 0
                self._D[gram] = lamda * (r + 1) * fz / (r * self.ncounts[gram[0:-1]][r]) + 1 - lamda
                if self._D[gram]<=0:
                    self._D[gram] = self.eps
        return self._D[gram]
        """



    def alpha(self, gram: Gram) -> float:
        """
        Calculates the back-off weight alpha(`gram`)

        Args:
            gram - tuple of int

        Return:
            float
        """
        n = len(gram)
        ret = 0
        if n == 1: # 递归终止条件（计算一元组的alpha值）
            if gram not in self._alpha[n]:
                if gram in self.frequencies[n-1]:
                    for word, count in self.frequencies[n].items():
                        if word[0:-1] == gram:
                            ret += model[word]
                    numerator = 1 - ret
                    ret = 0
                    for word, count in self.frequencies[n-1].items():
                        ret += model[word]
                    denominator = 1 - ret if ret != 1 else self.eps # 防止分母为0
                    self._alpha[n][gram] = abs(numerator / denominator) # 确保alpha为正
                else:
                    self._alpha[n][gram] = 1
            return self._alpha[n][gram]
        else:
            if gram not in self._alpha[n]:
                if gram in self.frequencies[n - 1]:
                    # TODO: calculates the value of $\alpha$
                    for word, count in self.frequencies[n].items():
                        if word[0:-1] == gram:
                            ret += model[word] # 计算P(wk, wk in V+|W_(k-1,k-n+1))的求和
                    numerator = 1 - ret
                    ret = 0
                    W_ = gram[1:] # W_(k-1,k-n+2)
                    for word, count in self.frequencies[n-1].items():
                        if word[0:-1] == W_:
                            ret += model[word] # 计算P(wk, wk in V+|W_(k-1,k-n+2))的求和
                    denominator = 1 - ret if ret != 1 else self.eps # 防止分母为0
                    self._alpha[n][gram] = abs(numerator / denominator) # 确保alpha为正
                else:
                    self._alpha[n][gram] = 1
            return self._alpha[n][gram]


    def __getitem__(self, gram: Gram) -> float:
        """
        Calculates smoothed conditional probability P(`gram[-1]`|`gram[:-1]`).

        Args:
            gram - tuple of int

        Return:
            float
        """

        n = len(gram)-1
        if gram not in self.disfrequencies[n]:
            if n>0:
                # TODO: calculates the smoothed probability value according to the formulae
                if gram in self.frequencies[n] and self.frequencies[n][gram]>0:
                    self.disfrequencies[n][gram] = self.d(gram) * self.frequencies[n][gram] / self.frequencies[n-1][gram[:-1]]
                else:
                    self.disfrequencies[n][gram] = self.alpha(gram[:-1]) * model[gram[1:]]

            else:
                self.disfrequencies[n][gram] = self.frequencies[n].get(gram, self.eps) / float(sum(list(self.frequencies[0].values())))
            if self.disfrequencies[n][gram]>=1:
                self.disfrequencies[n][gram] = 1 - self.eps 
            if self.disfrequencies[n][gram]<=0:
                self.disfrequencies[n][gram] = self.eps
        return self.disfrequencies[n][gram]

    def log_prob(self, sentence: IntSentence) -> float:
        """
        Calculates the log probability of the given sentence. Assumes that the
        first token is always "<s>".

        Args:
            sentence: list of int

        Return:
            float
        """

        log_prob = 0.
        for i in range(2, len(sentence)+1):
            # TODO: calculates the log probability
            gram = tuple(sentence[0:i])
            if len(gram)>self.n:
                gram = gram[-self.n:] #n-gram操作
            log_prob += math.log2(model[gram])
        return -log_prob/(len(sentence)-1)

    def ppl(self, sentence: IntSentence) -> float:
        """
        Calculates the PPL of the given sentence. Assumes that the first token
        is always "<s>".

        Args:
            sentence: list of int

        Return:
            float
        """

        # TODO: calculates the PPL
        return math.pow(2,self.log_prob(sentence))

model = NGramModel(len(vocabulary))

### 训练与测试

现在数据与模型均已齐备，可以训练并测试了。

训练模型：

In [None]:
import pickle as pkl

model = NGramModel(len(vocabulary))
model.learn(corpus)
with open("model.pkl", "wb") as f:
    pkl.dump(vocabulary, f)
    pkl.dump(model, f)

print("Dumped model.")

在测试集上测试计算困惑度：

In [None]:
with open("model.pkl", "rb") as f:
    vocabulary = pkl.load(f)
    model = pkl.load(f)
print("Loaded model.")

with open("data/news.2007.en.shuffled.deduped.test", encoding="utf-8") as f:
    test_set = list(map(lambda l: l.strip(), f.readlines()))
test_corpus = normaltokenize(test_set)
test_corpus = list(
        map(functools.partial(words_to_indices, vocabulary),
            test_corpus))
ppls = []
for t in test_corpus:
    ppls.append(model.ppl(t))
    print(ppls[-1])
print("Avg: ", sum(ppls)/len(ppls))


<font size=5>计算结果：</font>

<font size=2.5>

<font size=4> 1.对于$N_r$全局记录 </font>

5344.751531403226

102.28489071818301

75.25474512599256

32.71169743563384

7119.472629896775

29.095278853684203

16.089396931107565

59.96130584714062

133.84074193626577

30.29258968484799

63.673374763269564

33.659361837762205

403.28647593931

209.88016706551244

93.8597331954259

101.6122037944068

6476.128683912222

4320.803162985796

36.60605938560949

55.79183688363482

115.46614386475997

317.097747608931

77.48652397629324

84.61108040536573

36.175327315937324

30.120914427919715

71.06027258195377

10.304202137721173

64.16837866501936

63.4818953583803

98.61092745766682

159.0692514758781

37.74004911227742

50.6691716277016

172.1737981116973

40.699035221553345

107.09350724820948

90.51841419665698

60.88648610311688

525.3593003750514

44.78277346002141

118.60451790200368

48.55693615438766

40.352279797816045

159.97223006767675

187.6050024744979

75.42381654696669

166.20155106955994

53.52862130873786

76.43493482604238

</font>



<font size=4>平均困惑度为558.466</font>


<font size=4> 2.针对(n-1)gram前缀的$N_r$全局记录 </font>

<font size=2.5>

4454.093644879657

20.410009865741657

2389.3801306575424

767.3055527497089

442.67868335289285

119.86684063335655

7254778.269109974

215.3164559319089

164.98251588502765

4541.523312435997

26.01350071795566

117.6948529732758

84.14048135191248

395.25397814273833

4763.884490853237

4289.111836469176

4842.656444412066

342.8224701766212

2760.980066632535

23.307944230692534

71.66593353097068

36.4123045285047

2366.3411174743424

5380.585581755722

2780.31075824106

3522.107110090969

140.92037766362958

417.7542790043179

437.0848266361579

168.21621388448546

7641.424266671785

1879.9349805892707

5668.53541709457

1806.582561746922

19.717753616918113

10694.712172118727

1254.0871484918214

309.13534226415885

2069.20505907461

19.27246166782348

1829.3057494805726

226.05015952653187

946.8021442027409

243083.91630410004

79.24853158576518

637.0752639486178

320.9859402277677

57.96357308392998

119042.41454169252

5141.757933302546

<font size=4>平均困惑度为154071.7843625925</font>

</font>

<font size=5>结果分析：</font>



对于$N_r$，可以采用对不同n的全局记录或对(n-1)gram前缀记录；我尝试了两种策略，计算的困惑度差距很大。