In [1]:
from __future__ import unicode_literals, print_function, division
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np
import pandas as pd

import os
import re
import random

In [2]:
torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
raw = ["I feel hungry.	나는 배가 고프다.",
       "Pytorch is very easy.	파이토치는 매우 쉽다.",
       "Pytorch is a framework for deep learning.	파이토치는 딥러닝을 위한 프레임워크이다.",
       "Pytorch is very clear to use.	파이토치는 사용하기 매우 직관적이다."]

In [4]:
SOS_token = 0
EOS_token = 1

In [5]:
# 기계가 번역할 때는 단어보다는 숫자를 더 잘 처리 하기 때문에 숫자로 넣어준 다음 데이터가 나오면 다시 숫자에서 단어로 바꿔주기 위해 
# 아래와 같은 작업을 진행
class Vocab:
    def __init__(self):
        self.vocab2index = {"<SOS>": SOS_token, "<EOS>": EOS_token} # 단어를 키값, 인덱스를 value값으로 저장
        self.index2vocab = {SOS_token: "<SOS>", EOS_token: "<EOS>"} # 인덱스와 단어
        self.vocab_count = {} # 단어가 나온 횟수 저장
        self.n_vocab = len(self.vocab2index) # 단어 집한 개수

    def add_vocab(self, sentence):
        for word in sentence.split(" "): # 공백을 기준으로 단어 쪼개기
            if word not in self.vocab2index: # 새로운 단어가 나오면 단어 인덱스를 부여하고 vocab에 저장
                self.vocab2index[word] = self.n_vocab # 단어에 인덱스 줌
                self.vocab_count[word] = 1 # 단어가 한 번 나옴
                self.index2vocab[self.n_vocab] = word # 인덱스에 단어 
                self.n_vocab += 1 # 단어 집합의 수에 하나 추가
            else:
                self.vocab_count[word] += 1 # 이미 있는 단어라면 단어가 나온 횟수만 증가

In [6]:
# 길이가 긴 문장 처리
def filter_pair(pair, source_max_length, target_max_length): # 최대 길이를 지정
    return len(pair[0].split(" ")) < source_max_length and len(pair[1].split(" ")) < target_max_length # 최대 문장 까지만 리턴한다.

In [60]:
# 모델에 넣기 전 데이터 전처리
def preprocess(corpus, source_max_length, target_max_length):
    print()
    pairs=[]
    for line in corpus: 
        pairs.append([s for s in line.strip().lower().split("\t")]) # 소스와 타켓으로 나눈다
    print("Read {} sentence pairs".format(len(pairs))) # 문장의 개수
    
    pairs = [pair for pair in pairs if filter_pair(pair, source_max_length, target_max_length)] # 최대 문장을 기준으로 데이터 정제/..?
    print("Trimmed to {} sentence pairs".format(len(pairs)))
    
    source_vocab = Vocab()
    target_vocab = Vocab()
    
    print()
    
    for pair in pairs:
        source_vocab.add_vocab(pair[0]) # 자른 문장의 앞부분을 소스 단어 보캡에 저장
        target_vocab.add_vocab(pair[1]) # 자른 문장의 뒷부분을 타겟보캡에 저장
    print("source vocab size =", source_vocab.n_vocab)
    print("target vocab size =", target_vocab.n_vocab)
    
    return pairs, source_vocab, target_vocab

In [61]:
# 인코더
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size): # 인풋 사이즈와 히든사이즈 
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size) # 임베딩은 토치 라이브러리 함수 사용.?
        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, x, hidden):
        
        x = self.embedding(x).view(1, 1, -1) # 데이터가 학습할 수 있는 형태로 바꾸기
        x, hidden = self.gru(x, hidden) # GRU의 리턴값은 (배치 크기, 시퀀스 길이, 은닉 상태의 크기)
        return x, hidden # 인코더의 결과값이 디코더의 인풋으로 들어가기 때문에 리턴
                         # 인코더의 히든사이즈를 디코더에서도 쓸 예정이기 때문에 리턴

In [80]:
# 디코더
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, output_size) # 결과값 출력
        self.softmax = nn.LogSoftmax(dim=1) # 활성화 함수

    def forward(self, x, hidden):
        
        x = self.embedding(x).view(1, 1, -1)
        x, hidden = self.gru(x, hidden)
#         print(x[0])
        x = self.softmax(self.out(x[0])) #
        return x, hidden

In [81]:
# 문장들을 인덱스 텐서로 바꿔준다.
# 왜? 네트워크가 텐서 타입 데이터만 읽을 수 있어서
def tensorize(vocab, sentence):
    indexes = [vocab.vocab2index[word] for word in sentence.split(" ")]
    indexes.append(vocab.vocab2index["<EOS>"])
    return torch.Tensor(indexes).long().to(device).view(-1, 1)

In [82]:
# 훈련 seq2seq

def train(pairs, source_vocab, target_vocab, encoder, decoder, n_iter, print_every=1000, learning_rate=0.01): #  n_iter : 반복횟수
    loss_total = 0

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate) 
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate) 

    training_batch = [random.choice(pairs) for _ in range(n_iter)] 
    training_source = [tensorize(source_vocab, pair[0]) for pair in training_batch]
    training_target = [tensorize(target_vocab, pair[1]) for pair in training_batch]

    criterion = nn.NLLLoss()
    
    for i in range(1, n_iter + 1): # 횟수로 나타내기 위해 1 더함
        source_tensor = training_source[i - 1] # 0부터 시작하기 때문에 1 빼줌
        target_tensor = training_target[i - 1] 

        encoder_hidden = torch.zeros([1, 1, encoder.hidden_size]).to(device) # 디바이스를 to로 빼서 따로 써줌

        encoder_optimizer.zero_grad() # 역전파 할 때 데이터 업데이트 하기 위해
        decoder_optimizer.zero_grad()

        source_length = source_tensor.size(0) 
        target_length = target_tensor.size(0)

        loss = 0

        for enc_input in range(source_length): # 인코더에 소스데이터 넣어 훈련 진행
#             print(f'인코더에서 인풋 사이즈 : {source_tensor[enc_input]}')
            _, encoder_hidden = encoder(source_tensor[enc_input], encoder_hidden)

        decoder_input = torch.Tensor([[SOS_token]]).long().to(device) # 디코더 sos토큰을 넣어준다.
#         print(f'디코더에서 인풋 : {decoder_input}')
        decoder_hidden = encoder_hidden # 인코더의 출력값은 디코더의 인풋 데이터가 된다.

        for di in range(target_length): # 정답 레이블
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
#             print(f'디코더에서 아웃풋 : {decoder_output}')
            loss += criterion(decoder_output, target_tensor[di]) #오차계산 
            decoder_input = target_tensor[di]  # teacher forcing

        loss.backward()

        encoder_optimizer.step() # 인코더 최적합
        decoder_optimizer.step() # 디코더 최적합

        loss_iter = loss.item() / target_length # epoch
        loss_total += loss_iter 

        if i % print_every == 0:
            loss_avg = loss_total / print_every # 배치별 평균 오차
            loss_total = 0
            print("[{} - {}%] loss = {:05.4f}".format(i, i / n_iter * 100, loss_avg))

In [83]:
# 문장을 넣고 평가하기
def evaluate(pairs, source_vocab, target_vocab, encoder, decoder, target_max_length):
    for pair in pairs:
        print(">", pair[0])
        print("=", pair[1])
        source_tensor = tensorize(source_vocab, pair[0])
        source_length = source_tensor.size()[0]
        encoder_hidden = torch.zeros([1, 1, encoder.hidden_size]).to(device)

        for ei in range(source_length):
            _, encoder_hidden = encoder(source_tensor[ei], encoder_hidden) # shaped을 변경하고 인코더에 넣음

        decoder_input = torch.Tensor([[SOS_token]]).to(device).long() # device to에 넣어야 된다... 안에서 정의하면 cpu로 실행된다.
        decoder_hidden = encoder_hidden
        decoded_words = []

        for di in range(target_max_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            _, top_index = decoder_output.data.topk(1) # 디코딩해서 나온 값을 인덱스 형태로 저장되어잇음
            if top_index.item() == EOS_token:
                decoded_words.append("<EOS>") # 문장이 끝나면 eos 넣고 끝
                break
            else:
                decoded_words.append(target_vocab.index2vocab[top_index.item()]) # 인덱스를 단어로 변경해서 아웃풋 저장

            decoder_input = top_index.squeeze().detach() 

        predict_words = decoded_words # 예측한 값 저장
        predict_sentence = " ".join(predict_words) 
        print("<", predict_sentence)
        print("")

In [84]:
# 최대 길이 지정
SOURCE_MAX_LENGTH = 10
TARGET_MAX_LENGTH = 12

In [85]:
# 병렬 코퍼스 형태로 데이터 변환
load_pairs, load_source_vocab, load_target_vocab = preprocess(raw, SOURCE_MAX_LENGTH, TARGET_MAX_LENGTH)
print(random.choice(load_pairs))


Read 4 sentence pairs
Trimmed to 4 sentence pairs

source vocab size = 17
target vocab size = 13
['pytorch is a framework for deep learning.', '파이토치는 딥러닝을 위한 프레임워크이다.']


In [86]:
# 인코딩과 디코딩의 히든 사이즈 지정
# 인코딩과 디코딩 실행
enc_hidden_size = 16
dec_hidden_size = enc_hidden_size
enc = Encoder(load_source_vocab.n_vocab, enc_hidden_size).to(device)
dec = Decoder(dec_hidden_size, load_target_vocab.n_vocab).to(device)

In [87]:
train(load_pairs, load_source_vocab, load_target_vocab, enc, dec, 5000, print_every=1000)

[1000 - 20.0%] loss = 0.6863
[2000 - 40.0%] loss = 0.1111
[3000 - 60.0%] loss = 0.0366
[4000 - 80.0%] loss = 0.0208
[5000 - 100.0%] loss = 0.0144


In [88]:
# 주어진 데이터로 모델 평가
evaluate(load_pairs, load_source_vocab, load_target_vocab, enc, dec, TARGET_MAX_LENGTH)

> i feel hungry.
= 나는 배가 고프다.
< 나는 배가 고프다. <EOS>

> pytorch is very easy.
= 파이토치는 매우 쉽다.
< 파이토치는 매우 쉽다. <EOS>

> pytorch is a framework for deep learning.
= 파이토치는 딥러닝을 위한 프레임워크이다.
< 파이토치는 딥러닝을 위한 프레임워크이다. <EOS>

> pytorch is very clear to use.
= 파이토치는 사용하기 매우 직관적이다.
< 파이토치는 사용하기 매우 직관적이다. <EOS>

