Ben Trevett 의 [Simple Sentiment Analysis](https://github.com/bentrevett/pytorch-sentiment-analysis/blob/master/1%20-%20Simple%20Sentiment%20Analysis.ipynb) 튜토리얼을 한글 데이터셋에 적용해보는 연습이다. 데이터셋은 [네이버 영화 평점 데이터](https://github.com/e9t/nsmc)을 이용한다.

이 튜토리얼에서는 RNN 모델을 이용하여 문장을 순차적으로 입력한 후 최종 hidden vector를 이용하여 문장이 긍정적인지 혹은 부정적인지 판단한다.

먼저 필요한 패키지와 랜덤 시드를 설정하자.

In [66]:
import torch
from torchtext import data

SEED = 1234

torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# 전처리

`Field`를 지정하자. 우리는 한글 데이터를 다루므로 토크나이저로 `spaCy`를 쓸 수가 없다. 여기서는 [KoNLPy](https://konlpy-ko.readthedocs.io/ko/v0.4.3/)의 은전한닢 tokenizer를 이용한다.

In [67]:
from konlpy.tag import Mecab
mecab = Mecab()

In [68]:
TEXT = data.Field(tokenize=mecab.morphs)
LABEL = data.LabelField(dtype = torch.float)

`torchtext`에 내장된 데이터셋을 이용하는 게 아니므로 각 컬럼별로 해당하는 `Field`를 지정해줘야 한다.

In [69]:
fields = {'text': ('text',TEXT), 'label': ('label',LABEL)}
# dictionary 형식은 {csv컬럼명 : (데이터 컬럼명, Field이름)}

In [71]:
train_data, test_data = data.TabularDataset.splits(
                            path = 'data',
                            train = 'train_data.csv',
                            test = 'test_data.csv',
                            format = 'csv',
                            fields = fields,  
)

불러온 데이터는 다음과 같은 형태이다.

In [72]:
vars(train_data[0]), vars(train_data[1])

({'text': ['아', '더', '빙', '.', '.', '진짜', '짜증', '나', '네요', '목소리'],
  'label': '0'},
 {'text': ['흠',
   '.',
   '..',
   '포스터',
   '보고',
   '초딩',
   '영화',
   '줄',
   '.',
   '...',
   '오버',
   '연기',
   '조차',
   '가볍',
   '지',
   '않',
   '구나'],
  'label': '1'})

훈련 데이터의 갯수는 다음과 같다.

In [73]:
print(f'훈련 데이터 수 : {len(train_data)}')
print(f'테스트 데이터 수 : {len(test_data)}')

훈련 데이터 수 : 150000
테스트 데이터 수 : 50000


데이터에 검증 데이터가 따로 주어져 있지 않으므로 생성해준다.

In [74]:
import random
train_data, valid_data = train_data.split(random_state=random.seed(SEED))

In [75]:
print(f'훈련 데이터 수 : {len(train_data)}')
print(f'검증 데이터 수 : {len(valid_data)}')
print(f'테스트 데이터 수 : {len(test_data)}')

훈련 데이터 수 : 105000
검증 데이터 수 : 45000
테스트 데이터 수 : 50000


원 튜토리얼에서는 단어를 다짜고짜 25,000개로 끊는데, 일단 총 단어가 몇 개인지 확인해보자.

In [76]:
TEXT.build_vocab(train_data)
len(TEXT.vocab)

45963

이중 최소 두 번 이상 등장하는 단어의 갯수를 세보자.

In [77]:
TEXT.build_vocab(train_data, min_freq=2)
len(TEXT.vocab)

25210

이 정도면 그냥 25,000개로 끊어도 될 것 같다.

In [78]:
MAX_VOCAB_SIZE = 25000

TEXT.build_vocab(train_data, max_size=MAX_VOCAB_SIZE)
LABEL.build_vocab(train_data)

In [79]:
print(f"TEXT 단어장의 갯수 : {len(TEXT.vocab)}")
print(f"LABEL 단어장의 갯수 : {len(LABEL.vocab)}")

TEXT 단어장의 갯수 : 25002
LABEL 단어장의 갯수 : 2


`<unk>`와 `<pad>` 토큰이 추가되어 있으므로 단어의 갯수가 25,002개이다. 이제 최빈 단어들이 어떤 것인지 확인해보자.

In [80]:
print(TEXT.vocab.freqs.most_common(20))

[('.', 112232), ('이', 51194), ('는', 46784), ('영화', 40412), ('다', 38942), ('고', 33023), ('하', 31175), ('도', 23975), ('의', 23819), ('가', 23360), ('은', 21871), ('에', 21472), ('을', 21007), ('보', 17937), ('한', 17744), ('..', 16006), (',', 15682), ('게', 15512), ('들', 15013), ('!', 13573)]


단어와 인덱스 사이 매핑도 확인해볼 수 있다.

In [81]:
print(TEXT.vocab.itos[:10])

['<unk>', '<pad>', '.', '이', '는', '영화', '다', '고', '하', '도']


In [82]:
print(LABEL.vocab.stoi)

defaultdict(None, {'0': 0, '1': 1})


이제 `BucketIterator`를 이용하여 데이터 생성자를 만들자.

In [98]:
BATCH_SIZE = 64

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    device = device,
    # 원래 튜토리얼은 아래 두줄이 없는데 에러가 나서 추가
    sort_key = lambda x: len(x.text),
    sort_within_batch = False,
)

생성된 데이터의 크기를 확인해보자.

In [99]:
next(iter(train_iterator)).text.shape
# 문장의 길이 * 배치 사이즈

torch.Size([68, 64])

# 모델 생성

바닐라 RNN을 이용하여 간단한 모델을 생성한다. 모델의 각 input/output 벡터의 사이즈는 다음과 같다.

* `text` : `[sentence length, batch size]`
* `embedded` : `[sentence length, batch size, embedding dim]`
* `output` : `[sentence length, batch size, hidden dim]`
* `hidden` : `[1, batch size, hidden dim]`

In [85]:
import torch.nn as nn

In [86]:
class RNN(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, text):
        # text : [sent_len, batch_size]
        embedded = self.embedding(text)
        # embedded : [sent_len, batch_size, emb_dim]
        output, hidden = self.rnn(embedded)
        # output : [sent_len, batch_size, hidden_dim]
        # hidden : [1, batch_size, hidden_dim]
        assert torch.equal(output[-1,:,:], hidden.squeeze(0))
        
        return self.fc(hidden.squeeze(0)) # [batch_size, output_dim]

모델의 하이퍼파라미터를 다음과 같이 설정한 후 불러오자.

In [87]:
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 1

model = RNN(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM)

이 모델의 파라미터수는 다음과 같이 추출할 수 있다.

In [88]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'이 모델은 {count_parameters(model):,} 개의 파라미터를 가지고 있다.')

이 모델은 2,592,105 개의 파라미터를 가지고 있다.


# 모델 훈련

가장 기초적인 SGD 옵티마이저를 이용하여 모델을 훈련시켜보자.

In [89]:
import torch.optim as optim

optimizer = optim.SGD(model.parameters(), lr=1e-3)

손실 함수는 `binary cross entropy with logits`로 하자. 이 함수는 임의의 실수를 입력으로 받아서 sigmoid 함수를 취해 0과 1 사이의 값으로 변환한 뒤 `label`과의 `binary cross entropy`를 계산한다.

In [90]:
criterion = nn.BCEWithLogitsLoss()

모델과 손실함수를 GPU에 올리자.

In [91]:
model = model.to(device)
criterion = criterion.to(device)

이제 평가를 위해 임의의 실수를 0과 1 두 정수 중 하나로 변환하는 함수를 만들자.

In [92]:
def binary_accuracy(preds,y):
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float()
    acc = correct.sum() / len(correct)
    return acc

이제 모델의 훈련과 평가를 위한 함수를 만들자.

model의 output size는 `[batch_size, output_dim]`인데, output_dim = 1 이므로 이 차원을 없애줘야 `label`과 비교할 수 있다. 따라서 `squeeze(1)`를 적용한다.

In [93]:
def train(model, iterator, optimizer, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.train() # 이 모델은 상관없음
    
    for batch in iterator:
        optimizer.zero_grad()
        predictions = model(batch.text).squeeze(1)
        loss = criterion(predictions, batch.label)
        acc = binary_accuracy(predictions, batch.label)

        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item() # tensor에 item()을 취하면 value를 반환
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

평가를 위한 함수는 그래디언트 업데이트를 하지 않아야 하므로 `with torch.no_grad():` 구문으로 감싸도록 한다.

In [94]:
def evaluate(model, iterator, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.eval() # 이 모델은 상관없음
    
    with torch.no_grad():
        for batch in iterator:
            predictions = model(batch.text).squeeze(1)
            loss = criterion(predictions, batch.label)
            acc = binary_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

에폭마다 걸린 훈련시간을 측정하는 함수를 만든다.

In [95]:
import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

이제 실제로 훈련을 시켜보자. 검증셋의 손실이 개선되면 모델을 저장하도록 구현한다.

In [96]:
N_EPOCHS = 5
best_valid_loss = float('inf')

In [100]:
for epoch in range(N_EPOCHS):
    start_time = time.time()
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut1-model.pt')
        
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\t Train Loss: {train_loss:.3f} | Train Acc: {train_acc*100:0.2f}%')
    print(f'\t  Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:0.2f}%')

Epoch: 01 | Epoch Time: 0m 7s
	 Train Loss: 0.693 | Train Acc: 49.96%
	  Val. Loss: 0.694 |  Val. Acc: 51.81%
Epoch: 02 | Epoch Time: 0m 7s
	 Train Loss: 0.693 | Train Acc: 50.06%
	  Val. Loss: 0.694 |  Val. Acc: 52.10%
Epoch: 03 | Epoch Time: 0m 7s
	 Train Loss: 0.693 | Train Acc: 50.07%
	  Val. Loss: 0.693 |  Val. Acc: 52.30%
Epoch: 04 | Epoch Time: 0m 7s
	 Train Loss: 0.693 | Train Acc: 50.19%
	  Val. Loss: 0.693 |  Val. Acc: 52.39%
Epoch: 05 | Epoch Time: 0m 7s
	 Train Loss: 0.693 | Train Acc: 50.10%
	  Val. Loss: 0.693 |  Val. Acc: 52.40%


사실상 훈련이 거의 되지 않는 것을 알 수 있다. 이는 모델의 성능이 좋지 않기 떄문인데, 이 튜토리얼에서 차차 개선해보자.

마지막으로, 검증 셋에서 가장 좋은 결과를 받은 모델 기준으로 테스트셋의 결과를 측정해보자.

In [101]:
model.load_state_dict(torch.load('tut1-model.pt'))
test_loss, test_acc = evaluate(model, test_iterator, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

Test Loss: 0.693 | Test Acc: 52.00%
