<a href="https://colab.research.google.com/github/tomoya-ichikawa/NMT-LSTM-Attention-/blob/main/seq2seq_attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive 
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
from torch import nn, optim
from torch.utils.data import(Dataset, DataLoader, TensorDataset)
import tqdm
import re
import collections
import itertools

In [None]:


def normalizeString(s,l):
    if s == l[0]:
      s = s.lower().strip()
   
  
    s = re.sub(r"([.!?])", r" \1", s)
    #s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s

def build_vocab(tokens):
  counts = collections.Counter(tokens)
  sorted_counts = sorted(counts.items(), key=lambda c: c[1], reverse=True)
  word_list = ["<PAD>","<SOS>", "<EOS>", "<UNK>"]+[x[0] for x in sorted_counts]
  word_dict = dict((w, i) for i, w in enumerate(word_list))
  return word_list, word_dict

def word2tensor(words, word_dict, max_len, trgg=False, padding=0):
  if trgg:
    words = ["<SOS>"] + words
  words = [word_dict.get(w, 3) for w in words]
  seq_len = len(words)
  if seq_len < max_len:
    words = words + [padding]*(max_len-seq_len)
  return torch.tensor(words, dtype=torch.int64), seq_len
def word2ltensor(words, word_dict, max_len, trgg=False, padding=0):
  if trgg:
    words =  words +["<EOS>"]
  words = [word_dict.get(w, 3) for w in words]
  seq_len = len(words)
  if seq_len < max_len:
    words = words + [padding]*(max_len-seq_len)
  return torch.tensor(words, dtype=torch.int64), seq_len


In [None]:
class MyDataset(torch.utils.data.Dataset):

  def __init__(self, path, max_len=22, test=False):
    lines = open(path, encoding='utf-8').read().strip().split('\n')
    pairs = [[normalizeString(s,l) for s in l.split('\t')] for l in lines]

    src = [p[0] for p in pairs]
    src = [l.split(" ") for l in src]
    trg = [p[1] for p in pairs]
    trg = [l.split(" ") for l in trg]
    if test:
      self.src_word_list, self.src_word_dict = train_data.src_word_list, train_data.src_word_dict
      self.trg_word_list, self.trg_word_dict = train_data.trg_word_list, train_data.trg_word_dict
    #リストを要素として持つリスト（2次元リスト）を平坦化する
    else:
      self.src_word_list,self.src_word_dict = build_vocab(itertools.chain.from_iterable(src))
      self.trg_word_list,self.trg_word_dict = build_vocab(itertools.chain.from_iterable(trg))

    self.src_data = [word2tensor(words, self.src_word_dict, max_len) for words in src]
    self.trg_data = [word2tensor(words, self.trg_word_dict, max_len, trgg=True) for words in trg]
    self.trg_label = [word2ltensor(words, self.trg_word_dict, max_len, trgg=True, padding = -100) for words in trg]

  def __len__(self):
    return len(self.trg_data)
  def __getitem__(self, idx):
    src, lsrc = self.src_data[idx]
    trg, ltrg = self.trg_data[idx]
    label, llabel = self.trg_label[idx]
    return src, lsrc, trg, ltrg,label, llabel

In [None]:
batch_size =128
max_len =30
path = "drive/MyDrive/Colab-Notebooks/eng-ja.txt"
path2 = "drive/MyDrive/Colab-Notebooks/eng-ja2.txt"
train_data = MyDataset(path, max_len=max_len)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=4)
test_data = MyDataset(path2, max_len=max_len, test=True)
test_loader = DataLoader(test_data)

In [None]:
train_data[0]
w = "".join(train_data.src_word_list[i] for i in test_data[0][0])
w2 = " ".join(train_data.trg_word_list[i] for i in train_data[0][4])


In [None]:
class Encoder(nn.Module):
  def __init__(self, vocab_size, embedding_dim=50,hidden_size=50, num_layer=1, dropout=0.1):
    super().__init__()
    self.emb = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
    self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layer, batch_first=True, dropout=dropout)
  
  def forward(self, x, h0=None, l=None):
    x = self.emb(x)
    if l is not None:
      x = nn.utils.rnn.pack_padded_sequence(x, l, batch_first=True)
    output, h = self.lstm(x,h0)
    if l is not None:
      output = nn.utils.rnn.pad_packed_sequence(output, batch_first=True, padding_value=0)[0]

    return output, h

In [None]:
class Decoder(nn.Module):
  def __init__(self, vocab_size, embedding_dim=50, hidden_size=50, num_layer=1, dropout=0.1):
    super().__init__()
    self.emb = nn.Embedding(vocab_size, embedding_dim)
    self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layer, batch_first=True, dropout=dropout)
    self.linear = nn.Linear(hidden_size*2, vocab_size)
    self.softmax = nn.Softmax(dim=2)

  def forward(self, x, h0=None, hs=None, l=None):
    x = self.emb(x)
    if l is not None:
      x = nn.utils.rnn.pack_padded_sequence(x, l, batch_first=True)
    output, hidden = self.lstm(x,h0)
    if l is not None:
      output = nn.utils.rnn.pad_packed_sequence(output, batch_first=True, padding_value=0)[0]
    output_a = torch.transpose(output, 1, 2)
    s =torch.bmm(hs, output_a)
    s = torch.transpose(s, 1,2)
    attention_weight = self.softmax(s)
    attention_score = torch.bmm(attention_weight, hs)
    output = torch.cat([output, attention_score], dim=2)
    output = self.linear(output)
    return output, hidden

In [None]:
enc = Encoder(len(train_data.src_word_list), 512, 512, 2)
dec = Decoder(len(train_data.trg_word_list), 512, 512, 2)
enc.to("cuda:0")
dec.to("cuda:0")
opt_enc = optim.Adam(enc.parameters(), 0.002)
opt_dec = optim.Adam(dec.parameters(), 0.002)
loss_f = nn.CrossEntropyLoss()

In [None]:
def to2D(x):
  shapes = x.shape
  return x.reshape(shapes[0]*shapes[1], -1)

In [None]:
from statistics import mean

i =0
enc.train()
dec.train()
for epoc in range(20):
  losses = []
  for x, lx, y, ly, label, llabel in train_loader:
    lx, sort_idx = lx.sort(descending=True)
    x, y, ly, label, llabel= x[sort_idx], y[sort_idx], ly[sort_idx], label[sort_idx], llabel[sort_idx]
    x, y , label = x.to("cuda:0"), y.to("cuda:0"), label.to("cuda:0")
    
    out, ctx = enc(x, l=lx)
    ly, sort_idx = ly.sort(descending=True)
    y, label, llabel = y[sort_idx],label[sort_idx], llabel[sort_idx]
    h0 = (ctx[0][:,sort_idx,:], ctx[1][:, sort_idx, :])
    out = out[sort_idx,:,:]
    output,_ = dec(y, h0, out, l=ly)
    loss = loss_f(to2D(output), to2D(label[:, :max(ly)]).squeeze())
    enc.zero_grad()
    dec.zero_grad()
    loss.backward()
    opt_enc.step(), opt_dec.step()
    losses.append(loss.item())

  print(epoc, mean(losses))

  

  cpuset_checked))


0 2.7853371124438313
1 1.467399000816638
2 0.9660628965443663
3 0.6798749598853119
4 0.5038627501186508
5 0.39553467201454867
6 0.3323540522543061
7 0.28723837176094885
8 0.25244237562579575
9 0.23295301980222277
10 0.22221132838512625
11 0.2062469698355326
12 0.19598310755189421
13 0.18656859952775415
14 0.1862150082350387
15 0.18610288566716796
16 0.1771948013044989
17 0.1720039608419094
18 0.17088862053116263
19 0.167032426904382


In [None]:
def translate(input_str, input_len, enc, dec, max_len=22, device = "cpu"):
  sos_inputs = torch.tensor(1, dtype=torch.int64)
  input_str = input_str.to(device)
  sos_inputs = sos_inputs.to(device)
  out, ctx = enc(input_str, l=input_len)
  z = sos_inputs
  results = []
  h= ctx
  for i in range(max_len):
    output, h = dec(z.view(1,1), h,out)
    
    wi = output.detach().view(-1).max(0)[1]

    if wi.item()==2:
      break
    results.append(wi.item())
    z = wi

  return " ".join(train_data.trg_word_list[i] for i in results)

In [None]:
enc.eval()
dec.eval()
ptrgs=[]
with torch.no_grad():
  for x, lx, y, ly, label, llabel in test_loader:
    ptrg = translate(x, lx, enc, dec, device="cuda:0")
    y = y[:,1:ly]
    w = " ".join(train_data.trg_word_list[i] for i in y.squeeze())
    
    ptrgs.append((ptrg, w))


生成文と参照訳




In [None]:
for pred, refer in ptrgs:
    print("翻訳文："+pred)
    print("参照訳："+refer)
    print("")

翻訳文：they asserted that it could be true  .
参照訳：they finally acknowledged it as true  .

翻訳文：he was not good at swimming  .
参照訳：he didn 't care for swimming  .

翻訳文：he is said to have his sister on his back  .
参照訳：he is no less kind than his sister  .

翻訳文：you must be back before ten o 'clock  .
参照訳：you must be back before ten  .

翻訳文：i hope you will succeed  .
参照訳：break a leg  .

翻訳文：she lives next door to us  .
参照訳：she lives next door to us  .

翻訳文：i am trying to answer you  .
参照訳：i 'm about to tell you the answer  .

翻訳文：i am as rich as yoshio  .
参照訳：i 'm a person who lives for the moment  .

翻訳文：i will put this game  .
参照訳：we have this game on ice  .

翻訳文：tell me the reason you 've ever done such a thing  .
参照訳：will you give me your reasons for doing this  ?

翻訳文：she likes the teacher  .
参照訳：she likes the teacher  .

翻訳文：that 's enough job  .
参照訳：it 's business  .

翻訳文：this pair of shoes will two years  .
参照訳：these shoes will last you two years  .

翻訳文：he adapted himself to the meet