Github으로부터 BERT모델을 한국어로 학습시킨 Pretrained Kobert를 불러와 Fine Tuning  
또한 필요한 라이브러리들을 pip install

In [None]:
!pip install 'git+https://github.com/SKTBrain/KoBERT.git#egg=kobert_tokenizer&subdirectory=kobert_hf'

In [None]:
!pip install gluonnlp pandas tqdm

In [None]:
!pip install mxnet

In [None]:
!pip install sentencepiece

In [None]:
import torch
from torch import nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import numpy as np
import pandas as pd
import gluonnlp as nlp
from tqdm import tqdm, tqdm_notebook

from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup
from transformers import BertModel

from kobert_tokenizer import KoBERTTokenizer
from transformers import BertModel

DB를 따로 사용하지 않기에, Google Drive를 Mount하여 사용한다

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
df = pd.read_csv('/content/drive/MyDrive/aipro/new_dataset.csv')

In [None]:
df

In [None]:
df['감정'].unique()

In [None]:
df['감정'].value_counts()

원본 데이터는 AIhub와 다른 여러 장소에서 가져왔다.  
처음 클래스는 7개로 클래스 개수에 비해 Data가 부족하여 상처,슬픔과 비슷한 부류의 감정같은 경우는 슬픔으로 통일하였다.  
또한 기존 CSV Data에서는 "분노", "슬픔"등 한국어로 Labeling이 되어있었으나, 다른 데이터들과 합치면서 수작업으로 일련의 정수로 Labeling 하였다.

0 => 분노 , 1 => 슬픔, 2 => 불안, 3 => 행복

In [None]:
#데이터 전처리 CSV파일에 있는 한 행에 있는 두가지 열을 2차원 리스트로 변환하여 저장한다.
data = []
for q, label in zip(df['문장'],df['감정']):
    data_a = []
    data_a.append(q)
    data_a.append(str(label))
    data.append(data_a)

현재 Dataframe을 보면 알 수 있듯이, Label 0번부터 순서대로 데이터들이 모아져있다.  
또한 Train, Test set으로 Shuffle 후 나누기 위해 Sklearn의 train_test_split 함수를 이용

In [None]:

from sklearn.model_selection import train_test_split
train, test = train_test_split(data, test_size = 0.25, random_state = 0)

In [None]:
print(len(train), len(test))

In [None]:
#Dataset을 정의하는 Class
class BERTDataset(Dataset):
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, vocab, max_len,
                 pad, pair):
        transform = nlp.data.BERTSentenceTransform(
            bert_tokenizer, max_seq_length=max_len, vocab=vocab, pad=pad, pair=pair)

        self.sentences = [transform([i[sent_idx]]) for i in dataset]
        self.labels = [np.int32(i[label_idx]) for i in dataset]

    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i], ))

    def __len__(self):
        return (len(self.labels))

In [None]:
#Hyper parameter
max_len = 64
batch_size = 64
warmup_ratio = 0.1
num_epochs = 10
max_grad_norm = 1
log_interval = 200
learning_rate =  5e-5

Kobert에서 Pretrained된 Tokenizer를 사용한다

In [None]:
tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')
bertmodel = BertModel.from_pretrained('skt/kobert-base-v1', return_dict=False)
vocab = nlp.vocab.BERTVocab.from_sentencepiece(tokenizer.vocab_file, padding_token='[PAD]')
tok = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower = False)

In [None]:
tok = tokenizer.tokenize

In [None]:
senti_data_train = BERTDataset(train, 0, 1, tok, vocab,max_len, True, False)
senti_data_test = BERTDataset(test, 0, 1, tok, vocab,max_len, True, False)

In [None]:
train_dataloader = torch.utils.data.DataLoader(senti_data_train, batch_size=batch_size, num_workers=5)
test_dataloader = torch.utils.data.DataLoader(senti_data_test, batch_size=batch_size, num_workers=5)

In [None]:
class BERTClassifier(nn.Module):
    def __init__(self,
                 bert,
                 hidden_size = 768,
                 num_classes = 4,
                 dr_rate=None,
                 params=None):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate

        self.classifier = nn.Linear(hidden_size , num_classes)
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)

    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)

        _, pooler = self.bert(input_ids = token_ids, token_type_ids = segment_ids.long(), attention_mask = attention_mask.float().to(token_ids.device))
        if self.dr_rate:
            out = self.dropout(pooler)
        return self.classifier(out)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
model = BERTClassifier(bertmodel,  dr_rate=0.5).to(device)
# Prepare optimizer and schedule (linear warmup and decay)
no_decay = ['bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
    {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
    {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
]
optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate)
loss_fn = nn.CrossEntropyLoss()
t_total = len(train_dataloader) * num_epochs
warmup_step = int(t_total * warmup_ratio)
scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_step, num_training_steps=t_total)
def calc_accuracy(X,Y):
    max_vals, max_indices = torch.max(X, 1)
    train_acc = (max_indices == Y).sum().data.cpu().numpy()/max_indices.size()[0]
    return train_acc
train_dataloader

학습

In [None]:
train_score = []
test_score = []
for e in range(num_epochs):
    train_acc = 0.0
    test_acc = 0.0
    model.train()
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm_notebook(train_dataloader)):
        optimizer.zero_grad()
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        valid_length= valid_length
        label = label.long().to(device)
        out = model(token_ids, valid_length, segment_ids)
        loss = loss_fn(out, label)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        optimizer.step()
        scheduler.step()  # Update learning rate schedule
        train_acc += calc_accuracy(out, label)
        if batch_id % log_interval == 0:
            print("epoch {} batch id {} loss {} train acc {}".format(e+1, batch_id+1, loss.data.cpu().numpy(), train_acc / (batch_id+1)))
    train_score.append(train_acc / (batch_id+1))
    print("epoch {} train acc {}".format(e+1, train_acc / (batch_id+1)))

    model.eval()
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm_notebook(test_dataloader)):
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        valid_length= valid_length
        label = label.long().to(device)
        out = model(token_ids, valid_length, segment_ids)
        test_acc += calc_accuracy(out, label)
    test_score.append(test_acc/(batch_id+1))
    print("epoch {} test acc {}".format(e+1, test_acc / (batch_id+1)))


In [None]:
import matplotlib.pyplot as plt
num = np.arange(1,11)
plt.plot(num,train_score)
plt.plot(num,test_score)

0 => 분노 , 1 => 슬픔, 2 => 불안, 3 => 행복



In [None]:
torch.save(model.state_dict(),'new_bert.pth') #모델의 Weight만 저장, 따로 Class를 불러와야함

In [None]:
torch.save(model,'new_full_kobert_model.pth') #모델 Architecture와 Weight 모두 다 저장

In [None]:
def new_softmax(a) :
    c = np.max(a) # 최댓값
    exp_a = np.exp(a-c) # 각각의 원소에 최댓값을 뺀 값에 exp를 취한다. (이를 통해 overflow 방지)
    sum_exp_a = np.sum(exp_a)
    y = (exp_a / sum_exp_a) * 100
    return np.round(y, 3)

def predict(predict_sentence): # 모델을 예측할 때, 입력으로 그냥 문장을 넣으면 안됨, Token화 된 문장을 넣어야하므로 Pedi

    data = [predict_sentence, '0']
    dataset_another = [data]

    another_test = BERTDataset(dataset_another, 0, 1, tok, vocab, max_len, True, False)
    test_dataloader = torch.utils.data.DataLoader(another_test, batch_size=batch_size, num_workers=5)

    model.eval()

    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader):
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)

        valid_length= valid_length
        label = label.long().to(device)

        out = model(token_ids, valid_length, segment_ids)


        test_eval=[]
        for i in out:
            logits=i
            logits = logits.detach().cpu().numpy()
            min_v = min(logits)
            total = 0
            probability = []
            logits = np.round(new_softmax(logits),3).tolist()
            for logit in logits:
                probability.append(np.round(logit,3))

            if np.argmax(logits) == 0:
                test_eval.append("분노가")
            elif np.argmax(logits) == 1:
                test_eval.append("슬픔이")
            elif np.argmax(logits) == 2:
                test_eval.append("불안이")
            elif np.argmax(logits) == 3:
                test_eval.append("행복이")
            print(probability)

        print(">> 입력하신 내용에서 " + test_eval[0] + " 느껴집니다.")

In [None]:
predict("오늘 과제가 많아서 화가나")

In [None]:
predict("성적이 너무 낮게 나왔어")

In [None]:
predict("성적이 너무 낮게 나왔어...")

In [None]:
predict("시험을 잘쳤을까")

In [None]:
predict("배고프다")

모델 불러오기

In [None]:
filepath = '/content/drive/MyDrive/bert.pth'
model = BERTClassifier(bertmodel,dr_rate = 0.5)
model.load_state_dict(torch.load(filepath, map_location = torch.device('cpu')))