PyTorch official tutorial 에서는 Recurrent Neural Network 를 이용한 Part of Speech tagger 의 구현 예시가 있습니다. 우리는 이를 통하여 RNN, 그 중 LSTM 을 이용하는 연습을 합니다.

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader

print(torch.__version__)

1.0.1.post2


데이터는 두 개의 문장, 총 3 개의 tags 입니다. 구현 과정을 연습할 때에는 이처럼 확인 가능한 작은 데이터나, 경향을 잘 알고 있는 데이터로 하는 것이 좋습니다.

In [2]:
training_data = [
    ("The dog ate the apple".split(), ["DET", "NN", "V", "DET", "NN"]),
    ("Everybody read that book".split(), ["NN", "V", "DET", "NN"])
]

utils.py 파일에 기본적인 유틸 함수들을 만들어 두었습니다. Sentence sequence 와 tag sequence 를 각각 나눠서 vocabulary scanning 을 합니다.

In [3]:
from utils import decode_sequence
from utils import encode_sequence
from utils import scan_vocabulary

sents, tags = zip(*training_data)

idx_to_vocab, vocab_to_idx = scan_vocabulary(sents)
idx_to_tag, tag_to_idx = scan_vocabulary(tags)

In [4]:
print(idx_to_vocab)
print(idx_to_tag)

['The', 'dog', 'ate', 'the', 'apple', 'Everybody', 'read', 'that', 'book']
['NN', 'DET', 'V']


In [5]:
print(encode_sequence(sents[0], vocab_to_idx))
print(encode_sequence(tags[0], tag_to_idx))

tensor([0, 1, 2, 3, 4])
tensor([1, 0, 2, 1, 0])


In [6]:
X = [encode_sequence(sent, vocab_to_idx) for sent in sents]
Y = [encode_sequence(tag, tag_to_idx) for tag in tags]

nn.LSTM.forward 함수의 input, output 의 size 는 각각 (seq_len, batch_size, input dim), (seq_len, batch_size, output dim) 입니다. 여기에서는 batch_size = 1 인 경우의 모델을 만들어봅니다.

우리는 word embedding vectors 도 LSTM model 안에 구현할 것입니다. lookup 이 이뤄지면 LSTM layer 에 input 을 입력하고, 매 시점의 output 마다 linear layer 를 이용하여 softmax 수행 (hidden2tag ) 할 것입니다.

```python
class Model(nn.LSTM):
    def __init__(self, ... ):
        # ...
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)
```

그리고 LSTM 이 이용할 hidden layer 값을 저장할 준비를 합니다. hidden 의 format 은 (num_layers, minibatch_size, hidden_dim) 입니다. 우리는 deep RNN 이 아니기 때문에 num_layers 를 1 로 정의하였으며, 하나의 문장을 입력하여 하나의 품사열을 얻을 걷이시 때문에 minibatch_size 도 1 로 정의하였습니다.

```python
class Model(nn.LSTM):
    # ...
    def init_hidden(self):
        return (torch.zeros(1, 1, self.hidden_dim),
                torch.zeros(1, 1, self.hidden_dim))
```

PyTorch 에서의 LSTM.forward 함수의 입력/출력 값의 형식은 아래와 같습니다. (hidden, cell) 에는 마지막 hidden 과 memory cell 의 값이 저장되어 있습니다. 그렇기 때문에 우리는 init_hidden 에서 두 개의 (1, 1, hidden_dim) 의 zero tensor 를 만듭니다.

    output, (hidden, cell) = LSTM.forward(input, (hidden, cell))

output 은 각 input 에 대한 모든 output 이 출력됩니다.

In [7]:
class Model(nn.Module):

    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(Model, self).__init__()

        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)

        # The LSTM takes word embeddings as inputs, and outputs hidden states
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)

        # The linear layer that maps from hidden state space to tag space
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def init_hidden(self):
        # (num_layers, minibatch_size, hidden_dim)
        return (torch.zeros(1, 1, self.hidden_dim),
                torch.zeros(1, 1, self.hidden_dim))

    def forward(self, sentence, debug=False):
        len_sen = len(sentence)
        lstm_input = self.word_embeddings(sentence)
        lstm_input = lstm_input.view(len_sen, 1, -1)
        hidden, cell = self.init_hidden()

        lstm_out, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))

        lstm_out_ = lstm_out.view(len(sentence), -1)
        tag_space = self.hidden2tag(lstm_out_)
        tag_scores = F.log_softmax(tag_space, dim=1)

        if debug:
            print('input size       : {}'.format(lstm_input.size()))            
            print('lstm hidden size : {}'.format(hidden.size()))
            print('lstm cell size   : {}'.format(cell.size()))
            print('lstm_out size    : {}'.format(lstm_out.size()))
            print('lstm_out_ size   : {}'.format(lstm_out_.size()))
            print('tag scores size  : {}'.format(tag_scores.size()))

        return tag_scores

In [8]:
# 개발용 데이터이기 때문에 embedding dimension 이나 hidden dimension 이 작아도 됩니다만, 실제 모델에서는 이 값을 충분히 크게 설정해야 합니다. 
embedding_dim = 7
hidden_dim = 8
vocab_size = len(vocab_to_idx)
tagset_size = len(tag_to_idx)

model = Model(embedding_dim, hidden_dim, vocab_size, tagset_size)
model(X[0], debug=True)

input size       : torch.Size([5, 1, 7])
lstm hidden size : torch.Size([1, 1, 8])
lstm cell size   : torch.Size([1, 1, 8])
lstm_out size    : torch.Size([5, 1, 8])
lstm_out_ size   : torch.Size([5, 8])
tag scores size  : torch.Size([5, 3])


tensor([[-0.8630, -1.1220, -1.3765],
        [-0.8189, -1.1604, -1.4035],
        [-0.7957, -1.2365, -1.3535],
        [-0.8032, -1.2118, -1.3686],
        [-0.8371, -1.1185, -1.4259]], grad_fn=<LogSoftmaxBackward>)

만약 현재의 모델에 forward 를 하여 중간 값을 확인하고 싶다면 torch.no_grad() 를 이용할 수 있습니다. 모델의 gradient 에 어떤 값도 저장하지 않습니다. debugging 용 방법입니다.

In [9]:
with torch.no_grad():
    tag_scores = model.forward(X[0], debug=True)
    tag_pred_idx = torch.argmax(tag_scores, dim=1)
    tag_pred = decode_sequence(tag_pred_idx, idx_to_tag)
    print('\n## tag true\n{}'.format(Y[0]))
    print('\n## tag predicted idx\n{}'.format(tag_pred_idx))
    print('\n## tag predicted\n{}'.format(tag_pred))

input size       : torch.Size([5, 1, 7])
lstm hidden size : torch.Size([1, 1, 8])
lstm cell size   : torch.Size([1, 1, 8])
lstm_out size    : torch.Size([5, 1, 8])
lstm_out_ size   : torch.Size([5, 8])
tag scores size  : torch.Size([5, 3])

## tag true
tensor([1, 0, 2, 1, 0])

## tag predicted idx
tensor([0, 0, 0, 0, 0])

## tag predicted
['NN', 'NN', 'NN', 'NN', 'NN']


In [10]:
def train(model, X, Y, optimizer, loss_func, epoch):
    loss_sum = 0
    for words, tags in zip(X, Y):
        model.zero_grad()
        tag_scores = model(words)
        loss = loss_func(tag_scores, tags)
        loss.backward()
        optimizer.step()
        loss_sum += loss.item()
    return model, loss_sum

Loss function 은 negative log likelihood loss 를 이용합니다.

In [11]:
# Negative Log Likelihood Loss
loss_func = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

max_epoch = 300
for epoch in range(1, max_epoch+1):
    model, loss = train(model, X, Y, optimizer, loss_func, epoch)
    loss /= len(X)
    if epoch % 10 == 0:
        print('epoch = {}, loss = {:.4}'.format(epoch, loss))

epoch = 10, loss = 1.059
epoch = 20, loss = 1.026
epoch = 30, loss = 0.9874
epoch = 40, loss = 0.936
epoch = 50, loss = 0.8653
epoch = 60, loss = 0.7707
epoch = 70, loss = 0.6577
epoch = 80, loss = 0.5444
epoch = 90, loss = 0.4456
epoch = 100, loss = 0.3645
epoch = 110, loss = 0.299
epoch = 120, loss = 0.2463
epoch = 130, loss = 0.2041
epoch = 140, loss = 0.1704
epoch = 150, loss = 0.1436
epoch = 160, loss = 0.1222
epoch = 170, loss = 0.1049
epoch = 180, loss = 0.09101
epoch = 190, loss = 0.07966
epoch = 200, loss = 0.07032
epoch = 210, loss = 0.06257
epoch = 220, loss = 0.05609
epoch = 230, loss = 0.05062
epoch = 240, loss = 0.04596
epoch = 250, loss = 0.04197
epoch = 260, loss = 0.03853
epoch = 270, loss = 0.03553
epoch = 280, loss = 0.03291
epoch = 290, loss = 0.0306
epoch = 300, loss = 0.02856


학습 이후에는 tagging 이 제대로 됨을 확인할 수 있습니다.

In [13]:
with torch.no_grad():
    tag_scores = model.forward(X[0], debug=True)
    tag_pred_idx = torch.argmax(tag_scores, dim=1)
    tag_pred = decode_sequence(tag_pred_idx, idx_to_tag)
    print('\n## tag true\n{}'.format(Y[0]))
    print('\n## tag predicted idx\n{}'.format(tag_pred_idx))
    print('\n## tag predicted\n{}'.format(tag_pred))

input size       : torch.Size([5, 1, 7])
lstm hidden size : torch.Size([1, 1, 8])
lstm cell size   : torch.Size([1, 1, 8])
lstm_out size    : torch.Size([5, 1, 8])
lstm_out_ size   : torch.Size([5, 8])
tag scores size  : torch.Size([5, 3])

## tag true
tensor([1, 0, 2, 1, 0])

## tag predicted idx
tensor([1, 0, 2, 1, 0])

## tag predicted
['DET', 'NN', 'V', 'DET', 'NN']
