In [1]:
import torch
import random
import zipfile


In [2]:
import time
import math
import numpy as np
import torch
from torch import nn, optim
import torch.nn.functional as F

## 语言模型数据集

### 数据

In [3]:
def load_data_jay_lyrics(length):
    with zipfile.ZipFile('F:/Jupyter_note/DL_pytorch/FashionMNIST/jaychou_lyrics.txt.zip') as zin:
        with zin.open('jaychou_lyrics.txt') as f:
            corpus_chars = f.read().decode('utf-8')
    
    corpus_chars = corpus_chars.replace('\n', ' ').replace('\r', ' ')
    corpus_chars = corpus_chars[0:length]
    # 建立字符索引
    idx_to_char = list(set(corpus_chars))
    char_to_idx = dict([(char, i) for i, char in enumerate(idx_to_char)])
    vocab_size = len(char_to_idx)
    # 将训练数据集中每个字符转化为索引
    corpus_indices = [char_to_idx[char] for char in corpus_chars]
    return corpus_indices, char_to_idx, idx_to_char, vocab_size

### 随机采样

下面的代码每次从数据里随机采样一个小批量。其中批量大小batch_size指每个小批量的样本数，num_steps为每个样本所包含的时间步数。  
在随机采样中，每个样本是原始序列上任意截取的一段序列。相邻的两个随机小批量在原始序列上的位置不一定相毗邻。  
因此，我们无法用一个小批量最终时间步的隐藏状态来初始化下一个小批量的隐藏状态。  
在训练模型时，每次随机采样前都需要重新初始化隐藏状态  

In [25]:
def data_iter_random(corpus_indices, batch_size, num_steps, device=None):
    # 减1是因为输出的索引x是相应输入的索引y加1
    num_examples = (len(corpus_indices) - 1) // num_steps
    epoch_size = num_examples // batch_size
    example_indices = list(range(num_examples))
    random.shuffle(example_indices)

    # 返回从pos开始的长为num_steps的序列
    def _data(pos):
        return corpus_indices[pos: pos + num_steps]
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    for i in range(epoch_size):
        # 每次读取batch_size个随机样本
        i = i * batch_size
        batch_indices = example_indices[i: i + batch_size]
        X = [_data(j * num_steps) for j in batch_indices]
        Y = [_data(j * num_steps + 1) for j in batch_indices]
        yield torch.tensor(X, dtype=torch.float32, device=device), torch.tensor(Y, dtype=torch.float32, device=device)

In [28]:
my_seq = list(range(40))
for X, Y in data_iter_random(my_seq, batch_size=3, num_steps=5):
    print('X: ', X, '\nY:', Y, '\n')

X:  tensor([[ 5.,  6.,  7.,  8.,  9.],
        [10., 11., 12., 13., 14.],
        [25., 26., 27., 28., 29.]]) 
Y: tensor([[ 6.,  7.,  8.,  9., 10.],
        [11., 12., 13., 14., 15.],
        [26., 27., 28., 29., 30.]]) 

X:  tensor([[15., 16., 17., 18., 19.],
        [20., 21., 22., 23., 24.],
        [30., 31., 32., 33., 34.]]) 
Y: tensor([[16., 17., 18., 19., 20.],
        [21., 22., 23., 24., 25.],
        [31., 32., 33., 34., 35.]]) 



### 相邻采样

In [4]:
def data_iter_consecutive(corpus_indices, batch_size, num_steps, device=None):
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    corpus_indices = torch.tensor(corpus_indices, dtype=torch.float32, device=device)
    data_len = len(corpus_indices)
    batch_len = data_len // batch_size
    indices = corpus_indices[0: batch_size*batch_len].view(batch_size, batch_len)
    epoch_size = (batch_len - 1) // num_steps
    for i in range(epoch_size):
        i = i * num_steps
        X = indices[:, i: i + num_steps]
        Y = indices[:, i + 1: i + num_steps + 1]
        yield X, Y

In [6]:
my_seq = list(range(40))
for X, Y in data_iter_consecutive(my_seq, batch_size=3, num_steps=5):
    print('X: ', X, '\nY:', Y, '\n')

X:  tensor([[ 0.,  1.,  2.,  3.,  4.],
        [13., 14., 15., 16., 17.],
        [26., 27., 28., 29., 30.]]) 
Y: tensor([[ 1.,  2.,  3.,  4.,  5.],
        [14., 15., 16., 17., 18.],
        [27., 28., 29., 30., 31.]]) 

X:  tensor([[ 5.,  6.,  7.,  8.,  9.],
        [18., 19., 20., 21., 22.],
        [31., 32., 33., 34., 35.]]) 
Y: tensor([[ 6.,  7.,  8.,  9., 10.],
        [19., 20., 21., 22., 23.],
        [32., 33., 34., 35., 36.]]) 



### nn.RNN

In [5]:
size = 10000
corpus_indices, char_to_idx, idx_to_char, vocab_size = load_data_jay_lyrics(size)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [7]:
vocab_size

1027

In [8]:
# 这里rnn_layer的输入形状为(时间步数, 批量大小, 输入个数)
# 1027  256
num_hiddens = 256
rnn_layer = nn.RNN(input_size=vocab_size, hidden_size=num_hiddens)

In [9]:
print(rnn_layer)

RNN(1027, 256)


In [29]:
for name, param in rnn_layer.named_parameters():
    print(name, param.shape)

weight_ih_l0 torch.Size([256, 1027])
weight_hh_l0 torch.Size([256, 256])
bias_ih_l0 torch.Size([256])
bias_hh_l0 torch.Size([256])


In [11]:
# 输出形状为(时间步数, 批量大小, 隐藏单元个数)，隐藏状态h的形状为(层数, 批量大小, 隐藏单元个数)
num_steps = 5
batch_size = 3
state = None
X = torch.rand(num_steps, batch_size, vocab_size)
Y, state_new = rnn_layer(X, state)

In [14]:
print(X.shape)
print(Y.shape)

torch.Size([5, 3, 1027])
torch.Size([5, 3, 256])


In [18]:
# nn.RNN实例在前向计算返回的隐藏状态指的是隐藏层在最后时间步的隐藏状态
# 当隐藏层有多层时，每一层的隐藏状态都会记录在该变量中
# 第一层的
state_new[0].shape

torch.Size([3, 256])

In [20]:
def one_hot(x, n_class, dtype=torch.float32): 
    # X shape: (batch), output shape: (batch, n_class)
    x = x.long()
    res = torch.zeros(x.shape[0], n_class, dtype=dtype, device=x.device)
    res.scatter_(1, x.view(-1, 1), 1)
    return res
def to_onehot(X, n_class):  
    # X shape: (batch, seq_len), output: seq_len elements of (batch, n_class)
    return [one_hot(X[:, i], n_class) for i in range(X.shape[1])]

In [21]:
x = torch.tensor([0, 2])
one_hot(x, vocab_size)

tensor([[1., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 1.,  ..., 0., 0., 0.]])

In [30]:
class RNNModel(nn.Module):
    def __init__(self, rnn_layer, vocab_size):
        super(RNNModel, self).__init__()
        self.rnn = rnn_layer
        self.hidden_size = rnn_layer.hidden_size * (2 if rnn_layer.bidirectional else 1) 
        self.vocab_size = vocab_size
        self.dense = nn.Linear(self.hidden_size, vocab_size)
        self.state = None

    def forward(self, inputs, state): # inputs: (batch, seq_len)
        # 获取one-hot向量表示
        X = to_onehot(inputs, self.vocab_size) # X是个list
        Y, self.state = self.rnn(torch.stack(X), state)
        # 全连接层会首先将Y的形状变成(num_steps * batch_size, num_hiddens)，它的输出
        # 形状为(num_steps * batch_size, vocab_size)
        output = self.dense(Y.view(-1, Y.shape[-1]))
        return output, self.state

In [31]:
model = RNNModel(rnn_layer, vocab_size).to(device)

In [32]:
print(model)

RNNModel(
  (rnn): RNN(1027, 256)
  (dense): Linear(in_features=256, out_features=1027, bias=True)
)


### 预测函数

In [33]:
def predict_rnn_pytorch(prefix, num_chars, model, vocab_size, device, idx_to_char,
                      char_to_idx):
    state = None
    output = [char_to_idx[prefix[0]]] # output会记录prefix加上输出
    for t in range(num_chars + len(prefix) - 1):
        X = torch.tensor([output[-1]], device=device).view(1, 1)
        if state is not None:
            if isinstance(state, tuple): # LSTM, state:(h, c)  
                state = (state[0].to(device), state[1].to(device))
            else:   
                state = state.to(device)

        (Y, state) = model(X, state)
        if t < len(prefix) - 1:
            output.append(char_to_idx[prefix[t + 1]])
        else:
            output.append(int(Y.argmax(dim=1).item()))
    return ''.join([idx_to_char[i] for i in output])

In [35]:
predict_rnn_pytorch('想要', 10, model, vocab_size, device, idx_to_char, char_to_idx)

'想要J右毛毛育毛育毛育毛'