## CS310 Natural Language Processing
## Assignment 3 (part 1). Recurrent Neural Networks for Language Modeling

**Total points**: 30

In this assignment, you will train a vanilla RNN language model on《论语》and evaluate its perplexity.

### 0. Import Necessary Libraries

In [7]:
from pprint import pprint
import torch.nn as nn
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

### 2. Build the Model

In [8]:
input_file = 'lunyu_20chapters.txt'

from utils import CorpusReader
corpus = CorpusReader(inputFileName=input_file, min_count=1)

word2id: dict = {}
id2word: dict = {}

word2id.update({'[PAD]': 0})
word2id.update({k: v+1 for k, v in corpus.word2id.items()})
id2word = {v: k for k, v in word2id.items()}


lines = []
with open(input_file, 'r', encoding='utf-8') as f:
    for i, line in enumerate(f):
        lines.append(line.strip())

embedding_lunyu = nn.Embedding(len(word2id), 50)
rnn_lunyu = nn.RNN(50, 100, batch_first=True)

seq_ids = [torch.tensor([word2id.get(w, 0) for w in line], dtype=torch.long) for line in lines]
seq_lens = torch.tensor([len(line) for line in seq_ids])
seq_ids_padded = nn.utils.rnn.pad_sequence(seq_ids, batch_first=True)

seq_embs = embedding_lunyu(seq_ids_padded)
seq_embs_packed = nn.utils.rnn.pack_padded_sequence(seq_embs, seq_lens, batch_first=True, enforce_sorted=False)

out_packed,_= rnn_lunyu(seq_embs_packed)
out_unpacked,_= nn.utils.rnn.pad_packed_sequence(out_packed, batch_first=True)


targets_padded = seq_ids_padded.clone()
for i in range(len(targets_padded)):
    targets_padded[i, :-1] = targets_padded[i, 1:].clone()
    targets_padded[i, -1] = word2id.get('[PAD]', 0)





Total vocabulary: 1352
logits: torch.Size([512, 393, 1353])
log_probs: torch.Size([512, 393, 1353])
loss: torch.Size([201216])


In [9]:
class RNNLM(nn.Module):
    def __init__(self, **kwargs):
        # super(RNNLM, self).__init__()
        # self.embedding = nn.Embedding(kwargs['vocab_size'], kwargs['emb_size'])
        # self.rnn = nn.RNN(kwargs['emb_size'], kwargs['hidden_size'], batch_first=True)
        # self.fc = nn.Linear(kwargs['hidden_size'], kwargs['vocab_size'])

        super(RNNLM, self).__init__()
        self.embedding = nn.Embedding(kwargs['vocab_size'], kwargs['emb_size'])

        # 多层RNN
        num_layers = 3
        self.rnn_layers = nn.ModuleList()
        for i in range(num_layers):
            input_size = kwargs['emb_size'] if i == 0 else kwargs['hidden_size']
            self.rnn_layers.append(nn.RNN(input_size, kwargs['hidden_size'], batch_first=True))

        self.fc = nn.Linear(kwargs['hidden_size'], kwargs['vocab_size'])



def forward(self, seq, seq_lens):
        embedded = self.embedding(seq)
        packed = nn.utils.rnn.pack_padded_sequence(embedded, seq_lens, batch_first=True, enforce_sorted=False)

        # 多层RNN的前向传播
        rnn_output = packed
        for rnn_layer in self.rnn_layers:
            rnn_output, _ = rnn_layer(rnn_output)

        padded, _ = nn.utils.rnn.pad_packed_sequence(rnn_output, batch_first=True)
        logits = self.fc(padded)
        return logits


### 3. Train and Evaluate

In [9]:
fc = nn.Linear(100, len(word2id))
logits = fc(out_unpacked)
# log_probs = F.log_softmax(logits, dim=-1)
#
#
# # Test result
# print('logits:', logits.size())
# print('log_probs:', log_probs.size())
#
# # Report Compute Perplexity
# print('Report Compute Perplexity:')
# loss_fn = nn.NLLLoss(ignore_index=0, reduction='none')
#
#
# # Calculate the loss
# with torch.no_grad():
#     loss = loss_fn(log_probs.view(-1, log_probs.size(-1)), targets_padded.view(-1))
#
# # Test result
# print('loss:', loss.size())



# 定义计算困惑度的函数
def compute_perplexity(logits, targets):
    with torch.no_grad():
        log_probs = F.log_softmax(logits, dim=-1)
        loss = F.nll_loss(log_probs.view(-1, log_probs.size(-1)), targets.view(-1), ignore_index=0, reduction='none')
        # num_words = targets.ne(0).sum().item()  # 计算非填充词的数量
        perplexity = torch.exp(loss.mean())
    return perplexity

# 计算训练集的困惑度
perplexity = compute_perplexity(logits, targets_padded)
print(f"Perplexity on training set: {perplexity.item()}")



# 词汇表大小
vocab_size = len(word2id)

# 嵌入维度
emb_size = embedding_lunyu.embedding_dim

# 隐藏层维度
hidden_size = rnn_lunyu.hidden_size

# # 输入序列的填充后的张量
# seq_ids_padded = seq_ids_padded
#
# # 输入序列的长度
# seq_lens = seq_lens
#
# # 目标序列的填充后的张量
# targets_padded = targets_padded

model = RNNLM(vocab_size=vocab_size, emb_size=emb_size, hidden_size=hidden_size)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

# 训练过程
model.train()
num_epochs = 5  # 迭代次数
for epoch in range(num_epochs):
    optimizer.zero_grad()
    logits = model(seq_ids_padded, seq_lens)
    loss = criterion(logits.view(-1, logits.shape[-1]), targets_padded.view(-1))
    loss.backward()
    optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}")

# 评估过程
# model.eval()
# with torch.no_grad():
#     logits = model(seq_ids_padded, seq_lens)
#     loss = criterion(logits.view(-1, logits.shape[-1]), targets_padded.view(-1))
#     perplexity = torch.exp(loss)
#
#     print(f"Perplexity: {perplexity.item()}")


def generate_sentence(model, start_tokens, max_length=20):
    # 将模型设置为评估模式
    model.eval()
    with torch.no_grad():
        current_tokens = start_tokens[:]  # 初始化当前的标记序列为起始标记序列
        hidden = None  # 初始化隐藏状态为None，因为我们只生成一个句子
        for _ in range(max_length):  # 迭代生成最大长度的句子
            # 将当前标记序列转换为对应的ID序列，并转换为张量
            input_tensor = torch.tensor([[word2id.get(token, 0) for token in current_tokens]], dtype=torch.long)
            embeddings = model.embedding(input_tensor)  # 将ID序列转换为嵌入向量
            output, hidden = model.rnn(embeddings, hidden)  # 将嵌入向量输入RNN模型得到输出和隐藏状态
            logits = model.fc(output)  # 使用线性层将RNN输出转换为词汇表大小的logits
            probabilities = F.softmax(logits, dim=-1)  # 使用softmax函数将logits转换为概率分布
            current_word = torch.argmax(probabilities[:, -1, :], dim=-1).item()  # 选择概率最高的词作为当前词
            current_token = id2word.get(current_word, "[UNK]")  # 将当前词的ID转换为对应的标记
            current_tokens.append(current_token)  # 将当前标记追加到标记序列中
            if current_token == "。":  # 如果生成了句号，停止生成
                break
        return current_tokens  # 返回生成的标记序列

# 生成句子
num_sentences = 5
start_tokens = ["子", "曰"]
for _ in range(num_sentences):
    sentence = generate_sentence(model, start_tokens)
    print("Generated Sentence:", "".join(sentence))  # 将标记序列连接为字符串并打印出来

### 4. Experiments

In [9]:
 import gensim

model = gensim.models.KeyedVectors.load_word2vec_format(f'embeddings_{emb_size}_{k}_{window_size}.txt')



vacob_size = len(corpus.id2word)
# model = SkipGram(vacob_size, emb_size)

# Get embeddings as numpy array
embeddings = model.embedding.weight.data.numpy()

# # Truncated SVD
# svd = TruncatedSVD(n_components=2)
# embeddings_2d = svd.fit_transform(embeddings)

# 加载预训练嵌入
pretrained_embeddings = torch.load('pretrained_embeddings.pth')  # 根据预训练的嵌入文件的路径进行调整

# 应用预训练嵌入到模型的嵌入层
model.embedding.weight.data.copy_(pretrained_embeddings)

# 计算使用预训练嵌入的困惑度
logits_with_pretrained = model(seq_ids_padded, seq_lens)
perplexity_with_pretrained = compute_perplexity(logits_with_pretrained, targets_padded)
print(f"Perplexity with pretrained embeddings: {perplexity_with_pretrained.item()}")

# 重新随机初始化模型的嵌入层
model.embedding.reset_parameters()

# 计算使用随机初始化嵌入的困惑度
logits_random = model(seq_ids_padded, seq_lens)
perplexity_random = compute_perplexity(logits_random, targets_padded)
print(f"Perplexity with randomly initialized embeddings: {perplexity_random.item()}")