#[01] Seq2Seq

##(1) Data loading & preprocessing

In [1]:
import re, os, unicodedata, urllib3, zipfile, shutil
import numpy as np
import pandas as pd
import torch
from collections import Counter
from tqdm import tqdm
from torch.utils.data import DataLoader, TensorDataset

In [2]:
num_samples = 33000

In [3]:
!wget -c http://www.manythings.org/anki/fra-eng.zip && unzip -o fra-eng.zip

--2024-07-17 07:01:28--  http://www.manythings.org/anki/fra-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7943074 (7.6M) [application/zip]
Saving to: ‘fra-eng.zip’


2024-07-17 07:01:31 (4.14 MB/s) - ‘fra-eng.zip’ saved [7943074/7943074]

Archive:  fra-eng.zip
  inflating: _about.txt              
  inflating: fra.txt                 


In [4]:
def unicode_to_ascii(s):
    return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')

In [5]:
def preprocess_sentence(sent):
    # 프랑스어 악센트(accent) 삭제
    sent = unicode_to_ascii(sent.lower())

    # 단어와 구두점 사이 공백 생성
    # Ex) "he is a boy." => "he is a boy ."
    sent = re.sub(r"([?.!,¿])", r" \1", sent)

    # (a-z, A-Z, ".", "?", "!", ",") 를 제외하고 전부 공백으로 변환
    sent = re.sub(r"[^a-zA-Z!.?,]+", r" ", sent)

    # 다수 개의 공백을 하나의 공백으로 치환
    sent = re.sub(r"\s+", " ", sent)
    return sent

* 교사 강요(Teacher Forcing) 훈련을 위해, 디코더의 입력 시퀀스와, 레이블에 해당하는 출력 시퀀스를 분리
* 입력 시퀀스에는 시작을 의미하는 토큰을 추가
* 출력 시퀀스에는 종료를 의미하는 토큰을 추가

In [6]:
def load_preprocessed_data():
    encoder_input, decoder_input, decoder_target = [], [], []

    with open('fra.txt', "r") as lines:
        for i, line in enumerate(lines):
            # source 데이터와 target 데이터 분리
            src_line, tar_line, _ = line.strip().split('\t')

            # source 데이터 전처리
            src_line = [w for w in preprocess_sentence(src_line).split()]

            # target 데이터 전처리
            tar_line = preprocess_sentence(tar_line)
            tar_line_in = [w for w in ("<sos> " + tar_line).split()]
            tar_line_out = [w for w in (tar_line + " <eos>").split()]

            encoder_input.append(src_line)
            decoder_input.append(tar_line_in)
            decoder_target.append(tar_line_out)

            if i == num_samples - 1:
                break

    return encoder_input, decoder_input, decoder_target

In [7]:
# 전처리 테스트
en_sent = u"Have you had dinner?"
fr_sent = u"Avez-vous déjà diné?"

print('전처리 전 영어 문장 :', en_sent)
print('전처리 후 영어 문장 :', preprocess_sentence(en_sent))
print('전처리 전 프랑스어 문장 :', fr_sent)
print('전처리 후 프랑스어 문장 :', preprocess_sentence(fr_sent))

전처리 전 영어 문장 : Have you had dinner?
전처리 후 영어 문장 : have you had dinner ?
전처리 전 프랑스어 문장 : Avez-vous déjà diné?
전처리 후 프랑스어 문장 : avez vous deja dine ?


In [8]:
sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data()

In [9]:
print('인코더 입력 :', sents_en_in[:5])
print('디코더 입력 :', sents_fra_in[:5])
print('디코더 레이블 :', sents_fra_out[:5])

인코더 입력 : [['go', '.'], ['go', '.'], ['go', '.'], ['go', '.'], ['hi', '.']]
디코더 입력 : [['<sos>', 'va', '!'], ['<sos>', 'marche', '.'], ['<sos>', 'en', 'route', '!'], ['<sos>', 'bouge', '!'], ['<sos>', 'salut', '!']]
디코더 레이블 : [['va', '!', '<eos>'], ['marche', '.', '<eos>'], ['en', 'route', '!', '<eos>'], ['bouge', '!', '<eos>'], ['salut', '!', '<eos>']]


* 단어집합(Vocabulary): 단어로부터 정수를 얻는 딕셔너리
* 패딩 토큰 <PAD> : 0번 & OOV 토큰 <UNK> : 1번

In [10]:
def build_vocab(sents):
    word_list = []

    for sent in sents:
        for word in sent:
            word_list.append(word)

    # 각 단어별 등장 빈도를 계산하여 등장 빈도가 높은 순서로 정렬
    word_counts = Counter(word_list)
    vocab = sorted(word_counts, key=word_counts.get, reverse=True)

    word_to_index = {}
    word_to_index['<PAD>'] = 0
    word_to_index['<UNK>'] = 1

    # 등장 빈도가 높은 단어일수록 낮은 정수를 부여
    for index, word in enumerate(vocab):
        word_to_index[word] = index + 2

    return word_to_index

In [11]:
src_vocab = build_vocab(sents_en_in)
tar_vocab = build_vocab(sents_fra_in + sents_fra_out)

src_vocab_size = len(src_vocab)
tar_vocab_size = len(tar_vocab)

print('영어 단어 집합의 크기 : {:d}, 프랑스어 단어 집합의 크기 : {:d}'.format(src_vocab_size, tar_vocab_size))

영어 단어 집합의 크기 : 4487, 프랑스어 단어 집합의 크기 : 7880


In [12]:
index_to_src = {v : k for k, v in src_vocab.items()}
index_to_tar = {v : k for k, v in tar_vocab.items()}

def texts_to_sequences(sents, word_to_index):
    encoded_X_data = []
    for sent in tqdm(sents):
        index_sequence = []
        for word in sent:
            try:
                index_sequence.append(word_to_index[word])
            except KeyError:
                index_sequence.append(word_to_index['<UNK>'])
        encoded_X_data.append(index_sequence)
    return encoded_X_data

In [13]:
encoder_input = texts_to_sequences(sents_en_in, src_vocab)
decoder_input = texts_to_sequences(sents_fra_in, tar_vocab)
decoder_target = texts_to_sequences(sents_fra_out, tar_vocab)

100%|██████████| 33000/33000 [00:00<00:00, 191067.85it/s]
100%|██████████| 33000/33000 [00:00<00:00, 685856.59it/s]
100%|██████████| 33000/33000 [00:00<00:00, 721538.62it/s]


In [14]:
for i, (item1, item2) in zip(range(5), zip(sents_en_in, encoder_input)):
    print(f"Index: {i}, 정수 인코딩 전: {item1}, 정수 인코딩 후: {item2}")

Index: 0, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 1, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 2, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 3, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 4, 정수 인코딩 전: ['hi', '.'], 정수 인코딩 후: [737, 2]


In [15]:
def pad_sequences(sentences, max_len=None):
    # 최대 길이 값이 주어지지 않을 경우 데이터 내 최대 길이로 패딩
    if max_len is None:
        max_len = max([len(sentence) for sentence in sentences])

    features = np.zeros((len(sentences), max_len), dtype=int)
    for index, sentence in enumerate(sentences):
        if len(sentence) != 0:
            features[index, :len(sentence)] = np.array(sentence)[:max_len]
    return features

In [16]:
encoder_input = pad_sequences(encoder_input)
decoder_input = pad_sequences(decoder_input)
decoder_target = pad_sequences(decoder_target)

In [17]:
print('인코더 입력의 크기(shape) :', encoder_input.shape)
print('디코더 입력의 크기(shape) :', decoder_input.shape)
print('디코더 레이블의 크기(shape) :', decoder_target.shape)

인코더 입력의 크기(shape) : (33000, 7)
디코더 입력의 크기(shape) : (33000, 16)
디코더 레이블의 크기(shape) : (33000, 16)


In [18]:
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
print('랜덤 시퀀스 :', indices)

랜덤 시퀀스 : [  474  5410  4220 ... 20286 27464 10058]


In [19]:
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

In [20]:
print([index_to_src[word] for word in encoder_input[30997]])
print([index_to_tar[word] for word in decoder_input[30997]])
print([index_to_tar[word] for word in decoder_target[30997]])

['i', 'play', 'rugby', '.', '<PAD>', '<PAD>', '<PAD>']
['<sos>', 'je', 'joue', 'au', 'rugby', '.', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']
['je', 'joue', 'au', 'rugby', '.', '<eos>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']


In [21]:
n_of_val = int(33000 * 0.1)
print('검증 데이터의 갯수 :', n_of_val)

검증 데이터의 갯수 : 3300


In [22]:
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]

encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]

In [23]:
print('훈련 source 데이터의 크기 :', encoder_input_train.shape)
print('훈련 target 데이터의 크기 :', decoder_input_train.shape)
print('훈련 target 레이블의 크기 :', decoder_target_train.shape)
print('테스트 source 데이터의 크기 :',encoder_input_test.shape)
print('테스트 target 데이터의 크기 :',decoder_input_test.shape)
print('테스트 target 레이블의 크기 :',decoder_target_test.shape)

훈련 source 데이터의 크기 : (29700, 7)
훈련 target 데이터의 크기 : (29700, 16)
훈련 target 레이블의 크기 : (29700, 16)
테스트 source 데이터의 크기 : (3300, 7)
테스트 target 데이터의 크기 : (3300, 16)
테스트 target 레이블의 크기 : (3300, 16)


##(2) Model Training

* Encoder: 입력 시퀀스를 받아 해당 시퀀스의 정보를 압축하여 contexxt vector로 변환.
* Encoder는 Embedding layer와 LSTM layer로 구성. Embedding layer는 입력 시퀀스의 각 토큰을 고정 크기의 벡터로 변환 / LSTM layer는 시퀀스의 순서 정보를 고려하여 해당 시퀀스를 요약
* Encoder의 forward 메서드는 입력 시퀀스를 받아 LSTM의 hidden state와 cell state를 반환


* Decoder: encoder에서 생성한 context vector(인코더의 마지막 은닉 상태)를 기반으로 출력 시퀀스 생성.
* Decoder는 Embedding layer와 LSTM layer로 구성. Decoder의 LSTM은 encoder에서 전달 받은 hidden state와 cell state를 초기상태로 사용하여 출력 시퀀스를 생성.
* 생성된 출력 시퀀스는 fully conntected layer를 통과하여 각 시점의 출력 토큰에 대한 확률 분포를 얻음
* Decoder의 forward 메서드는 입력 시퀀스, hidden state, cell state를 받아 출력 시퀀스, 업데이트된 hidden state와 cell state를 반환

* Seq2Seq 클래스는 Encoder와 Decoder를 결합하여 전체 모델 구성
* Seq2Seq 모델의 forward 메서드는 입력 시퀀스(src)와 출력 시퀀스(trg)를 받아 Encoder에서 생성한 은닉 상태(hidden state)와 셀 상태(cell state)를 Decoder로 전달하고, Decoder에서 생성한 출력 시퀀스를 반환

* Seq2Seq의 Decoder는 각 시점마다 다중 클래스 분류 문제를 풀고 있음. 매 시점마다 프랑스어 단어 집합의 크기(tar_vocab_size)의 선택지에서 단어를 1개 선택하여 이를 이번 시점에서 예측한 단어로 택함.
* 다중 클래스 분류 문제이므로 모델 학습을 위해 CrossEntropyLoss 손실 함수 사용, Adam optimizer로 파라미터 최적화. CrossEntropyLoss의 ignore_index는 패딩 토큰의 인덱스를 무시하도록 설정

In [24]:
import torch
import torch.nn as nn
import torch.optim as optim

embedding_dim = 256
hidden_units = 256

class Encoder(nn.Module):
    def __init__(self, src_vocab_size, embedding_dim, hidden_units):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_units, batch_first=True)

    def forward(self, x):
        # x.shape == (batch_size, seq_len, embedding_dim)
        x = self.embedding(x)
        # hidden.shape == (1, batch_size, hidden_units), cell.shape == (1, batch_size, hidden_units)
        _, (hidden, cell) = self.lstm(x)
        # 인코더의 출력은 hidden state, cell state
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, tar_vocab_size, embedding_dim, hidden_units):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(tar_vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_units, batch_first=True)
        self.fc = nn.Linear(hidden_units, tar_vocab_size)

    def forward(self, x, hidden, cell):
        # x.shape == (batch_size, seq_len, embedding_dim)
        x = self.embedding(x)

        # 디코더의 LSTM으로 인코더의 hidden state, cell state 전달
        # output.shape == (batch_size, seq_len, hidden_units)
        # hidden.shape == (1, batch_size, hidden_units)
        # cell.shape == (1, batch_size, hidden_units)
        output, (hidden, cell) = self.lstm(x, (hidden, cell))

        # output.shape == (batch_size, seq_len, tar_vocab_size)
        output = self.fc(output)

        # 디코더의 출력은 예측값, hidden state, cell state
        return output, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg):
        hidden, cell = self.encoder(src)

        # 훈련 중에는 디코더의 출력 중 오직 output만 사용한다.
        output, _, _ = self.decoder(trg, hidden, cell)
        return output

encoder = Encoder(src_vocab_size, embedding_dim, hidden_units)
decoder = Decoder(tar_vocab_size, embedding_dim, hidden_units)
model = Seq2Seq(encoder, decoder)

loss_function = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters())

In [25]:
print(model)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(4487, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(7880, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
    (fc): Linear(in_features=256, out_features=7880, bias=True)
  )
)


In [30]:
def evaluation(model, dataloader, loss_function, device):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_count = 0

    with torch.no_grad():
        for encoder_inputs, decoder_inputs, decoder_targets in dataloader:
            encoder_inputs = encoder_inputs.to(device)
            decoder_inputs = decoder_inputs.to(device)
            decoder_targets = decoder_targets.to(device)

            # Forward propagation
            # outputs.shape == (batch_size, seq_len, tar_vocab_size)
            outputs = model(encoder_inputs, decoder_inputs)

            # Calculate loss
            # outputs.view(-1, outputs.size(-1))의 shape : (batch_size * seq_len, tar_vocab_size)
            # decoder_targets.view(-1) shape : (batch_size * seq_len)
            loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
            total_loss += loss.item()

            # Accuracy (without padding token)
            mask = decoder_targets != 0
            total_correct += ((outputs.argmax(dim=-1) == decoder_targets) * mask).sum().item()
            total_count += mask.sum().item()

    return total_loss / len(dataloader), total_correct / total_count

In [24]:
encoder_input_train_tensor = torch.tensor(encoder_input_train, dtype=torch.long)
decoder_input_train_tensor = torch.tensor(decoder_input_train, dtype=torch.long)
decoder_target_train_tensor = torch.tensor(decoder_target_train, dtype=torch.long)

encoder_input_test_tensor = torch.tensor(encoder_input_test, dtype=torch.long)
decoder_input_test_tensor = torch.tensor(decoder_input_test, dtype=torch.long)
decoder_target_test_tensor = torch.tensor(decoder_target_test, dtype=torch.long)

# Dataset & DataLoader
batch_size = 128

train_dataset = TensorDataset(encoder_input_train_tensor, decoder_input_train_tensor, decoder_target_train_tensor)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

valid_dataset = TensorDataset(encoder_input_test_tensor, decoder_input_test_tensor, decoder_target_test_tensor)
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

In [None]:
num_epochs = 30
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

In [31]:
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Training
    model.train()

    for encoder_inputs, decoder_inputs, decoder_targets in train_dataloader:
        encoder_inputs = encoder_inputs.to(device)
        decoder_inputs = decoder_inputs.to(device)
        decoder_targets = decoder_targets.to(device)

        optimizer.zero_grad()

        # Forward pass
        # outputs.shape == (batch_size, seq_len, tar_vocab_size)
        outputs = model(encoder_inputs, decoder_inputs)

        # Loss & Backpropagation
        # outputs.view(-1, outputs.size(-1)) shape : (batch_size * seq_len, tar_vocab_size)
        # decoder_targets.view(-1) shape : (batch_size * seq_len)
        loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
        loss.backward()

        # weight update
        optimizer.step()

    train_loss, train_acc = evaluation(model, train_dataloader, loss_function, device)
    valid_loss, valid_acc = evaluation(model, valid_dataloader, loss_function, device)

    print(f'Epoch: {epoch+1}/{num_epochs} | Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | Valid Loss: {valid_loss:.4f} | Valid Acc: {valid_acc:.4f}')

    # Save checkpoint
    if valid_loss < best_val_loss:
        print(f'Validation loss improved from {best_val_loss:.4f} to {valid_loss:.4f}. Saving checkpoint....')

        best_val_loss = valid_loss
        torch.save(model.state_dict(), 'best_model_checkpoint.pth')

Epoch: 1/30 | Train Loss: 2.2758 | Train Acc: 0.6016 | Valid Loss: 2.5136 | Valid Acc: 0.5919
Validation loss improved from inf to 2.5136. Saving checkpoint....
Epoch: 2/30 | Train Loss: 1.8709 | Train Acc: 0.6437 | Valid Loss: 2.2296 | Valid Acc: 0.6207
Validation loss improved from 2.5136 to 2.2296. Saving checkpoint....
Epoch: 3/30 | Train Loss: 1.5645 | Train Acc: 0.6820 | Valid Loss: 2.0326 | Valid Acc: 0.6430
Validation loss improved from 2.2296 to 2.0326. Saving checkpoint....
Epoch: 4/30 | Train Loss: 1.3264 | Train Acc: 0.7123 | Valid Loss: 1.9004 | Valid Acc: 0.6602
Validation loss improved from 2.0326 to 1.9004. Saving checkpoint....
Epoch: 5/30 | Train Loss: 1.1007 | Train Acc: 0.7563 | Valid Loss: 1.7782 | Valid Acc: 0.6762
Validation loss improved from 1.9004 to 1.7782. Saving checkpoint....
Epoch: 6/30 | Train Loss: 0.9178 | Train Acc: 0.7903 | Valid Loss: 1.6954 | Valid Acc: 0.6881
Validation loss improved from 1.7782 to 1.6954. Saving checkpoint....
Epoch: 7/30 | Train

In [32]:
# Model loading
model.load_state_dict(torch.load('best_model_checkpoint.pth'))
model.to(device)

val_loss, val_accuracy = evaluation(model, valid_dataloader, loss_function, device)

print(f'Best model validation loss: {val_loss:.4f}')
print(f'Best model validation accuracy: {val_accuracy:.4f}')

Best model validation loss: 1.5063
Best model validation accuracy: 0.7202


In [33]:
print(tar_vocab['<sos>'])
print(tar_vocab['<eos>'])

3
4


##(3) Seq2seq Machine Translation

* seq2seq는 훈련과정(교사 강요)와 테스트 과정에서의 동작 방식이 다름
* 번역 단계
* 1) 번역하고자 하는 입력 문장에 인코더로 입력되어 인코더의 마지막 시점의 은닉 상태와 셀 상태 얻음
* 2) 인코더의 은닉 상태와 셀 상태, 그리고 토큰 \<sos\>를 디코더로 보냄
* 3) 디코더가 토큰 \<eos\>가 나올 때까지 다음 단어 예측

In [34]:
index_to_src = {v : k for k, v in src_vocab.items()}
index_to_tar = {v : k for k, v in tar_vocab.items()}

# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
    sentence = ''
    for encoded_word in input_seq:
        if (encoded_word != 0):
            sentence = sentence + index_to_src[encoded_word] + ' '
    return sentence

# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
    sentence = ''
    for encoded_word in input_seq:
        if (encoded_word != 0 and encoded_word != tar_vocab['<sos>'] and encoded_word != tar_vocab['<eos>']):
            sentence = sentence + index_to_tar[encoded_word] + ' '
    return sentence

In [35]:
print(encoder_input_test[25])
print(decoder_input_test[25])
print(decoder_target_test[25])

[  3  26  14 486 117   2   0]
[   3    5   20   65   12 7187    2    0    0    0    0    0    0    0
    0    0]
[   5   20   65   12 7187    2    4    0    0    0    0    0    0    0
    0    0]


In [39]:
def decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, max_output_len, int_to_src_token, int_to_tar_token):
    encoder_inputs = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0).to(device)

    # 인코더의 초기 상태 설정
    hidden, cell = model.encoder(encoder_inputs)

    # 시작 토큰 <sos>을 디코더의 첫 입력으로 설정
    # unsqueeze(0)는 배치 차원을 추가하기 위함.
    decoder_input = torch.tensor([3], dtype=torch.long).unsqueeze(0).to(device)

    decoded_tokens = []

    # for문을 도는 것 == 디코더의 각 시점
    for _ in range(max_output_len):
        output, hidden, cell = model.decoder(decoder_input, hidden, cell)

        # 소프트맥스 회귀를 수행. 예측 단어의 인덱스
        output_token = output.argmax(dim=-1).item()

        # 종료 토큰 <eos>
        if output_token == 4:
            break

        # 각 시점의 단어(정수)는 decoded_tokens에 누적하였다가 최종 번역 시퀀스로 리턴합니다.
        decoded_tokens.append(output_token)

        # 현재 시점의 예측. 다음 시점의 입력으로 사용된다.
        decoder_input = torch.tensor([output_token], dtype=torch.long).unsqueeze(0).to(device)

    return ' '.join(int_to_tar_token[token] for token in decoded_tokens)

In [40]:
for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input_train[seq_index]
  translated_text = decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, 20, index_to_src, index_to_tar)

  print("입력문장 :",seq_to_src(encoder_input_train[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_train[seq_index]))
  print("번역문장 :",translated_text)
  print("-"*50)

입력문장 : what do you say ? 
정답문장 : que dis tu ? 
번역문장 : que dis tu ?
--------------------------------------------------
입력문장 : i missed supper . 
정답문장 : j ai loupe le diner . 
번역문장 : j ai loupe le souper .
--------------------------------------------------
입력문장 : you re mistaken . 
정답문장 : vous faites erreur . 
번역문장 : vous faites erreur .
--------------------------------------------------
입력문장 : how do we stop it ? 
정답문장 : comment l arretons nous ? 
번역문장 : comment le stoppons nous ?
--------------------------------------------------
입력문장 : look around you . 
정답문장 : regardez autour de vous . 
번역문장 : regardez autour de vous .
--------------------------------------------------


In [41]:
for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input_test[seq_index]
  translated_text = decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, 20, index_to_src, index_to_tar)

  print("입력문장 :",seq_to_src(encoder_input_test[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_test[seq_index]))
  print("번역문장 :",translated_text)
  print("-"*50)

입력문장 : i used to be lazy . 
정답문장 : j etais paresseux avant . 
번역문장 : j ai voulu la fete .
--------------------------------------------------
입력문장 : be objective . 
정답문장 : sois objective . 
번역문장 : soyez objectifs .
--------------------------------------------------
입력문장 : it smells burnt . 
정답문장 : ca sent le brule . 
번역문장 : ca semble etrange .
--------------------------------------------------
입력문장 : they re trapped . 
정답문장 : ils sont coinces . 
번역문장 : elles sont pieges .
--------------------------------------------------
입력문장 : here s your key . 
정답문장 : voici votre cle . 
번역문장 : ton probleme .
--------------------------------------------------


#[02] Attention

##(1) Model Training

In [26]:
import torch
import torch.nn as nn
import torch.optim as optim

embedding_dim = 256
hidden_units = 256

### Encoder class

In [28]:
class Encoder(nn.Module):
    def __init__(self, src_vocab_size, embedding_dim, hidden_units):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_units, batch_first=True)

    def forward(self, x):
        # x.shape == (batch_size, seq_len, embedding_dim)
        x = self.embedding(x)
        # hidden.shape == (1, batch_size, hidden_units), cell.shape == (1, batch_size, hidden_units)
        outputs, (hidden, cell) = self.lstm(x)
        return outputs, hidden, cell

### Decoder class

* Decoder class는 앞선 Seq2Seq와 달라짐.
* Attention mechanism:
* 1) Decoder의 hidden state와 Encoder의 모든 시점의 hidden state 간의 내적(dot product를 통해서 Attention scores 산출
* 2) attention score를 softmax에 통과시켜 attension weights 산출
* 3) attention weights를, value에 해당하는 encoder의 모든 시점의 hidden state와 각각 곱한 후, 이를 모두 더한 weighted sum => context vector 산출
* 4) 이 context vector를 embedding vector와 concatenate하여 입력으로 사용

In [29]:
class Decoder(nn.Module):
    def __init__(self, tar_vocab_size, embedding_dim, hidden_units):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(tar_vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim + hidden_units, hidden_units, batch_first=True)
        self.fc = nn.Linear(hidden_units, tar_vocab_size)
        self.softmax = nn.Softmax(dim=2)

    def forward(self, x, encoder_outputs, hidden, cell):
        x = self.embedding(x)

        # Dot product attention
        # attention_scores.shape: (batch_size, source_seq_len, 1)
        attention_scores = torch.bmm(encoder_outputs, hidden.transpose(0, 1).transpose(1, 2))

        # attetion_weights.shape: (batch_size, source_seq_len, 1)
        attention_weights = self.softmax(attention_scores)

        # context_vector.shape: (batch_size, 1, hidden_units)
        context_vector = torch.bmm(attention_weights.transpose(1, 2), encoder_outputs)

        # Repeat context_vector to match seq_len
        # context_vector_repeated.shape: (batch_size, target_seq_len, hidden_units)
        seq_len = x.shape[1]
        context_vector_repeated = context_vector.repeat(1, seq_len, 1)

        # Concatenate context vector and embedded input
        # x.shape: (batch_size, target_seq_len, embedding_dim + hidden_units)
        x = torch.cat((x, context_vector_repeated), dim=2)

        # output.shape: (batch_size, target_seq_len, hidden_units)
        # hidden.shape: (1, batch_size, hidden_units)
        # cell.shape: (1, batch_size, hidden_units)
        output, (hidden, cell) = self.lstm(x, (hidden, cell))

        # output.shape: (batch_size, target_seq_len, tar_vocab_size)
        output = self.fc(output)

        return output, hidden, cell

In [34]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg):
        encoder_outputs, hidden, cell = self.encoder(src)
        output, _, _ = self.decoder(trg, encoder_outputs, hidden, cell)
        return output

encoder = Encoder(src_vocab_size, embedding_dim, hidden_units)
decoder = Decoder(tar_vocab_size, embedding_dim, hidden_units)
model = Seq2Seq(encoder, decoder)

loss_function = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters())

In [31]:
def evaluation(model, dataloader, loss_function, device):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_count = 0

    with torch.no_grad():
        for encoder_inputs, decoder_inputs, decoder_targets in dataloader:
            encoder_inputs = encoder_inputs.to(device)
            decoder_inputs = decoder_inputs.to(device)
            decoder_targets = decoder_targets.to(device)

            # forward pass
            # outputs.shape == (batch_size, seq_len, tar_vocab_size)
            outputs = model(encoder_inputs, decoder_inputs)

            # Calculate loss
            # outputs.view(-1, outputs.size(-1)) shape: (batch_size * seq_len, tar_vocab_size)
            # decoder_targets.view(-1) shape: (batch_size * seq_len)
            loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
            total_loss += loss.item()

            # Calculate accuracy (without padding tokens)
            mask = decoder_targets != 0
            total_correct += ((outputs.argmax(dim=-1) == decoder_targets) * mask).sum().item()
            total_count += mask.sum().item()

    return total_loss / len(dataloader), total_correct / total_count

In [35]:
encoder_input_train_tensor = torch.tensor(encoder_input_train, dtype=torch.long)
decoder_input_train_tensor = torch.tensor(decoder_input_train, dtype=torch.long)
decoder_target_train_tensor = torch.tensor(decoder_target_train, dtype=torch.long)

encoder_input_test_tensor = torch.tensor(encoder_input_test, dtype=torch.long)
decoder_input_test_tensor = torch.tensor(decoder_input_test, dtype=torch.long)
decoder_target_test_tensor = torch.tensor(decoder_target_test, dtype=torch.long)

# 데이터셋 및 데이터로더 생성
batch_size = 128

train_dataset = TensorDataset(encoder_input_train_tensor, decoder_input_train_tensor, decoder_target_train_tensor)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

valid_dataset = TensorDataset(encoder_input_test_tensor, decoder_input_test_tensor, decoder_target_test_tensor)
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

# 학습 설정
num_epochs = 30
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(4487, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(7880, 256, padding_idx=0)
    (lstm): LSTM(512, 256, batch_first=True)
    (fc): Linear(in_features=256, out_features=7880, bias=True)
    (softmax): Softmax(dim=2)
  )
)

In [37]:
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Training
    model.train()

    for encoder_inputs, decoder_inputs, decoder_targets in train_dataloader:
        encoder_inputs = encoder_inputs.to(device)
        decoder_inputs = decoder_inputs.to(device)
        decoder_targets = decoder_targets.to(device)

        # initialize gradient
        optimizer.zero_grad()

        # forward pass
        # outputs.shape == (batch_size, seq_len, tar_vocab_size)
        outputs = model(encoder_inputs, decoder_inputs)

        # calculate loss and backpropagation
        # outputs.view(-1, outputs.size(-1)) shape == (batch_size * seq_len, tar_vocab_size)
        # decoder_targets.view(-1) shape == (batch_size * seq_len)
        loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
        loss.backward()

        # update weights
        optimizer.step()

    train_loss, train_acc = evaluation(model, train_dataloader, loss_function, device)
    valid_loss, valid_acc = evaluation(model, valid_dataloader, loss_function, device)

    print(f'\nEpoch: {epoch+1} / {num_epochs} || Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} || Valid Loss: {valid_loss:.4f} | Valid Acc: {valid_acc:.4f}')
    print('-' * 99)
    print()

    # Saving checkpoint
    if valid_loss < best_val_loss:
        print(f'\nValidation loss improved from {best_val_loss:.4f} to {valid_loss:.4f}. Saving checkpoint....')
        best_val_loss = valid_loss
        torch.save(model.state_dict(), 'best_model_checkpoint.pth')


Epoch: 1 / 30 || Train Loss: 2.8614 | Train Acc: 0.5472 || Valid Loss: 2.9906 | Valid Acc: 0.5435
---------------------------------------------------------------------------------------------------


Validation loss improved from inf to 2.9906. Saving checkpoint....

Epoch: 2 / 30 || Train Loss: 2.1380 | Train Acc: 0.6220 || Valid Loss: 2.3884 | Valid Acc: 0.6061
---------------------------------------------------------------------------------------------------


Validation loss improved from 2.9906 to 2.3884. Saving checkpoint....

Epoch: 3 / 30 || Train Loss: 1.6690 | Train Acc: 0.6771 || Valid Loss: 2.0538 | Valid Acc: 0.6423
---------------------------------------------------------------------------------------------------


Validation loss improved from 2.3884 to 2.0538. Saving checkpoint....

Epoch: 4 / 30 || Train Loss: 1.3263 | Train Acc: 0.7226 || Valid Loss: 1.8445 | Valid Acc: 0.6664
-------------------------------------------------------------------------------------------

In [38]:
# Load model
model.load_state_dict(torch.load('best_model_checkpoint.pth'))
model.to(device)

val_loss, val_accuracy = evaluation(model, valid_dataloader, loss_function, device)

print(f'Best model validation loss: {val_loss:.4f}')
print(f'Best model validation accuracy: {val_accuracy:.4f}')

Best model validation loss: 1.4041
Best model validation accuracy: 0.7311


##(2) Attention Machine Translation

In [39]:
print(tar_vocab['<sos>'])
print(tar_vocab['<eos>'])

3
4


In [40]:
index_to_src = {v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}

# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
    sentence = ''
    for encoded_word in input_seq:
        if (encoded_word != 0):
            sentence = sentence + index_to_src[encoded_word] + ' '
    return sentence

# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
    sentence = ''
    for encoded_word in input_seq:
        if (encoded_word != 0 and encoded_word != tar_vocab['<sos>'] and encoded_word != tar_vocab['<eos>']):
            sentence = sentence + index_to_tar[encoded_word] + ' '
    return sentence

In [41]:
print(encoder_input_test[25])
print(decoder_input_test[25])
print(decoder_target_test[25])

[ 63   8  37 751   2   0   0]
[  3  14  28  41   9  12  19 804   2   0   0   0   0   0   0   0]
[ 14  28  41   9  12  19 804   2   4   0   0   0   0   0   0   0]


In [45]:
def decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, max_output_len, int_to_src_token, int_to_tar_token):
    encoder_inputs = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0).to(device)

    # 인코더 초기 상태 설정
    encoder_outputs, hidden, cell = model.encoder(encoder_inputs)

    # 시작 토큰 <sos>를 디코더의 첫 입력으로 설정
    # unsqueeze(0): batch 차원 추가
    decoder_input = torch.tensor([3], dtype=torch.long).unsqueeze(0).to(device)

    decoded_tokens = []

    # iterating every timestep
    for _ in range(max_output_len):
        output, hidden, cell = model.decoder(decoder_input, encoder_outputs, hidden, cell)

        # 소프트맥스 회귀 수행. 예측 단어의 인덱스
        output_token = output.argmax(dim=-1).item()

        # 종료 토큰 <eos>
        if output_token == 4:
            break

        # 각 시점의 단어(정수)는 decoded_tokens에 누적하였다가 최종 번역 시퀀스로 리턴
        decoded_tokens.append(output_token)

        # 현재 시점의 예측을 다음 시점의 입력으로 사용
        decoder_input = torch.tensor([output_token], dtype=torch.long).unsqueeze(0).to(device)

    return ' '.join(int_to_tar_token[token] for token in decoded_tokens)

In [46]:
for seq_index in [3, 50, 100, 300, 1001]:
    input_seq = encoder_input_train[seq_index]
    translated_text = decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, 20, index_to_src, index_to_tar)

    print('입력 문장 :', seq_to_src(encoder_input_train[seq_index]))
    print('정답 문장 :', seq_to_tar(decoder_input_train[seq_index]))
    print('번역 문장 :', translated_text)
    print('-' * 49)

입력 문장 : what do you want ? 
정답 문장 : que veux tu ? 
번역 문장 : que voulez vous ?
-------------------------------------------------
입력 문장 : we re normal . 
정답 문장 : nous sommes normaux . 
번역 문장 : nous sommes normales .
-------------------------------------------------
입력 문장 : i talk to myself . 
정답 문장 : je parle tout seul . 
번역 문장 : je parle tout seul .
-------------------------------------------------
입력 문장 : they are arguing . 
정답 문장 : ils se disputent . 
번역 문장 : elles sont en train de se disputer .
-------------------------------------------------
입력 문장 : tom likes country . 
정답 문장 : tom aime la country . 
번역 문장 : tom aime la country .
-------------------------------------------------


In [47]:
for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input_test[seq_index]
  translated_text = decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, 20, index_to_src, index_to_tar)

  print("입력문장 :",seq_to_src(encoder_input_test[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_test[seq_index]))
  print("번역문장 :",translated_text)
  print("-"*50)

입력문장 : tom is making tea . 
정답문장 : tom fait du the . 
번역문장 : tom est en train de faire du the .
--------------------------------------------------
입력문장 : do it this way . 
정답문장 : fais le de cette maniere . 
번역문장 : faites le de ca !
--------------------------------------------------
입력문장 : what woke you up ? 
정답문장 : qu est ce qui vous a reveilles ? 
번역문장 : qu est ce qui vous a reveillee ?
--------------------------------------------------
입력문장 : check this out . 
정답문장 : regarde moi ca . 
번역문장 : verifie ca .
--------------------------------------------------
입력문장 : let it be . 
정답문장 : ainsi soit il . 
번역문장 : laisse tomber .
--------------------------------------------------
