<a href="https://colab.research.google.com/github/machine-perception-robotics-group/JDLALectureNotebooks/blob/master/notebooks/15_recurrent_neural_network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 回帰結合型のニューラルネットワークによる文章生成

---
## 目的
回帰結合型のニューラルネットワーク，すなわち再帰型ニューラルネットワーク (Recurrent Neural Network; RNN) を用いてPenn Tree Bankデータセットに対する次単語の予測を行う．
また，単方向RNN (uni-directional RNN) と双方向RNN (bidirectional RNN) の両方を用いて学習を行い，その違いを確認する．



## モジュールのインポート
プログラムの実行に必要なモジュールをインポートします．

In [None]:
import os
from time import time

import pickle

import numpy as np

import torch
import torch.nn as nn

# GPUの確認
use_cuda = torch.cuda.is_available()
print('Use CUDA:', use_cuda)

## データセットの読み込み

Penn Tree Bank (PTB) データセットを読み込みます．

読み込んだ学習データのサイズを確認します．
学習，検証，テストデータはそれぞれ929589，73760，82430のサイズの1次元配列になっていることがわかります．

また，`get_ptb_words_vocabulary`関数を用いて，ptbデータセットに存在する英単語の情報を取得します．
`vocab`には英単語とその単語を示すIDが辞書型のオブジェクトとして格納されています．
英単語の数は10000です．

最後に，keyと値の組み合わせを逆にした辞書`inverse_vocab`を作成します．
これはIDで出力された予測結果から英単語を検索する際に使用します．

In [None]:
import gdown
gdown.download('https://drive.google.com/uc?id=1ceEd6c19k9GLFosyqmnfEakuxuTzWV4N', 'ptb_dataset.zip', quiet=False)
!unzip -q -o ptb_dataset.zip

In [None]:
train = np.load('ptb_dataset/ptb_train.npy')
val = np.load('ptb_dataset/ptb_val.npy')
test = np.load('ptb_dataset/ptb_test.npy')

with open('ptb_dataset/ptb_vocab.pkl', 'rb') as f:
    vocab = pickle.load(f)

with open('ptb_dataset/ptb_inverse_vocab.pkl', 'rb') as f:
    inverse_vocab = pickle.load(f)

print(train.shape, val.shape, test.shape)
print(len(vocab))

### Benn Tree Bankデータセットの表示

PTBデータセットの中身を`print`関数を使って表示してみます．

学習用データを表示すると，1次元配列に整数値が格納されていることがわかります．

また，`vocab`のうち，英単語を指定すると，各英単語に対応するIDガ表示されます．

In [None]:
print("train sentence:", train)
print(vocab['player'], vocab['primarily'], vocab['arose'], vocab['generate'], vocab['partnership'])

## データセットクラスの作成

PTBデータセットのためのデータセットクラスを作成します．

In [None]:
class PTBDataset(torch.utils.data.Dataset):
    def __init__(self, root, type='train', time_window=35):

        if type not in ['train', 'val', 'test']:
            raise ValueError('type must be train or val or test')

        self.root = root
        self.time_window = time_window

        self.data = np.load(os.path.join(root, 'ptb_' + type + '.npy'))

        with open(os.path.join(root, 'ptb_vocab.pkl'), 'rb') as f:
            self.vocab = pickle.load(f)

        with open(os.path.join(root, 'ptb_inverse_vocab.pkl'), 'rb') as f:
            self.inverse_vocab = pickle.load(f)

    def __getitem__(self, index):
        x = self.data[index:index+self.time_window]
        y = self.data[index+1:index+self.time_window+1]
        return torch.from_numpy(x), torch.from_numpy(y)

    def __len__(self):
        return len(self.data) - self.time_window - 1

## ネットワークモデルの定義

In [None]:
class RNNLanguageModel(nn.Module):

    def __init__(self, n_vocab, n_units, bidirectional=False):
        super(RNNLanguageModel, self).__init__()

        self.n_vocab = n_vocab
        self.n_units = n_units

        self.embed = nn.Embedding(n_vocab, n_units)
        self.rnn = nn.RNN(n_units, n_units, bidirectional=bidirectional)
        if bidirectional:
            self.fc = nn.Linear(n_units*2, n_vocab)
        else:
            self.fc = nn.Linear(n_units, n_vocab)

    def forward(self, x):
        x = self.embed(x)
        x, h = self.rnn(x)
        x = self.fc(x)
        return x, h

## 学習（単方向RNN）

In [None]:
# パラメータの設定
batch_size = 256
epoch_num = 10

n_vocab = len(vocab)
n_units = 256

# データセットの定義
train_dataset = PTBDataset('ptb_dataset', type='train')
val_dataset = PTBDataset('ptb_dataset', type='val')
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# モデルの定義（単方向RNN）
model_unidirectional = RNNLanguageModel(n_vocab, n_units, bidirectional=False)
if use_cuda:
    model_unidirectional = model_unidirectional.cuda()

# 損失関数と最適化アルゴリズムの設定
criterion = nn.CrossEntropyLoss()
if use_cuda:
    criterion = criterion.cuda()
optimizer = torch.optim.Adam(model_unidirectional.parameters())

model_unidirectional.train()

# 学習の実行
start = time()
for epoch in range(1, epoch_num+1):

    sum_loss = 0.0

    for data, target in train_loader:
        if use_cuda:
            data = data.cuda()
            target = target.cuda()

        optimizer.zero_grad()
        output, _ = model_unidirectional(data)
        loss = criterion(output.view(-1, n_vocab), target.view(-1).long())
        loss.backward()
        optimizer.step()

        sum_loss += loss.data

    elapsed_time = time() - start
    print('epoch: {}, loss: {:.4f}, time: {:.4f}[s]'.format(epoch, sum_loss / len(train_loader), elapsed_time))

学習したモデルで推論を行ってみます．

In [None]:
model_unidirectional.eval()

for i in range(10):
    data, target = val_dataset[i]

    if use_cuda:
        data = data.cuda()

    output, _ = model_unidirectional(data)
    output = output.view(-1, n_vocab)
    _, pred = torch.max(output, 1)

    true_sentence = []
    pred_sentence = []
    for i in range(len(target.data.tolist())):
        true_word = inverse_vocab[target.data.tolist()[i]]
        pred_word = inverse_vocab[pred.cpu().data.tolist()[i]]
        true_sentence.append(true_word)
        pred_sentence.append(pred_word)

    print('true:', ' '.join(true_sentence))
    print('pred:', ' '.join(pred_sentence), '\n')

## 学習（双方向RNN）

In [None]:
# パラメータの設定
batch_size = 256
epoch_num = 10

n_vocab = len(vocab)
n_units = 256

# データセットの定義
train_dataset = PTBDataset('ptb_dataset', type='train')
val_dataset = PTBDataset('ptb_dataset', type='val')
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# モデルの定義（単方向RNN）
model_bidirectional = RNNLanguageModel(n_vocab, n_units, bidirectional=True)
if use_cuda:
    model_bidirectional = model_bidirectional.cuda()

# 損失関数と最適化アルゴリズムの設定
criterion = nn.CrossEntropyLoss()
if use_cuda:
    criterion = criterion.cuda()
optimizer = torch.optim.Adam(model_bidirectional.parameters())

model_bidirectional.train()

# 学習の実行
start = time()
for epoch in range(1, epoch_num+1):

    sum_loss = 0.0

    for data, target in train_loader:
        if use_cuda:
            data = data.cuda()
            target = target.cuda()

        optimizer.zero_grad()
        output, _ = model_bidirectional(data)
        loss = criterion(output.view(-1, n_vocab), target.view(-1).long())
        loss.backward()
        optimizer.step()

        sum_loss += loss.data

    elapsed_time = time() - start
    print('epoch: {}, loss: {:.4f}, time: {:.4f}[s]'.format(epoch, sum_loss / len(train_loader), elapsed_time))

学習したモデルで推論を行ってみます．

In [None]:
model_bidirectional.eval()

for i in range(10):
    data, target = val_dataset[i]

    if use_cuda:
        data = data.cuda()

    output, _ = model_bidirectional(data)
    output = output.view(-1, n_vocab)
    _, pred = torch.max(output, 1)

    true_sentence = []
    pred_sentence = []
    for i in range(len(target.data.tolist())):
        true_word = inverse_vocab[target.data.tolist()[i]]
        pred_word = inverse_vocab[pred.cpu().data.tolist()[i]]
        true_sentence.append(true_word)
        pred_sentence.append(pred_word)

    print('true:', ' '.join(true_sentence))
    print('pred:', ' '.join(pred_sentence), '\n')