In [2]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchtext import data, datasets

In [3]:
# 하이퍼 파라미터 선언
Batch_size = 64
lr = 0.001
epochs = 40

In [4]:
# GPU 사용
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

In [37]:
TEXT = data.Field(sequential=True, batch_first=True, lower=True)
LABEL = data.Field(sequential=False, batch_first=True)

In [None]:
trainset, testset = datasets.IMDB.splits(TEXT, LABEL)

In [None]:
TEXT.build_vocab(trainset, min_freq=5)
LABEL.build_vocab(trainset)

In [None]:
# 학습 데이터를 쪼개서 검증 데이터셋을 만든다.
trainset, validation_set = trainset.split(split_ratio=0.8)
train_iter, validation_iter, test_iter = data.BucketIterator.splits(
                                        (trainset, validation_set, testset),
                                        batch_size=Batch_size,
                                        shuffle=True, repeat=False)

In [None]:
vocab_size = len(TEXT.vocab)
n_classes = 2

In [None]:
print(f"학습셋 : {len(trainset)}, 검증셋 : {len(validation_set)}, 테스트셋 : {len(testset)}, 단어수 : {vocab_size}, 클래스 : {n_classes}")

In [None]:
# RNN이 아닌 GRU를 사용해서 학습을 진행했다.
class BasicGRU(nn.Module):
    def __init__(self, n_layers, hidden_dim, n_vocab, embed_dim, n_classes, dropout_p = 0.2):
        super(BasicGRU, self).__init__()
        print("building basic GRU model..")
        self.n_layers = n_layers
        self.embed = nn.Embedding(n_vocab, embed_dim)
        self.hidden_dim = hidden_dim
        self.dropout = nn.Dropout(dropout_p)
        self.gru = nn.GRU(embed_dim, self.hidden_dim, num_layers = self.n_layers, batch_first=True)
        self.out = nn.Linear(self.hidden_dim, n_classes)
        
    def forward(self, x):
        x = self.embed(x)
        h_0 = self._init_state(batch_size=x.size(0)) # 첫번째 은닉 벡터
        x, _ = self.gru(x, h_0)
        h_t = x[:,-1,:]
        self.dropout(h_t)
        logit = self.out(h_t)
        return logit
    
    def _init_state(self, batch_size=1):
        weight = next(self.parameters()).data
        return weight.new(self.n_layers, batch_size, self.hidden_dim).zero_()

In [None]:
def train(model, optimizer, train_iter):
    model.train()
    for b, batch in enumerate(train_iter):
        x, y = batch.text.to(device), batch.label.to(device)
        y.data.sub_(1)
        optimizer.zero_grad()
        logit = model(x)
        loss = F.cross_entropy(logit, y)
        loss.backward()
        optimizer.step()
            
def evaluate(model, val_iter):
    model.eval()
    corrects, total_loss = 0, 0
    for batch in val_iter:
        x, y = batch.text.to(device), batch.label.to(device)
        y.data_sub_(1)
        logit = model(x)
        loss = F.cross_entropy(logit, y, reduction='sum')
        total_loss += loss.item()
        corrects += (logit.max(1)[1].view(y.size()).data ==y.data).sum()
    size = len(val_iter.dataset)
    avg_loss = total_loss / size
    avg_accuracy = 100.0 * corrects / size
    return avg_loss, avg_accuracy

In [None]:
model = BasicGRU(1, 256, vocab_size, 128, n_classes, 0.5).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = lr)

In [None]:
best_val_loss = None
for e in range(1, epochs+1):
    print("시작")
    train(model, optimizer, train_iter)
    val_loss, val_accuracy = evaluate(model, val_iter)
    print(f"에폭 : {e}, 검증 오차 : {val_loss}, 검증 정확도 : {val_accuracy}")
    if not best_val_loss or val_loss < best_val_loss:
        if not os.path.isdir("snapshot"):
            os.makedirs("snapshot")
        torch.save(model.state_dict(), './snapshot/txtclassification.pt')
        best_val_loss = val_loss

In [None]:
model.load_state_dict(torch.load('./snapshot/txtclassification.pt'))
text_loss, test_acc = evaluate(model, test_iter)
print(f'테스트 오차 : {test_loss}, 테스트 정확도 : {test_acc}')