# 作业二：实现Word2Vec的连续词袋模型

姓名：

学号：

---

本次作业将使用PyTorch构建CBOW模型。Pytorch是当下最主流的深度学习框架，在大作业中我们将继续使用torch完成语言模型的载入、训练、推理等操作。希望同学们能够通过这次作业对torch的张量操作以及常用函数有一个基础的理解，以便应用于之后的作业以及其他的深度学习实践当中。

依据计算平台的不同，PyTorch提供了多种版本可供安装。本次作业我们只需要使用CPU版本，可以通过通过`pip install torch`直接安装。

> 关于GPU版本的安装可以参见[官网](https://pytorch.org/get-started/locally/)。对于本次作业，由于模型参数太小，使用GPU进行运算与CPU相比并无优势，**请不要使用GPU训练模型。**

In [None]:
!pip install torch tqdm

需要Python版本大于等于3.6，并检查是否已安装所依赖的第三方库。（若没有安装可以执行上面的代码块）

In [None]:
import importlib
import sys

assert sys.version_info[0] == 3
assert sys.version_info[1] >= 6

requirements = ["torch", "tqdm"]
_OK = True

for name in requirements:
    try:
        importlib.import_module(name)
    except ImportError:
        print(f"Require: {name}")
        _OK = False

if not _OK:
    exit(-1)
else:
    print("All libraries are satisfied.")

## 辅助代码

该部分包含：用于给句子分词的分词器`tokenizer`、用于构造数据的数据集类`Dataset`和用于构建词表的词表类`Vocab`。

> 注：该部分无需实现。

### 分词器

该分词器会：
1. 将所有字母转为小写；
2. 将句子分为连续的字母序列（word）

In [None]:
import re
from typing import List, Tuple

def tokenizer(line: str) -> List[str]:
    line = line.lower()
    tokens = list(filter(lambda x: len(x) > 0, re.split(r"\W", line)))
    return tokens

print(tokenizer("It's  useful. "))

### 数据集类

语料数据集类`CorpusDataset`读取`corpus`中的行，并依据设定的窗口长度`window_size`解析返回`(context, target)`元组。

假如一个句子序列为`a b c d e`，且此时`window_size=2`，`CorpusDataset`会返回：

```
([b, c], a)
([a, c, d], b)
([a, b, d, e], c)
([b, c, e], d)
([c, d], e)
```

> 该`CorpusDataset`类继承自torch提供的数据集类`Dataset`。torch对该类提供了多种工具函数，配合`DataLoader`可以便捷地完成批次加载、数据打乱等数据集处理工作。

In [4]:
import torch
from torch.utils.data import Dataset

class CorpusDataset(Dataset):
    def __init__(self, corpus_path: str, window_size: int) -> None:
        """
        :param corpus: 语料路径
        :param window_size: 窗口长度
        """
        self.corpus_path = corpus_path
        self.window_size = window_size

        self.data = self._load_data()

    def _load_data(self) -> List:
        data = []
        with open(self.corpus_path, encoding="utf-8") as f:
            for line in f:
                tokens = tokenizer(line)
                if len(tokens) <= 1:
                    continue
                for i, target in enumerate(tokens):
                    left_context = tokens[max(0, i - self.window_size): i]
                    right_context = tokens[i + 1: i + 1 + self.window_size]
                    context = left_context + right_context
                    data.append((context, target))
        return data

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, idx) -> Tuple[List[str], str]:
        return self.data[idx]

In [None]:
debug_dataset = CorpusDataset("./data/debug2.txt", window_size=3)
print(len(debug_dataset))

for i, pair in enumerate(iter(debug_dataset)):
    print(pair)
    if i >= 3:
        break

del debug_dataset

### 词表类

`Vocab`可以用`token_to_idx`把token(str)映射为索引(int)，也可以用`idx_to_token`找到索引对应的token。

实例化`Vocab`有两种方法：
1. 读取`corpus`构建词表。
2. 通过调用`Vocab.load_vocab`，可以从已训练的词表中构建`Vocab`实例。

In [6]:
import os
import warnings
from collections import Counter
from typing import Dict

class Vocab:
    VOCAB_FILE = "vocab.txt"
    UNK = "<unk>"

    def __init__(self, corpus: str=None, max_vocab_size: int=-1):
        """
        :param corpus:         语料文件路径
        :param max_vocab_size: 最大词表数量，-1表示不做任何限制
        """
        self._token_to_idx: Dict[str, int] = {}
        self.token_freq: List[Tuple[str, int]] = []

        if corpus is not None:
            self.build_vocab(corpus, max_vocab_size)

    def build_vocab(self, corpus: str, max_vocab_size: int=-1):
        """ 统计词频，并保留高频词 """
        counter = Counter()
        with open(corpus, encoding="utf-8") as f:
            for line in f:
                tokens = tokenizer(line)
                counter.update(tokens)

        print(f"总Token数: {sum(counter.values())}")

        # 将找到的词按照词频从高到低排序
        self.token_freq = [(self.UNK, 1)] + sorted(counter.items(), key=lambda x: x[1], reverse=True)
        if max_vocab_size > 0:
            self.token_freq = self.token_freq[:max_vocab_size]

        print(f"词表大小: {len(self.token_freq)}")

        for i, (token, _freq) in enumerate(self.token_freq):
            self._token_to_idx[token] = i

    def __len__(self):
        return len(self.token_freq)

    def __contains__(self, token: str):
        return token in self._token_to_idx

    def token_to_idx(self, token: str, warn: bool = False) -> int:
        """ 将token映射至索引 """
        token = token.lower()
        if token not in self._token_to_idx:
            if warn:
                warnings.warn(f"{token} => {self.UNK}")
            token = self.UNK
        return self._token_to_idx[token]

    def idx_to_token(self, idx: int) -> str:
        """ 将索引映射至token """
        assert 0 <= idx < len(self), f"Index {idx} out of vocab size {len(self)}"
        return self.token_freq[idx][0]

    def save_vocab(self, path: str):
        with open(os.path.join(path, self.VOCAB_FILE), "w", encoding="utf-8") as f:
            lines = [f"{token} {freq}" for token, freq in self.token_freq]
            f.write("\n".join(lines))

    @classmethod
    def load_vocab(cls, path: str):
        vocab = cls()

        with open(os.path.join(path, cls.VOCAB_FILE), encoding="utf-8") as f:
            lines = f.read().split("\n")

        for i, line in enumerate(lines):
            token, freq = line.split()
            vocab.token_freq.append((token, int(freq)))
            vocab._token_to_idx[token] = i

        return vocab

In [None]:
debug_vocab = Vocab("./data/debug2.txt")
print(debug_vocab.token_freq)
del debug_vocab

## Word2Vec实现

本节将实现Word2Vec的CBOW模型，为了便于实现，本实验不引入`Hierarchical Softmax`和`Negative Sampling`等加速技巧，若同学们对这些技术感兴趣，可参考：[word2vec Parameter Learning Explained](https://arxiv.org/pdf/1411.2738.pdf)。

### 1. 实现one-hot向量构建函数

需求：指定词向量的维度和需要置1的索引，返回`torch.Tensor`张量格式的one-hot行向量。

请手动操作张量实现该需求， **不要直接使用库中已有的`torch.nn.functional.one_hot`函数，否则不得分！** 你可以在实现后与库函数的结果相比对来验证正确性。

In [8]:
def one_hot(idx: int, dim: int) -> torch.Tensor:
    # [1] TODO: 实现one-hot函数【1分】
    pass

In [None]:
import torch.nn.functional as F
src = one_hot(1, 4)
ref = F.one_hot(torch.tensor(1), num_classes=4)
print(f"参考值：{ref}")
print(f"测试值：{src}")

### 2. 实现softmax函数
请手动操作张量，实现softmax函数。**直接使用torch的softmax方法不得分！**

In [10]:
def softmax(x: torch.Tensor) -> torch.Tensor:
    # [2] TODO: 实现softmax函数【2分】
    pass

In [None]:
src = softmax(torch.arange(10).float())
ref = F.softmax(torch.arange(10).float(), dim=0)
print(f"参考值：{ref}")
print(f"测试值：{src}")

### 3. 实现CBOW类并训练模型

推荐按照TODO描述的步骤以及限定的代码块区域来实现（预计15行代码），也可在保证结果正确的前提下按照自己的思路来实现。请手动操作张量实现反向传播与模型训练，**直接使用loss.backward()、optimizer等torch内置方法不得分！**

> 建议利用torch提供的张量操作（点积、外积、矩阵乘等）替代python的循环，高效处理数据。

> `torch.nn.Module`是torch中神经网络模型的基类，大多数模型的定义都继承于此。其中的`forward`函数相当于`__call__`方法，一般用于处理模型的前向传播步骤。因此如果你定义了一个实例`cbow = CBOW()`，你可以直接用`cbow(input)`来调用它的`forward`函数并获得模型输出。

> 一般来说，模型接受的输入往往是一个批次（batch）；本次作业为实现方便起见不使用batch，只需考虑单条输入的前向与反向传播即可。

In [12]:
import os
import time

from tqdm import tqdm

class CBOW(torch.nn.Module):
    def __init__(self, vocab: Vocab, vector_dim: int):
        super().__init__()
        self.vocab = vocab
        self.vector_dim = vector_dim

        self.U_proj = torch.nn.Linear(len(self.vocab), vector_dim, bias=False)  # (vocab_size, vector_dim)
        self.V_proj = torch.nn.Linear(vector_dim, len(self.vocab), bias=False)  # (vector_dim, vocab_size)
        torch.nn.init.uniform_(self.U_proj.weight, -1, 1)
        torch.nn.init.uniform_(self.V_proj.weight, -1, 1)

    def forward(self, x):
        h, o, y = None, None, None
        # [3] TODO: 实现前向传播逻辑【3分】 ==========================>>>
        # 使用之前定义的softmax函数完成输出概率的归一化
        # 注意返回中间结果，以便于在训练时反向传播使用
        pass
        # [3] <<<======================= END ==========================
        return y, (h, o)

    def train(self, corpus: str, window_size: int, train_epoch: int, learning_rate: float=1e-1, save_path: str = None):
        dataset = CorpusDataset(corpus, window_size)
        start_time = time.time()

        for epoch in range(1, train_epoch + 1):
            avg_loss = self.train_one_epoch(epoch, dataset, learning_rate)
            if save_path is not None:
                self.save_model(save_path)

        end_time = time.time()
        print(f"总耗时 {end_time - start_time:.2f}s")

    def train_one_epoch(self, epoch: int, dataset: CorpusDataset, learning_rate: float) -> float:
        steps, total_loss = 0, 0.0

        with tqdm(dataset, desc=f"Epoch {epoch}") as pbar:
            for sample in pbar:
                context_tokens, target_token = sample
                loss = self.train_one_step(context_tokens, target_token, learning_rate)
                total_loss += loss
                steps += 1
                if steps % 10 == 0:
                    pbar.set_postfix({"Avg. loss": f"{total_loss / steps:.4f}"})

        return total_loss / steps

    def train_one_step(self, context_tokens: List[str], target_token: str, learning_rate: float, debug: bool=False) -> float:
        """
        :param context_tokens:  目标词周围的词
        :param target_token:    目标词
        :param learning_rate:   学习率
        :return:    loss值 (标量)
        """
        context, target = None, None
        # [4] TODO: 使用one_hot函数，构建输入与输出的0-1向量【2分】 ===>>>
        pass
        # [4] <<<======================= END ==========================

        pred, (h, o) = self.forward(context)

        loss = None
        # [5] TODO: 计算交叉熵损失loss【1分】 ========================>>>
        pass
        # [5] <<<======================= END ==========================

        dV_proj, dU_proj = None, None
        # [6] TODO: 计算U与V的梯度【3分】 ============================>>>
        pass
        # [6] <<<======================= END ==========================
        
        # [7] TODO: 更新U与V的参数【2分】 ============================>>>
        pass
        # [7] <<<======================= END ==========================

        if debug:
            print(f"Loss: {loss.item()}")
            print(f"Gradient of U_proj:\n{dU_proj.T.detach()}")
            print(f"Gradient of V_proj:\n{dV_proj.T.detach()}")

        return loss.item()

    def similarity(self, token1: str, token2: str) -> float:
        """ 计算两个词的相似性 """
        v1 = self.U_proj.weight.T[self.vocab.token_to_idx(token1)]
        v2 = self.U_proj.weight.T[self.vocab.token_to_idx(token2)]
        return torch.cosine_similarity(v1, v2).item()

    def most_similar_tokens(self, token: str, n: int):
        """ 召回与token最相似的n个token """
        idx = self.vocab.token_to_idx(token, warn=True)
        token_v = self.U_proj.weight.T[idx]

        similarities = torch.cosine_similarity(token_v, self.U_proj.weight.T)
        nbest_idx = similarities.argsort(descending=True)[:n]

        results = []
        for idx in nbest_idx:
            _token = self.vocab.idx_to_token(idx)
            results.append((_token, similarities[idx].item()))

        return results

    def save_model(self, path: str):
        """ 将模型保存到`path`路径下，如果不存在`path`会主动创建 """
        os.makedirs(path, exist_ok=True)
        self.vocab.save_vocab(path)
        torch.save(self.state_dict(), os.path.join(path, "model.pth"))

    @classmethod
    def load_model(cls, path: str):
        """ 从`path`加载模型 """
        vocab = Vocab.load_vocab(path)
        state_dict = torch.load(os.path.join(path, "model.pth"))
        model = cls(vocab, state_dict["U_proj.weight"].size(0))
        model.load_state_dict(state_dict)

        return model

## 测试

测试部分可用于验证CBOW实现的正确性。为了方便检查结果，请不要对训练的参数做修改。

### 测试1：loss计算与反向传播

本测试使用torch自带的损失函数与梯度反传功能对张量进行计算。如果你的实现正确，应当可以看到手动计算与自动计算得到的损失与梯度值相等或几近相等。

In [None]:
import random

def test1():
    random.seed(42)
    torch.manual_seed(42)

    vocab = Vocab(corpus="./data/debug1.txt")
    cbow = CBOW(vocab, vector_dim=3)

    print("********** 参考值 **********")
    x = F.one_hot(
        torch.tensor([cbow.vocab.token_to_idx("1"), cbow.vocab.token_to_idx("3")]), num_classes=len(vocab)
    ).float().sum(dim=0)
    label = F.one_hot(torch.tensor(cbow.vocab.token_to_idx("2")), num_classes=len(vocab)).float()
    y, (h, o) = cbow(x)
    loss_fct = torch.nn.CrossEntropyLoss()
    loss = loss_fct(o.unsqueeze(0), torch.argmax(label).unsqueeze(0))
    loss.backward()
    print("Loss:", loss.item())
    print(f"Gradient of U_proj:\n{cbow.U_proj.weight.grad}")
    print(f"Gradient of V_proj:\n{cbow.V_proj.weight.grad}")

    print("\n********** 测试值 **********")
    cbow.train_one_step(["1", "3"], "2", 1, debug=True)
    
test1()

### 测试2：CBOW的简单训练

本测试可用于验证CBOW的整个训练流程。如果你的实现正确，可以看到最终一个epoch的平均loss约在0.5~0.6，并且“i”、“he”和“she”的相似性较高。

In [None]:
def test2():
    random.seed(42)
    torch.manual_seed(42)
    
    vocab = Vocab(corpus="./data/debug2.txt")
    cbow = CBOW(vocab, vector_dim=8)
    cbow.train(corpus="./data/debug2.txt", window_size=3, train_epoch=10, learning_rate=1.0)

    print(cbow.most_similar_tokens("i", 5))
    print(cbow.most_similar_tokens("he", 5))
    print(cbow.most_similar_tokens("she", 5))

test2()

### 测试3：正式训练CBOW模型

本测试将会在`treebank.txt`上训练词向量，为了加快训练流程，实验只保留高频的4000词，且词向量维度为20。

在每个epoch结束后，会在`data/treebank.txt`中测试词向量的召回能力。如下所示，`data/treebank.txt`中每个样例为`word`以及对应的同义词，同义词从wordnet中获取。

```python
[
    "about",
    [
        "most",
        "virtually",
        "around",
        "almost",
        "near",
        "nearly",
        "some"
    ]
]
```

> 本阶段预计消耗40分钟，具体时间与代码实现有关。最后一个epoch平均loss降至5.1左右，并且在同义词上的召回率约为17~18%左右。

In [None]:
import json

def calculate_recall_rate(model: CBOW, word_synonyms: List[Tuple[str, List[str]]], topn: int) -> float:
    """ 测试CBOW的召回率 """
    hit, total = 0, 1e-9
    for word, synonyms in word_synonyms:
        synonyms = set(synonyms)
        recalled = set([w for w, _ in model.most_similar_tokens(word, topn)])
        hit += len(synonyms & recalled)
        total += len(synonyms)

    print(f"Recall rate: {hit / total:.2%}")
    return hit / total

def test3():
    random.seed(42)
    torch.manual_seed(42)

    corpus = "./data/treebank.txt"
    lr = 1e-1
    topn = 40

    vocab = Vocab(corpus, max_vocab_size=4000)
    model = CBOW(vocab, vector_dim=20)

    dataset = CorpusDataset(corpus, window_size=4)

    with open("data/synonyms.json", encoding="utf-8") as f:
        word_synonyms: List[Tuple[str, List[str]]] = json.load(f)

    for epoch in range(1, 11):
        model.train_one_epoch(epoch, dataset, learning_rate=lr)
        calculate_recall_rate(model, word_synonyms, topn)

test3()

## 实验总结

> [8] TODO：请在这里写下你的实验总结。**【1分】**