**Sequence to Sequence 네트워크와 Attention을 이용한 Machine Translation 모델 구현**

- 한국어를 영어로 번역하도록 모델 학습

In [None]:
import string
import re
import random

import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
seed = 21
random.seed(seed)
torch.manual_seed(seed)

언어별 정보 저장을 위한 클래스 생성

- 단어->아이디, 아이디->단어 사전 생성 (word2idx)
- addSentence 함수는 문장을 띄어쓰기를 기준으로 토큰화
- addWord 함수
    - word2idx, idx2word 만들기
    - 단어별 출현 횟수 구하기
    - 전체 단어의 개수 구하기


In [None]:
SOS_token = 0
EOS_token = 1
UNK_token = 2

class Lang:
    def __init__(self, name):
        self.name = name
        ## 단어->아이디, 단어별 개수 사전 선언

        self.word2index = {}
        self.word2count = {}

        self.index2word = {0: "SOS", 1: "EOS", 2:"UNK"}
        self.n_words = 3  # Count SOS and EOS and UNK

    def addSentence(self, sentence):
        # sentence를 띄어쓰기 단위로 토큰화 하고, 토큰은 addWord 함수를 이용해서 정보 저장

        for word in sentence.split(' '):
            self.addWord(word)

    def addWord(self, word):
        # word2idx, word2count, idx2word, nwords 정보 저장

        ####################  빈칸  ####################
        
            
        ###############################################

- 병렬 코퍼스 준비 : 영어-한국어
- 영어는 normalize를 진행
- readLangs 함수
    - 파일을 읽고 쌍으로 분리
    - Lang class를 이용하여 각 언어 정보 저장

In [None]:
def normalizeString(s):
    s = s.lower().strip()
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s

def readLangs(lang1, lang2):
    print("Reading lines...")

    # git clone을 통해 데이터를 다운로드 받는 경우, 파일 경로 수정 필요
    f = open('%s-%s.txt' % (lang1, lang2), encoding='utf-8')
    lines = f.readlines()
    
    pairs = []
    for l in lines:
        line = l.split("\t")
        # 한국어, 영어
        pair = [line[1], normalizeString(line[0])]
        pairs.append(pair)

    input_lang = Lang(lang2)
    output_lang = Lang(lang1)
    f.close()
    return input_lang, output_lang, pairs

- MAX_LENGTH : 학습에 사용할 최대문장길이
- eng_prefixes : 해당 단어들로 시작하는 문장만 번역에 포함

- filterPair : 위 조건에 만족하는 pair만 반환
- filterPairs : 필터링된 pair만 저장해서 반환

In [None]:
MAX_LENGTH = 30
eng_prefixes = (
    "i am ", "i m ",
    "he is", "he s ",
    "she is", "she s ",
    "you are", "you re ",
    "we are", "we re ",
    "they are", "they re "
)

def filterPair(p):
    ####################  빈칸  ####################

    # 영어는 해당하는 구문으로 시작하고 영어와 한국어 둘다 max_length보다 작은 데이터일 경우에만 True 반환
    
    ###############################################

def filterPairs(pairs):
    # filterPair가 True인 데이터만 저장후 반환
    return [pair for pair in pairs if filterPair(pair)]

데이터 준비 및 결과 확인
- **학습과 검증에 필요한 데이터쌍을 구분**
- 이전 단계에서 만든 클래스와 함수를 통해 결과 확인
- 전체 문장 개수(pair 개수)
- 필터링 후 문장개수(pair 개수)
- 각 언어별로, 사전에 등록된 단어 개수

In [None]:
def prepareData(lang1, lang2):

    # 이전에 만든 클래스를 활용해서 영어 한국어의 정보를 저장
    # 조건에 맞게 데이터쌍을 필터링
    # 필터링된 데이터쌍의 80%는 학습용으로, 나머지 20%는 검증용으로 구분
    # 전체 데이터쌍의 수, 필터링 후 데이터쌍의 수 그리고 각 언어별 단어(토큰)의 개수를 출력
    
    input_lang, output_lang, pairs = readLangs(lang1, lang2)
    print("Read %s sentence pairs" % len(pairs))
    pairs = filterPairs(pairs)
    print("Trimmed to %s sentence pairs" % len(pairs))

    ####################  빈칸  ####################
    
    # 데이터 shuffle

    # 데이터를 8:2로 split

    # 각 언어 객체에 문장 추가
    
    ###############################################

    return input_lang, output_lang, train_pairs, valid_pairs


input_lang, output_lang, train_pairs, valid_pairs = prepareData('eng', 'kor')
print(random.choice(train_pairs))

인코더 역할의 RNN 생성
- torch.nn.Embedding으로 랜덤 임베딩 생성
- 인풋의 임베딩을 만들고 배치차원 늘려주기
- 임베딩을 gru에 넣고 output과 hidden state 얻기

In [None]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        embedding_size = hidden_size
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.gru = nn.GRU(embedding_size, hidden_size)

    def forward(self, input, hidden):
        ####################  빈칸  ####################

        # 임베딩 만들고, gru에 입력으로 넣어주고 output 과 hidden 얻기

        ###############################################

        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

디코더 역할의 RNN 생성

- torch.nn.Embedding으로 랜덤 임베딩 생성
- 인풋의 임베딩을 만들고 배치차원 늘려주기
- 임베딩을 gru에 넣고 output과 hidden state 얻기
- softmax를 이용하여 전체 단어에서 가장 확률높은 단어 뽑기


In [None]:
class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(DecoderRNN, self).__init__()

        ####################  빈칸  ####################
        # GRU의 결과가 linear layer 와 softmax를 거쳐야 한다
        # 차원 정의

        # layer 정의

        # log prob 계산
        
        ###############################################

    def forward(self, input, hidden):

        ####################  빈칸  ####################

        # 임베딩 만들고, gru 입력으로 넣어주고, 
        # output을 linear 와 softmax의 입력으로 넣어주기        

        ###############################################

        return output, hidden

학습 데이터 준비
- 각 문장마다 word2idx를 이용하여 tensor로 변환(벡터화)



In [None]:
def indexesFromSentence(lang, sentence):

    ####################  빈칸  ####################

    # 문장을 띄어쓰기 단위로 토큰화 하고, 해당 단어(토큰)의 id로 변환
    # 단, 검증때 사전에 없는 단어가 출현할 수 있는 상황도 처리할 수 있어야 함

    ###############################################


def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)

    ####################  빈칸  ####################

    # 실제 데이터 외에 추가로 들어가야할 토큰이 있다!

    ###############################################

    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

def tensorsFromPair(pair):
    input_tensor = tensorFromSentence(input_lang, pair[0])
    target_tensor = tensorFromSentence(output_lang, pair[1])
    return (input_tensor, target_tensor)

모델 학습 코드


In [None]:
teacher_forcing_ratio = 0.5

# train for each step
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer,
          decoder_optimizer, criterion, max_length=MAX_LENGTH):
    # encoder 초기값 설정, gradient 0 으로 초기화
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    loss = 0

    ####################  빈칸  ####################

    # encoder 호출

        # 각 token에 대해서 hidden과 output을 계산

    # decoder의 초기 input과 hidden값 결정

    ###############################################

    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if use_teacher_forcing:
        ####################  빈칸  ####################

        # 디코더의 다음 입력으로 실제값 사용

            # decoder로부터 hidden과 output을 계산

            # 다음 decoder의 input은 정답으로부터 가져옴

        ###############################################

    else:
        ####################  빈칸  ####################

        # 디코더의 다음 입력으로 예측값 사용

            # decoder로부터 hidden과 output을 계산

            # decoder의 output으로부터 가장 확률이 높은 token을 다음 decoder input으로 사용

            # decoder가 EOS token을 생성하면 종료

        ###############################################

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length

시간측정 출력 함수

In [None]:
import time
import math

def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

학습 진행 과정
- 타이머 시작
- optimizer, criterion 선언
- 전체 학습 데이터에 iteration 수만큼 랜덤하게 데이터 구축

In [None]:
def trainIters(encoder, decoder, n_iters, print_every=1000, learning_rate=0.01):
    start = time.time()
    print_loss_total = 0  # Reset every print_every

    ####################  빈칸  ####################

    # optimizer 선언

    # 학습 데이터에서 random하게 선택

    # Negative Log Likelihood Loss 정의

    ###############################################

    # n_iter 횟수만큼 모델 학습 및 로스 출력
    for iter in range(1, n_iters + 1):
      
        ####################  빈칸  ####################

        # 학습에 사용할 데이터 input, target 가져오기

        # 학습 및 loss 반환
        
        ###############################################

        print_loss_total += loss

        if iter % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print('%s (%d %d%%) %.4f' % (timeSince(start, iter / n_iters),
                                         iter, iter / n_iters * 100, print_loss_avg))


In [None]:
hidden_size = 256

####################  빈칸  ####################
# encoder, decoder를 선언하고, 7500번의 iteration으로 학습하고 1000번마다 loss 출력

# 모델 정의

# 학습

###############################################

모델 검증 코드

In [None]:
def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):
    with torch.no_grad():

        ####################  빈칸  ####################

        # input 데이터에서 문장 가져와서 텐서로 변환 (train과 동일)

        # 인코더의 초기 hidden state를 결정 (train과 동일)
        
        ###############################################

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei],
                                                     encoder_hidden)

        ####################  빈칸  ####################

        # decoder의 초기 input 과 hidden state 결정 (train과 동일)

        ###############################################
        
        decoded_words = []
        for di in range(max_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            topv, topi = decoder_output.data.topk(1)

            ####################  빈칸  ####################

            # decoding 멈출 시기 결정, 예측단어 decoded_words에 저장

            # EOS_token을 생성하면 종료

                # 예측한 단어를 추가

            ###############################################
            
            decoder_input = topi
        return decoded_words

검증 데이터쌍에서 결과 확인

In [None]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
smoothie = SmoothingFunction().method4
eval_pair = [['너 건망증이 좀 있구나.',"You're quite forgetful."],
             ['미안한데, 내가 도와줄 수가 없어.',"I'm sorry, I can't help you."],
             ['너 떨고 있네.',"You're shivering."],
             ['그는 우울하다.',"He is depressed."],
             ['난 네 선생이다.',"I'm your teacher."],
             ['내 피가 끓고 있었다.	',"My blood was boiling."]]

def evaluatePrint(encoder, decoder):
    for pair in eval_pair:
        print('원본: ', pair[0])
        print('정답 변역: ', pair[1])
        output_words = evaluate(encoder, decoder, pair[0])
        output_sentence = ' '.join(output_words[:-1])
        print('예측 번역: ', output_sentence)
        print("sentence_bleu: %.3f" % (sentence_bleu([normalizeString(pair[1]).split()], output_sentence.split(), smoothing_function=smoothie)))
        print('')

전체 검증 데이터의 BLEU score 평균 확인

In [None]:
def evaluateScore(encoder, decoder):
    score = 0.0
    for pair in valid_pairs:
        output_words = evaluate(encoder, decoder, pair[0])
        output_sentence = ' '.join(output_words[:-1])
        score += sentence_bleu([normalizeString(pair[1]).split()], output_sentence.split(), smoothing_function=smoothie)
    print('Score : ',score/len(valid_pairs))

In [None]:
evaluatePrint(encoder1, decoder1)
evaluateScore(encoder1, decoder1)

Attention 기법을 적용한 디코더 RNN
- dot production 으로 attention score 구하기

In [None]:
class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.softmax = nn.Softmax(dim=1)
        self.out = nn.Linear(self.hidden_size*2, self.output_size)
        self.log_softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden, encoder_hiddens, input_length):
        embedded = self.embedding(input).view(1, 1, -1)
        embedded = self.dropout(embedded)

        ####################  빈칸  ####################
        # gru를 돌고 나온 hidden state로 어텐션 구하기
        # attn_score (dot) -> attn distribution -> attn value -> concat (attn value;hidden state)
        # 이때, 실제 input 길이를 제외한 나머지 부분은 매우 작은 값(-9e10)으로 마스킹해준 후 softmax를 돌아야함
        # concat 된 벡터를 linear 하나 돌아서 output 도출

        # output, hidden 계산
        
        # attn score 구하기 (내적)
        
        # input을 제외한 부분은 masking

        # softmax를 통해 encoder의 hidden embedding들을 weight sum
        
        # hidden과 encoder hidden에 어텐션을 적용한 결과를 concat하여 최종 단어 예측
        
        ###############################################


        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

어텐션 모델 학습 코드

In [None]:
teacher_forcing_ratio = 0.5

# train for each step
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH):

    ####################  빈칸  ####################

    # 기본 decoder를 사용했을 때와 거의 유사하다.
    # decoder 인자에 주의하며, 위에서 작성한 코드를 참고해서 작성
    # encoder 의 hiddens state 값들을 모아두어야 함, 미리 transpose 시키기

        # 기존의 decoder 학습과 달라진 점: 모든 encdoer의 hidden을 저장 (decoder의 attn에 사용)

    # decoder의 input에 encoder의 모든 hidden embedding을 사용
    
    ###############################################

    return loss.item() / target_length

어텐션 모델 검증 코드

In [None]:
def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):

    ####################  빈칸  ####################

    # 기본 decoder를 사용했을 때와 거의 유사하다.
    # decoder 인자에 주의하며, 위에서 작성한 코드를 참고해서 작성
    # encoder 의 hiddens state 값들을 모아두어야 함, 미리 transpose 시키기

    # 기존의 decoder 학습과 달라진 점: 모든 encdoer의 hidden을 저장 (decoder의 attn에 사용)

        # encoder 호출

    ###############################################
      
        return decoded_words

In [None]:
hidden_size = 256
encoder1 = EncoderRNN(input_lang.n_words, hidden_size).to(device)
attn_decoder1 = AttnDecoderRNN(hidden_size, output_lang.n_words, dropout_p=0.1).to(device)

trainIters(encoder1, attn_decoder1, 7500, print_every=1000) 

In [None]:
evaluatePrint(encoder1, attn_decoder1)
evaluateScore(encoder1, attn_decoder1)