<a href="https://colab.research.google.com/github/komazawa-deep-learning/komazawa-deep-learning.github.io/blob/master/2025notebooks/2025_1104seq2seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import numpy as np
from termcolor import colored
import random

if torch.backends.mps.is_available():
    device = 'mps'
elif torch.cuda.is_available():
    device = 'cuda:0'
else:
    device = 'cpu'
print(f'device:{device}')

import matplotlib.pyplot as plt
try:
    import japanize_matplotlib
except ImportError:
    !pip install japanize_matplotlib
    import japanize_matplotlib

# 乱数のシードを設定
def init_seed(seed:int=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed=42
init_seed(seed=seed)

# データ生成

In [2]:
def gen_randint(_min:int=0, _max:int=1000):
    """_min から _max までの整数を一つ返す"""
    _range = _max - _min
    return np.random.randint(_range) + _min

def gen_addition_data(_min:int=0, _max:int=100):
    """足し算問題を作成し，その答えとともに inp, tch として返す"""
    X = gen_randint(_min=_min, _max=_max)
    Y = gen_randint(_min=_min, _max=_max)
    A = X + Y
    Q = str(X)+'+'+str(Y)+'='
    A = str(A)
    return Q, A


class _digit_tokenizer():
    """PyTorch で活用するために文字としての数字をトークン化
    encode(), decode() を実装"""
    def __init__(self, N:int=10000, tokens:list=list('0123456789.+= ')+['<PAD>','<SOS>','<EOS>','<UNK>']):
        self.tokens=tokens

    def encode(self, string:str):
        ret = [self.tokens.index(ch) for ch in string]
        return ret

    def decode(self, ids):
        ret = [self.tokens[idx] for idx in ids]
        return ret

    def __call__(self, X):
        return self.encode(X)


inps, tchs = gen_addition_data()
_tokenizer = _digit_tokenizer()
print(_tokenizer.encode(inps), _tokenizer.encode(tchs))
print(f'inps:{inps}')
print(f'tchs:{tchs}')
#_tokenizer(inps), _tokenizer.decode(_tokenizer(inps))

[5, 1, 11, 9, 2, 12] [1, 4, 3]
inps:51+92=
tchs:143


In [3]:
class addition_ds(torch.utils.data.Dataset):
    """PyTorch のデータセット：足し算の入力データと教師データを管理"""
    def __init__(
        self, N:int=1000, # 生成する総データ数
        tokenizer=_tokenizer, # トークナイザ
        generator=gen_addition_data # 問題生成器
            ):
        super().__init__()
        self.tokenizer=tokenizer
        inps, tchs = [], []
        for _ in range(N):
            inp, tch = generator()
            inps.append(inp)
            tchs.append(tch)

        self.inps = inps
        self.tchs = tchs

    def __len__(self):
        return len(self.inps)

    def __getitem__(self, idx):
        inp = self.tokenizer(self.inps[idx])
        tch = [self.tokenizer.tokens.index('<SOS>')]+self.tokenizer(self.tchs[idx])+[self.tokenizer.tokens.index('<EOS>')]
        return torch.tensor(inp), torch.tensor(tch)

# データセットの定義
ds = addition_ds(N=10000)
#ds.__len__(), ds.__getitem__(99), ds.tokenizer.decode(ds.__getitem__(99)[0])

# 定義したデータセットを分割
N_train = int (ds.__len__() * 0.9)
N_test  = ds.__len__() - N_train
train_ds, test_ds = torch.utils.data.random_split(dataset=ds, lengths=(N_train, N_test), generator=torch.Generator().manual_seed(seed))

def _collate_fn(batch):
    inps, tgts = list(zip(*batch))
    inps = list(inps)
    tgts = list(tgts)
    return inps, tgts

# ミニバッチ処理を管理するデータローダ
batch_size = 512
train_dl = torch.utils.data.DataLoader(dataset=train_ds, batch_size=batch_size, shuffle=True, collate_fn=_collate_fn)
test_dl  = torch.utils.data.DataLoader(dataset=test_ds,  batch_size=8, shuffle=False, collate_fn=_collate_fn)
print(f'訓練データセット数:{train_ds.__len__():,d}')
print(f'検査データセット数:{test_ds.__len__():,d}')
#next(iter(train_dl))

訓練データセット数:9,000
検査データセット数:1,000


# Encoder, Decoder の定義

In [4]:
import torch.nn as nn

# Encoderクラス
class Encoder(nn.Module):
    def __init__(self, n_vocab:int, n_emb:int, n_hid:int,
                 num_layers:int=1, dropout:float=0.5,
                 padding_idx:int=_tokenizer.tokens.index('<PAD>')):

        super().__init__()
        self.n_hid = n_hid
        self.emb_layer = nn.Embedding(
            num_embeddings=n_vocab,
            embedding_dim=n_emb,
            padding_idx=padding_idx)
        self.rnn = nn.LSTM(
            input_size=n_emb,
            hidden_size=n_hid,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True)

    def forward(self, inp:int):
        emb = self.emb_layer(inp)
        out, state = self.rnn(emb)
        return out, state

# Decoderクラス
class Decoder(nn.Module):
    def __init__(self, n_vocab:int, n_emb:int, n_hid:int,
        num_layers:int=1, dropout:float=0.5,
        padding_idx:int=_tokenizer.tokens.index('<PAD>')):

        super().__init__()
        self.n_hid = n_hid
        self.emb_layer = nn.Embedding(
            num_embeddings=n_vocab,
            embedding_dim=n_emb,
            padding_idx=padding_idx)
        self.rnn = nn.LSTM(
            input_size=n_emb,
            hidden_size=n_hid,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True)
        self.fc = nn.Linear(in_features=n_hid, out_features=n_vocab)

    def forward(self, inp, enc_state):
        emb = self.emb_layer(inp)
        out, state = self.rnn(emb, enc_state) # 第２戻り値は推論時に次の文字を生成するときに使う
        out = self.fc(out)
        return out, state

In [5]:
# #n_emb, n_hid = 64, 64     # 文字の埋め込み次元数
# #n_emb, n_hid = 32, 32   # 文字の埋め込み次元数
# #n_emb, n_hid =  128, 128  # 文字の埋め込み次元数
# n_emb, n_hid = 256, 256   # 文字の埋め込み次元数
# #n_emb, n_hid = 512, 512   # 文字の埋め込み次元数

# n_vocab = len(_tokenizer.tokens) # 扱う文字の数。今回は 18 文字
# num_layers, dropout = 2, 0.5
# num_layers, dropout = 3, 0.25
# # num_layers, dropout = 3, 0.5
# # num_layers, dropout = 2, 0.01
# #num_layers, dropout = 1, 0.0

# encoder = Encoder(n_vocab, n_emb, n_hid, num_layers=num_layers, dropout=dropout).to(device)
# decoder = Decoder(n_vocab, n_emb, n_hid, num_layers=num_layers, dropout=dropout).to(device)

class _Seq2Seq(torch.nn.Module):
    def __init__(self, Encoder:torch.nn.Module=None, Decoder:torch.nn.Module=None,
    # def __init__(self, Encoder:nn.Module=encoder, Decoder:nn.Module=decoder,
                 n_vocab:int=0, n_emb:int=0, n_hid:int=0, num_layers:int=1, dropout:float=0.0,
                 padding_idx=_tokenizer.tokens.index('<PAD>')):

        super().__init__()
        if Encoder == None:
            self.encoder = self.Encoder(n_vocab, n_emb, n_hid, num_layers=num_layers, dropout=dropout).to(device)
        else:
            self.encoder = Encoder
        if Decoder == None:
            self.decoder = self.Decoder(n_vocab, n_emb, n_hid, num_layers=num_layers, dropout=dropout).to(device)
        else:
            self.decoder = Decoder

        self.n_vocab=n_vocab
        self.n_emb=n_emb
        self.n_hid=n_hid
        self.padding_idx=padding_idx
        self.num_layers=num_layers

    def forward(self, inps, tchs):
        enc_out, enc_state = self.encoder(inps)
        dec_out, dec_state = self.decoder(tchs, enc_state)
        return dec_out, dec_state

    # Encoderクラス
    class Encoder(torch.nn.Module):
        def __init__(
            self, n_vocab:int, n_emb:int, n_hid:int, num_layers:int=1, dropout:float=0.5,
                 padding_idx:int=_tokenizer.tokens.index('<PAD>')):

            super().__init__()
            self.n_hid = n_hid
            self.emb_layer = torch.nn.Embedding(num_embeddings=n_vocab, embedding_dim=n_emb, padding_idx=padding_idx)
            self.rnn = torch.nn.LSTM(input_size=n_emb, hidden_size=n_hid, num_layers=num_layers, dropout=dropout, batch_first=True)
            self.fc = torch.nn.Linear(in_features=n_hid, out_features=n_vocab)

        def forward(self, inp:int):
            emb = self.emb_layer(inp)
            out, state = self.rnn(emb)
            out = self.fc(out)
            return out, state

    # Decoderクラス
    class Decoder(torch.nn.Module):
        def __init__(
            self, n_vocab:int, n_emb:int, n_hid:int, num_layers:int=1, dropout:float=0.5,
            padding_idx:int=_tokenizer.tokens.index('<PAD>')):

            super().__init__()
            self.n_hid = n_hid
            self.emb_layer = torch.nn.Embedding(num_embeddings=n_vocab, embedding_dim=n_emb, padding_idx=padding_idx)
            self.rnn = torch.nn.LSTM(input_size=n_emb, hidden_size=n_hid, num_layers=num_layers, dropout=dropout, batch_first=True)
            self.fc = torch.nn.Linear(in_features=n_hid, out_features=n_vocab)

        def forward(self, inp, enc_state):
            emb = self.emb_layer(inp)
            out, state = self.rnn(emb, enc_state) # 第２戻り値は推論時に次の文字を生成するときに使う
            out = self.fc(out)
            return out, state

In [6]:
# 損失関数
criterion = torch.nn.CrossEntropyLoss()

n_emb, n_hid = 256, 256   # 文字の埋め込み次元数
#n_emb, n_hid = 512, 512   # 文字の埋め込み次元数

n_vocab = len(_tokenizer.tokens) # 扱う文字の数
num_layers, dropout = 2, 0.5
num_layers, dropout = 2, 0.0
# num_layers, dropout = 3, 0.25
#num_layers, dropout = 3, 0.5
# num_layers, dropout = 2, 0.01
# num_layers, dropout = 1, 0.0

#seq2seq = _Seq2Seq()
#seq2seq = _Seq2Seq(Encoder=Encoder, Decoder=Decoder, n_vocab=n_vocab, n_emb=n_emb, n_hid=n_hid, num_layers=num_layers, dropout=dropout).to(device)
seq2seq = _Seq2Seq(Encoder=None, Decoder=None, n_vocab=n_vocab, n_emb=n_emb, n_hid=n_hid, num_layers=num_layers, dropout=dropout).to(device)
_model = seq2seq

# 最適化関数の定義
lr = 1e-2
_optimizer = torch.optim.Adam(_model.parameters(), lr=lr)
# encoder_optimizer = torch.optim.Adam(encoder.parameters(), lr=lr)
# decoder_optimizer = torch.optim.Adam(decoder.parameters(), lr=lr)

_model.eval()

_Seq2Seq(
  (encoder): Encoder(
    (emb_layer): Embedding(18, 256, padding_idx=14)
    (rnn): LSTM(256, 256, num_layers=2, batch_first=True)
    (fc): Linear(in_features=256, out_features=18, bias=True)
  )
  (decoder): Decoder(
    (emb_layer): Embedding(18, 256, padding_idx=14)
    (rnn): LSTM(256, 256, num_layers=2, batch_first=True)
    (fc): Linear(in_features=256, out_features=18, bias=True)
  )
)

## fit() の定義

In [None]:
from torch.nn.utils.rnn import pad_sequence

def count_correct(outs:torch.Tensor, tchs:torch.Tensor):
    _nc, _n = 0, 0
    outs = outs.argmax(dim=-1).detach().cpu().numpy()
    tchs = tchs.detach().cpu().numpy()
    for a, b in zip(outs, tchs):
        a, b = a[a<10], b[b<10]
        yesno = False
        if len(a) != len(b):
            # print(f'a:{a}, b:{b}')
            continue
        else:
            yesno = np.array((a == b) * 1).sum() == len(b)
        _nc += 1 if yesno else 0

    _n = len(tchs)
    return _nc, _n

def _fit(
    _model:torch.nn.Module=None, _optimizer:torch.optim=None, train_dl=train_dl, test_dl=test_dl,
    epochs:int=200, interval:int=10,
    train_losses:list=[], test_losses:list=[], train_crs:list=[], test_crs:list=[]):

    for epoch in range(1, epochs+1):

        train_epoch_loss = 0  # epoch 毎の 損失値
        _model.train()
        for inps, tchs in train_dl:
            _model.zero_grad()    # 勾配の初期化
            _optimizer.zero_grad()

            # 系列長を揃える
            inps = pad_sequence(inps, batch_first=True, padding_value=_tokenizer.tokens.index('<PAD>')).to(device)
            tchs = pad_sequence(tchs, batch_first=True, padding_value=_tokenizer.tokens.index('<PAD>')).to(device)

            dec_out, dec_state = _model(inps, tchs)  # 順伝搬
            loss = 0
            for j in range(dec_out.size()[1]-1):
                loss += criterion(dec_out[:, j, :], tchs[:, j+1])
            train_epoch_loss += loss.item()
            loss.backward()                           # 誤差逆伝播
            _optimizer.step()                         # パラメータ更新
        train_losses.append(train_epoch_loss)

        test_epoch_loss = 0.
        _model.eval()
        for inps, tchs in test_dl:
            inps = pad_sequence(inps, batch_first=True, padding_value=_tokenizer.tokens.index('<PAD>')).to(device)
            tchs = pad_sequence(tchs, batch_first=True, padding_value=_tokenizer.tokens.index('<PAD>')).to(device)

            dec_out, dec_state = _model(inps, tchs)
            loss = 0
            for j in range(dec_out.size()[1]-1):
                loss += criterion(dec_out[:, j, :], tchs[:, j+1])
            test_epoch_loss += loss.item()
        test_losses.append(test_epoch_loss)

        _model.eval()
        Nc, N = 0, 0
        for inps, tchs in train_dl:
            inps = pad_sequence(inps, batch_first=True, padding_value=_tokenizer.tokens.index('<PAD>')).to(device)
            tchs = pad_sequence(tchs, batch_first=True, padding_value=_tokenizer.tokens.index('<PAD>')).to(device)
            dec_out, dec_state = _model(inps, tchs)
            _Nc, _N = count_correct(dec_out, tchs)
            Nc += _Nc
            N += _N
        train_crs.append((Nc,N))

        Nc, N = 0, 0
        for inps, tchs in test_dl:
            inps = pad_sequence(inps, batch_first=True, padding_value=_tokenizer.tokens.index('<PAD>')).to(device)
            tchs = pad_sequence(tchs, batch_first=True, padding_value=_tokenizer.tokens.index('<PAD>')).to(device)
            dec_out, dec_state = _model(inps, tchs)
            _Nc, _N = count_correct(dec_out, tchs)
            Nc += _Nc
            N += _N
        test_crs.append((Nc,N))

        # 損失を表示
        if ((epoch % interval) == 0) or (epoch == 1):
            print(f"Epoch {epoch:03d}:", f"訓練損失:{train_epoch_loss:7.3f}", f"検査損失:{test_epoch_loss:7.3f}",
                  f"訓練データ正解率:{train_crs[-1][0]/train_crs[-1][1]:.02f}", f"検査データ正解率:{test_crs[-1][0]/test_crs[-1][1]:.02f}")
        if train_epoch_loss < 0.01:
            break

    return _model, _optimizer, train_losses, test_losses, train_crs, test_crs

criterion = torch.nn.CrossEntropyLoss()

n_emb, n_hid = 256, 256   # 文字の埋め込み次元数
n_emb, n_hid = 32, 128   # 文字の埋め込み次元数
n_emb, n_hid = 64, 256    # 文字の埋め込み次元数
#n_emb, n_hid = 512, 512   # 文字の埋め込み次元数
n_emb, n_hid = 64, 512

n_vocab = len(_tokenizer.tokens) # 扱う文字の数。
num_layers, dropout = 2, 0.1
#num_layers, dropout = 3, 0.5
#num_layers, dropout = 2, 0.0
# num_layers, dropout = 3, 0.25
#num_layers, dropout = 3, 0.5
# num_layers, dropout = 2, 0.01
num_layers, dropout = 2, 0.25
#num_layers, dropout = 2, 0.5

seq2seq = _Seq2Seq(Encoder=None, Decoder=None, n_vocab=n_vocab, n_emb=n_emb, n_hid=n_hid, num_layers=num_layers, dropout=dropout).to(device)
_model = seq2seq

# 最適化関数の定義
lr = 1e-3
_optimizer = torch.optim.Adam(_model.parameters(), lr=lr)
_model.eval()
_model, _optimizer, train_losses, test_losses, train_crs, test_crs = _fit(
    _model=_model, _optimizer=_optimizer, train_dl=train_dl, test_dl=test_dl, epochs=20, interval=2)

In [None]:
_model, _optimizer, train_losses, test_losses, train_crs, test_crs = _fit(
    _model=_model, _optimizer=_optimizer, train_dl=train_dl, test_dl=test_dl, epochs=300, interval=10)

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

plt.plot(train_losses, label="訓練損失")
plt.plot(test_losses, label="検査損失")
plt.xlabel('エポック')
plt.title(f'dropout:{dropout:.2f}, n_hid:{n_hid}, num_layers:{num_layers}, lr:{lr:.3f}')
plt.legend()
#plt.xkcd()
plt.show()