读取数据

In [1]:
with open('./dataset/poetry1.txt', 'r') as f:
    poetry_corpus = f.read()

In [2]:
poetry_corpus[:100]

'寒随穷律变，春逐鸟声开。\n初风飘带柳，晚雪间花梅。\n碧林青旧竹，绿沼翠新苔。\n芝田初雁去，绮树巧莺来。\n晚霞聊自怡，初晴弥可喜。\n日晃百花色，风动千林翠。\n池鱼跃不同，园鸟声还异。\n寄言博通者，知予物'

In [3]:
# 看看字符数
len(poetry_corpus)

942681

将一些其他字符替换成空格

In [4]:
poetry_corpus = poetry_corpus.replace('\n', ' ').replace('\r', ' ').replace('，', ' ').replace('。', ' ')
poetry_corpus[:100]

'寒随穷律变 春逐鸟声开  初风飘带柳 晚雪间花梅  碧林青旧竹 绿沼翠新苔  芝田初雁去 绮树巧莺来  晚霞聊自怡 初晴弥可喜  日晃百花色 风动千林翠  池鱼跃不同 园鸟声还异  寄言博通者 知予物'

文本数值表示


In [5]:
import numpy as np

class TextConverter(object):
    def __init__(self, text_path, max_vocab=10000):
        """建立一个字符索引转换器
        
        Args:
            text_path: 文本位置
            max_vocab: 最大的单词数量
        """
        
        with open(text_path, 'r') as f:
            text = f.read()
        text = text.replace('\n', ' ').replace('\r', ' ').replace('，', ' ').replace('。', ' ')
        # 去掉重复的字符
        vocab = set(text)
        
        # 如果单词总数超过最大数值，去掉频率最低的
        vocab_count = {}
        
        # 计算单词出现频率并排序
        for word in vocab:
            vocab_count[word] = 0
        for word in text:
            vocab_count[word] += 1
        vocab_count_list = []
        for word in vocab_count:
            vocab_count_list.append((word, vocab_count[word]))
        vocab_count_list.sort(key=lambda x: x[1], reverse=True)
        
        # 如果超过最大值，截取频率最低的字符
        if len(vocab_count_list) > max_vocab:
            vocab_count_list = vocab_count_list[:max_vocab]
        vocab = [x[0] for x in vocab_count_list]
        self.vocab = vocab
        
        self.word_to_int_table = {c: i for i, c in enumerate(self.vocab)}
        self.int_to_word_table = dict(enumerate(self.vocab))

    @property
    def vocab_size(self):
        return len(self.vocab) + 1

    def word_to_int(self, word):
        if word in self.word_to_int_table:
            return self.word_to_int_table[word] 
        else:
            return len(self.vocab)

    def int_to_word(self, index):
        if index == len(self.vocab):
            return '<unk>'
        elif index < len(self.vocab):
            return self.int_to_word_table[index]
        else:
            raise Exception('Unknown index!')

    def text_to_arr(self, text):
        arr = []
        for word in text:
            arr.append(self.word_to_int(word))
        return np.array(arr)

    def arr_to_text(self, arr):
        words = []
        for index in arr:
            words.append(self.int_to_word(index))
        return "".join(words)

In [6]:
convert = TextConverter('./dataset/poetry.txt', max_vocab=10000)
print(convert.vocab)

[' ', ':', '不', '人', '山', '一', '日', '风', '无', '中', '云', '上', '有', '春', '天', '何', '来', '花', '月', '时', '水', '相', '长', '君', '年', '归', '为', '秋', '生', '自', '行', '见', '江', '白', '心', '夜', '知', '如', '此', '得', '下', '清', '去', '在', '高', '南', '空', '明', '门', '子', '三', '里', '未', '客', '金', '事', '处', '道', '作', '送', '青', '东', '多', '歌', '别', '寒', '雨', '是', '玉', '落', '家', '城', '声', '远', '新', '千', '朝', '出', '前', '今', '西', '入', '与', '书', '游', '路', '万', '思', '诗', '阳', '草', '寄', '飞', '马', '十', '开', '应', '树', '酒', '同', '深', '我', '尽', '流', '将', '望', '还', '烟', '回', '闻', '和', '已', '地', '成', '闲', '欲', '色', '独', '重', '石', '谁', '雪', '公', '光', '可', '王', '从', '林', '古', '向', '更', '二', '楼', '海', '州', '后', '看', '香', '身', '老', '旧', '满', '情', '愁', '首', '衣', '龙', '头', '之', '方', '过', '红', '五', '难', '黄', '尘', '（', '）', '外', '平', '复', '名', '曲', '大', '言', '台', '乐', '莫', '能', '似', '仙', '分', '北', '安', '犹', '到', '华', '发', '初', '宫', '故', '居', '松', '叶', '离', '当', '晚', '少', '气', '鸟', '起', '竹', '间', '边', '题', '几', '怀', '意', '四', '亦',

In [7]:
# 原始文本字符
txt_char = poetry_corpus[:11]
print(txt_char)

# 转化成数字
print(convert.text_to_arr(txt_char))

寒随穷律变 春逐鸟声开
[  65  217  382 1123  628    0   13  442  189   72   95]


构造时序样本数据


In [8]:
n_step = 20

# 总的序列个数
num_seq = int(len(poetry_corpus) / n_step)

# 去掉最后不足一个序列长度的部分
text = poetry_corpus[:num_seq*n_step]

print(num_seq)

47134


重新排列成 (num_seq x n_step) 的矩阵

In [9]:
import torch

In [10]:
arr = convert.text_to_arr(text)
arr = arr.reshape((num_seq, -1))
arr = torch.from_numpy(arr)

print(arr.shape)
print(arr[0, :])

torch.Size([47134, 20])
tensor([  65,  217,  382, 1123,  628,    0,   13,  442,  189,   72,   95,    0,
           0,  178,    7,  520,  421,  202,    0,  186])


将最后一个字符的输出 label 定为输入的第一个字符，也就是"床前明月光"的输出是"前明月光床"

In [11]:
class TextDataset(object):
    def __init__(self, arr):
        self.arr = arr
        
    def __getitem__(self, item):
        x = self.arr[item, :]
        
        # 构造 label
        y = torch.zeros(x.shape)
        # 将输入的第一个字符作为最后一个输入的 label
        y[:-1], y[-1] = x[1:], x[0]
        return x, y
    
    def __len__(self):
        return self.arr.shape[0]

In [12]:
train_set = TextDataset(arr)

In [13]:
x, y = train_set[-1]
print(convert.arr_to_text(x.numpy()))
print(convert.arr_to_text(y.numpy()))

空 风静林还静  供养及修行 旧话成重省
 风静林还静  供养及修行 旧话成重省空


### 建立模型
模型可以定义成非常简单的三层，第一层是词嵌入，第二层是 RNN 层，因为最后是一个分类问题，所以第三层是线性层，最后输出预测的字符。

In [21]:
import torch
import torch.nn as nn
import torch.nn.functional as F
use_gpu = True
class CharRNN(nn.Module):
    def __init__(self, num_classes, embed_dim, hidden_size, num_layers, dropout=0.5):
        super().__init__()
        self.num_classes = num_classes
        self.embed_dim = embed_dim
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout = dropout
        self.use_gpu = True
        
        # 嵌入层
        self.word_to_vec = nn.Embedding(num_classes, embed_dim)

        # GRU层参数
        self.gru_weights = []
        for _ in range(num_layers):
            self.gru_weights.append({
                'W_ir': nn.Linear(embed_dim if _ == 0 else hidden_size, hidden_size, bias=False),
                'W_iz': nn.Linear(embed_dim if _ == 0 else hidden_size, hidden_size, bias=False),
                'W_in': nn.Linear(embed_dim if _ == 0 else hidden_size, hidden_size, bias=False),
                'W_hr': nn.Linear(hidden_size, hidden_size, bias=False),
                'W_hz': nn.Linear(hidden_size, hidden_size, bias=False),
                'W_hn': nn.Linear(hidden_size, hidden_size, bias=False),
            })
            # 将权重移动到GPU
            if self.use_gpu:
                for k, v in self.gru_weights[-1].items():
                    v.cuda()

        # 分类线性层
        self.project = nn.Linear(hidden_size, num_classes)
        # print(self.project.weight.shape)
    def forward(self, x, hs=None):
        batch_size, seq_len = x.size()
        
        # 如果没有传递隐藏状态，初始化为零
        if hs is None:
            hs = [torch.zeros(batch_size, self.hidden_size).to(x.device) for _ in range(self.num_layers)]
            if use_gpu:
                hs = [h.cuda() for h in hs]
        # 嵌入层：输入索引转化为词向量
        embed = self.word_to_vec(x)  # (batch, seq_len, embed_dim)
        embed = embed.permute(1, 0, 2)  # (seq_len, batch, embed_dim)

        h_list = []  # 保存每个时间步的隐藏状态
        for t in range(seq_len):
            x_t = embed[t]
            for l in range(self.num_layers):
                h_prev = hs[l]
                
                # GRU单元计算
                r_t = torch.sigmoid(self.gru_weights[l]['W_ir'](x_t) + self.gru_weights[l]['W_hr'](h_prev))
                z_t = torch.sigmoid(self.gru_weights[l]['W_iz'](x_t) + self.gru_weights[l]['W_hz'](h_prev))
                n_t = torch.tanh(self.gru_weights[l]['W_in'](x_t) + r_t * self.gru_weights[l]['W_hn'](h_prev))

                h_t = (1 - z_t) * n_t + z_t * h_prev
                hs[l] = h_t

                # Dropout应用在GRU层之间
                if self.dropout > 0 and l < self.num_layers - 1:
                    h_t = F.dropout(h_t, p=self.dropout, training=self.training)
                
                x_t = h_t

            h_list.append(h_t)
        # 输出层
        all_h = torch.stack(h_list, dim=0)  # (seq_len, batch, hidden)
        # print(out.shape)
        le, mb, hd = all_h.shape
        # out = out.view(le * mb, hd)
        # print(out.shape)
        out = self.project(all_h)  # project to num_classes
        out = out.view(le, mb, -1)
        out = out.permute(1, 0, 2).contiguous()  # (batch, len, num_classes)
        
        return out.view(-1, out.shape[2]), hs ,all_h # return the reshaped output and the last hidden state


    def cross_entropy_loss(self, out, y, num_classes):
        
        batch_size, seq_len = y.shape
        

        # 创建 one-hot 编码并移动到 GPU
        y_one_hot = torch.zeros((batch_size * seq_len, num_classes)).cuda()
        y_flat = y.view(-1)  # 展平标签形状为 (batch_size * seq_len,)
        y_one_hot.scatter_(1, y_flat.unsqueeze(1), 1)  # 在指定的索引位置上填充1

        # 计算 softmax
        out_softmax = F.softmax(out, dim=1) 

        # 计算交叉熵损失
        loss = -torch.sum(y_one_hot * torch.log(out_softmax + 1e-9)) / (batch_size * seq_len)  # 防止 log(0)
        return loss, y_one_hot  # 返回 one-hot 编码后的 y 供后续使用




    def backward(self, x, hs, y, out):
        """
        反向传播计算梯度，更新网络参数
        :param x: 输入序列 (batch_size, seq_len)
        :param hs: 隐藏状态 (seq_len, batch_size, hidden_size)
        :param y: 真实标签 (batch_size, seq_len, num_classes)
        :param out: 模型输出 (batch_size, seq_len, num_classes)
        :return: None, 梯度会存储在各参数的.grad中
        """
        # 在反向传播前初始化梯度
        for layer_weights in self.gru_weights:
            for param in layer_weights.values():
                if param.weight.grad is None:
                    param.weight.grad = torch.zeros_like(param.weight)
                else:
                    param.weight.grad.zero_()
        
        # 分类线性层的梯度清零
        if self.project.weight.grad is None:
            self.project.weight.grad = torch.zeros_like(self.project.weight)
        else:
            self.project.weight.grad.zero_()
        
        # 嵌入层的梯度清零
        if self.word_to_vec.weight.grad is None:
            self.word_to_vec.weight.grad = torch.zeros_like(self.project.weight)
        else:
            self.word_to_vec.weight.grad.zero_()
        
        batch_size, seq_len = x.shape
        hidden_size = self.hidden_size
        
        # 1. 计算交叉熵损失对输出的梯度
        dL_dy = out - y  # (batch_size, seq_len, num_classes)
        

        # 2. 计算输出到投影层的梯度
        dL_dW_project = hs.reshape(batch_size * seq_len, hidden_size).T @ dL_dy  # (hidden_size, num_classes)
        dL_db_project = dL_dy.sum(axis=0)  # (num_classes,)

        # 3. 计算GRU层的梯度
        dL_dh = dL_dy @ self.project.weight  # (batch_size * seq_len, hidden_size)
        dL_dh = dL_dh.view(seq_len, batch_size, -1)  # (seq_len, batch_size, hidden_size)
        # print("dl_dh.shape:",dL_dh[0].shape)
        # GRU反向传播
        for t in reversed(range(seq_len)):
            for l in range(self.num_layers - 1, -1, -1):
                h_prev = hs[l][t-1] if t > 0 else torch.zeros_like(hs[l][t])
                h_t = hs[l][t]

                # 计算每一项的梯度
                r_t = torch.sigmoid(self.gru_weights[l]['W_ir'](h_prev) + self.gru_weights[l]['W_hr'](h_t))
                z_t = torch.sigmoid(self.gru_weights[l]['W_iz'](h_prev) + self.gru_weights[l]['W_hz'](h_t))
                n_t = torch.tanh(self.gru_weights[l]['W_in'](h_prev) + r_t * self.gru_weights[l]['W_hn'](h_t))
                # print(h_t.shape)
                # 计算梯度并反向传播
                dL_dn_t = dL_dh[t] * (1 - z_t)
                dL_dz_t = dL_dh[t] * (h_t - h_prev)
                dL_dr_t = dL_dh[t] * (1 - z_t) * h_prev
                # print(self.gru_weights[l]['W_ir'].weight.shape,dL_dr_t[-1].shape)
                # 更新GRU层参数的梯度
                self.gru_weights[l]['W_ir'].weight.grad += dL_dr_t[-1]
                self.gru_weights[l]['W_iz'].weight.grad += dL_dz_t[-1]
                self.gru_weights[l]['W_in'].weight.grad += dL_dn_t[-1]
                self.gru_weights[l]['W_hr'].weight.grad += dL_dr_t[-1]
                self.gru_weights[l]['W_hz'].weight.grad += dL_dz_t[-1]
                self.gru_weights[l]['W_hn'].weight.grad += dL_dn_t[-1]

        # 4. 嵌入层的梯度
        dL_dx_embed = torch.zeros_like(self.word_to_vec.weight.T)  # (embed_dim,num_classes)
        # print(dL_dy[0].shape,self.word_to_vec.weight.grad.shape)
        for t in range(seq_len):
            dL_dx_embed[x[:, t]] += dL_dy[t]  # 累加每个词的梯度

        self.word_to_vec.weight.grad = dL_dx_embed.T
        return dL_dy, dL_dh
    def UpdatePara(self, lr):
        
        # 更新嵌入层权重
        if self.word_to_vec.weight.grad is not None:
            self.word_to_vec.weight.data -= lr * self.word_to_vec.weight.grad

        # 更新GRU层的权重
        for layer in range(self.num_layers):
            for param_name in ['W_ir', 'W_iz', 'W_in', 'W_hr', 'W_hz', 'W_hn']:
                param = self.gru_weights[layer][param_name]
                if param.weight.grad is not None:
                    param.weight.data -= lr * param.weight.grad

        # 更新投影层权重
        if self.project.weight.grad is not None:
            self.project.weight.data -= lr * self.project.weight.grad
        if self.project.bias.grad is not None:
            self.project.bias.data -= lr * self.project.bias.grad




训练模型


In [15]:
from torch.utils.data import DataLoader

batch_size = 128
train_data = DataLoader(train_set, batch_size, True, num_workers=4)

In [17]:

model = CharRNN(convert.vocab_size, 512, 512, 2, 0.5)
if use_gpu:
    model = model.cuda()



In [18]:
epochs = 20
for e in range(epochs):
    
    train_loss = 0
    for data in train_data:
        x, y = data
        
        y = y.long()
        if use_gpu:
            x = x.cuda()
            y = y.cuda()
        
        # Forward.
        score, hs ,all_h = model(x)
        # print(all_h.shape)
        num_class = score.shape[1]
        
        loss,y = model.cross_entropy_loss(score,y,num_class)
        # print(loss,y)
        
        # Backward.
        model.backward(x,all_h,y,score)
        # Clip gradient.
        # nn.utils.clip_grad_norm_(model.parameters(), 5)
        model.UpdatePara(lr=1e-3)
        # print(loss.item())
        train_loss += loss
    
    print('epoch: {}, perplexity is: {:.3f}'.format(e+1, np.exp(train_loss / len(train_data))))

epoch: 1, perplexity is: 750.065
epoch: 2, perplexity is: 349.313
epoch: 3, perplexity is: 292.008
epoch: 4, perplexity is: 259.686
epoch: 5, perplexity is: 237.979
epoch: 6, perplexity is: 221.654
epoch: 7, perplexity is: 208.528
epoch: 8, perplexity is: 197.734
epoch: 9, perplexity is: 188.491
epoch: 10, perplexity is: 180.772
epoch: 11, perplexity is: 173.879
epoch: 12, perplexity is: 167.959
epoch: 13, perplexity is: 162.731
epoch: 14, perplexity is: 157.893
epoch: 15, perplexity is: 153.639
epoch: 16, perplexity is: 149.758
epoch: 17, perplexity is: 146.101
epoch: 18, perplexity is: 142.877
epoch: 19, perplexity is: 139.935
epoch: 20, perplexity is: 137.123


可以看到，训练完模型之后，我们得到较低的困惑度，下面我们就可以开始生成文本了。

### 生成文本
生成文本的过程非常简单，给定开始的字符，然后不断向后生成字符，将生成的字符作为新的输入再传入网络。

在预测的概率最高的前五个里面依据他们的概率来进行随机选择。

In [33]:
def pick_top_n(preds, top_n=5):
    
    top_pred_prob, top_pred_label = torch.topk(preds, top_n, 1)
    
    top_pred_prob /= torch.sum(top_pred_prob)
    
    top_pred_prob = top_pred_prob.squeeze(0).cpu().numpy()
    
    top_pred_label = top_pred_label.squeeze(0).cpu().numpy()
    
    c = np.random.choice(top_pred_label, size=1, p=top_pred_prob)
    return c

In [71]:
from torch.autograd import Variable
begin = '离离原上草 '
text_len = 30

model = model.eval()
samples = [convert.word_to_int(c) for c in begin]
input_txt = torch.LongTensor(samples)[None]

if use_gpu:
    input_txt = input_txt.cuda()
input_txt = Variable(input_txt)

_, init_state,_ = model(input_txt)

result = samples
model_input = input_txt[:, -1][:, None]

for i in range(text_len):
    out, init_state,_ = model(model_input, init_state)
    
    pred = pick_top_n(out.data)
    
    model_input = Variable(torch.LongTensor(pred))[None]
    
    if use_gpu:
        model_input = model_input.cuda()
    result.append(pred[0])
text = convert.arr_to_text(result)
print('Generate text is: {}'.format(text))

Generate text is: 离离原上草 江城南北去北归路赊去 何时见 山日月照照影斜斜 风雨吹 寒落
