# LSTM仿写莎士比亚十四行诗

**任务要求：利用LSTM网络实现莎士比亚风格的十四行诗文本生成。请勿使用Copilot等深度学习代码提示工具完成此次作业。**  
**数据: shakespeare_sonnets.txt**  
**你将学会：**
1. 对文本进行单词级别的编码和解码
2. 以LSTM网络为核心部件的神经网络的定义和使用
3. 文本生成任务的基本训练流程（基于交叉熵损失函数）
4. 文本生成网络的基本采样流程

导入必要的库，若缺少库请自行安装。其中`simple_tokenizer`是文件夹中定义好的模块文件，无需安装。

In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import string
import matplotlib.pyplot as plt
from simple_tokenizer import tokenize, detokenize
from einops import rearrange,repeat
%matplotlib inline

ModuleNotFoundError: No module named 'simple_tokenizer'

In [None]:
# 设置运输设备
use_gpu = True if torch.cuda.is_available() else False # 如果GPU显存不够，可以手动设置为False
device_id = torch.device("cuda" if use_gpu else "cpu")
print('Device:', device_id)

Device: cuda


## 1. 数据导入和预处理

In [None]:
sonnet_list = [] # 列表中的每一个元素是一首诗的字符串
with open('shakespeares_sonnets.txt', 'r') as f:
    while True:
        text = ''
        line1 = f.readline() # ignore the first line
        if not line1:
            break
        assert eval(line1) == len(sonnet_list) + 1
        line2 = f.readline() # ignore the second line
        while True:
            line = f.readline()
            if line == '\n' or not line:
                break
            text += line.lower().strip() + '\n'
        sonnet_list.append(text)
print('first sonnet:\n%s' % sonnet_list[0])

first sonnet:
from fairest creatures we desire increase,
that thereby beauty's rose might never die,
but, as the riper should by time decease,
his tender heir might bear his memory.
but thou, contracted to thine own bright eyes,
feed'st thy light's flame with self-substantial fuel,
making a famine where abundance lies,
thyself thy foe, to thy sweet self too cruel.
thou that art now the world's fresh ornament
and only herald to the gaudy spring
within thine own bud buriest thy content
and, tender churl, mak'st waste in niggarding.
pity the world, or else this glutton be--
to eat the world's due, by the grave and thee.



我们使用在`simple_tokenizer.py`中定义好的`tokenizer`函数把文本分割成单词列表，其中每个单词称为一个token。

In [None]:
tokens = tokenize(sonnet_list[0])
print('Number of words in first sonnet: ', len(tokens))
print(tokens[:10]) # 打印前10个token
string = detokenize(tokens)
print('Reconstructed text:\n%s' % string)

Number of words in first sonnet:  145
['Afrom', 'Afairest', 'Acreatures', 'Awe', 'Adesire', 'Aincrease', 'S,', 'S\n', 'Athat', 'Athereby']
Reconstructed text:
from fairest creatures we desire increase,
that thereby beauty's rose might never die,
but, as the riper should by time decease,
his tender heir might bear his memory.
but thou, contracted to thine own bright eyes,
feed'st thy light's flame with self-substantial fuel,
making a famine where abundance lies,
thyself thy foe, to thy sweet self too cruel.
thou that art now the world's fresh ornament
and only herald to the gaudy spring
within thine own bud buriest thy content
and, tender churl, mak'st waste in niggarding.
pity the world, or else this glutton be--
to eat the world's due, by the grave and thee.



对数据集中的所有字符串都转化为token列表，并统计所有出现过至少一次的token--这些token将组成模型的词汇表（vocabulary）。

然后在词表中添加三个特殊的token：
+ `<pad>`: 用于填充序列，使得模型训练时，所有输入序列的长度相同
+ `<start>`: 用于标记序列的开始
+ `<end>`: 用于标记序列的结束

例如，当规定序列的最大长度为8时，对于输入token序列`['hello', 'world']`，我们将预处理为`['<start>', 'hello', 'world', '<end>', '<pad>', '<pad>', '<pad>', '<pad>']`（该预处理操作将会在dataset类中实现）。

In [None]:
sonnets_in_tokens = [] # 每一个元素是一个token列表
vocab = set()          # 词表先采用set数据结构来构建，方便去重
max_length = 0         # 按token数量来计算，最长的诗的长度
for son in sonnet_list:
    tokens = tokenize(son)
    sonnets_in_tokens.append(tokens)
    for tok in tokens:
        vocab.add(tok)
    length = len(tokens)
    if length > max_length:
        max_length = length

vocab = list(vocab)   # 词表转换为列表，方便索引
vocab += ['<PAD>', '<START>', '<END>'] # 添加三个特殊token
print('Number of unique words in the vocabulary:', len(vocab))
print('Maximum length of tokens of sonnets in dataset:', max_length)

Number of unique words in the vocabulary: 3115
Maximum length of tokens of sonnets in dataset: 169


token可以转化为自然数形式的索引（id），即其在`vocab`列表中的index，从而形成一一映射。神经网络将以id序列作为输入，并预测下一个token的id，因此为了方便实现，我们将所有的诗歌转化为id序列的形式。

In [None]:
token_to_id = {tok: i for i, tok in enumerate(vocab)} # 用dict结构构造反向索引
sonnets_in_ids = []  # 每一个元素是一个id列表
for son in sonnets_in_tokens:
    ids = []
    for tok in son:
        # 请完善此处代码，将诗歌数据集中的token转换为id
        ids.append(token_to_id[tok])
    sonnets_in_ids.append(ids)    

当神经网络预测了一个id序列，我们需要一个函数完成从id序列到string的解码。

In [None]:
def decode(ids):
    tokens = []
    for i in ids: 
        tok = vocab[i]
        if tok in ['<START>']:
            continue
        if tok in ['<END>', '<PAD>']:
            break
        tokens.append(tok)
    string = detokenize(tokens)
    return string
print(decode(sonnets_in_ids[0]))

from fairest creatures we desire increase,
that thereby beauty's rose might never die,
but, as the riper should by time decease,
his tender heir might bear his memory.
but thou, contracted to thine own bright eyes,
feed'st thy light's flame with self-substantial fuel,
making a famine where abundance lies,
thyself thy foe, to thy sweet self too cruel.
thou that art now the world's fresh ornament
and only herald to the gaudy spring
within thine own bud buriest thy content
and, tender churl, mak'st waste in niggarding.
pity the world, or else this glutton be--
to eat the world's due, by the grave and thee.



## 2. 数据集类定义

按照使用PyTorch进行深度学习的惯例，我们会定义一个`Dataset`类，该类继承自`torch.utils.data.Dataset`，并要求实现`__len__`和`__getitem__`方法。然后使用`torch.utils.data.DataLoader`类来提供数据迭代器，该迭代器将自动完成数据的批量化和打乱等操作，并在背后提供了多线程数据加载的功能。建议阅读[资料](https://blog.csdn.net/flyconley/article/details/119119817)。

请根据提示完善代码中`__getitem__`方法的定义。注意，当你正确实现了`__getitem__`方法，应当可以通过下面的`assert`检查。

In [None]:
class dataset(Dataset):
    def __init__(self, sonnets_in_ids, vocab, max_seq_length):
        super().__init__()
        self.data = sonnets_in_ids
        self.vocab = vocab
        self.vocab_size = len(vocab)
        self.pad_id = self.vocab.index('<PAD>')
        self.start_id = self.vocab.index('<START>')
        self.end_id = self.vocab.index('<END>')
        self.max_seq_length = max_seq_length + 2
    
    def __len__(self):
        """返回数据集大小"""
        return len(self.data)
    
    def __getitem__(self, index):
        """抽取第index个样本，对其进行处理（加上`<START>`，`<END>`和`<PAD>`相应的id），
        然后转化为torch.LongTensor（维度为[max_seq_length]），最后返回该tensor
        """
        x = self.data[index]# 取data中的第index个元素
        x = [self.start_id] + x + [self.end_id]
        x += [self.pad_id] * (self.max_seq_length - len(x))
        x = torch.LongTensor(x)# 转化为torch.LongTensor
        return x

In [None]:
BATCH_SIZE = 4  # 可以自行调整batch size
train_set = dataset(sonnets_in_ids, vocab, max_length)
assert train_set[0].shape == torch.Size([max_length + 2])
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, drop_last=True)

## 3. 模型定义

我们将定义SonNet类，它拟合了条件概率：
$$
P(y_i|y_{i-1}, y_{i-2}, \cdots, y_1)
$$
其中$y_i$表示生成序列中第$i$个token的id，$y_{i-1}$表示第$i-1$个token的id，以此类推。简单来说，输入一个已经生成的id序列，模型将预测下一个序列元素是某id的概率（类似于分类网络中的分类概率）。我们通常使用`softmax`函数将分类线性层的输出（称为logits，值域为$\mathbb{R}$）转化为概率（值域为$[0, 1]$），但是在实际实现中我们把`softmax`函数操作放在`forward`函数外部，因为：
1. 损失函数`nn.CrossEntropyLoss`已经包含了`softmax`函数，因此我们不需要再次调用`softmax`函数
2. 方便在采样（文本生成）时对概率的计算进行调整（例如引入`temperature`参数）

在训练和生成时，模型具有不同的输入输出形式（但可以共享同一套forward流程）。如下图所示，在训练时（左图），输入为`[0:seq_len-1]`的id序列，输出标签（label）为`[1:seq_len]`的id序列，即相同的序列长度但是向左偏移一个位置。在生成时（右图），一次生成一个id，输出重新作为输入。
![lstm](./imgs/lstm.png)
阅读[LSTM](https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html#torch.nn.LSTM)和[Embedding](https://pytorch.org/docs/stable/generated/torch.nn.Embedding.html#torch.nn.Embedding)的文档，完成`forward`方法的定义。

In [None]:
class SonNet(nn.Module):
    def __init__(self, vocab_size, hidden_size, output_size, num_layers=1):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = nn.Embedding(vocab_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, inputs, hidden=None):
        """完成模型的前向传播过程
        输入
        inputs: tensor，维度为`[batch_size, max_seq_length]`
        hidden: tuple，仅在采样时使用，包含了LSTM的初始隐状态向量和细胞状态向量，见输出`hidden`的说明
        输出
        outputs: tensor，维度为`[batch_size, max_seq_length, vocab_size]`，值域为$\mathbb{R}$
        hidden: tuple，包含了LSTM的最终隐状态向量和细胞状态向量，仅在采样时使用，一次`forward`的hidden输出作为
            下一次forward的`hidden`输入（第一次仍然输入`None`）
        """
        inputs = self.embedding(inputs) # [batch_size, max_seq_length, hidden_size]
        if inputs.ndim == 2:
            inputs = repeat(inputs, 'a b -> c a b', c=1)
        # inputs.to(device_id)
        # outputs, self.hidden = nn.LSTM(inputs,self.hidden)
        outputs, hidden = self.lstm(inputs, hidden)
        
        return logits, hidden

## 4. 模型训练

实例化模型、优化器和损失函数。可自行调整超参数，例如学习率、隐状态向量维度等。

In [None]:
HIDDEN_SIZE = 256
LR = 0.001
EPOCHS = 10  # 可以自行调整训练轮数，建议先设置较少轮数验证实现的准确性，再调整轮数使模型loss收敛
model = SonNet(vocab_size=len(vocab), hidden_size=HIDDEN_SIZE, output_size=len(vocab))
model.to(device_id)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = nn.CrossEntropyLoss(ignore_index=train_set.pad_id)

根据最大化似然函数方法，我们要最大化采样（生成）到训练样本的概率，这等价于最小化交叉熵损失函数。如第三节中的图所示，我们可以一次性输入整个序列，并在输出上对每一个位置计算交叉熵损失函数（`nn.CrossEntropyLoss(ignore_index=train_set.pad_id)`会忽略输出label为`<PAD>`的loss）。请完成loss计算和模型权重更新的代码。

In [None]:
loss_cache = []
print(1)
for epoch in range(EPOCHS):
    print(1)
    epoch_loss_sum = 0
    for i, batch in enumerate(train_loader):
        batch = batch.to(device_id)
        inputs = batch[:, :-1].contiguous()
        labels = batch[:, 1:].contiguous()
        print(inputs,labels)
        logits, _ = model(inputs)
        # 完成loss的计算，反向传播和权重更新
        print(1)
        loss = criterion(logits, labels)# .mean()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(i,loss)
        loss_cache.append(loss.item())
        epoch_loss_sum += loss.item()
    epoch_loss_avg = epoch_loss_sum / len(train_loader)
    print('Epoch: {}, Loss: {}'.format(epoch, epoch_loss_avg))

1
1


In [None]:
# 画出loss曲线
plt.plot(loss_cache)
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.show()

NameError: name 'loss_cache' is not defined

## 5. 序列采样

本节没有代码实现任务。请阅读并理解生成代码，实验模型的生成效果并回答相关问题。

In [None]:
@torch.no_grad()
def generate(model, start_token='<START>', max_length=500, temperature=1.0):
    model.eval()
    start_id = token_to_id[start_token]
    x = torch.LongTensor([start_id]).to(device_id)
    hidden = None
    ids = []
    logprob = 0.
    for i in range(max_length):
        logits, hidden = model(x, hidden)
        logits = logits.view(-1)
        probs = torch.softmax(logits / temperature, dim=-1)
        next_id = torch.multinomial(probs, 1)
        next_id = next_id.item()
        next_prob = probs[next_id]
        logprob += torch.log(next_prob)
        ids.append(next_id)
        x = torch.LongTensor([next_id]).to(device_id)
        if next_id == token_to_id['<END>']:
            break
    string = decode(ids)
    return string, logprob.item()

In [None]:
string, logprob = generate(model, temperature=0.3)
print('Logprob of generated string:', logprob)
print('Generated string:\n%s' % string)

回答下列问题：
1. 请简要评价`generate`函数的生成效果？有哪些可能的改进（对模型改进、对生成函数的改进均可）？
2. 分析并解释参数`temperature`的作用？它是如何影响生成结果的多样性和可靠性的？
3. （附加题）`logprob`表示了生成某个字符串的对数概率值，`logprob`越大，则模型认为该字符串有更大的概率生成，字符串也就越可靠（语句更通顺，更接近训练样本的语言风格）。给定一个已训练好的模型，怎样改进`generate`函数可以使得生成字符串的`logprob`更大？实现你所描述的改进可直接获得本次作业的满分。