<a href="https://colab.research.google.com/github/tom2rd/textmining/blob/master/Pytorch_seq2seq_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#PyTorchによるSeq2seqの実装

https://www.pytry3g.com/entry/pytorch-seq2seq

Google Colab にドライブをマウント

In [1]:
from google.colab import drive
drive.mount('/content/drive')


Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


Googlecolabdataというフォルダをマウント

In [3]:
cd drive/My\ Drive/Googlecolabdata

/content/drive/My Drive/Googlecolabdata


Pytorchのインストール

In [0]:
!pip install torch > /dev/null

#データの用意

In [0]:
import random
from sklearn.model_selection import train_test_split

word2id = {str(i): i for i in range(10)}
word2id.update({"<pad>": 10, "+": 11, "<eos>": 12})
id2word = {v: k for k, v in word2id.items()}

def load_dataset(N=20000):
    def generate_number():
        number = [random.choice(list("0123456789")) for _ in range(random.randint(1, 3))] 
        # a <= N <= b random.randint(a, b)
        return int("".join(number))
    
    def padding(string, training=True):
        string = "{:*<7s}".format(string) if training else "{:*<6s}".format(string)
        return string.replace("*", "<pad>")
    
    def transform(string, seq_len=7):
        tmp = []
        for i, c in enumerate(string):
            try:
                tmp.append(word2id[c])
            except:
                tmp += [word2id["<pad>"]] * (seq_len - i)
                break
        return tmp
        
    data = []
    target = []    
    for _ in range(N):
        x = generate_number()
        y = generate_number()
        z = x + y
        left = padding(str(x) + "+" + str(y))
        right = padding(str(z), training=False)
        data.append(transform(left))
        right = transform(right, seq_len=6)
        right = [12] + right[:5]
        right[right.index(10)] = 12
        target.append(right)
        
    return data, target

data, target = load_dataset()
train_x, test_x, train_t, test_t = train_test_split(data, target, test_size=0.1)

#EncoderとDecoder

In [0]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


embedding_dim = 16
hidden_dim = 128
vocab_size = len(word2id)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class Encoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, batch_size=100):
        super(Encoder, self).__init__()
        self.hidden_dim = hidden_dim
        self.batch_size = batch_size

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=word2id["<pad>"])
        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True)

    def forward(self, indices):
        embedding = self.word_embeddings(indices)
        if embedding.dim() == 2:
            embedding = torch.unsqueeze(embedding, 1)
        _, state = self.gru(embedding, torch.zeros(1, self.batch_size, self.hidden_dim, device=device))
        
        return state


class Decoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, batch_size=100):
        super(Decoder, self).__init__()
        self.hidden_dim = hidden_dim
        self.batch_size = batch_size

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=word2id["<pad>"])
        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True)
        self.output = nn.Linear(hidden_dim, vocab_size)

    def forward(self, index, state):
        embedding = self.word_embeddings(index)
        if embedding.dim() == 2:
            embedding = torch.unsqueeze(embedding, 1)
        gruout, state = self.gru(embedding, state)
        output = self.output(gruout)
        return output, state


encoder = Encoder(vocab_size, embedding_dim, hidden_dim).to(device)
decoder = Decoder(vocab_size, embedding_dim, hidden_dim).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=word2id["<pad>"])

# Initialize opotimizers
encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.001)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.001)

#学習する

In [10]:
from datetime import datetime
from sklearn.utils import shuffle

batch_size=100
def train2batch(data, target, batch_size=100):
    input_batch = []
    output_batch = []
    data, target = shuffle(data, target)
    
    for i in range(0, len(data), batch_size):
        input_tmp = []
        output_tmp = []
        for j in range(i, i+batch_size):
            input_tmp.append(data[j])
            output_tmp.append(target[j])
        input_batch.append(input_tmp)
        output_batch.append(output_tmp)
    return input_batch, output_batch

def get_current_time():
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


print("Training...")
n_epoch = 100
for epoch in range(1, n_epoch+1):

    
    input_batch, output_batch = train2batch(train_x, train_t)
    for i in range(len(input_batch)):
        # Zero gradients
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()
        # Prepare tensor
        inputs = torch.tensor(input_batch[i], device=device)
        outputs = torch.tensor(output_batch[i], device=device)
        # Forward pass through encoder
        encoder_hidden = encoder(inputs)
        # Create source and target
        source = outputs[:, :-1]
        target = outputs[:, 1:]
        decoder_hidden = encoder_hidden
        
        # Forward batch of sequences through decoder one time step at a time
        loss = 0
        for i in range(source.size(1)):
            decoder_output, decoder_hidden = decoder(source[:, i], decoder_hidden)
            decoder_output = torch.squeeze(decoder_output)
            loss += criterion(decoder_output, target[:, i])

        # Perform backpropagation
        loss.backward()
        
        # Adjust model weights
        encoder_optimizer.step()
        decoder_optimizer.step()
    
    if epoch % 10 == 0:
        print(get_current_time(), "Epoch %d: %.2f" % (epoch, loss.item()))        
        
    if epoch % 10 == 0:
        model_name = "seq2seq_calculator_v{}.pt".format(epoch)
        torch.save({
            'encoder_model': encoder.state_dict(),
            'decoder_model': decoder.state_dict(),
        }, model_name)
        print("Saving the checkpoint...")

Training...


RuntimeError: ignored

#テストする

In [9]:
import numpy as np


result = """---------------
Q:{:>10s}
A:{:>10s}
T/F: {}
---------------"""

encoder = Encoder(vocab_size, embedding_dim, hidden_dim, batch_size=1).to(device)
decoder = Decoder(vocab_size, embedding_dim, hidden_dim, batch_size=1).to(device)


for epoch in range(10, 101, 10):
    model_name = "seq2seq_calculator_v{}.pt".format(epoch)
    checkpoint = torch.load(model_name)
    encoder.load_state_dict(checkpoint["encoder_model"])
    decoder.load_state_dict(checkpoint["decoder_model"])
    
    print("Checkpoint {:>3d}".format(epoch))
    print("-"*30)
    accuracy = 0
    with torch.no_grad():
        for i in range(len(test_x)):
            x = test_x[i]
            input_tensor = torch.tensor([x], device=device)
            state = encoder(input_tensor)
            token = "<eos>"
            try:
                padded_idx_x = x.index(word2id["<pad>"])
            except ValueError:
                padded_idx_x = len(x)
            left = "".join(map(lambda c: str(id2word[c]), x[:padded_idx_x]))
            right = []
            for _ in range(7):
                index = word2id[token]
                input_tensor = torch.tensor([index], device=device)
                output, state = decoder(input_tensor, state)
                prob = F.softmax(torch.squeeze(output))
                index = torch.argmax(prob.cpu().detach()).item()
                token = id2word[index]
                if token == "<eos>":
                    break
                right.append(token)
            right = "".join(right)
            flag = ["F", "T"][eval(left) == int(right)]
            #print(result.format(left, right, flag))
            if flag == "T":
                accuracy += 1
    print("Accuracy: {:.2f}".format(accuracy / len(test_x)))
    print("-"*30)

Checkpoint  10
------------------------------




Accuracy: 0.31
------------------------------
Checkpoint  20
------------------------------
Accuracy: 0.55
------------------------------
Checkpoint  30
------------------------------
Accuracy: 0.72
------------------------------
Checkpoint  40
------------------------------
Accuracy: 0.75
------------------------------
Checkpoint  50
------------------------------
Accuracy: 0.79
------------------------------
Checkpoint  60
------------------------------
Accuracy: 0.81
------------------------------
Checkpoint  70
------------------------------
Accuracy: 0.83
------------------------------
Checkpoint  80
------------------------------
Accuracy: 0.82
------------------------------
Checkpoint  90
------------------------------
Accuracy: 0.80
------------------------------
Checkpoint 100
------------------------------
Accuracy: 0.83
------------------------------
