# KOBERT를 이용해 감정 분류하기


`필요 환경 및 패키지 설치`

In [None]:
!pip install mxnet
!pip install pandas tqdm
!pip install sentencepiece
!pip install transformers
!pip install torch
!pip install gluonnlp==0.10.0
!pip install 'git+https://github.com/SKTBrain/KoBERT.git#egg=kobert_tokenizer&subdirectory=kobert_hf'

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd

import gluonnlp as nlp
import numpy as np
from tqdm import tqdm, tqdm_notebook
from kobert_tokenizer import KoBERTTokenizer
from transformers import BertModel

from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup
import urllib.request
from google.colab import drive

drive.mount('/content/drive')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

kobert_tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')

word_tokenizer = kobert_tokenizer.tokenize

kobertmodel = BertModel.from_pretrained('skt/kobert-base-v1', return_dict=False)

vocab = nlp.vocab.BERTVocab.from_sentencepiece(kobert_tokenizer.vocab_file, padding_token='[PAD]')

`사전 파라미터 지정`

In [None]:
max_len = 128
batch_size = 64
warmup_ratio = 0.1
num_epochs = 5
max_grad_norm = 1
log_interval = 200
learning_rate =  5e-5

`긍부정 데이터셋 불러오기`

In [None]:
urllib.request.urlretrieve("https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt", filename="ratings_train.txt")
urllib.request.urlretrieve("https://raw.githubusercontent.com/e9t/nsmc/master/ratings_test.txt", filename="ratings_test.txt")

# 긍정 1 부정 0
train_data = pd.read_table('ratings_train.txt')
test_data = pd.read_table('ratings_test.txt')


# pn_train_data
pn_train_data_list = []
for sent, label in zip(train_data['document'], train_data['label']):
    pn_train_data = []
    pn_train_data.append(str(sent))
    if label == 1 :
        pn_train_data.append((1,0))
    elif label == 0 :
        pn_train_data.append((0,1))

    pn_train_data_list.append(pn_train_data)

# pn_test_data
pn_test_data_list = []
for sent, label in zip(test_data['document'], test_data['label']):
    pn_test_data = []
    pn_test_data.append(str(sent))
    if label == 1 :
        pn_test_data.append((1,0))
    elif label == 0 :
        pn_test_data.append((0,1))

    pn_test_data_list.append(pn_test_data)

In [None]:
pn_test_data

['마무리는 또 왜이래', (0, 1)]

`pn_dataset 구축 및 dataloader 구축`

In [None]:
class SENT_BERTDataset(Dataset):
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, vocab, max_len,
                 pad, pair):
        self.transform = nlp.data.BERTSentenceTransform(bert_tokenizer, max_seq_length=max_len, vocab = vocab, pad=pad, pair=pair)
        self.sentences = [self.transform([i[sent_idx]]) for i in dataset]
        self.labels = torch.tensor([(int(i[label_idx][0]), int(i[label_idx][1])) for i in dataset])

    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i], ))

    def __len__(self):
        return (len(self.labels))


pn_data_train = SENT_BERTDataset(pn_train_data_list, 0, 1, word_tokenizer, vocab, max_len, True, False)
pn_data_test = SENT_BERTDataset(pn_test_data_list, 0, 1, word_tokenizer, vocab, max_len, True, False)

# DataLoader의 역할 : 한번에 batch_size만큼 시키는 iterable 생성
pn_train_dataloader = torch.utils.data.DataLoader(pn_data_train, batch_size = batch_size, num_workers = 5)
pn_test_dataloader = torch.utils.data.DataLoader(pn_data_test, batch_size = batch_size, num_workers = 5)

# ESG 모델 + 긍부정 분류 모델

In [None]:
class BERT_SENT_Classifier(nn.Module):
    def __init__(self,
                 bert, # bert 모델 받아오기
                 hidden_size = 768, # 은닉층의 크기(기준 수)
                 num_classes = 2,   # [E,S,G]
                 dr_rate = None, # dropout_rate : 신경망 학습 중에 일부 뉴런을 무작위로 제거하여 과적합을 방지하고 모델의 일반화 성능을 향상시키는 기법
                 params = None):
        super(BERT_SENT_Classifier, self).__init__()
        self.bert = bert # 사용할 bert 모델 지정
        self.dr_rate = dr_rate # dropout 비율 지정
        self.sent_classifier = nn.Linear(hidden_size , num_classes)
        if dr_rate: # dr_rate가 0과 None이 아니고 (0~1) 사이로 설정되었을 경우에는
            self.dropout = nn.Dropout(p = dr_rate) # nn.dropout을 실행한다.
        self.activation_softmax = nn.Softmax(dim = 1)



    def gen_attention_mask(self, token_ids, valid_length): # attention mask를 생성하는 것 : 이 문장을 바라보는 전문가들 생성한다.
        attention_mask = torch.zeros_like(token_ids) # token_ids와 같은 크기를 가지고 있는 0으로 채워지는 것들  : 뭔가 포지셔널 인코딩일 것 같은 느낌
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1 # 가리지 않는 부분 설정하기
        return attention_mask.float() # dtype 은 float32로 한다.

    def forward(self, token_ids, valid_length, segment_ids): # segment_ids 는 문장단위를 나누는 임베딩 부분인 것 같다. 한 문장일 경우 0000, 두 문장 이상일 경우 00..111
        attention_mask = self.gen_attention_mask(token_ids, valid_length)
        # torch.long() : int64 타입
        _, pooler = self.bert(input_ids = token_ids, token_type_ids = segment_ids.long(), attention_mask = attention_mask.float().to(token_ids.device), return_dict = False)
        if self.dr_rate:
            out = self.dropout(pooler)
        posneg = self.sent_classifier(out)
        activation_posneg = self.activation_softmax(posneg)

        return activation_posneg

`모델 정의하기`

In [None]:
# BERT  모델 불러오기
model = BERT_SENT_Classifier(kobertmodel,  dr_rate = 0.5).to(device)



# kober model + esg_classifier freezing 하기
for name, para in model.named_parameters() :
    if not name.count('sent_classifier') :
        para.requires_grad = False

# 옵티마이저 생성 시 전달해줄 파라미터 정의
no_decay = ['bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
    {'params': [weight for name, weight in model.named_parameters() if not any(nd in name for nd in no_decay)], 'weight_decay': 0.01},
    {'params': [weight for name, weight in model.named_parameters() if any(nd in name for nd in no_decay)], 'weight_decay': 0.0} ]


# 옵티마이저 정의
optimizer = AdamW(optimizer_grouped_parameters, lr = learning_rate)

# 손실함수 정의
loss_fn = nn.BCELoss()

# 스케쥴러 생성 시 전달해줄 파라미터 정의
t_total = len(pn_train_dataloader) * num_epochs
warmup_step = int(t_total * warmup_ratio)

# 스케쥴러 정의
scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps = warmup_step, num_training_steps = t_total)

`긍부정 정확도 계산 함수 정의`

In [25]:
# 긍부정 분류 시에 사용한다.
def sent_calc_accuracy(X,Y):
    count =  0
    for pred, result in zip(out, label) :

        temp = []
        for p in pred :
            if p < 0.5 :
                temp.append(0)
            else :
                temp.append(1)
        temp = torch.tensor(temp)
        if abs(temp.sub(result)).sum():
            count +=1
    train_acc = count/batch_size
    return train_acc


`모델 학습 코드`

In [26]:
# 감정 분류 시에 사용한다.
for e in range(1):
    train_acc = 0.0
    model.train() # model을 훈련모드로 바꾸고, 가중치가 업데이트 될 수 있게 한다.
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm_notebook(pn_train_dataloader)):
        # 옵티마이저의 미분값을 0으로 초기화
        optimizer.zero_grad()

        # model의 forward 인자 설정
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        valid_length= valid_length
        label = label.float().to(device)

        # model output 도출
        out = model.forward(token_ids, valid_length, segment_ids)
        break
        # 모델 output과 label(정답)과의 손실함수 정의
        loss = loss_fn(out, label)

        # 손실함수의 기울기 계산
        loss.backward()

        # gradient vanishing 또는 gradient exploding을 방지하기 위한 gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)

        # 기울기 반영한 가중치 업데이트
        optimizer.step()
        scheduler.step()

        train_acc += sent_calc_accuracy(out, label)

        if batch_id % log_interval == 0:
             print(f'epoch : {e+1} | batch_id : {batch_id + 1} | loss : {loss.data.cpu().numpy()}| accuracy : {train_acc / batch_size}')

    print("epoch : {} train acc : {}".format(e+1, train_acc / batch_size))

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm_notebook(pn_train_dataloader)):


  0%|          | 0/2344 [00:00<?, ?it/s]

  self.pid = os.fork()


epoch : 1 train acc : 0.0


`학습 완료시킨 model을 이용한 predict 함수`

In [None]:
def sent_predict(predict_sentence): # input = 감정분류하고자 하는 sentence

    data = [predict_sentence, '0']
    dataset_another = [data]

    another_test = SENT_BERTDataset(dataset_another, 0, 1, word_tokenizer, vocab, max_len, True, False) # 토큰화한 문장
    test_dataloader = torch.utils.data.DataLoader(another_test, batch_size = batch_size, num_workers = 5) # torch 형식 변환

    model.eval()

    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader):
        # model의 forward 인자 설정
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        valid_length = valid_length
        label = label.long().to(device)
        # model output 도출
        out = model.forward(token_ids, valid_length, segment_ids)


        test_eval = []
        for i in out: # out = model(token_ids, valid_length, segment_ids)
            logits = i
            logits = logits.detach().cpu().numpy()

            if np.argmax(logits) == 0 :
                test_eval.append(f'긍정 문장 : 문장의 긍정 수치 : {logits[0]}, 문장의 부정 수치 : {logits[1]}')
            elif np.argmax(logits) == 1 :
                test_eval.append(f'긍정 문장 : 문장의 긍정 수치 : {logits[0]}, 문장의 부정 수치 : {logits[1]}')



    return test_eval[0]

`sent_predict 해보기

In [None]:
# 테스트 데이터 불러오기
test_data = pd.read_csv('/content/drive/MyDrive/kobert_modeling/naver_news_test.csv')

# 문장 추출
sents = test_data['content']
for i, sent in enumerate(sents[:10]) :
    esg_output = sent_predict(sent)
    print(f'{i+1}번 문장은' + esg_output)

`모델 저장하기`

In [None]:
# 저장하기(모델 추가 계층 및 옵티마이저)
torch.save({'model_sent_classifier.state_dict': model.sent_classifier.state_dict()}, '/content/drive/MyDrive/model_checkpoint/sent_version1.pt')