In [1]:
import torch 
from torch import nn,optim
from torch.utils.data import(Dataset, DataLoader, TensorDataset)
import tqdm
import re 
import collections
import itertools
import MeCab

In [2]:
remove_marks_regex = re.compile("[\,\(\)\[\]\*:;]|<.*?>")
shift_marks_regex = re.compile("([?!\.])")

unk = 0
sos = 1
eos = 2

def normalize(text):
    text = text.lower()
    #不要な文字を削除
    text = remove_marks_regex.sub("", text)
    #?!.と単語の間に空白を挿入
    text = shift_marks_regex.sub(r"\1", text)
    return text

def parse_line(line):
    trg = normalize(line.strip())
    #翻訳元と翻訳先それぞれのトークンリストを作成する
    #日本語分かち書き
    mecab = MeCab.Tagger ("-Owakati")
    trg = mecab.parse(trg)

    trg_tokens = trg.strip().split()
    return trg_tokens

def build_vocab(tokens):
    #ファイル中のすべての文章でのトークン数を数える
    counts = collections.Counter(tokens)
    #トークンの出現数の多い順に並べる
    sorted_counts = sorted(counts.items(), key=lambda c: c[1], reverse=True)
    #3つのタグを追加して正引きリストと逆引き用辞書を作る
    word_list = ["<UNK>", "<SOS>", "<EOS>"] \
    + [x[0] for x in sorted_counts]
    word_dict = dict((w, i) for i, w in enumerate(word_list))
    return word_list, word_dict

def words2tensor(words, word_dict, max_len, padding=0):
    #末尾に終了タグをつける
    words = words + ["<EOS>"]
    #辞書を利用して数値のリストに変換する
    words = [word_dict.get(w,0) for w in words]
    seq_len = len(words)
    #長さがmax_len以下の場合はパディングする
    if seq_len < max_len + 1:
        words = words + [padding] * (max_len + 1 - seq_len)
    #Tensorに変換して返す
    return torch.tensor(words, dtype=torch.int64), seq_len

In [4]:
class TranslationPairDataset(Dataset):
    def __init__(self, path, max_len=15):
        #単語数が多い文書をフィルタリングする関数
        def filter_pair(p):
            return not(len(p[0]) > max_len or len(p[1]) > max_len)
        #ファイルを開き、パース/フィルタリングする
        with open(path["train"])as fp:
            print(fp)
            pairs = map(parse_line, fp)
            pairs = filter(filter_pair, pairs)
            pairs = list(pairs)
        #文書のペアをソースとターゲットに分ける
        src = [p for p in pairs]
        with open(path["label"])as fp:
            pairs = map(parse_line, fp)
            pairs = filter(filter_pair, pairs)
            pairs = list(pairs)
        trg = [p for p in pairs]
        #それぞれの語彙集を作成する
        self.src_word_list, self.src_word_dict = build_vocab(itertools.chain.from_iterable(src))
        self.trg_word_list, self.trg_word_dict = build_vocab(itertools.chain.from_iterable(trg))
        #語彙集を使用してTensorに変換する
        self.src_data = [words2tensor(words, self.src_word_dict, max_len) for words in src]
        self.trg_data = [words2tensor(words, self.trg_word_dict, max_len, -100) for words in trg]
        
    def __len__(self):
        return len(self.src_data)
    
    def __getitem__(self, idx):
        src, Isrc = self.src_data[idx]
        trg, Itrg = self.trg_data[idx]
        return src, Isrc, trg, Itrg

In [5]:
class Encoder(nn.Module):
    def __init__(self, num_embeddings, embedding_dim=50, hidden_size=50, num_layers=10, dropout=0.2):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True, dropout=dropout)
    def forward(self, x, h0=None,l=None):
        x = self.emb(x)
        if l is not None:
            x = nn.utils.rnn.pack_padded_sequence(x, l, batch_first=True)
        _, h = self.lstm(x, h0)
        return h

In [6]:
class Decoder(nn.Module):
    def __init__(self, num_embeddings, embedding_dim=50, hidden_size=50, num_layers=1, dropout=0.2):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.linear = nn.Linear(hidden_size, num_embeddings)
        
    def forward(self, x, h, l=None):
        x = self.emb(x)
        if l is not None:
            x = nn.utils.rnn.pack_padded_sequence(x, l, batch_first=True)
        x, h = self.lstm(x, h)
        if l is not None:
            x =nn.utils.rnn.pad_packed_sequence(x, batch_first=True, padding_value=0)[0]
        x = self.linear(x)
        return x, h

In [7]:
def translate(input_str, enc, dec, max_len=15, device="cpu"):
    words = normalize(input_str).split()
    input_tensor, seq_len = words2tensor(words, ds.src_word_dict, max_len=max_len)
    input_tensor = input_tensor.unsqueeze(0)
    #Encoderで使用するので入力の長さもリストにしておく
    seq_len = [seq_len]
    #開始トークン準備
    sos_inputs = torch.tensor(sos, dtype=torch.int64)
    input_tensor = input_tensor.to(device)
    sos_inputs = sos_inputs.to(device)
    #入力文字配列をEncoderに入れてコンテキストを得る
    ctx = enc(input_tensor, l=seq_len)
    #開始トークンとコンテキストをDecoderの初期値にセット
    z = sos_inputs
    h = ctx
    results = []
    for i in range(max_len):
        #Decoderで次の単語を予測
        o,h = dec(z.view(1,1), h)
        #線形層の出力が最も大きい場所が次の単語のID
        wi = o.detach().view(-1).max(0)[1]
        if wi.item() == eos:
            break
        results.append(wi.item())
        #次の入力は今回の出力のIDを使用する
        z = wi
    #記録しておいた出力のIDを文字列に変換
    return " ".join(ds.trg_word_list[i] for i in results)

In [None]:
batch_size = 1
max_len = 200
path = {"train":"train_data/Text.txt", "label":"train_data/label.txt"}
ds = TranslationPairDataset(path, max_len=max_len)
loder = DataLoader(ds, batch_size=batch_size, shuffle=True, num_workers=0)


<_io.TextIOWrapper name='train_data/Text.txt' mode='r' encoding='UTF-8'>


In [None]:
print(len(ds.src_word_list))
print(len(ds.trg_word_list))

In [9]:
enc = Encoder(len(ds.src_word_list), 150, 150, 3)
dec = Decoder(len(ds.trg_word_list), 150, 150, 3)
enc.to("cuda:0")
dec.to("cuda:0")
opt_enc = optim.Adam(enc.parameters(), 0.009)
opt_dec = optim.Adam(dec.parameters(), 0.009)
loss_f =  nn.CrossEntropyLoss()

In [None]:
from statistics import mean

def to2D(x):
    shapes = x.shape
    return x.reshape(shapes[0] * shapes[1], -1)

for epoch in range(200):
    enc.train(), dec.train()
    losses = []
    
    for x, lx, y, ly in tqdm.tqdm(loder):
        
        #xのPackedSequenceを作るために翻訳元の長さで降順にソート
        lx, sort_idx = lx.sort(descending = True)
        x, y, ly = x[sort_idx], y[sort_idx], ly[sort_idx]
        x = x.to("cuda:0")
        y = y.to("cuda:0")
        #翻訳元をEncoderに入れてコンテキストを得る
        ctx = enc(x, l=lx)
    
        #yのpackedSequenceを作るために翻訳先の長さで降順にソート
        ly, sort_idx = ly.sort(descending=True)
        y = y[sort_idx]
        #Decoderの初期値をセット
        h0 = (ctx[0][:,sort_idx,:],ctx[1][:,sort_idx,:])
        z = y[:,:-1].detach()
        #-100のままだとEmbeddingの計算でエラーが出るので値を0にしておく
        z[z==-100] = 0
        #Decoderに通して損失関数を計算
        o, _ = dec(z, h0,l=ly-1)
        loss = loss_f(to2D(o[:]), to2D(y[:,1:max(ly)]).squeeze())
        #誤差逆伝播
        enc.zero_grad()
        dec.zero_grad()
        loss.backward()
        opt_enc.step()
        opt_dec.step()
        losses.append(loss.item())
       
    enc.eval()
    dec.eval()
    print(epoch, mean(losses))
    with torch.no_grad():
        print(translate("I am student", enc, dec, max_len=max_len, device="cuda:0"))
        print(translate("she is my mother", enc, dec, max_len=max_len, device="cuda:0"))
        print(translate("Where is the registrar's office?", enc, dec, max_len=max_len, device="cuda:0"))
        
torch.save(enc.state_dict(), './enc.pth')
torch.save(dec.state_dict(), './dec.pth')

 75%|███████▍  | 5520/7367 [08:03<02:31, 12.21it/s]