## Seq2Seq를 이용한 번역기 구현하기
seq2seq를 이용해서 기계 번역기를 만들어보겠습니다. 실제 서비스에 사용되는 번역기는 뒤의 챕터에서 배우게 될 어텐션 메커니즘을 사용해야 하고, 최소 수백만 개의 데이터가 필요합니다. 하지만 그럼에도 번역기를 만드는 간단한 토이 프로젝트를 사용해서 seq2seq 구조와 인코더와 디코더의 역할을 이해할 수 있습니다.

### 1. 데이터 로드 및 전처리

실제 성능이 좋은 기계 번역기를 구현하려면 방대한 데이터가 필요하므로 여기서는 seq2seq를 간단히 실습해보는 수준의 간단한 기계 번역기를 구현해보겠습니다. 기계 번역기를 훈련시키기 위해서는 훈련 데이터로 병렬 코퍼스(parallel corpus)가 필요합니다. 병렬 코퍼스란, 두 개 이상의 언어가 병렬적으로 구성된 코퍼스를 의미합니다.

링크 : http://www.manythings.org/anki

이번 실습에서는 프랑스어-영어 병렬 코퍼스인 fra-eng.zip 파일을 사용합니다. 위의 링크에서 해당 파일을 다운받은 후 압축을 풀면 fra.txt라는 파일을 얻을 수 있는데 해당 파일을 이 실습에서 사용합니다.

병렬 코퍼스 데이터에 대해서 이해해봅시다. 병렬 데이터라고 하면 앞서 수행한 태깅 작업 챕터의 개체명 인식과 같은 데이터를 생각할 수 있지만, 앞서 수행한 태깅 작업의 병렬 데이터와 seq2seq가 사용하는 병렬 데이터는 성격이 다릅니다. 태깅 작업의 병렬 데이터는 쌍이 되는 데이터와 레이블이 길이가 동일하였으나 여기서는 쌍이 된다고 해서 반드시 길이가 같지는 않습니다.

실제 번역기를 생각해보면 구글 번역기에 '나는 학생이다.'라는 토큰의 개수가 2인 문장을 넣었을 때 'I am a student.'라는 토큰의 개수가 4인 문장이 나오는 것과 같은 이치입니다. seq2seq는 기본적으로 입력 시퀀스와 출력 시퀀스의 길이가 다를 수 있다고 가정합니다. 지금 구현 예제는 기계 번역기이지만 seq2seq로 구현할 수 있는 또 다른 예제인 챗봇을 만든다고 가정해보면, 대답의 길이가 질문의 길이와 항상 똑같아야 한다고하면 그 또한 이상합니다. 여기서 사용할 fra.txt 데이터는 아래와 같이 왼쪽의 영어 문장과 오른쪽의 프랑스어 문장 사이에 탭으로 구분되는 형식이 하나의 샘플입니다.

- Watch me.           Regardez-moi !


데이터는 위와 동일한 형식의 약 19만개의 병렬 문장 샘플을 포함하고 있습니다. 데이터를 읽고 전처리를 진행해보겠습니다. 앞으로의 코드에서 src는 source의 줄임말로 입력 문장을 나타내며, tar는 target의 줄임말로 번역하고자 하는 문장을 나타냅니다.

In [1]:
import re
import os
import unicodedata
import urllib3
import zipfile
import shutil
import numpy as np
import pandas as pd
import torch
from collections import Counter
from tqdm import tqdm
from torch.utils.data import DataLoader, TensorDataset

이번 실습에서는 약 19만개의 데이터 중 33,000개의 샘플만을 사용할 예정입니다.

In [2]:
num_samples =33000

fra-eng.zip 파일을 다운로드하고 압축을 풀겠습니다.

In [3]:
!wget -c http://www.manythings.org/anki/fra-eng.zip && unzip -o fra-eng.zip

--2025-06-23 08:52:12--  http://www.manythings.org/anki/fra-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8143096 (7.8M) [application/zip]
Saving to: ‘fra-eng.zip’


2025-06-23 08:52:13 (23.9 MB/s) - ‘fra-eng.zip’ saved [8143096/8143096]

Archive:  fra-eng.zip
  inflating: _about.txt              
  inflating: fra.txt                 


전처리 함수들을 구현합니다. 구두점 등을 제거하거나 단어와 구분해주기 위한 전처리입니다.

In [4]:
def unicode_to_ascii(s):
  # 프랑스어 악센트(accent) 삭제
  # 예시 : 'déjà diné' -> deja dine
  return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')

In [5]:
def preprocess_sentence(sent):
  # 악센트 삭제 함수 호출
  sent = unicode_to_ascii(sent.lower())

  # 단어와 구두점 사이에 공백을 만듭니다.
  # Ex) "he is a boy." => "he is a boy ."
  sent = re.sub(r"([?.!,¿])", r" \1", sent)

  # (a-z, A-Z, ".", "?", "!", ",") 이들을 제외하고는 전부 공백으로 변환합니다.
  sent = re.sub(r"[^a-zA-Z!.?]+", r" ", sent)

  # 다수 개의 공백을 하나의 공백으로 치환
  sent = re.sub(r"\s+", " ", sent)
  return sent

In [6]:
def load_preprocessed_data():
  encoder_input, decoder_input, decoder_target = [], [], []

  with open("fra.txt", "r") as lines:
    for i, line in enumerate(lines):
      # source 데이터와 target 데이터 분리
      src_line, tar_line, _ = line.strip().split('\t')

      # source 데이터 전처리
      src_line = [w for w in preprocess_sentence(src_line).split()]

      # target 데이터 전처리
      tar_line = preprocess_sentence(tar_line)
      tar_line_in = [w for w in ("<sos> " + tar_line).split()]
      tar_line_out = [w for w in (tar_line + " <eos>").split()]

      encoder_input.append(src_line)
      decoder_input.append(tar_line_in)
      decoder_target.append(tar_line_out)

      if i == num_samples - 1:
        break

  return encoder_input, decoder_input, decoder_target


구현한 전처리 함수들을 임의의 문장을 입력으로 테스트해봅시다.

In [7]:
# 전처리 테스트
en_sent = u"Have you had dinner?"
fr_sent = u"Avez-vous déjà diné?"

print('전처리 전 영어 문장 :', en_sent)
print('전처리 후 영어 문장 :',preprocess_sentence(en_sent))
print('전처리 전 프랑스어 문장 :', fr_sent)
print('전처리 후 프랑스어 문장 :', preprocess_sentence(fr_sent))

전처리 전 영어 문장 : Have you had dinner?
전처리 후 영어 문장 : have you had dinner ?
전처리 전 프랑스어 문장 : Avez-vous déjà diné?
전처리 후 프랑스어 문장 : avez vous deja dine ?


전체 데이터에서 33,000개의 샘플에 대해서 전처리를 수행합니다. 또한 훈련 과정에서 교사 강요(Teacher Forcing)을 사용할 예정이므로, 훈련 시 사용할 디코더의 입력 시퀀스와 실제값. 즉, 레이블에 해당되는 출력 시퀀스를 따로 분리하여 저장합니다. 입력 시퀀스에는 시작을 의미하는 토큰인 를 추가하고, 출력 시퀀스에는 종료를 의미하는 토큰인 를 추가합니다. 이렇게 얻은 3개의 데이터셋 인코더의 입력, 디코더의 입력, 디코더의 레이블을 상위 5개 샘플만 출력해봅시다.

In [8]:
sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data()
print('인코더의 입력 :',sents_en_in[:5])
print('디코더의 입력 :',sents_fra_in[:5])
print('디코더의 레이블 :',sents_fra_out[:5])

인코더의 입력 : [['go', '.'], ['go', '.'], ['go', '.'], ['go', '.'], ['hi', '.']]
디코더의 입력 : [['<sos>', 'va', '!'], ['<sos>', 'marche', '.'], ['<sos>', 'en', 'route', '!'], ['<sos>', 'bouge', '!'], ['<sos>', 'salut', '!']]
디코더의 레이블 : [['va', '!', '<eos>'], ['marche', '.', '<eos>'], ['en', 'route', '!', '<eos>'], ['bouge', '!', '<eos>'], ['salut', '!', '<eos>']]


모델을 설계하기 전 의아한 점이 있을 수 있습니다. 현재 시점의 디코더 셀의 입력은 오직 이전 디코더 셀의 출력을 입력으로 받는다고 설명하였는데 디코더의 입력에 해당하는 데이터인 sents_fra_in이 왜 필요할까요?

훈련 과정에서는 이전 시점의 디코더 셀의 출력을 현재 시점의 디코더 셀의 입력으로 넣어주지 않고, 이전 시점의 실제값을 현재 시점의 디코더 셀의 입력값으로 하는 방법을 사용할 겁니다. 그 이유는 이전 시점의 디코더 셀의 예측이 틀렸는데 이를 현재 시점의 디코더 셀의 입력으로 사용하면 현재 시점의 디코더 셀의 예측도 잘못될 가능성이 높고 이는 연쇄 작용으로 디코더 전체의 예측을 어렵게 합니다. 이런 상황이 반복되면 훈련 시간이 느려집니다. 만약 이 상황을 원하지 않는다면 이전 시점의 디코더 셀의 예측값 대신 실제값을 현재 시점의 디코더 셀의 입력으로 사용하는 방법을 사용할 수 있습니다. 이와 같이 RNN의 모든 시점에 대해서 이전 시점의 예측값 대신 실제값을 입력으로 주는 방법을 교사 강요라고 합니다.

단어로부터 정수를 얻는 딕셔너리. 즉, 단어 집합(Vocabulary)을 만들어봅시다. 이를 위한 함수로 build_vocab()을 구현합니다. build_vocab은 입력된 데이터로부터 단어의 등장 빈도순으로 정렬 후에 등장 빈도가 높은 순서일 수록 낮은 정수를 부여합니다. 이때, 패딩 토큰을 위한 <PAD> 토큰은 0번, OOV에 대응하기 위한 <UNK> 토큰은 1번에 할당합니다. 이렇게 되면 빈도수가 가장 높은 단어는 정수가 2번, 빈도수가 두번 째로 많은 단어는 정수 3번이 할당됩니다.

In [9]:
def build_vocab(sents):
  word_list = []

  for sent in sents:
      for word in sent:
        word_list.append(word)

  # 각 단어별 등장 빈도를 계산하여 등장 빈도가 높은 순서로 정렬
  word_counts = Counter(word_list)
  vocab = sorted(word_counts, key=word_counts.get, reverse=True)

  word_to_index = {}
  word_to_index['<PAD>'] = 0
  word_to_index['<UNK>'] = 1

  # 등장 빈도가 높은 단어일수록 낮은 정수를 부여
  for index, word in enumerate(vocab) :
    word_to_index[word] = index + 2

  return word_to_index

영어를 위한 단어 집합 src_vocab과 프랑스어를 이용한 단어 집합 tar_vocab를 만들어봅시다. 구현 방식에 따라서는 하나의 단어 집합으로 만들어도 상관없으며 이는 선택의 차이입니다.

In [10]:
src_vocab = build_vocab(sents_en_in)
tar_vocab = build_vocab(sents_fra_in + sents_fra_out)

src_vocab_size = len(src_vocab)
tar_vocab_size = len(tar_vocab)
print("영어 단어 집합의 크기 : {:d}, 프랑스어 단어 집합의 크기 : {:d}".format(src_vocab_size, tar_vocab_size))

영어 단어 집합의 크기 : 4498, 프랑스어 단어 집합의 크기 : 7895


정수로부터 단어를 얻는 딕셔너리를 각각 만들어줍니다. 이들은 훈련을 마치고 예측값과 실제값을 비교하는 단계에서 사용됩니다.

In [11]:
index_to_src = {v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}

def texts_to_sequences(sents, word_to_index):
  encoded_X_data = []
  for sent in tqdm(sents):
    index_sequences = []
    for word in sent:
      try:
          index_sequences.append(word_to_index[word])
      except KeyError:
          index_sequences.append(word_to_index['<UNK>'])
    encoded_X_data.append(index_sequences)
  return encoded_X_data

In [12]:
encoder_input = texts_to_sequences(sents_en_in, src_vocab)
decoder_input = texts_to_sequences(sents_fra_in, tar_vocab)
decoder_target = texts_to_sequences(sents_fra_out, tar_vocab)

100%|██████████| 33000/33000 [00:00<00:00, 1019009.29it/s]
100%|██████████| 33000/33000 [00:00<00:00, 786615.25it/s]
100%|██████████| 33000/33000 [00:00<00:00, 176228.79it/s]


In [13]:
# 상위 5개의 샘플에 대해서 정수 인코딩 전, 후 문장 출력
# 인코더 입력이므로 <sos>나 <eos>가 없음
for i, (item1, item2) in zip(range(5), zip(sents_en_in, encoder_input)):
    print(f"Index: {i}, 정수 인코딩 전: {item1}, 정수 인코딩 후: {item2}")

Index: 0, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 1, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 2, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 3, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 4, 정수 인코딩 전: ['hi', '.'], 정수 인코딩 후: [740, 2]


In [14]:
def pad_sequences(sentences, max_len=None):
    # 최대 길이 값이 주어지지 않을 경우 데이터 내 최대 길이로 패딩
    if max_len is None:
        max_len = max([len(sentence) for sentence in sentences])

    features = np.zeros((len(sentences), max_len), dtype=int)
    for index, sentence in enumerate(sentences):
        if len(sentence) != 0:
            features[index, :len(sentence)] = np.array(sentence)[:max_len]
    return features

In [15]:
encoder_input = pad_sequences(encoder_input)
decoder_input = pad_sequences(decoder_input)
decoder_target = pad_sequences(decoder_target)

In [16]:
print('인코더의 입력의 크기(shape) :',encoder_input.shape)
print('디코더의 입력의 크기(shape) :',decoder_input.shape)
print('디코더의 레이블의 크기(shape) :',decoder_target.shape)

인코더의 입력의 크기(shape) : (33000, 7)
디코더의 입력의 크기(shape) : (33000, 16)
디코더의 레이블의 크기(shape) : (33000, 16)


테스트 데이터를 분리하기 전 데이터를 섞어줍니다. 이를 위해서 순서가 섞인 정수 시퀀스 리스트를 만듭니다.

In [17]:
indices = np.arange(encoder_input.shape[0]) # 행의 순서를 섞음
np.random.shuffle(indices)
print(indices)

[10581   471  4113 ...  4850 31988 21789]


이를 데이터셋의 순서로 지정해주면 샘플들이 기존 순서와 다른 순서로 섞이게 됩니다.

In [18]:
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

임의로 30997번째 샘플을 출력해봅시다. 이때 decoder_input과 decoder_target은 데이터의 구조상으로 앞에 붙은 sos 토큰과 뒤에 붙은 eos을 제외하면 동일한 시퀀스를 가져야 합니다.

In [19]:
print([index_to_src[word] for word in encoder_input[30997]])
print([index_to_tar[word] for word in decoder_input[30997]])
print([index_to_tar[word] for word in decoder_target[30997]])

['she', 'can', 'jump', 'high', '.', '<PAD>', '<PAD>']
['<sos>', 'elle', 'peut', 'sauter', 'haut', '.', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']
['elle', 'peut', 'sauter', 'haut', '.', '<eos>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']


33,000개의 10%에 해당되는 3,300개의 데이터를 테스터 데이터로 사용합니다.

In [20]:
n_of_val = int(33000*0.1)
print('검증 데이터의 개수: ', n_of_val)

검증 데이터의 개수:  3300


In [21]:
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]

encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]

훈련 데이터와 테스트 데이터의 크기를 출력해봅시다.

In [22]:
print('훈련 source 데이터의 크기 :',encoder_input_train.shape)
print('훈련 target 데이터의 크기 :',decoder_input_train.shape)
print('훈련 target 레이블의 크기 :',decoder_target_train.shape)
print('테스트 source 데이터의 크기 :',encoder_input_test.shape)
print('테스트 target 데이터의 크기 :',decoder_input_test.shape)
print('테스트 target 레이블의 크기 :',decoder_target_test.shape)


훈련 source 데이터의 크기 : (29700, 7)
훈련 target 데이터의 크기 : (29700, 16)
훈련 target 레이블의 크기 : (29700, 16)
테스트 source 데이터의 크기 : (3300, 7)
테스트 target 데이터의 크기 : (3300, 16)
테스트 target 레이블의 크기 : (3300, 16)


###2. 기계 번역기 만들기

In [23]:
import torch
import torch.nn as nn
import torch.optim as optim

embedding_dim = 256
hidden_units = 256

class Encoder(nn.Module):
  def __init__(self, src_vocab_size, embedding_dim, hidden_units):
    super(Encoder, self).__init__()
    self.embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=0)
    self.lstm = nn.LSTM(embedding_dim, hidden_units,  batch_first=True)

  def forward(self, x):
    x = self.embedding(x)
    _, (hidden, cell) = self.lstm(x)
    return hidden, cell

class Decoder(nn.Module):
  def __init__(self, tar_vocab_size, embedding_dim, hidden_units):
    super(Decoder, self).__init__()
    self.embedding = nn.Embedding(tar_vocab_size, embedding_dim, padding_idx=0)
    self.lstm = nn.LSTM(embedding_dim, hidden_units, batch_first=True)
    self.fc = nn.Linear(hidden_units, tar_vocab_size)

  def forward(self, x, hidden, cell):
    x = self.embedding(x)
    output, (hidden, cell) = self.lstm(x, (hidden, cell))
    output = self.fc(output)
    return output, hidden, cell

class Seq2Seq(nn.Module):
  def __init__(self, encoder, decoder):
    super(Seq2Seq, self).__init__()
    self.encoder = encoder
    self.decoder = decoder

  def forward(self, src, trg):
    hidden, cell = self.encoder(src)
    output, _,_ = self.decoder(trg, hidden, cell)
    return output

encoder = Encoder(src_vocab_size, embedding_dim, hidden_units)
decoder = Decoder(tar_vocab_size, embedding_dim, hidden_units)
model = Seq2Seq(encoder, decoder)

loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

In [24]:
print(model)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(4498, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(7895, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
    (fc): Linear(in_features=256, out_features=7895, bias=True)
  )
)


Encoder 클래스는 입력 시퀀스를 받아 해당 시퀀스의 정보를 압축하여 context vector로 변환하는 역할을 합니다. Encoder는 임베딩 레이어와 LSTM 레이어로 구성되어 있습니다. 임베딩 레이어는 입력 시퀀스의 각 토큰을 고정 크기의 벡터로 변환하고, LSTM 레이어는 시퀀스의 순서 정보를 고려하여 해당 시퀀스를 요약합니다. Encoder의 forward 메서드는 입력 시퀀스를 받아 LSTM의 hidden state와 cell state를 반환합니다.

Decoder 클래스는 Encoder에서 생성된 context vector(인코더의 마지막 은닉 상태)를 기반으로 출력 시퀀스를 생성하는 역할을 합니다. Decoder 또한 임베딩 레이어와 LSTM 레이어로 구성되어 있습니다. Decoder의 LSTM은 Encoder에서 전달받은 hidden state와 cell state를 초기 상태로 사용하여 출력 시퀀스를 생성합니다. 생성된 출력 시퀀스는 fully connected 레이어를 통과하여 각 시점의 출력 토큰에 대한 확률 분포를 얻습니다. Decoder의 forward 메서드는 입력 시퀀스, hidden state, cell state를 받아 출력 시퀀스, 업데이트된 hidden state와 cell state를 반환합니다.

Seq2Seq 클래스는 Encoder와 Decoder를 결합하여 전체 모델을 구성합니다. Seq2Seq 모델의 forward 메서드는 입력 시퀀스(src)와 출력 시퀀스(trg)를 받아 Encoder에서 생성된 은닉 상태(hidden state)와 셀 상태(cell state)를 Decoder로 전달하고, Decoder에서 생성된 출력 시퀀스를 반환합니다.

Seq2Seq의 디코더는 기본적으로 각 시점마다 다중 클래스 분류 문제를 풀고있습니다. 매 시점마다 프랑스어 단어 집합의 크기(tar_vocab_size)의 선택지에서 단어를 1개 선택하여 이를 이번 시점에서 예측한 단어로 택합니다. 다중 클래스 분류 문제이므로 모델 학습을 위해 CrossEntropyLoss 함수를 사용하여 손실을 계산하고, Adam 옵티마이저를 사용하여 모델의 파라미터를 최적화합니다. CrossEntropyLoss의 ignore_index 파라미터는 패딩 토큰에 해당하는 인덱스를 무시하도록 설정합니다.

이 코드에서는 임베딩 차원(embedding_dim)과 LSTM의 은닉 상태 크기(hidden_units)를 256으로 설정하였습니다. Encoder와 Decoder의 인스턴스를 생성한 후, 이를 Seq2Seq 모델로 결합하여 전체 모델을 구성합니다. 이렇게 구현된 Seq2Seq 모델은 기계 번역이나 챗봇과 같은 시퀀스-투-시퀀스 문제를 해결하는 데 사용될 수 있습니다. 입력 시퀀스가 Encoder를 통과하여 context vector로 변환되고, 이를 기반으로 Decoder에서 출력 시퀀스를 생성합니다. 모델의 학습은 입력 시퀀스와 해당하는 출력 시퀀스의 쌍을 사용하여 이루어집니다.

In [25]:
def evaluation(model, dataloader, loss_function, device):
  model.eval()
  total_loss = 0.0
  total_correct = 0
  total_count = 0

  with torch.no_grad():
    for encoder_inputs, decoder_inputs, decoder_targets in dataloader:
      encoder_inputs = encoder_inputs.to(device)
      decoder_inputs = decoder_inputs.to(device)
      decoder_targets = decoder_targets.to(device)

      # 손실 계산
      outputs = model(encoder_inputs, decoder_inputs)
      loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
      total_loss += loss.item()

      # 정확도 계산(패딩 토큰 제외)
      mask = decoder_targets !=0
      total_correct +=((outputs.argmax(dim=-1) ==  decoder_targets) * mask).sum().item()
      total_count +=mask.sum().item()

  return total_loss / len(dataloader), total_correct / total_count

평가 함수의 입력으로는 평가할 모델(model), 데이터로더(dataloader), 손실 함수(loss_function), 그리고 모델을 실행할 디바이스(device)가 주어집니다. 먼저, 모델을 평가 모드로 설정합니다. 이는 model.eval()을 호출하여 이루어지며, 드롭아웃(dropout)이나 배치 정규화(batch normalization)와 같은 층의 동작을 조정합니다.

다음으로, 총 손실(total_loss), 총 정확도(total_correct), 그리고 총 토큰 수(total_count)를 초기화합니다. 이 변수들은 전체 데이터셋에 대한 평가 결과를 누적하는 데 사용됩니다.

그 후, torch.no_grad() 컨텍스트 매니저 내에서 데이터로더를 순회합니다. 이는 기울기(gradient) 계산을 비활성화하여 메모리 사용량을 줄이고 평가 속도를 향상시킵니다.

각 배치(batch)에 대해, 인코더 입력(encoder_inputs), 디코더 입력(decoder_inputs), 그리고 디코더 타겟(decoder_targets)을 디바이스로 이동시킵니다. 그런 다음, 모델에 인코더 입력과 디코더 입력을 전달하여 순방향 전파(forward pass)를 수행합니다. 이를 통해 모델의 출력(outputs)을 얻습니다. 그 후, 출력과 디코더 타겟을 사용하여 손실을 계산합니다. 이때, 출력과 타겟의 차원을 조정하기 위해 view() 함수를 사용합니다. 계산된 손실을 총 손실에 누적합니다.

정확도를 계산하기 위해, 패딩 토큰(padding token)을 제외한 실제 토큰들에 대해서만 고려합니다. 이를 위해 디코더 타겟이 0이 아닌 위치에 대한 마스크(mask)를 생성합니다. 출력의 argmax를 취하여 예측된 토큰을 얻고, 이를 디코더 타겟과 비교하여 정확한 예측 수를 계산합니다. 정확한 예측 수와 전체 토큰 수를 누적합니다.

마지막으로, 평균 손실(average loss)과 정확도(accuracy)를 계산하여 반환합니다. 평균 손실은 총 손실을 데이터로더의 배치 수로 나누어 계산하고, 정확도는 총 정확도를 총 토큰 수로 나누어 계산합니다. 이 평가 함수를 사용하여 모델의 성능을 측정할 수 있습니다. 평균 손실이 낮을수록, 그리고 정확도가 높을수록 모델의 성능이 좋다는 것을 나타냅니다. 이를 통해 모델의 학습 진행 상황을 모니터링하고, 최적의 모델을 선택할 수 있습니다.

In [26]:
encoder_input_train_tensor = torch.tensor(encoder_input_train, dtype=torch.long)
decoder_input_train_tensor = torch.tensor(decoder_input_train, dtype=torch.long)
decoder_target_train_tensor = torch.tensor(decoder_target_train, dtype=torch.long)

encoder_input_test_tensor = torch.tensor(encoder_input_test, dtype=torch.long)
decoder_input_test_tensor = torch.tensor(decoder_input_test, dtype=torch.long)
decoder_target_test_tensor = torch.tensor(decoder_target_test, dtype=torch.long)

batch_size = 128

train_dataset = TensorDataset(encoder_input_train_tensor, decoder_input_train_tensor, decoder_target_train_tensor)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

valid_dataset = TensorDataset(encoder_input_test_tensor, decoder_input_test_tensor, decoder_target_test_tensor)
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

In [27]:
# 학습 설정
num_epochs =30
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(4498, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(7895, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
    (fc): Linear(in_features=256, out_features=7895, bias=True)
  )
)

학습 설정에서는 학습 에포크 수를 30으로 설정하고, 학습에 사용할 디바이스를 설정합니다. GPU가 사용 가능한 경우 "cuda"로 설정하고, 그렇지 않은 경우 "cpu"로 설정합니다. 그 후, model.to(device)를 사용하여 모델을 설정한 디바이스로 이동시킵니다. 이를 통해 모델의 계산을 해당 디바이스에서 수행할 수 있습니다.

이제 모델을 훈련합니다. 128개의 배치 크기(128개씩 데이터를 병렬로 학습)로 총 30 에포크 학습합니다. 검증 데이터로 훈련이 제대로 되고있는지 모니터링하겠습니다.

In [28]:
torch.batch_norm_gather_stats_with_counts

best_val_loss = float('inf') # Initialize best_val_loss with infinity

for epoch in range(num_epochs):
  model.train()

  for encoder_inputs, decoder_inputs, decoder_targets in train_dataloader:
    encoder_inputs = encoder_inputs.to(device)
    decoder_inputs = decoder_inputs.to(device)
    decoder_targets = decoder_targets.to(device)

    # 기울기 초기화
    optimizer.zero_grad()

    # 순방향 전파
    outputs = model(encoder_inputs, decoder_inputs)

    # 손실 계산 및 역방향 전파
    loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
    loss.backward()

    # 가중치 업데이트
    optimizer.step()

  train_loss, train_acc = evaluation(model, train_dataloader, loss_function, device)
  valid_loss, valid_acc = evaluation(model, valid_dataloader, loss_function, device)

  print(f'Epoch: {epoch+1}/{num_epochs} | Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | Valid Loss: {valid_loss:.4f} | Valid Acc: {valid_acc:.4f}')

  # 검증 손실이 최소일 때 체크포인트 저장
  if valid_loss < best_val_loss:
      print(f'Validation loss improved from {best_val_loss:.4f} to {valid_loss:.4f}. 체크포인트를 저장합니다.')
      best_val_loss = valid_loss
      torch.save(model.state_dict(), 'best_model_checkpoint.pth')

Epoch: 1/30 | Train Loss: 1.1967 | Train Acc: 0.5089 | Valid Loss: 1.2267 | Valid Acc: 0.5093
Validation loss improved from inf to 1.2267. 체크포인트를 저장합니다.
Epoch: 2/30 | Train Loss: 0.9338 | Train Acc: 0.5793 | Valid Loss: 1.0044 | Valid Acc: 0.5719
Validation loss improved from 1.2267 to 1.0044. 체크포인트를 저장합니다.
Epoch: 3/30 | Train Loss: 0.7715 | Train Acc: 0.6261 | Valid Loss: 0.8836 | Valid Acc: 0.6077
Validation loss improved from 1.0044 to 0.8836. 체크포인트를 저장합니다.
Epoch: 4/30 | Train Loss: 0.6540 | Train Acc: 0.6599 | Valid Loss: 0.8063 | Valid Acc: 0.6298
Validation loss improved from 0.8836 to 0.8063. 체크포인트를 저장합니다.
Epoch: 5/30 | Train Loss: 0.5594 | Train Acc: 0.6893 | Valid Loss: 0.7478 | Valid Acc: 0.6485
Validation loss improved from 0.8063 to 0.7478. 체크포인트를 저장합니다.
Epoch: 6/30 | Train Loss: 0.4735 | Train Acc: 0.7306 | Valid Loss: 0.6970 | Valid Acc: 0.6688
Validation loss improved from 0.7478 to 0.6970. 체크포인트를 저장합니다.
Epoch: 7/30 | Train Loss: 0.4043 | Train Acc: 0.7629 | Valid Loss: 

검증 데이터 손실이 가장 최소일 때의 모델을 로드하고 다시 재평가해봅시다.

In [29]:
# 모델 로드
model.load_state_dict(torch.load('best_model_checkpoint.pth'))

# 모델을 device에 올립니다.
model.to(device)

# 검증 데이터에 대한 정확도와 손실 계산
val_loss, val_accuracy = evaluation(model, valid_dataloader, loss_function, device)

print(f'Best model validation loss: {val_loss:.4f}')
print(f'Best model validation accuracy: {val_accuracy:.4f}')

Best model validation loss: 0.5607
Best model validation accuracy: 0.7254


  model.load_state_dict(torch.load('best_model_checkpoint.pth'))


로드 후 재평가를 진행하였더니, 저장할 당시와 검증 데이터의 손실과 정확도가 동일하므로 저장 및 로드가 원활히 되었습니다. sos와 eos 토큰의 정수는 각각 3과 4입니다.

In [30]:
print(tar_vocab['<sos>'])
print(tar_vocab['<eos>'])

3
4


## 3. seq2seq 기계 번역기 동작시키기
seq2seq는 훈련 과정(교사 강요)과 테스트 과정에서의 동작 방식이 다릅니다. 그래서 테스트 과정을 위해 모델을 다시 설계해주어야 합니다. 특히 디코더를 수정해야 합니다. 이번에는 번역 단계를 위해 모델을 수정하고 동작시켜보겠습니다.

전체적인 번역 단계를 정리하면 아래와 같습니다.

1) 번역하고자 하는 입력 문장이 인코더로 입력되어 인코더의 마지막 시점의 은닉 상태와 셀 상태를 얻습니다.

2) 인코더의 은닉 상태와 셀 상태, 그리고 토큰 sos를 디코더로 보냅니다.

3) 디코더가 토큰 eos가 나올 때까지 다음 단어를 예측하는 행동을 반복합니다.

In [31]:
index_to_src = {v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}

# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word !=0):
      sentence = sentence + index_to_src[encoded_word] + ' '
  return sentence

# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0 and encoded_word != tar_vocab['<sos>'] and encoded_word != tar_vocab['<eos>']):
      sentence = sentence + index_to_tar[encoded_word] + ' '
  return sentence

In [32]:
print(encoder_input_test[25])
print(decoder_input_test[25])
print(decoder_target_test[25])

[  3  91 115   2   0   0   0]
[  3   5  31 102 209   2   0   0   0   0   0   0   0   0   0   0]
[  5  31 102 209   2   4   0   0   0   0   0   0   0   0   0   0]


decode_sequence() 함수를 봅시다. 테스트 단계에서는 디코더를 매 시점 별로 컨트롤 하게 됩니다. 각 시점을 for문을 통해서 컨트롤하게 되며, 현재 시점의 예측은 다음 시점의 입력으로 사용됩니다. 여기서 사용될 변수는 decoder_input입니다.

In [33]:
def decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, max_output_len, int_to_src_token, int_to_tar_token):
  encoder_inputs = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0).to(device)

  #인코더의 초기 상태 설정
  hidden, cell = model.encoder(encoder_inputs)

  # 시작 토큰 <sos>을 디코더의 첫 입력으로 설정
  # unsqueeze(0)은 배치 차원을 추가하기 위함
  decoder_input = torch.tensor([3], dtype=torch.long).unsqueeze(0).to(device)

  decoded_tokens = []

  # 디코더의 각 시점
  for _ in range(max_output_len):
    output, hidden, cell = model.decoder(decoder_input, hidden, cell)

    # softmax회귀를 수행. 예측 단어의 인덱스
    output_token = output.argmax(dim=-1).item()

    # 종료 토큰 <eos>
    if output_token == 4:
      break

    # 각 시점의 단어(정수)는 decoded_tokens에 누적하였다가 최종 번역 시퀀스로 리턴합니다.
    decoded_tokens.append(output_token)

    # 현재 시점의 예측. 다음 시점의 입력으로 사용된다.
    decoder_input = torch.tensor([output_token], dtype=torch.long).unsqueeze(0).to(device)

  return ' '.join(int_to_tar_token[token] for token in decoded_tokens)

결과 확인을 위한 함수를 만듭니다. seq_to_src 함수는 영어 문장에 해당하는 정수 시퀀스를 입력받으면 정수로부터 영어 단어를 리턴하는 index_to_src를 통해 영어 문장으로 변환합니다. seq_to_tar은 프랑스어에 해당하는 정수 시퀀스를 입력받으면 정수로부터 프랑스어 단어를 리턴하는 index_to_tar을 통해 프랑스어 문장으로 변환합니다. 훈련 데이터에 대해서 임의로 선택한 인덱스의 샘플의 결과를 출력해봅시다.

In [34]:
for seq_index in [3, 50, 100, 400, 1001]:
  input_seq = encoder_input_train[seq_index]
  translated_text = decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, 20, index_to_src, index_to_tar)

  print("입력문장 :",seq_to_src(encoder_input_train[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_train[seq_index]))
  print("번역문장 :",translated_text)
  print("-"*50)

입력문장 : are you homeless ? 
정답문장 : etes vous sans abri ? 
번역문장 : es tu sans abri ?
--------------------------------------------------
입력문장 : it s dead . 
정답문장 : c est mort . 
번역문장 : c est mort .
--------------------------------------------------
입력문장 : this is unsafe . 
정답문장 : ce n est pas securise . 
번역문장 : ce n est pas securise .
--------------------------------------------------
입력문장 : you re safe now . 
정답문장 : tu es desormais en securite . 
번역문장 : vous etes desormais en securite .
--------------------------------------------------
입력문장 : don t push . 
정답문장 : ne poussez pas . 
번역문장 : ne poussez pas .
--------------------------------------------------


테스트 데이터에 대해서 임의로 선택한 인덱스의 샘플의 결과를 출력해봅시다.

In [35]:
for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input_test[seq_index]
  translated_text = decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, 20, index_to_src, index_to_tar)

  print("입력문장 :",seq_to_src(encoder_input_test[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_test[seq_index]))
  print("번역문장 :",translated_text)
  print("-"*50)


입력문장 : i ate apples . 
정답문장 : j ai mange des pommes . 
번역문장 : j ai mange des frites .
--------------------------------------------------
입력문장 : i saw a fight . 
정답문장 : j ai vu une bagarre . 
번역문장 : j ai vu une etoile .
--------------------------------------------------
입력문장 : tom is knocking . 
정답문장 : tom frappe . 
번역문장 : tom a ete frappe .
--------------------------------------------------
입력문장 : i ve had enough . 
정답문장 : j en ai assez gobe . 
번역문장 : j en ai assez avale .
--------------------------------------------------
입력문장 : we got separated . 
정답문장 : nous avons ete separes . 
번역문장 : nous nous sommes separees .
--------------------------------------------------
