In [1]:
import string

l = []

with open("kor.txt", 'r', encoding="utf-8") as f:
    lines = f.read().split("\n")
    for line in lines:
        txt = "".join(v for v in line if v not in string.punctuation).lower()
        l.append(txt)
        
print(l[:5])

['go\t가', 'hi\t안녕', 'run\t뛰어', 'run\t뛰어', 'who\t누구']


In [2]:
import numpy as np
import torch

from torch.utils.data.dataset import Dataset

def get_BOW(corpus):
    BOW = {"<SOS>":0, "<EOS>":1}
    
    for line in corpus:
        for word in line.split():
            if word not in BOW.keys():
                BOW[word] = len(BOW.keys())

    return BOW

  from .autonotebook import tqdm as notebook_tqdm


get_BOW는 문장들로부터 BOW (Bag Of Words)를 만들어주는 함수이다.

In [3]:
class Eng2Kor(Dataset):
    def __init__(self, pth2txt="kor.txt"):
        self.eng_corpus = []
        self.kor_corpus = []
        
        with open(pth2txt, 'r', encoding="utf-8") as f:
            lines = f.read().split("\n")
            for line in lines:
                txt = "".join(v for v in line if v not in string.punctuation).lower()
                engtxt = txt.split("\t")[0]
                kortxt = txt.split("\t")[1]
                
                if len(engtxt.split()) <= 10 and len(kortxt.split()) <= 10:
                    self.eng_corpus.append(engtxt)
                    self.kor_corpus.append(kortxt)
                    
        self.engBOW = get_BOW(self.eng_corpus)
        self.korBOW = get_BOW(self.kor_corpus)
    
    def gen_seq(self, line):
        seq = line.split()
        seq.append("<EOS>")
        return seq
    
    def __len__(self):
        return len(self.eng_corpus)
    
    def __getitem__(self, i):
        data = np.array([self.engBOW[txt] for txt in self.gen_seq(self.eng_corpus[i])])
        label = np.array([self.korBOW[txt] for txt in self.gen_seq(self.kor_corpus[i])])
        return data, label

gen_seq: 문장을 단어별로 분리하고 마지막에 \<EOS\>를 추가해주는 함수

Eng2Kor를 이용하여 만들어지는 dataset의 예시는 다음과 같다.

In [4]:
dataset = Eng2Kor()
print(len(dataset))
print(dataset.eng_corpus[2000])
print(dataset[2000][0])
print(dataset.kor_corpus[2000])
print(dataset[2000][1])

3592
im not sure what they want
[ 54  67 913 103  91 214   1]
그들이 원하는 게 뭔지 모르겠다
[  99 2658 1656 2659 2660    1]


engBOW와 korBOW는 다음과 같이 생성된다.

In [5]:
print(dataset.engBOW)
print(len(dataset.engBOW))
print(dataset.korBOW)
print(len(dataset.korBOW))

2445
{'<SOS>': 0, '<EOS>': 1, '가': 2, '안녕': 3, '뛰어': 4, '누구': 5, '우와': 6, '쏴': 7, '도와줘': 8, '점프': 9, '점프해': 10, '기다려': 11, '잠깐': 12, '시작해': 13, '알았어': 14, '시도해볼게': 15, '내가': 16, '이겼어': 17, '아니': 18, '이런': 19, '진정해': 20, '웃어': 21, '공격': 22, '공격해': 23, '꼼짝마': 24, '일어나': 25, '알겠어': 26, '안아줘': 27, '알아': 28, '나': 29, '일해': 30, '들어': 31, '절대': 32, '아니야': 33, '그럴리가': 34, '고마워': 35, '우리는': 36, '시도할거야': 37, '우리가': 38, '왜': 39, '나야': 40, '굉장해': 41, '공정하게': 42, '해': 43, '저리': 44, '우리한테': 45, '연락해': 46, '들어와': 47, '어서': 48, '나가': 49, '그가': 50, '왔어': 51, '그': 52, '사람이': 53, '톰을': 54, '때려': 55, '동의해': 56, '슬퍼': 57, '나도': 58, '열어': 59, '완벽해': 60, '보여줘': 61, '시끄러워': 62, '건너뛰어': 63, '그만해': 64, '말해': 65, '톰이': 66, '설거지': 67, '어서오세요': 68, '환영합니다': 69, '누가': 70, '안돼': 71, '힘내': 72, '꺼져': 73, '계속해': 74, '잘했어': 75, '잡아': 76, '귀엽잖아': 77, '이렇게': 78, '귀엽다니': 79, '얼마나': 80, '깊게': 81, '서둘러': 82, '잊어버렸어': 83, '나는': 84, '못': 85, '생겼다': 86, '아파': 87, '동작하네': 88, '작동하네': 89, '되네': 90, '가자': 91, '조심해': 92, '앉아': 93, 

In [6]:
def loader(dataset):
    for i in range(len(dataset)):
        data, label = dataset[i]
        yield torch.tensor(data), torch.tensor(label)

loader: 데이터셋의 문장을 한 문장씩 불러오는 함수

yield 구문은 return과 비슷하지만, 값을 반복적으로 반환하는데 사용된다.

In [7]:
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)
        
    def forward(self, x, h):                    # x: scala, h: (1, 1, 64)
        x = self.embedding(x).view(1, 1, -1)    # x: (1, 1, 64)
        output, hidden = self.gru(x, h)         # output, hidden: (1, 1, 64)
        return output, hidden

Encoder는 임베딩층과 GRU층으로 구성되어 있어 영단어를 나타내는 x와 hidden state h가 input으로 들어오면 output (encoder output)과 hidden (hidden state)을 반환한다.

In [20]:
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=11):
        super(Decoder, self).__init__()
        
        self.embedding = nn.Embedding(output_size, hidden_size)
        
        self.attention = nn.Linear(hidden_size * 2, max_length)
        
        self.context = nn.Linear(hidden_size * 2, hidden_size)
        
        self.dropout = nn.Dropout(dropout_p)
        
        self.gru = nn.GRU(hidden_size, hidden_size)
        
        self.out = nn.Linear(hidden_size, output_size)
        
        self.relu = nn.ReLU()
        self.softmax = nn.LogSoftmax(dim=1)
        
    def forward(self, x, h, encoder_outputs):   # x: scala, h: (1, 1, 64), encoder_outputs: (11, 64)
        x = self.embedding(x).view(1, 1, -1)    # x: (1, 1, 64)
        x = self.dropout(x)
        
        attn_weights = self.softmax(self.attention(torch.cat((x[0], h[0]), -1)))
        # attn_weights: (1, 11)
        
        attn_applied = torch.bmm(attn_weights.unsqueeze(0), encoder_outputs.unsqueeze(0))
        # torch.bmm: batch matrix-matrix product -> attn_applied: (1, 1, 64)
        
        output = torch.cat((x[0], attn_applied[0]), 1)  # output: (1, 128)
        output = self.context(output).unsqueeze(0)      # output: (1, 1, 64)
        output = self.relu(output)
        
        output, hidden = self.gru(output, h)    # output: (1, 1, 64)
        
        output = self.out(output[0])            # output: (1, 5263)
        
        return output

Decoder에서는 Encoder에서 받은 output과 hidden을 사용한다. 한글 단어를 나타내는 x가 input으로 들어오면 임베딩층을 거쳐 hidden_size로 변환되고, MLP와 softmax를 거쳐 attn_weights (어텐션 가중치)를 얻는다. encoder_outputs와 attn_weights를 내적해 encoder 각 시점의 중요도를 계산하면 attn_applied를 얻을 수 있다. 초기 입력값 x와 attn_applied를 합친 다음 MLP와 ReLU를 거치면 output (특징)을 추출할 수 있다. 마지막으로 추출한 특징과 이전 시점의 hidden state를 GRU층에 넣어 시계열 정보를 학습하게 된다.

In [21]:
import random
import tqdm

from torch.optim.adam import Adam

device = "cuda" if torch.cuda.is_available() else "cpu"

dataset = Eng2Kor()

encoder = Encoder(input_size=len(dataset.engBOW), hidden_size=64).to(device)
decoder = Decoder(64, len(dataset.korBOW), dropout_p=0.1).to(device)

encoder_optimizer = Adam(encoder.parameters(), lr=0.0001)
decoder_optimizer = Adam(decoder.parameters(), lr=0.0001)

In [24]:
for epoch in range(200):
    iterator = tqdm.tqdm(loader(dataset), total=len(dataset))
    total_loss = 0
    
    for data, label in iterator:    # len(dataset)번 반복
        data = torch.tensor(data, dtype=torch.long).to(device)      # ex) data = tensor([52, 94, 1])
        label = torch.tensor(label, dtype=torch.long).to(device)    # ex) label = tensor([66, 104, 105, 1])
        
        encoder_hidden = torch.zeros(1, 1, 64).to(device)
        encoder_outputs = torch.zeros(11, 64).to(device)
        
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()
        
        loss = 0
        
        for ei in range(len(data)):     # len(data)번 반복
            encoder_output, encoder_hidden = encoder(data[ei], encoder_hidden)
            # shape of encoder_output, encoder_hidden: (1, 1, 64)
            encoder_outputs[ei] = encoder_output[0, 0]  # len(data)만큼 채워지고 나머지는 0
        
        decoder_input = torch.tensor([[0]]).to(device)  # 문장의 시작을 나타내는 <SOS> 토큰
        
        decoder_hidden = encoder_hidden     # encoder의 마지막 hidden state -> decoder의 초기 hidden state
        
        use_teacher_forcing = True if random.random() < 0.5 else False
        
        if use_teacher_forcing:     # teacher forcing을 사용하는 loop
            for di in range(len(label)):    # len(label)번 반복
                decoder_output = decoder(decoder_input, decoder_hidden, encoder_outputs)
                # decoder_output: (1, 5263)
                
                target = torch.tensor(label[di], dtype=torch.long).to(device)
                target = torch.unsqueeze(target, dim=0).to(device)
                loss += nn.CrossEntropyLoss()(decoder_output, target)
                decoder_input = target  # 다음 정답 단어를 강제적으로 decoder_input으로 설정
                
        else:   # teacher forcing을 사용하지 않는 loop
            for di in range(len(label)):    # len(label)번 반복
                decoder_output = decoder(decoder_input, decoder_hidden, encoder_outputs)
                # decoder_output: (1, 5263)
                
                topv, topi = decoder_output.topk(1)
                # decoder_output에서 가장 높은 확률을 갖는 단어의 확률값과 index 반환
                decoder_input = topi.squeeze().detach()     # 이 단어를 decoder_input으로 설정
                
                target = torch.tensor(label[di], dtype=torch.long).to(device)
                target = torch.unsqueeze(target, dim=0).to(device)
                loss += nn.CrossEntropyLoss()(decoder_output, target)
                
                if decoder_input.item() == 1:   # <EOS> 토큰을 만나면 중지
                    break
                
        total_loss += loss.item()/len(dataset)
        iterator.set_description(f"epoch:{epoch+1} loss:{total_loss}")
        loss.backward()
        
        encoder_optimizer.step()
        decoder_optimizer.step()

torch.save(encoder.state_dict(), "attn_enc.pth")
torch.save(decoder.state_dict(), "attn_dec.pth")

  
  import sys
epoch:1 loss:22.240539454139384: 100%|██████████| 3592/3592 [02:09<00:00, 27.81it/s]
epoch:2 loss:22.538080265965387: 100%|██████████| 3592/3592 [02:22<00:00, 25.19it/s]
epoch:3 loss:21.900623210952627: 100%|██████████| 3592/3592 [02:49<00:00, 21.25it/s]
epoch:4 loss:21.623567492765382: 100%|██████████| 3592/3592 [02:48<00:00, 21.34it/s]
epoch:5 loss:11.525647709390894:  65%|██████▍   | 2328/3592 [01:38<00:53, 23.61it/s]


KeyboardInterrupt: 

In [75]:
encoder.load_state_dict(torch.load("attn_enc.pth", map_location=device))
decoder.load_state_dict(torch.load("attn_dec.pth", map_location=device))

idx = random.randint(0, len(dataset))
input_sentence = dataset.eng_corpus[idx]
pred_sentence = ""

data, label = dataset[idx]
data = torch.tensor(data, dtype=torch.long).to(device)
label = torch.tensor(label, dtype=torch.long).to(device)

encoder_hidden = torch.zeros(1, 1, 64).to(device)
encoder_outputs = torch.zeros(11, 64).to(device)

In [76]:
for ei in range(len(data)):
    encoder_output, encoder_hidden = encoder(data[ei], encoder_hidden)  # 한 단어씩 encoder에 입력
    
    encoder_outputs[ei] = encoder_output[0, 0]  # encoder의 출력 저장
    
decoder_input = torch.tensor([[0]]).to(device)  # 문장의 시작을 나타내는 <SOS> 토큰

decoder_hidden = encoder_hidden     # encoder의 마지막 hidden state -> decoder의 초기 hidden state

In [77]:
for di in range(11):
    decoder_output = decoder(decoder_input, decoder_hidden, encoder_outputs)
    topv, topi = decoder_output.topk(1)
    decoder_input = topi.squeeze().detach()     # 가장 높은 확률을 갖는 단어의 요소 추출
    
    if decoder_input.item() == 1:       # <EOS> 토큰을 만나면 중지
        break
    
    pred_sentence += list(dataset.korBOW.keys())[decoder_input] + " "   # 가장 높은 확률의 단어를 문자열에 추가
    
print(input_sentence)
print(pred_sentence)

i live very close to the subway station
난 지하철 역에서 아주 가까운 곳에 살아 
