# Info
Last Updated @2022-05-26

### 👨‍💻 Author Info
- Author: Gyeongbin Park(a.k.a., Tony Park)
- Blog: https://heytech.tistory.com/
- Github: https://github.com/park-gb
- Contact: dev.gbpark@gmail.com

### 📚 References
- KLUE-base: https://huggingface.co/klue/bert-base
- Dataset: https://github.com/ukairia777/finance_sentiment_corpus/blob/main/finance_data.csv

# 구글 드라이브 연동

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive/')

:

# 패키지 설치

## Huggingface 패키지 설치

In [None]:
!pip install transformers

## RAdam 패키지 설치

In [None]:
!pip install tensorflow_addons

# 패키지 import

In [None]:
import pandas as pd
import numpy as np
import re
from tqdm import tqdm
import urllib.request
import seaborn as sns
import matplotlib.pyplot as plt
import tensorflow_addons as tfa
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from transformers import BertTokenizer, TFBertForSequenceClassification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, \
                            roc_auc_score, confusion_matrix, classification_report, \
                            matthews_corrcoef, cohen_kappa_score, log_loss

# 언어모델 및 Tokenizer 불러오기

In [None]:
MODEL_NAME = "klue/bert-base"
model = TFBertForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=3, from_pt=True)
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)
MODEL_NAME = MODEL_NAME.replace("/", "_")

# GPU 작동확인

In [None]:
device_name = tf.test.gpu_device_name()
if device_name == '/device:GPU:0':
  print("GPU 작동 중")
  mirrored_strategy = tf.distribute.MirroredStrategy()
else:
  print("GPU 미작동 중")

# 데이터셋 Load

In [None]:
DATASET_URL = "https://raw.githubusercontent.com/ukairia777/finance_sentiment_corpus/main/finance_data.csv"
DATASET_PATH = "./drive/MyDrive/Colab Notebooks/SCFN/data/"
DATASET_NAME = "dataset.csv"

In [None]:
urllib.request.urlretrieve(DATASET_URL, 
                           filename = DATASET_PATH + DATASET_NAME
                           )

In [None]:
dataset = pd.read_csv(DATASET_PATH + DATASET_NAME)
dataset.head()

# 데이터 전처리

## 영어 뉴스 기사 칼럼 제거

In [None]:
del dataset['sentence']

## 라벨 데이터 숫자 치환

In [None]:
dataset['labels'] = dataset['labels'].replace(['neutral', 'positive', 'negative'],[0, 1, 2])
dataset.head()

## 결측치 확인

In [None]:
dataset.info()

## 중복 데이터 제거

In [None]:
# 중복 데이터 확인
dataset[dataset['kor_sentence'].duplicated()]

In [None]:
# 중복 데이터 제거
dataset.drop_duplicates(subset = ['kor_sentence'], inplace = True)

# 라벨 비율 확인

In [None]:
dataset['labels'].value_counts().plot(kind = 'bar')

In [None]:
dataset['labels'].value_counts(normalize = True).plot(kind = 'bar', )

In [None]:
dataset['labels'].value_counts(normalize = True)

# 훈련/테스트 데이터 분리

In [None]:
 #입출력 데이터 분리
X_data = dataset['kor_sentence']
y_data = dataset['labels']

In [None]:
TEST_SIZE = 0.2
RANDOM_STATE = 42
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, 
                                                    test_size = TEST_SIZE, 
                                                    random_state = RANDOM_STATE, 
                                                    stratify = y_data)

In [None]:
print(f"훈련 입력 데이터 개수: {len(X_train)}")
print(f"테스트 입력 데이터 개수: {len(X_test)}")

In [None]:
# 훈련 데이터 라벨별 비율
y_train.value_counts(normalize = True)

In [None]:
# 테스트 데이터 라벨별 비율
y_test.value_counts(normalize = True)

# BERT용 입력 데이터 포맷 변경

In [None]:
# 데이터 길이 제한
MAX_SEQ_LEN = 64

In [None]:
def convert_data(X_data, y_data):
    # BERT 입력으로 들어가는 token, mask, segment, target 저장용 리스트
    tokens, masks, segments, targets = [], [], [], []
    
    for X, y in tqdm(zip(X_data, y_data)):
        # token: 입력 문장 토큰화
        token = tokenizer.encode(X, truncation = True, padding = 'max_length', max_length = MAX_SEQ_LEN)
        
        # Mask: 토큰화한 문장 내 패딩이 아닌 경우 1, 패딩인 경우 0으로 초기화
        num_zeros = token.count(0)
        mask = [1] * (MAX_SEQ_LEN - num_zeros) + [0] * num_zeros
        
        # segment: 문장 전후관계 구분: 오직 한 문장이므로 모두 0으로 초기화
        segment = [0]*MAX_SEQ_LEN

        tokens.append(token)
        masks.append(mask)
        segments.append(segment)
        targets.append(y)

    # numpy array로 저장
    tokens = np.array(tokens)
    masks = np.array(masks)
    segments = np.array(segments)
    targets = np.array(targets)

    return [tokens, masks, segments], targets

In [None]:
# train 데이터를 버트 인풋에 맞게 변환
train_x, train_y = convert_data(X_train, y_train)

In [None]:
# test 데이터를 버트 인풋에 맞게 변환
test_x, test_y = convert_data(X_test, y_test)

# BERT 모델링

In [None]:
# token, mask, segment 입력 정의
token_inputs = tf.keras.layers.Input((MAX_SEQ_LEN,), dtype = tf.int32, name = 'input_word_ids')
mask_inputs = tf.keras.layers.Input((MAX_SEQ_LEN,), dtype = tf.int32, name = 'input_masks')
segment_inputs = tf.keras.layers.Input((MAX_SEQ_LEN,), dtype = tf.int32, name = 'input_segment')
bert_outputs = model([token_inputs, mask_inputs, segment_inputs])

In [None]:
bert_outputs

In [None]:
bert_output = bert_outputs[0]

# 파인튜닝: 감정 분류 모델

## 감정 분류 모델 컴파일

In [None]:
DROPOUT_RATE = 0.5
NUM_CLASS = 3
#OPTIMIZER_NAME = 'Adam'
OPTIMIZER_NAME = 'RAdam'

In [None]:
dropout = tf.keras.layers.Dropout(DROPOUT_RATE)(bert_output)
sentiment_layer = tf.keras.layers.Dense(NUM_CLASS, activation='softmax', kernel_initializer = tf.keras.initializers.TruncatedNormal(stddev=0.02))(dropout)
sentiment_model = tf.keras.Model([token_inputs, mask_inputs, segment_inputs], sentiment_layer)

if OPTIMIZER_NAME == 'Adam':
  LEARNING_RATE = 5e-5
  optimizer = tf.keras.optimizers.Adam(learning_rate = LEARNING_RATE)
else:
  LEARNING_RATE = 5e-5
  TOTAL_STEPS = 10000
  MIN_LR = 1e-5
  WARMUP_PROPORTION = 0.1
  EPSILON = 1e-8
  CLIPNORM = 1.0
  optimizer = tfa.optimizers.RectifiedAdam(learning_rate = LEARNING_RATE,
                                           total_steps = TOTAL_STEPS, 
                                           warmup_proportion = WARMUP_PROPORTION, 
                                           min_lr = MIN_LR, 
                                           epsilon = EPSILON,
                                           clipnorm = CLIPNORM)

sentiment_model.compile(optimizer = optimizer, loss = tf.keras.losses.SparseCategoricalCrossentropy(), metrics = ['accuracy'])

## 조기종료 조건

In [None]:
MIN_DELTA = 1e-3
PATIENCE = 5

In [None]:
early_stopping = EarlyStopping(
    monitor = "val_accuracy", 
    min_delta = MIN_DELTA,
    patience = PATIENCE)

## 최고 성능 모델 저장

In [None]:
MODEL_BEST_PATH = "./drive/MyDrive/Colab Notebooks/SCFN/model/"
MODEL_BEST_NAME = "best_model_" + MODEL_NAME + "_" + OPTIMIZER_NAME + ".h5"

In [None]:
model_checkpoint = ModelCheckpoint(
    filepath = MODEL_BEST_PATH + MODEL_BEST_NAME,
    monitor = "val_loss",
    mode = "min",
    save_best_only = True,
    verbose = 1
)

In [None]:
callbacks = [early_stopping, model_checkpoint]

## 감정 분류 모델 학습

In [None]:
EPOCHS = 100
BATCH_SZIE = 32

In [None]:
sentiment_model.fit(train_x, train_y, 
                    epochs = EPOCHS, 
                    shuffle = True, 
                    batch_size = BATCH_SZIE, 
                    validation_data = (test_x, test_y),
                    callbacks = callbacks
                    )

# 감정 분류의 예측값 계산

In [None]:
# 최고 성능의 모델 불러오기
sentiment_model_best = tf.keras.models.load_model(MODEL_BEST_PATH + MODEL_BEST_NAME, 
                                                  custom_objects={'TFBertForSequenceClassification': TFBertForSequenceClassification})

In [None]:
# 모델이 예측한 라벨 도출
predicted_value = sentiment_model_best.predict(test_x)
predicted_label = np.argmax(predicted_value, axis = 1)

# 감정 분류 모델의 성능 평가

## Classification Report

In [None]:
print(classification_report(test_y, predicted_label))

## Confusion Matrix

In [None]:
# 이미지 파일 저장정보
FIGURE_PATH = "./drive/MyDrive/Colab Notebooks/SCFN/figure/"
FIGURE_NAME = "cf_matrix_" + MODEL_NAME + "_" + OPTIMIZER_NAME + ".png"

In [None]:
cf_matrix = confusion_matrix(test_y, predicted_label)
fig, ax = plt.subplots(figsize = (8,6))
sns.heatmap(cf_matrix, annot = True, fmt = 'd')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.savefig(FIGURE_PATH + FIGURE_NAME)
plt.show()

## Accracy

In [None]:
accuracy_score_v = round(accuracy_score(y_test, predicted_label), 3)

## Precision

In [None]:
precision_score_v = round(precision_score(y_test, predicted_label, average="weighted"), 3)

## Recall

In [None]:
recall_score_v = round(recall_score(y_test, predicted_label, average="weighted"), 3)

## F1 Score

In [None]:
f1_score_v = round(f1_score(y_test, predicted_label, average="weighted"), 3)

## ROC AUC Score

In [None]:
roc_auc_score_v = round(roc_auc_score(y_test, predicted_value, average="weighted", multi_class="ovr"), 3)

## Cohen’s Kappa score

In [None]:
cohen_kappa_score_v = round(cohen_kappa_score(y_test, predicted_label), 3)

## Matthew’s correlation coefficient

In [None]:
matthews_corrcoef_v = round(matthews_corrcoef(y_test, predicted_label), 3)

## Log loss

In [None]:
log_loss_v = round(log_loss(y_test, predicted_value), 3)

## 전체 평가지표

In [None]:
METRIC_PATH = "./drive/MyDrive/Colab Notebooks/SCFN/metric/"
METRIC_NAME = "metric_" + MODEL_NAME + "_" + OPTIMIZER_NAME + ".csv"

In [None]:
metric_total = pd.DataFrame({
    'PLM': MODEL_NAME,
    'Optimizer': OPTIMIZER_NAME,
    'Accuracy': accuracy_score_v,
    'Precision': precision_score_v,
    'Recall': recall_score_v,
    'F1_score': f1_score_v,
    'ROC_AUC_score': roc_auc_score_v,
    'Cohen_kappa_coef': cohen_kappa_score_v,
    'Matthews_corrcoef': matthews_corrcoef_v,
    'Log_loss': log_loss_v},
    index = ['score'])

In [None]:
metric_total.to_csv(METRIC_PATH + METRIC_NAME)

In [None]:
print(metric_total)