## Install KoBERT

In [None]:
!pip install git+https://git@github.com/SKTBrain/KoBERT.git@master
!pip install 'git+https://github.com/SKTBrain/KoBERT.git#egg=kobert_tokenizer&subdirectory=kobert_hf'

## Used Library



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import gluonnlp as nlp
import numpy as np
from tqdm import tqdm, tqdm_notebook
import pandas as pd
import matplotlib.pyplot as plt
import csv
import os
import collections
import json

from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup
from transformers import BertModel

from kobert.utils import get_tokenizer
from kobert.pytorch_kobert import get_pytorch_kobert_model

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Class and function definitions

In [None]:
class BERTDataset(Dataset):
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, max_len,
                 pad, pair):
        transform = nlp.data.BERTSentenceTransform(
            bert_tokenizer, max_seq_length=max_len, pad=pad, pair=pair)
        self.sentences = [transform([i[sent_idx]]) for i in dataset]
        self.labels = [np.int32(i[label_idx]) for i in dataset]

    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i], ))
 
    def __len__(self):
        return (len(self.labels))

In [None]:
class BERTClassifier(nn.Module):
    def __init__(self,
                 bert,
                 hidden_size = 768, 
                 num_classes=7,
                 dr_rate=None,
                 params=None):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
        self.classifier = nn.Linear(hidden_size , num_classes)
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)


    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()


    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)
        _, pooler = self.bert(input_ids = token_ids, token_type_ids = segment_ids.long(), attention_mask = attention_mask.float().to(token_ids.device))

        if self.dr_rate:
            out = self.dropout(pooler)
        else:
            out = pooler
        return self.classifier(out)

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, input, target):
        BCE_loss = F.cross_entropy(input, target, reduction='none')
        pt = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1 - pt) ** self.gamma * BCE_loss

        if self.reduction == 'mean':
            return F_loss.mean()
        elif self.reduction == 'sum':
            return F_loss.sum()
        else:
            return F_loss

In [None]:
def calc_accuracy(X,Y):
    max_vals, max_indices = torch.max(X, 1)
    train_acc = (max_indices == Y).sum().data.cpu().numpy()/max_indices.size()[0]
    return train_acc

In [None]:
def test_models(state_dict_filepath):
    model = BERTClassifier(bertmodel, dr_rate=0.5).to(device)
    model.load_state_dict(torch.load(state_dict_filepath))
    model.eval()

    loss_fn = FocalLoss()

    test_losses = []
    test_accs = []
    test_acc = 0.0
    test_loss = 0.0
    with torch.no_grad():
        for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader):
            token_ids = token_ids.long().to(device)
            segment_ids = segment_ids.long().to(device)
            valid_length = valid_length
            label = label.long().to(device)
            out = model(token_ids, valid_length, segment_ids)
            loss = loss_fn(out, label)
            test_acc += calc_accuracy(out, label)
            test_loss += loss.item()
        test_loss /= (batch_id+1)
        test_acc /= (batch_id+1)
        test_losses.append(test_loss)
        test_accs.append(test_acc)

    print(f"Test Loss: {test_loss}, Test Accuracy: {test_acc}")

    return test_acc

In [None]:
def new_softmax(a):
    c = np.max(a)
    exp_a = np.exp(a - c)
    sum_exp_a = np.sum(exp_a)
    y = (exp_a / sum_exp_a) * 100
    return np.round(y, 3)

In [None]:
def predict(predict_sentence, true_label):
    data = [predict_sentence, true_label]
    dataset_another = [data]

    another_test = BERTDataset(dataset_another, 0, 1, tok, max_len, True, False)
    test_dataloader = torch.utils.data.DataLoader(another_test, batch_size=batch_size, num_workers=5)

    model.eval()

    y_true = []
    y_pred = []
    probabilities = []

    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader):
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        label = label.long().to(device)

        out = model(token_ids, valid_length, segment_ids)

        for i in out:
            logits = i
            logits = logits.detach().cpu().numpy()

            logits = np.round(new_softmax(logits), 3).tolist()
            predicted_probabilities = [np.round(logit, 3) for logit in logits]

            emotion_classes = ["fear", "surprise", "angry", "sad", "neutral", "happy", "disgust"]
            predicted_label = np.argmax(logits)
            predicted_emotion = emotion_classes[predicted_label]

            y_true.append(int(true_label))
            y_pred.append(predicted_label)
            probabilities.append(predicted_probabilities)

    return y_true, y_pred, probabilities


## Model Learning

In [None]:
bertmodel, vocab = get_pytorch_kobert_model(cachedir=".cache")
tokenizer = get_tokenizer()
tok = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower=False)

### Path setting is required.
dataset_train = pd.read_csv('train_text_dataset.csv', encoding='utf-8')
dataset_train.dropna(inplace=True)
dataset_train = dataset_train[dataset_train != '.'].dropna()

dataset_train_list = [[str(text), int(label)] for text, label in zip(dataset_train['Text'], dataset_train['Lable'])]

max_len = 64
batch_size = 64
warmup_ratio = 0.1
num_epochs = 10  
max_grad_norm = 1
log_interval = 200
learning_rate = 5e-5

k_folds = 5
skf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

for fold, (train_index, val_index) in enumerate(skf.split(dataset_train_list, [item[1] for item in dataset_train_list])):
    print(f"Fold {fold + 1}")

    train_data = [dataset_train_list[i] for i in train_index]
    val_data = [dataset_train_list[i] for i in val_index]
    data_train = BERTDataset(train_data, 0, 1, tok, max_len, True, False)
    data_val = BERTDataset(val_data, 0, 1, tok, max_len, True, False)
    train_dataloader = torch.utils.data.DataLoader(data_train, batch_size=batch_size, num_workers=5)
    val_dataloader = torch.utils.data.DataLoader(data_val, batch_size=batch_size, num_workers=5)

    model = BERTClassifier(bertmodel, dr_rate=0.5).to(device)

    t_total = len(train_dataloader) * num_epochs
    warmup_step = int(t_total * warmup_ratio)
    no_decay = ['bias', 'LayerNorm.weight']
    optimizer_grouped_parameters = [
        {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
        {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
    ]
    optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate)
    loss_fn = FocalLoss()
    scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=warmup_step, num_training_steps=t_total)

    train_losses, train_accs, val_losses, val_accs = [], [], [], []

    for e in range(num_epochs):
      train_acc, train_loss, val_acc, val_loss = 0.0, 0.0, 0.0, 0.0
      model.train()

      for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(tqdm_notebook(train_dataloader)):
          optimizer.zero_grad()
          token_ids = token_ids.long().to(device)
          segment_ids = segment_ids.long().to(device)
          valid_length= valid_length
          label = label.long().to(device)
          out = model(token_ids, valid_length, segment_ids)
          loss = loss_fn(out, label)
          loss.backward()
          torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
          optimizer.step()
          scheduler.step()
          train_acc += calc_accuracy(out, label)
          train_loss += loss.item()
          if batch_id % log_interval == 0:
              print(f"epoch {e+1} batch id {batch_id+1} loss {loss.data.cpu().numpy()} train acc {train_acc / (batch_id+1)}")
      train_loss /= (batch_id+1)
      train_acc /= (batch_id+1)
      train_losses.append(train_loss)
      train_accs.append(train_acc)
      print(f"epoch {e+1} train loss {train_loss} train acc {train_acc}")
          
      model.eval()
      with torch.no_grad():
        for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(val_dataloader):
            token_ids = token_ids.long().to(device)
            segment_ids = segment_ids.long().to(device)
            label = label.long().to(device)
            out = model(token_ids, valid_length, segment_ids)
            loss = loss_fn(out, label)
            val_acc += calc_accuracy(out, label)
            val_loss += loss.item()
        val_loss /= (batch_id + 1)
        val_acc /= (batch_id + 1)
        val_losses.append(val_loss)
        val_accs.append(val_acc)

      print(f"epoch {e+1} val loss {val_loss} val acc {val_acc}")

    ### Path setting is required.
    PATH = 'your/path'
    fold_path = PATH + f"Focal_KoBERT_e10_b64_fold_{fold + 1}"
    state_dict_path = fold_path + "_state_dict.pt"
    torch.save(model, fold_path + ".pt")
    torch.save(model.state_dict(), state_dict_path)
    torch.save({
        "model": model.state_dict(),
        "optimizer": optimizer.state_dict()
    }, fold_path + "_all.tar")

    state_dict_paths = []
    state_dict_paths.append(state_dict_path)


fig, axs = plt.subplots(figsize=(10, 10))
axs.plot(train_losses, label="Train loss")
axs.plot(val_losses, label="Val loss")
axs.plot(train_accs, label="Train acc")
axs.plot(val_accs, label="Val acc")
axs.set_title("Focal_e10_b64_f5")
axs.legend()
plt.savefig(PATH + "Focal_e10_b64_f5.png")
plt.show()

## Learned model Test

In [None]:
### Path setting is required.
csv_file_path = 'test_text_dataset.csv'
dataset_test = pd.read_csv(csv_file_path, encoding='utf-8') 
dataset_test.dropna(inplace=True)
dataset_test = dataset_test[dataset_test != '.'].dropna()

dataset_test_list = [[str(text), int(label)] for text, label in zip(dataset_test['Text'], dataset_test['Lable'])]

test_data = BERTDataset(dataset_test_list, 0, 1, tok, max_len, True, False)
test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, num_workers=5)

accuracies = []
for state_dict_filepath in state_dict_filepaths:
    test_accuracy = test_models(state_dict_filepath)
    accuracies.append(test_accuracy)

best_model_index = accuracies.index(max(accuracies))
best_model_filepath = state_dict_filepaths[best_model_index]
print("Best model filepath:", best_model_filepath)

In [None]:
model = torch.load(best_model_filepath.replace("_state_dict", "")) 
model.load_state_dict(torch.load(best_model_filepath))

predictions = []
with torch.no_grad():
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader):
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)
        valid_length = valid_length
        label = label.long().to(device)

        outputs = model(token_ids, valid_length, segment_ids)
        _, predicted = torch.max(outputs, dim=1)
        predictions.extend(predicted.tolist())

target_names = ["fear", "surprise", "angry", "sad", "neutral", "happy", "disgust"]
print(classification_report(test_data.labels, predictions, target_names=target_names))

## Learned model Prediction

In [None]:
model = torch.load(best_model_filepath.replace("_state_dict", ""))
model.load_state_dict(torch.load(best_model_filepath))

### Path setting is required.
csv_file_path = 'test_text_dataset.csv'

with open(csv_file_path, newline='', encoding='utf-8') as csvfile:
    csv_reader = csv.reader(csvfile)
    
    next(csv_reader)

    y_true_list = []  
    y_pred_list = []  

    for row in csv_reader:
        sentence = row[2]
        true_label = row[3]

        y_true, y_pred, probabilities = predict(sentence, true_label)

        target_names = ["fear", "surprise", "angry", "sad", "neutral", "happy", "disgust"]
        y_true_names = [target_names[i] for i in y_true]
        y_pred_names = [target_names[i] for i in y_pred]

        y_true_list.append(y_true_names)
        y_pred_list.append(y_pred_names)
        
        print(f"{row[0]}, {sentence}: True Label = {y_true}, Prediction = {y_pred}, Probability = " + probabilities)