<a href="https://colab.research.google.com/github/zq0315/blog/blob/master/NNModelScratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install d2l
import random
import torch
from d2l import torch as d2l
import re
import collections
from torch.nn import functional as F

d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')

def read_time_machine():
  """将时间机器数据集加载到文本行的列表中"""
  with open(d2l.download('time_machine'), 'r') as f:
    lines = f.readlines()
  return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]



class Vocab:
  """文本词表"""
  def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
    if tokens is None:
      tokens = []
    if reserved_tokens is None:
      reserved_tokens = []
    # 按出现频率排序
    counter = count_corpus(tokens)
    self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
    reverse=True)
    # 未知词元的索引为0
    self.idx_to_token = ['<unk>'] + reserved_tokens
    self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}
    for token, freq in self._token_freqs:
      if freq < min_freq:
        break
      if token not in self.token_to_idx:
        self.idx_to_token.append(token)
        self.token_to_idx[token] = len(self.idx_to_token) - 1
  def __len__(self):
    return len(self.idx_to_token)
  def __getitem__(self, tokens):
    if not isinstance(tokens, (list, tuple)):
      return self.token_to_idx.get(tokens, self.unk)
    return [self.__getitem__(token) for token in tokens]
  def to_tokens(self, indices):
    if not isinstance(indices, (list, tuple)):
      return self.idx_to_token[indices]
    return [self.idx_to_token[index] for index in indices]
  @property
  def unk(self): # 未知词元的索引为0
    return 0
  @property
  def token_freqs(self):
    return self._token_freqs

  def count_corpus(tokens):
    """统计词元的频率"""
    # 这里的tokens是1D列表或2D列表
    if len(tokens) == 0 or isinstance(tokens[0], list):
    # 将词元列表展平成一个列表
      tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens)

def count_corpus(tokens):
  """统计词元的频率"""
  # 这里的tokens是1D列表或2D列表
  if len(tokens) == 0 or isinstance(tokens[0], list):
    # 将词元列表展平成一个列表
    tokens = [token for line in tokens for token in line]
  return collections.Counter(tokens)

def seq_data_iter_random(corpus, batch_size, num_steps):
  """使用随机抽样生成一个小批量子序列"""
  # 从随机偏移量开始对序列进行分区，随机范围包括num_steps-1
  corpus = corpus[random.randint(0, num_steps - 1):]
  # 减去1，是因为我们需要考虑标签
  num_subseqs = (len(corpus) - 1) // num_steps
  # 长度为num_steps的子序列的起始索引
  initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
  # 在随机抽样的迭代过程中，
  # 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
  random.shuffle(initial_indices)
  def data(pos):
    # 返回从pos位置开始的长度为num_steps的序列
    return corpus[pos: pos + num_steps]
  num_batches = num_subseqs // batch_size
  for i in range(0, batch_size * num_batches, batch_size):
    # 在这里，initial_indices包含子序列的随机起始索引
    initial_indices_per_batch = initial_indices[i: i + batch_size]
    X = [data(j) for j in initial_indices_per_batch]
    Y = [data(j + 1) for j in initial_indices_per_batch]
    yield torch.tensor(X), torch.tensor(Y)

def seq_data_iter_sequential(corpus, batch_size, num_steps):
  """使用顺序分区生成一个小批量子序列"""
  # 从随机偏移量开始划分序列
  offset = random.randint(0, num_steps)
  num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
  Xs = torch.tensor(corpus[offset: offset + num_tokens])
  Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
  Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
  num_batches = Xs.shape[1] // num_steps
  for i in range(0, num_steps * num_batches, num_steps):
    X = Xs[:, i: i + num_steps]
    Y = Ys[:, i: i + num_steps]
    yield X, Y

def tokenize(lines, token='word'):
  """将文本行拆分为单词或字符词元"""
  if token == 'word':
    return [line.split() for line in lines]
  elif token == 'char':
    return [list(line) for line in lines]
  else:
    print('错误：未知词元类型：' + token)

def load_corpus_time_machine(max_tokens=-1):
  """返回时光机器数据集的词元索引列表和词表"""
  lines = read_time_machine()
  tokens = tokenize(lines, 'char')
  vocab = Vocab(tokens)
  # 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落，
  # 所以将所有文本行展平到一个列表中
  corpus = [vocab[token] for line in tokens for token in line]
  if max_tokens > 0:
    corpus = corpus[:max_tokens]
  return corpus, vocab

class SeqDataLoader:
  """加载序列数据的迭代器"""
  def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
    if use_random_iter:
      self.data_iter_fn = seq_data_iter_random
    else:
      self.data_iter_fn = seq_data_iter_sequential
      self.corpus, self.vocab = load_corpus_time_machine(max_tokens)
      self.batch_size, self.num_steps = batch_size, num_steps
  def __iter__(self):
    return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)

def load_data_time_machine(batch_size, num_steps,
  use_random_iter=False, max_tokens=10000):
  """返回时光机器数据集的迭代器和词表"""
  data_iter = SeqDataLoader(
  batch_size, num_steps, use_random_iter, max_tokens)
  return data_iter, data_iter.vocab

batch_size, num_steps = 32, 35
train_iter, vocab = load_data_time_machine(batch_size, num_steps)

def get_params(vocab_size, num_hiddens, device):
    num_inputs = num_outputs = vocab_size

    def normal(shape):
        return torch.randn(size=shape, device=device) * 0.01

    # 隐藏层参数
    W_xh = normal((num_inputs, num_hiddens))
    W_hh = normal((num_hiddens, num_hiddens))
    b_h = torch.zeros(num_hiddens, device=device)
    # 输出层参数
    W_hq = normal((num_hiddens, num_outputs))
    b_q = torch.zeros(num_outputs, device=device)
    # 附加梯度
    params = [W_xh, W_hh, b_h, W_hq, b_q]
    for param in params:
        param.requires_grad_(True)
    return params

def init_rnn_state(batch_size, num_hiddens, device):
    return (torch.zeros((batch_size, num_hiddens), device=device), )

def rnn(inputs, state, params):
    # inputs的形状：(时间步数量，批量大小，词表大小)
    W_xh, W_hh, b_h, W_hq, b_q = params
    H, = state
    outputs = []
    # X的形状：(批量大小，词表大小)
    for X in inputs:
        H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
        Y = torch.mm(H, W_hq) + b_q
        outputs.append(Y)
    return torch.cat(outputs, dim=0), (H,)

class RNNModelScratch:
    """从零开始实现的循环神经网络模型"""
    def __init__(self, vocab_size, num_hiddens, device,
                 get_params, init_state, forward_fn):
        self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
        self.params = get_params(vocab_size, num_hiddens, device)
        self.init_state, self.forward_fn = init_state, forward_fn

    def __call__(self, X, state):
        X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
        return self.forward_fn(X, state, self.params)

    def begin_state(self, batch_size, device):
        return self.init_state(batch_size, self.num_hiddens, device)



def predict_ch8(prefix, num_preds, net, vocab, device):
    """在prefix后面生成新字符"""
    state = net.begin_state(batch_size=1, device=device)
    outputs = [vocab[prefix[0]]]
    get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
    for y in prefix[1:]:  # 预热期
        _, state = net(get_input(), state)
        outputs.append(vocab[y])
    for _ in range(num_preds):  # 预测num_preds步
        y, state = net(get_input(), state)
        outputs.append(int(y.argmax(dim=1).reshape(1)))
    return ''.join([vocab.idx_to_token[i] for i in outputs])

num_hiddens = 512
net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params,
                      init_rnn_state, rnn)
predict_ch8('time traveller ', 10, net, vocab, d2l.try_gpu())