# 모듈 import 및 전역 변수 설정

In [None]:
import json
import os

import torch
import torch.nn as nn
from tqdm import trange
from transformers import AutoModel, AutoTokenizer
from torch.utils.data import DataLoader, TensorDataset
from transformers import get_linear_schedule_with_warmup
from transformers import AdamW
from datasets import load_metric
from sklearn.metrics import f1_score
import pandas as pd
import copy

PADDING_TOKEN = 1
S_OPEN_TOKEN = 0
S_CLOSE_TOKEN = 2

do_eval=True

# 모델이 저장될 경로 설정
property_extraction_model_path = '/home/ubuntu/project/big_small_train_dev/small'

# 모델을 이어서 학습할 시 불러올 파일 경로 설정
test_property_extraction_model_path = '/home/ubuntu/project/big_small_train_dev/small/saved_model_epoch_7.pt'

# 학습할 json 데이터 설정
train_data_path = '/home/ubuntu/project/data_train+dev.json'
# 학습 시 evaluation할 json 데이터 설정
dev_data_path = '/home/ubuntu/project/data_train+dev.json'
# 학습 후 저장된 pt파일로 predict파일을 만들 json 데이터 설정
test_data_path = '/home/ubuntu/project/kluge-sa-2022-test.jsonl'

max_len = 256
batch_size = 8
# 학습시 사용할 베이스 모델 설정
base_model = 'kykim/electra-kor-base'
learning_rate = 3e-6
eps = 1e-8
num_train_epochs = 24
classifier_hidden_size = 768
classifier_dropout_prob = 0.1

# 속성 값 설정
property_pair = ['일반', '품질', '가격', '디자인', '편의성', '인지도', '다양성']

tf_id_to_name = ['True', 'False']
tf_name_to_id = {tf_id_to_name[i]: i for i in range(len(tf_id_to_name))}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

special_tokens_dict = {
    'additional_special_tokens': ['&name&', '&affiliation&', '&social-security-num&', '&tel-num&', '&card-num&', '&bank-account&', '&num&', '&online-account&']
}

json 및 jsonl 파일 read, write 함수

In [None]:
def jsonload(fname, encoding="utf-8"):
    with open(fname, encoding=encoding) as f:
        j = json.load(f)

    return j

# json 개체를 파일이름으로 저장
def jsondump(j, fname):
    with open(fname, "w", encoding="UTF8") as f:
        json.dump(j, f, ensure_ascii=False)

# jsonl 파일 읽어서 list에 저장
def jsonlload(fname, encoding="utf-8"):
    json_list = []
    with open(fname, encoding=encoding) as f:
        for line in f.readlines():
            json_list.append(json.loads(line))
    return json_list

# 모델 정의
kykim_electra 모델을 기반으로 한 classification 모델 이용

In [None]:
class SimpleClassifier(nn.Module):

    def __init__(self, num_label):
        super().__init__()
        self.dense = nn.Linear(classifier_hidden_size, classifier_hidden_size)
        self.dropout = nn.Dropout(classifier_dropout_prob)
        self.output = nn.Linear(classifier_hidden_size, num_label)

    def forward(self, features):
        x = features[:, 0, :]
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.output(x)
        return x


class ELECTRABaseClassifier(nn.Module):
    def __init__(self, num_label, len_tokenizer):
        super(ELECTRABaseClassifier, self).__init__()

        self.num_label = num_label
        self.kykim_electra = AutoModel.from_pretrained(base_model)
        self.kykim_electra.resize_token_embeddings(len_tokenizer)

        self.labels_classifier = SimpleClassifier(self.num_label)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.kykim_electra(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=None
        )

        sequence_output = outputs[0]
        logits = self.labels_classifier(sequence_output)

        loss = None

        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_label),
                                                labels.view(-1))

        return loss, logits


# 데이터 파싱 및 tokenization 함수 정의


In [None]:
def property_tokenize_and_align_labels(tokenizer, form, annotations, max_len):

    property_dict = {
        'input_ids': [],
        'attention_mask': [],
        'label': []
    }

    for pair in property_pair:
        isPairInOpinion = False
        if pd.isna(form):
            break
        tokenized_data = tokenizer(form, pair, padding='max_length', max_length=max_len, truncation=True)
        for annotation in annotations:
            property = annotation[0].split('#')[1] # 데이터 만들 시 property 만 받아옴

            if property == pair:
                property_dict['input_ids'].append(tokenized_data['input_ids'])
                property_dict['attention_mask'].append(tokenized_data['attention_mask'])
                property_dict['label'].append(tf_name_to_id['True'])

                isPairInOpinion = True
                break

        if isPairInOpinion is False:
            property_dict['input_ids'].append(tokenized_data['input_ids'])
            property_dict['attention_mask'].append(tokenized_data['attention_mask'])
            property_dict['label'].append(tf_name_to_id['False'])

    return property_dict

def property_get_dataset(raw_data, tokenizer, max_len):
    input_ids_list_property = []
    attention_mask_list_property = []
    token_labels_list_property = []

    for utterance in raw_data:
        property_dict = property_tokenize_and_align_labels(tokenizer, utterance['sentence_form'], utterance['annotation'], max_len)
        input_ids_list_property.extend(property_dict['input_ids'])
        attention_mask_list_property.extend(property_dict['attention_mask'])
        token_labels_list_property.extend(property_dict['label'])

    return TensorDataset(torch.tensor(input_ids_list_property), torch.tensor(attention_mask_list_property), torch.tensor(token_labels_list_property))

# 모델 학습

In [None]:
def evaluation(y_true, y_pred, label_len):
    count_list = [0]*label_len
    hit_list = [0]*label_len
    for i in range(len(y_true)):
        count_list[y_true[i]] += 1
        if y_true[i] == y_pred[i]:
            hit_list[y_true[i]] += 1
    acc_list = []

    for i in range(label_len):
        acc_list.append(hit_list[i]/count_list[i])

    print(count_list)
    print(hit_list)
    print(acc_list)
    print('accuracy: ', (sum(hit_list) / sum(count_list)))
    print('macro_accuracy: ', sum(acc_list) / 3)
    # print(y_true)

    y_true = list(map(int, y_true))
    y_pred = list(map(int, y_pred))

    print('f1_score: ', f1_score(y_true, y_pred, average=None))
    print('f1_score_micro: ', f1_score(y_true, y_pred, average='micro'))
    print('f1_score_macro: ', f1_score(y_true, y_pred, average='macro'))

def train_sentiment_analysis():

    print('train_sentiment_analysis')
    print('property_extraction model would be saved at ', property_extraction_model_path)

    print('loading train data')
    # train 데이터가 jsonl일 경우 위 코드 그대로 json일 경우는 아래 주석철된 코드로 바꿔서 사용
    train_data = jsonlload(train_data_path)
    # train_data = jsonload(train_data_path)

    # dev 데이터가 jsonl일 경우 위 코드 그대로 json일 경우는 아래 주석철된 코드로 바꿔서 사용    
    dev_data = jsonlload(dev_data_path)
    # dev_data = jsonload(dev_data_path)

    print('tokenizing train data')
    tokenizer = AutoTokenizer.from_pretrained(base_model)
    num_added_toks = tokenizer.add_special_tokens(special_tokens_dict)
    print('We have added', num_added_toks, 'tokens')
    property_train_data = property_get_dataset(train_data, tokenizer, max_len)
    property_dev_data= property_get_dataset(dev_data, tokenizer, max_len)
    property_train_dataloader = DataLoader(property_train_data, shuffle=True,
                                  batch_size=batch_size)
    property_dev_dataloader = DataLoader(property_dev_data, shuffle=True,
                                batch_size=batch_size)

    print('loading model')
    property_model = ELECTRABaseClassifier(len(tf_id_to_name), len(tokenizer))
    # 학습된 pt값에 이어서 학습할 시 주석 제거    
    # property_model.load_state_dict(torch.load(test_property_extraction_model_path, map_location=device))
    property_model.to(device)

    print('end loading')

    # property_model_optimizer_setting
    FULL_FINETUNING = True
    if FULL_FINETUNING:
        property_param_optimizer = list(property_model.named_parameters())
        no_decay = ['bias', 'gamma', 'beta']
        property_optimizer_grouped_parameters = [
            {'params': [p for n, p in property_param_optimizer if not any(nd in n for nd in no_decay)],
             'weight_decay_rate': 0.01},
            {'params': [p for n, p in property_param_optimizer if any(nd in n for nd in no_decay)],
             'weight_decay_rate': 0.0}
        ]
    else:
        property_param_optimizer = list(property_model.classifier.named_parameters())
        property_optimizer_grouped_parameters = [{"params": [p for n, p in property_param_optimizer]}]

    property_optimizer = AdamW(
        property_optimizer_grouped_parameters,
        lr=learning_rate,
        eps=eps
    )
    epochs = num_train_epochs
    max_grad_norm = 1.0
    total_steps = epochs * len(property_train_dataloader)

    property_scheduler = get_linear_schedule_with_warmup(
        property_optimizer,
        num_warmup_steps=0,
        num_training_steps=total_steps
    )

    epoch_step = 0

    for _ in trange(epochs, desc="Epoch"):
        property_model.train()
        epoch_step += 1

        # property train
        property_total_loss = 0

        for step, batch in enumerate(property_train_dataloader):
            batch = tuple(t.to(device) for t in batch)
            b_input_ids, b_input_mask, b_labels = batch

            property_model.zero_grad()

            loss, _ = property_model(b_input_ids, b_input_mask, b_labels)

            loss.backward()

            property_total_loss += loss.item()
            # print('batch_loss: ', loss.item())

            torch.nn.utils.clip_grad_norm_(parameters=property_model.parameters(), max_norm=max_grad_norm)
            property_optimizer.step()
            property_scheduler.step()

        avg_train_loss = property_total_loss / len(property_train_dataloader)
        print("Property_Epoch: ", epoch_step)
        print("Average train loss: {}".format(avg_train_loss))

        model_saved_path = property_extraction_model_path + '/saved_model_epoch_' + str(epoch_step) + '.pt'
        torch.save(property_model.state_dict(), model_saved_path)

        if do_eval:
            property_model.eval()

            pred_list = []
            label_list = []

            for batch in property_dev_dataloader:
                batch = tuple(t.to(device) for t in batch)
                b_input_ids, b_input_mask, b_labels = batch

                with torch.no_grad():
                    loss, logits = property_model(b_input_ids, b_input_mask, b_labels)

                predictions = torch.argmax(logits, dim=-1)
                pred_list.extend(predictions)
                label_list.extend(b_labels)

            evaluation(label_list, pred_list, len(tf_id_to_name))

    print("training is done")


In [None]:
# gc.collect()
# torch.cuda.empty_cache()

In [None]:
train_sentiment_analysis()

# 모델 평가

학습된 모델을 바탕으로 국어원 데이터 형태를 만드는 방법 예시

In [None]:
def predict_from_korean_form_property(tokenizer, ce_model, data):

    ce_model.to(device)
    ce_model.eval()
    for sentence in data:
        form = sentence['sentence_form']
        sentence['annotation'] = []
        if type(form) != str:
            print("form type is arong: ", form)
            continue
        for pair in protperty_pair:
            
            tokenized_data = tokenizer(form, pair, padding='max_length', max_length=256, truncation=True)

            input_ids = torch.tensor([tokenized_data['input_ids']]).to(device)
            attention_mask = torch.tensor([tokenized_data['attention_mask']]).to(device)
            with torch.no_grad():
                _, ce_logits = ce_model(input_ids, attention_mask)

            ce_predictions = torch.argmax(ce_logits, dim = -1)

            ce_result = tf_id_to_name[ce_predictions[0]]

            if ce_result == 'True':
                sentence['annotation'].append([pair])

    return data


F1 score 계산 - 추출 성능 및 전체 성능에 대한 F1 score 따로 계산(추후 제거 가능)


In [None]:
def evaluation_f1(true_data, pred_data):

    true_data_list = true_data
    pred_data_list = pred_data

    ce_eval = {
        'TP': 0,
        'FP': 0,
        'FN': 0,
        'TN': 0
    }

    pipeline_eval = {
        'TP': 0,
        'FP': 0,
        'FN': 0,
        'TN': 0
    }

    for i in range(len(true_data_list)):

        # TP, FN checking
        is_ce_found = False
        is_pipeline_found = False
        for y_ano  in true_data_list[i]['annotation']:
            y_category = y_ano[0]
            y_polarity = y_ano[2]

            for p_ano in pred_data_list[i]['annotation']:
                p_category = p_ano[0]
                p_polarity = p_ano[1]

                if y_category == p_category:
                    is_ce_found = True
                    if y_polarity == p_polarity:
                        is_pipeline_found = True

                    break

            if is_ce_found is True:
                ce_eval['TP'] += 1
            else:
                ce_eval['FN'] += 1

            if is_pipeline_found is True:
                pipeline_eval['TP'] += 1
            else:
                pipeline_eval['FN'] += 1

            is_ce_found = False
            is_pipeline_found = False

        # FP checking
        for p_ano in pred_data_list[i]['annotation']:
            p_category = p_ano[0]
            p_polarity = p_ano[1]

            for y_ano  in true_data_list[i]['annotation']:
                y_category = y_ano[0]
                y_polarity = y_ano[2]

                if y_category == p_category:
                    is_ce_found = True
                    if y_polarity == p_polarity:
                        is_pipeline_found = True

                    break

            if is_ce_found is False:
                ce_eval['FP'] += 1

            if is_pipeline_found is False:
                pipeline_eval['FP'] += 1

    ce_precision = ce_eval['TP']/(ce_eval['TP']+ce_eval['FP'])
    ce_recall = ce_eval['TP']/(ce_eval['TP']+ce_eval['FN'])

    ce_result = {
        'Precision': ce_precision,
        'Recall': ce_recall,
        'F1': 2*ce_recall*ce_precision/(ce_recall+ce_precision)
    }

    pipeline_precision = pipeline_eval['TP']/(pipeline_eval['TP']+pipeline_eval['FP'])
    pipeline_recall = pipeline_eval['TP']/(pipeline_eval['TP']+pipeline_eval['FN'])

    pipeline_result = {
        'Precision': pipeline_precision,
        'Recall': pipeline_recall,
        'F1': 2*pipeline_recall*pipeline_precision/(pipeline_recall+pipeline_precision)
    }

    return {
        'category extraction result': ce_result,
        'entire pipeline result': pipeline_result
    }


테스트 데이터에 대한 평가

In [None]:
# test 데이터에 라벨링 하기 위해 property_model 학습 한 pt 값 설정
test_property_extraction_model_path = ''
# test 데이터 설정
test_data_path = ''

In [None]:
def test_sentiment_analysis():

    tokenizer = AutoTokenizer.from_pretrained(base_model)
    num_added_toks = tokenizer.add_special_tokens(special_tokens_dict)
    test_data = jsonlload(test_data_path)
    
    model = ELECTRABaseClassifier(len(tf_id_to_name), len(tokenizer))
    model.load_state_dict(torch.load(test_category_extraction_model_path, map_location=device))
    model.to(device)
    model.eval()

    pred_data = predict_from_korean_form_property(tokenizer, model, copy.deepcopy(test_data))

    # predict된 데이터를 저장할 경로오 함께 이름을 지정
    jsondump(pred_data, '')
    # pred_data = jsonload('')

    # print('F1 result: ', evaluation_f1(test_data, pred_data))

In [None]:
test_sentiment_analysis()