## Encoder-Decoderモデルによる機械翻訳

RNNの章として最後にencoder-decoderという2つのニューラルネットワークを用いたモデルを使用して機械翻訳を実装します。

Encoder-Decoderモデルは任意の2つの対象を用意して、片方からもう一方を生成することができます。

今回の例では英語とフランス語の文章を用意することで英語からフランス語への翻訳モデルを作成します。また質問文と回答を用意すれば自動Q&Aシステムをつくることもできます。

さらにCNNと組み合わせることで、画像から説明文を作成するといった応用てきなことも可能です。


## Encoder-Decoderモデルの概要

Encoder-Decoderモデルは自由度の高いモデルなので様々な派生が存在しますが、だいたい次のように動作します。

1. 翻訳元のデータをEncoderに入力し特徴量ベクトルを得る
2. 特徴量ベクトルをDecoderに入力し翻訳先のデータを得る




In [1]:
DATA_DIR_PATH = "../data"

In [2]:
import torch
from torch import nn, optim
from torch.utils.data import (Dataset, 
                              DataLoader,
                              TensorDataset)
import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import re
import collections
import itertools

remove_marks_regex = re.compile(
    "[\,\(\)\[\]\*:;¿¡]|<.*?>")
shift_marks_regex = re.compile("([?!\.])")

unk = 0
sos = 1
eos = 2

def normalize(text):
    text = text.lower()
    # 不要な文字を除去
    text = remove_marks_regex.sub("", text)
    # ?!.と単語の間に空白を挿入
    text = shift_marks_regex.sub(r" \1", text)
    return text

def parse_line(line):
    line = normalize(line.strip())
    # print(line.split("\t"))
    # 翻訳元(src)と翻訳先(trg)それぞれのトークンのリストを作る
    try :
        src, trg = line.split("\t")
        src_tokens = src.strip().split()
        trg_tokens = trg.strip().split()
        return src_tokens, trg_tokens
    except :
        print("parse_error")
        return 

def build_vocab(tokens):
    # ファイル中のすべての文章でのトークンの出現数を数える
    counts = collections.Counter(tokens)
    # トークンの出現数の多い順に並べる
    sorted_counts = sorted(counts.items(), 
                           key=lambda c: c[1], reverse=True)
    # 3つのタグを追加して正引きリストと逆引き用辞書を作る
    word_list = ["<UNK>", "<SOS>", "<EOS>"] \
        + [x[0] for x in sorted_counts]
    word_dict = dict((w, i) for i, w in enumerate(word_list))
    return word_list, word_dict
    
def words2tensor(words, word_dict, max_len, padding=0):
    # 末尾に終了タグを付ける
    words = words + ["<EOS>"]
    # 辞書を利用して数値のリストに変換する
    words = [word_dict.get(w, 0) for w in words]
    seq_len = len(words)
    # 長さがmax_len以下の場合はパディングする
    if seq_len < max_len + 1:
        words = words + [padding] * (max_len + 1 - seq_len)
    # Tensorに変換して返す
    return torch.tensor(words, dtype=torch.int64), seq_len

リスト5.18　TranslationPairDatasetクラスの作成

In [4]:
class TranslationPairDataset(Dataset):
    def __init__(self, path, max_len=15):
        # 単語数が多い文章をフィルタリングする関数
        def filter_pair(p):
            if p is None :
                return False
            return not (len(p[0]) > max_len 
                        or len(p[1]) > max_len)
        # ファイルを開き、パース/フィルタリングをする       
        with open(path) as fp:
            pairs = map(parse_line, fp)
            pairs = filter(filter_pair, pairs)
            pairs = list(pairs)
        # 文章のペアをソースとターゲットに分ける
        src = [p[0] for p in pairs]
        trg = [p[1] for p in pairs]
        #それぞれの語彙集を作成する
        self.src_word_list, self.src_word_dict = \
            build_vocab(itertools.chain.from_iterable(src))
        self.trg_word_list, self.trg_word_dict = \
            build_vocab(itertools.chain.from_iterable(trg))
        # 語彙集を使用してTensorに変換する
        self.src_data = [words2tensor(
            words, self.src_word_dict, max_len)
                for words in src]
        self.trg_data = [words2tensor(
            words, self.trg_word_dict, max_len, -100)
                for words in trg]
        
    def __len__(self):
        return len(self.src_data)
    
    def __getitem__(self, idx):
        src, lsrc = self.src_data[idx]
        trg, ltrg = self.trg_data[idx]
        return src, lsrc, trg, ltrg

リスト5.19　DatasetとDataLoaderの作成（your_path>は任意のディレクトリを指定）

In [15]:
batch_size = 64
max_len = 10
# このスクリプトだとデータセットのうち \tCC.+$ -> "" で不要な部分を置換しないと動かない。
path = f"{DATA_DIR_PATH}/jpn.txt"
ds = TranslationPairDataset(path, max_len=max_len)
loader = DataLoader(ds, batch_size=batch_size, shuffle=True,
                    num_workers=4)

parse_error
parse_error
parse_error
parse_error


リスト5.20　Encoderの作成

In [16]:
class Encoder(nn.Module):
    def __init__(self, num_embeddings,
                 embedding_dim=50, 
                 hidden_size=50,
                 num_layers=1,
                 dropout=0.2):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim,
                                padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim,
                            hidden_size, num_layers,
                            batch_first=True, dropout=dropout)

    def forward(self, x, h0=None, l=None):
        x = self.emb(x)
        if l is not None:
            x = nn.utils.rnn.pack_padded_sequence(
                x, l, batch_first=True)
        _, h = self.lstm(x, h0)
        return h

リスト5.21　Decoderの作成（要再チェック）

In [17]:
class Decoder(nn.Module):
    def __init__(self, num_embeddings,
                 embedding_dim=50, 
                 hidden_size=50,
                 num_layers=1,
                 dropout=0.2):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim,
                                padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_size,
                            num_layers, batch_first=True,
                            dropout=dropout)
        self.linear = nn.Linear(hidden_size, num_embeddings)
    def forward(self, x, h, l=None):
        x = self.emb(x)
        if l is not None:
            x = nn.utils.rnn.pack_padded_sequence(
                x, l, batch_first=True)
        x, h = self.lstm(x, h)
        if l is not None:
            x = nn.utils.rnn.pad_packed_sequence(x, batch_first=True, padding_value=0)[0]
        x = self.linear(x)
        return x, h

リスト5.22　翻訳する関数の作成

In [45]:
def translate(input_str, enc, dec, max_len=15, device="cpu"):
    # 入力文字列を数値化してTensorに変換
    words = normalize(input_str).split()
    input_tensor, seq_len = words2tensor(words, 
        ds.src_word_dict, max_len=max_len)
    input_tensor = input_tensor.unsqueeze(0)
    # Encoderで使用するので入力の長さもリストにしておく
    seq_len = [seq_len]
    # 開始トークンを準備
    sos_inputs = torch.tensor(sos, dtype=torch.int64)
    input_tensor = input_tensor.to(device)
    sos_inputs = sos_inputs.to(device)
    # 入力文字列をEncoderに入れてコンテキストを得る
    ctx = enc(input_tensor, l=seq_len)
    # 開始トークンとコンテキストをDecoderの初期値にセット
    z = sos_inputs
    h = ctx
    results = []
    for i in range(max_len):
        # Decoderで次の単語を予測
        o, h = dec(z.view(1, 1), h)
        print(o)
        print(o.shape)
        # print("o:", o, "h:", h)
        # 線形層の出力が最も大きい場所が次の単語のID
        wi = o.detach().view(-1).max(0)[1]
        print()
        print(wi.item())
        if wi.item() == eos:
            break
        results.append(wi.item())
        # 次の入力は今回の出力のIDを使用する
        z = wi
    # 記録しておいた出力のIDを文字列に変換
    print(results)
    return " ".join(ds.trg_word_list[i] for i in results)

リスト5.23　関数の動作の確認

In [46]:
enc = Encoder(len(ds.src_word_list), 100, 100, 2)
dec = Decoder(len(ds.trg_word_list), 100, 100, 2)
translate("I am a student.", enc, dec)

tensor([[[-0.0823,  0.0704,  0.0465,  ...,  0.0661, -0.0842,  0.0838]]],
       grad_fn=<ViewBackward0>)
torch.Size([1, 1, 69651])

29048
tensor([[[-0.1012,  0.0663,  0.0618,  ...,  0.0504, -0.0704,  0.0888]]],
       grad_fn=<ViewBackward0>)
torch.Size([1, 1, 69651])

29048
tensor([[[-0.1006,  0.0476,  0.0726,  ...,  0.0479, -0.0579,  0.0818]]],
       grad_fn=<ViewBackward0>)
torch.Size([1, 1, 69651])

63469
tensor([[[-0.0832,  0.0636,  0.0742,  ...,  0.0437, -0.0665,  0.0756]]],
       grad_fn=<ViewBackward0>)
torch.Size([1, 1, 69651])

34945
tensor([[[-0.0852,  0.0738,  0.0984,  ...,  0.0317, -0.0685,  0.0779]]],
       grad_fn=<ViewBackward0>)
torch.Size([1, 1, 69651])

423
tensor([[[-0.0945,  0.0697,  0.0997,  ...,  0.0235, -0.0868,  0.0891]]],
       grad_fn=<ViewBackward0>)
torch.Size([1, 1, 69651])

34945
tensor([[[-0.0838,  0.0697,  0.1056,  ...,  0.0251, -0.0965,  0.0932]]],
       grad_fn=<ViewBackward0>)
torch.Size([1, 1, 69651])

37641
tensor([[[-0.0891,  0.0580,  0.0927,

'お兄さんの背ってどれぐらいなの？ お兄さんの背ってどれぐらいなの？ 幸福を求める権利は誰にもある。 それが知りたい。 みんなはどこ？ それが知りたい。 トムはお礼を言い忘れてしまった。 ほとんどの人が行きました。 ほとんどの人が行きました。 まだ宵の口だ。 彼が帰ってきたらすぐに私に知らせてくれ。 彼が帰ってきたらすぐに私に知らせてくれ。 まだ宵の口だ。 まだ宵の口だ。 彼が帰ってきたらすぐに私に知らせてくれ。'

リスト5.24　オプティマイザーのパラメータ

In [43]:
enc = Encoder(len(ds.src_word_list), 100, 100, 2)
dec = Decoder(len(ds.trg_word_list), 100, 100, 2)
enc.to("cuda:0")
dec.to("cuda:0")
opt_enc = optim.Adam(enc.parameters(), 0.002)
opt_dec = optim.Adam(dec.parameters(), 0.002)
loss_f = nn.CrossEntropyLoss()

リスト5.25　モデルの学習部分（損失関数など）

In [44]:
from statistics import mean

def to2D(x):
    shapes = x.shape
    return x.reshape(shapes[0] * shapes[1], -1)

for epoc in range(30):
    # ネットワークを訓練モードにする
    enc.train(), dec.train()
    losses = []
    for x, lx, y, ly in tqdm.tqdm(loader):
        # xのPackedSequenceを作るために翻訳元の長さで降順にソート
        lx, sort_idx = lx.sort(descending=True)
        x, y, ly = x[sort_idx], y[sort_idx], ly[sort_idx]
        x, y = x.to("cuda:0"), y.to("cuda:0")
        # 翻訳元をEncoderに入れてコンテキストを得る
        ctx = enc(x, l=lx)
        
        # yのPackedSequenceを作るために翻訳先の長さで降順にソート
        ly, sort_idx = ly.sort(descending=True)
        y = y[sort_idx]
        # Decoderの初期値をセット
        h0 = (ctx[0][:, sort_idx, :], ctx[1][:, sort_idx, :])
        z = y[:, :-1].detach()
        # -100のままだとEmbeddingの計算でエラーが出てしまうので値を0に変更しておく
        z[z==-100] = 0
        # Decoderに通して損失関数を計算
        o, _ = dec(z, h0, l=ly-1)
        loss = loss_f(to2D(o[:]), to2D(y[:, 1:max(ly)]).squeeze())
        # Backpropagation（誤差逆伝播法）を実行
        enc.zero_grad(), dec.zero_grad()
        loss.backward()
        opt_enc.step(), opt_dec.step()
        losses.append(loss.item())
        
    # データセットに対して一通り計算したら現在の
    # 損失関数の値や翻訳結果を表示
    enc.eval(), dec.eval()
    print(epoc, mean(losses))
    with torch.no_grad():
        print(translate("I am a student.",
                         enc, dec, max_len=max_len, device="cuda:0"))
        print(translate("He likes to eat pizza.",
                         enc, dec, max_len=max_len, device="cuda:0"))
        print(translate("She is my mother.",
                         enc, dec, max_len=max_len, device="cuda:0"))
    break

100%|██████████| 1301/1301 [00:11<00:00, 110.04it/s]

0 0.8380975039392802
torch.Size([1, 1, 69651])
tensor(2, device='cuda:0')
2
[]

torch.Size([1, 1, 69651])
tensor(2, device='cuda:0')
2
[]

torch.Size([1, 1, 69651])
tensor(2, device='cuda:0')
2
[]






In [None]:
print(translate("I am a student.",enc, dec, max_len=max_len, device="cuda:0"))

o: tensor([[[-7.5893, -8.7107, 11.6988,  ..., -7.6975, -7.7253, -8.1550]]],
       device='cuda:0', grad_fn=<ViewBackward0>) h: (tensor([[[ 0.9761, -0.9887, -0.9873,  0.9826, -0.9919, -0.9949,  0.9930,
          -0.9780, -0.9609,  0.9829, -0.9925,  0.9937, -0.9926, -0.9816,
          -0.9965,  0.9882, -0.7353, -0.9915, -0.9819, -0.9938, -0.9969,
           0.9804, -0.9466,  0.9902, -0.9942, -0.9966,  0.9733, -0.9836,
          -0.9815,  0.9933,  0.9942,  0.9832,  0.9899, -0.9833,  0.9715,
           0.9964,  0.9941, -0.9902, -0.9959, -0.9847,  0.9841,  0.9950,
          -0.9960,  0.9891,  0.9888, -0.9962, -0.9916, -0.1004, -0.9605,
           0.9899, -0.8829, -0.9870, -0.9946, -0.9900, -0.9871, -0.9937,
          -0.9865, -0.9950,  0.9923, -0.9938, -0.9953, -0.9918, -0.9914,
           0.7199, -0.9944, -0.9837, -0.9871,  0.9991,  0.7222, -0.9759,
           0.9357,  0.9226,  0.9975,  0.9979, -0.9638, -0.9883,  0.7235,
           0.9520, -0.9796, -0.9542, -0.7431, -0.9487, -0.9678, -0.9