In [1]:
from argparse import Namespace
from collections import Counter
import os
import re
import json
import string

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm

## Dataset

In [2]:
class ReviewDataset(Dataset):
    def __init__(self, review_df, vectorizer):
        '''

        :param review_df: 리뷰 데이터셋
        :param vectorizer: ReivewVectorizer 객체
        '''
        self.review_df = review_df
        self._vectorizer = vectorizer

        self.train_df = self.review_df[self.review_df.split == 'train']
        self.train_size = len(self.train_df)

        self.valid_df = self.review_df[self.review_df.split == 'val']
        self.valid_size = len(self.valid_df)

        self.test_df = self.review_df[self.review_df.split == 'test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.valid_df, self.valid_size),
                             'test': (self.test_df, self.test_size)}

    @classmethod
    def load_dataset_make_vectorizer(cls, review_csv):
        '''
        데이터셋을 로드하고 새로운 ReviewVectorizer 객체를 생성
        :param review_csv: 데이터셋 file_path
        :return:
            ReviewDataset의 인스턴스
        '''
        review_df = pd.read_csv(review_csv)
        return cls(review_df, ReviewVectorizer.from_dataframe(review_df))

    def get_vectorizer(self):
        '''ReviewVectorizer 객체 반환'''
        return self._vectorizer

    def set_split(self, split='train'):
        '''
        데이터 프레임에 있는 열을 사용하여 분할세트 선택
        :param split: 'train', 'val', 'test'
        '''
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]

    def __getitem__(self, index):
        '''
        :param index: 데이터 인덱스
        :return:
            feature, label 쌍으로 이루어진 딕셔너리
        '''
        row = self._target_df.iloc[index]

        # review vector
        review_vecotr = self._vectorizer.vectorize(row.review)

        # positive, negative의 인덱스 추출
        rating_index = self._vectorizer.rating_vocab.lookup_token(row.rating)

        return {'x_data': review_vecotr,
                'y_target': rating_index}

    def __len__(self):
        return self._target_size

    def get_num_batches(self, batch_size):
        '''배치 크기가 주어지면 데이터셋으로 만들 수 있는 배치 개수를 반환'''
        return len(self) // batch_size

## Vocabulary

In [3]:
class Vocabulary(object):
    def __init__(self, token_to_idx=None, add_unk=True, unk_token='<UNK>'):
        '''
        머신러닝 파이프라인에 필요한 토큰과 정수 매핑을 관리하는 클래스
        :param token_to_idx: 기존 토큰-인덱스 매핑 딕셔너리
        :param add_unk: UNK토큰을 추가할지 지정하는 플래그
        :param unk_token:
        '''

        if token_to_idx is None:
            token_to_idx = {}

        self._token_to_idx = token_to_idx

        self._idx_to_token = {idx:token for token, idx in self._token_to_idx.items()}

        self._add_unk = add_unk
        self._unk_token = unk_token

        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token)

    def add_token(self, token):
        '''
        토큰을 기반으로 매핑 딕셔너리를 업데이트
        :param token: 추가할 토큰
        :return:
            토큰에 상응하는 정수
        '''
        if token in self._token_to_idx:
            index = self._token_to_idx[token]
        else:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index

    def lookup_token(self, token):
        '''
        토큰에 대응하는 인덱스를 추출
        토큰이 없다면 UNK 반환
        '''

        if self.unk_index >= 0:
            # token이 존재하면 인덱스를 가져오고 없으면 unk_index를 가져옴
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]

    def lookup_index(self, index):
        '''인덱스에 해당하는 토큰을 반환'''
        if index not in self._idx_to_token:
            raise KeyError('Vocabulary에 인덱스(%d)가 없습니다.' % index)
        return self._idx_to_token[index]

    def __str__(self):
        return '<Vocabulary(size=%d)' % len(self)

    def __len__(self):
        return len(self._token_to_idx)

## Vectorizer

In [4]:
class ReviewVectorizer(object):
    '''어휘 사전을 생성하고 관리'''
    def __init__(self, reivew_vocab, rating_vocab):
        '''
        :param reivew_vocab: 단어를 정수에 매핑하는 Vocabulary
        :param rating_vocab: 클래스 레이블을 정수에 매핑하는 Vocabulary
        '''
        self.review_vocab = reivew_vocab
        self.rating_vocab = rating_vocab

    def vectorize(self, review):
        '''리뷰에 대한 원-핫 벡터를 만듦'''
        one_hot = np.zeros(len(self.review_vocab), dtype=np.float32)

        for token in review.split(' '):
            if token not in string.punctuation:
                one_hot[self.review_vocab.lookup_token(token)] = 1

        return one_hot

    @classmethod
    def from_dataframe(cls, review_df, cut_off=25):
        '''
        데이터셋 데이터프레임에서 Vectorizer 객체를 만듦
        :param review_df: 리뷰 데이터셋
        :param cut_off: 빈도 기반 필터링 설정값
        :return:
            ReviewVectorizer 객체
        '''
        review_vocab = Vocabulary(add_unk=True)
        rating_vocab = Vocabulary(add_unk=False)

        # 점수를 추가
        for rating in sorted(set(review_df.rating)):
            rating_vocab.add_token(rating)

        # count > cut_off인 단어 추가
        word_counts = Counter()
        for review in review_df.review:
            for word in review.split(' '):
                if word not in string.punctuation:
                    word_counts[word] += 1

        for word, count in word_counts.items():
            if count > cut_off:
                review_vocab.add_token(word)

        return cls(review_vocab, rating_vocab)

## DataLoader

In [5]:
def generate_batches(dataset, batch_size, shuffle=True, drop_last=True, device='cpu'):
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict

## ReviewClassifier 모델

In [6]:
class ReviewClassifier(nn.Module):
    '''간단한 퍼셉트론 기반 분류기'''
    def __init__(self, num_features):
        '''
        :param num_features: 입력 특성 벡터의 크기
        '''
        super(ReviewClassifier, self).__init__()
        self.fc1 = nn.Linear(in_features=num_features,
                             out_features=1)

    def forward(self, x_in, apply_sigmoid=False):
        '''
        분류기 정방향 계산
        :param x_in:
            입력 데이터 텐서
            x_in.shape = (batch, num_features)
        :param apply_sigmoid:
            시그모이드 활성화 함수를 위한 플래그
            크로스 엔트로피 손실을 사용하려면 False로 지정
        :return:
            결과 텐서
            tensor.shape = (batch,)
        '''
        y_out = self.fc1(x_in).squeeze()
        if apply_sigmoid:
            y_out = torch.sigmoid(y_out)
        return y_out

## 설정

In [7]:
args = Namespace(
    frequency_cutoff=25,
    model_state_file='model.pth',
    review_csv='data/reviews_with_splits_lite.csv',
    save_dir='model_storage',
    # 모델 하이퍼파라미터 없음
    # 훈련 파라미터
    batch_size=128,
    early_stopping=5,
    lr=0.001,
    num_epochs=100,
    sedd=1337
)

In [8]:
def make_train_state(args):
    return {'epoch_index': 0,
            'train_loss': [],
            'train_acc': [],
            'val_loss': [],
            'val_acc': [],
            'test_loss': -1,
            'test_acc': -1}

def compute_accuracy(y_pred, y_target):
    y_target = y_target.cpu()
    y_pred_indices = (torch.sigmoid(y_pred)>0.5).cpu().long()#.max(dim=1)[1]
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100

train_state = make_train_state(args)

device = torch.device('cpu')

# dataset, vectorizer
dataset = ReviewDataset.load_dataset_make_vectorizer(args.review_csv)
vectorizer = dataset.get_vectorizer()

# model
classifier = ReviewClassifier(num_features=len(vectorizer.review_vocab))
classifier = classifier.to(device)

# loss, optim
loss_func = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(classifier.parameters(), lr=args.lr)

In [9]:
epoch_bar = tqdm(desc='training routine',
                          total=args.num_epochs,
                          position=0)

dataset.set_split('train')
train_bar = tqdm(desc='split=train',
                          total=dataset.get_num_batches(args.batch_size),
                          position=1,
                          leave=True)

dataset.set_split('val')
val_bar = tqdm(desc='split=val',
                        total=dataset.get_num_batches(args.batch_size),
                        position=1,
                        leave=True)

for epoch_index in range(args.num_epochs):
    train_state['epoch_index'] = epoch_index

    # 훈련 세트 순회
    # 훈련 세트와 배치 제너레이터 준비, 손실과 정확도를 0으로 설정
    dataset.set_split('train')
    batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=device)

    running_loss = 0.0
    running_acc = 0.0
    classifier.train()

    for batch_index, batch_dict in enumerate(batch_generator):
        # 훈련 과정은 5단계로 이루어짐

        # 1단계 그레이디언트를 0으로 초기화
        optimizer.zero_grad()

        # 2단계 출력을 계산
        y_pred = classifier(x_in=batch_dict['x_data'].float())

        # 3단계 손실을 계산
        loss = loss_func(y_pred, batch_dict['y_target'].float())
        loss_batch = loss.item()
        running_loss += (loss_batch - running_loss) / (batch_index + 1)

        # 4단계 손실을 사용해 그레이디언트를 계산
        loss.backward()

        # 5단계 옵티마이저로 가중치를 업데이트
        optimizer.step()

        # 정확도를 계산
        acc_batch = compute_accuracy(y_pred, batch_dict['y_target'])
        running_acc += (acc_batch - running_acc) / (batch_index + 1)

        # 진행바 업데이트
        train_bar.set_postfix(loss=running_loss,
                                  acc=running_acc,
                                  epoch=epoch_index)
        train_bar.update()

    train_state['train_loss'].append(running_loss)
    train_state['train_acc'].append(running_acc)

    # 검증 세트 순회
    # 검증 세트와 배치 제너레이터 준비, 손실과 정확도를 0으로 설정
    dataset.set_split('val')
    batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=device)

    running_loss = 0.
    running_acc = 0.
    classifier.eval()

    for batch_index, batch_dict in enumerate(batch_generator):
        # 1단계 출력을 계산
        y_pred = classifier(x_in=batch_dict['x_data'].float())

        # 2단계 손실을 게산
        loss = loss_func(y_pred, batch_dict['y_target'].float())
        loss_batch = loss.item()
        running_loss += (loss_batch - running_loss) / (batch_index + 1)

        # 3단계 정확도를 계산
        acc_batch = compute_accuracy(y_pred, batch_dict['y_target'])
        running_acc += (acc_batch - running_acc) / (batch_index + 1)

        # 진행바 업데이트
        val_bar.set_postfix(loss=running_loss,
                                  acc=running_acc,
                                  epoch=epoch_index)
        val_bar.update()

    train_state['val_loss'].append(running_loss)
    train_state['val_acc'].append(running_acc)

    train_bar.n = 0
    val_bar.n = 0
    epoch_bar.update()

training routine:   0%|          | 0/100 [00:00<?, ?it/s]

split=train:   0%|          | 0/306 [00:00<?, ?it/s]

split=val:   0%|          | 0/65 [00:00<?, ?it/s]

In [10]:
dataset.set_split('test')
batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=device)

running_loss = 0.
running_acc = 0.
classifier.eval()

for batch_index, batch_dict in enumerate(batch_generator):
    # 출력을 계산
    y_pred = classifier(x_in=batch_dict['x_data'].float())

    # 손실을 계산
    loss = loss_func(y_pred, batch_dict['y_target'].float())
    loss_batch = loss.item()
    running_loss += (loss_batch - running_loss) / (batch_index + 1)

    # 정확도를 계산
    acc_batch = compute_accuracy(y_pred, batch_dict['y_target'])
    running_acc += (acc_batch - running_acc) / (batch_index + 1)

train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc

In [11]:
print("Test Loss: {:.3f}".format(train_state['test_loss']))
print("Test Acc: {:.2f}".format(train_state['test_acc']))

Test Loss: 0.340
Test Acc: 89.88


## 추론

In [12]:
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r"([.,!?])", r" \1 ", text)
    text = re.sub(r"[^a-zA-Z.,!?]+", r" ", text)
    return text

In [13]:
def predict_rating(review, classifier, vectorizer, decision_threshold=0.5):
    '''
    리뷰 점수 예측하기
    :param review: 리뷰 텍스트
    :param classifier: 훈련된 모델
    :param vectorizer: Vectorizer 객체
    :param decision_threshold: 클래스를 나눌 결정 경계
    '''
    review = preprocess_text(review)
    vectorized_review = torch.tensor(vectorizer.vectorize(review))
    result = classifier(vectorized_review.view(1, -1))

    probability_value = torch.sigmoid(result).item()

    index = 1
    if probability_value < decision_threshold:
        index = 0
    return vectorizer.rating_vocab.lookup_index(index)

In [17]:
test_review = 'this is a worst book'
prediction = predict_rating(test_review, classifier, vectorizer)
print('{} -> {}'.format(test_review, prediction))

this is a worst book -> negative


## 모델 가중치 분석

In [18]:
# 가중치 정렬
fc1_weights = classifier.fc1.weight.detach()[0]
_, indices = torch.sort(fc1_weights, dim=0, descending=True)
indices = indices.numpy().tolist()

# 긍정적인 단어 상위 20개
print('긍정적인 영향을 미치는 단어:')
print('----------------------------')
for i in range(20):
    print(vectorizer.review_vocab.lookup_index(indices[i]))

긍정적인 영향을 미치는 단어:
----------------------------
pleasantly
painless
limo
talkative
squid
rocked
unassuming
artsy
mmmmmm
relaxed
fo
superb
komol
unbeatable
ichiza
nbest
luv
deliciousness
eclectic
watering


In [19]:
# 부정적인 단어 상위 20개
print('부정적인 영향을 미치는 단어:')
print('----------------------------')
indices.reverse()
for i in range(20):
    print(vectorizer.review_vocab.lookup_index(indices[i]))

부정적인 영향을 미치는 단어:
----------------------------
underwhelmed
roach
lukewarm
unimpressed
horrid
burden
peeling
proudly
assed
worst
redeeming
unorganized
appalled
uninspired
musty
disorganized
meh
rotten
scam
clowns
