In [None]:
from transformers import AutoModel, AutoTokenizer, AutoModelForSequenceClassification
from transformers import TrainingArguments, Trainer
from datasets import Dataset
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix, accuracy_score, roc_auc_score
from sklearn.model_selection import train_test_split
import random
import torch

from pykospacing import Spacing
import re

import numpy as np
import pandas as pd
from tqdm import tqdm

import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# KF-DeBERTa 모델
model_name = 'kakaobank/kf-deberta-base'
model = AutoModel.from_pretrained(model_name)
Classification_model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)  # 2진분류
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [4]:
data_df = pd.read_excel('./data/New_samsung_news.xlsx')

In [5]:
data_df.loc[data_df['Outcome'] == '악재', 'labels'] = 0
data_df.loc[data_df['Outcome'] == '호재', 'labels'] = 1
data_df['labels'] = data_df['labels'].astype(int)

In [6]:
# 띄어쓰기, 대소문자 보정 함수
spacing = Spacing()
def preprocessing(text):
    text = spacing(text)
    text = text.lower()  # 소문자 변경
    text = re.sub(r'[^\w\s]', '', text)
    return text

# 기본 불용어 불러오기
korean_stopwords_path = "data/stopwords-ko.txt"
with open(korean_stopwords_path, encoding='utf8') as f:
    stopwords = f.readlines()
stopwords = [x.strip() for x in stopwords]

# 불용어 처리 함수
def remove_stopwords(text, stopwords):
    words = text.split()
    filtered_text = []
    for word in words:
        if text not in stopwords:
            filtered_text.append(text)
    return ' '.join(filtered_text)

In [None]:
# 문장 전처리 및 불용어 처리
for i in tqdm(range(len(data_df))):
    feature_text = data_df.loc[i, 'summary_content']
    processed_text = preprocessing(feature_text)
    cleaned_text = remove_stopwords(processed_text, stopwords)
    data_df.loc[i, 'processed'] = cleaned_text

In [7]:
# data_df.to_excel('./data/data_samsung_processed.xlsx')
# data_df = pd.read_excel('./data/data_apple_processed.xlsx')

In [24]:
# KF_DeBERT모델 텍스트 토큰화
encodings = tokenizer(data_df['processed'].tolist(), truncation=True, padding=True)  # truncation=최대길이를 초과하는 것에 대한 처리
data_df['encoding'] = encodings
data_df['input_ids'] = encodings['input_ids']  # input_ids=문장을 토크나이저가 일정 단위로 쪼개서 vocab에 들어있는 숫자로 치환한 것
data_df['attention_mask'] = encodings['attention_mask']  # attention_mask=attention대상인지 아닌지를 나타냄. 대상이면1, 아니면0, 패딩된건 0으로 학습대상x

In [None]:
# data_df.to_excel('./data/data_apple_encoding.xlsx')
# data_df = pd.read_excel('./data/data_apple_encoding.xlsx')

# # 문자열을 리스트로 변환
# data_df['input_ids'] = data_df['input_ids'].str.strip('[]').str.split(', ').apply(lambda x: list(map(int, x)))
# data_df['attention_mask'] = data_df['attention_mask'].str.strip('[]').str.split(', ').apply(lambda x: list(map(int, x)))

In [None]:
# 데이터 비중 조정
total_sample = len(data_df)
negative = data_df[data_df['labels']==0]  # 부정(악재) 클래스 5,849
positive = data_df[data_df['labels']==1]  # 긍정(호재) 클래스 10,672

# 데이터 비율 설정
total_rate = 1
rate = 1
negative_sample_size = round(len(negative) * total_rate)
positive_sample_size = round(negative_sample_size * total_rate * rate)
print(negative_sample_size)
print(positive_sample_size)

In [None]:
# 데이터 선택
negative_data = random.sample(list(negative.index), negative_sample_size)
positive_data = random.sample(list(positive.index), positive_sample_size)

# 최종 데이터 인덱스
sample_index = negative_data + positive_data
random.shuffle(sample_index)

# 데이터프레임 생성
sample_df = data_df.loc[sample_index]

# train_test_split 데이터set
train_columns = sample_df[['summary_content', 'input_ids', 'attention_mask']]
test_colums = sample_df[['labels']]

X_train, X_test, y_train, y_test = train_test_split(train_columns, test_colums, test_size=0.2)

train_dataset = Dataset.from_pandas(X_train.join(y_train))
test_dataset = Dataset.from_pandas(X_test.join(y_test))

In [28]:
# 하이퍼파라미터 옵션 설정
param_grid = {
    'learning_rate': [3e-5, 2e-5, 5e-5],  # 학습률
    'per_device_train_batch_size': [4, 8, 16, 32],  # 베치크기
    'num_train_epochs': [2, 4, 6, 8]  # 에포크 수
}

# Best Parameters: {'learning_rate': 3e-05, 'batch_size': 16, 'num_train_epochs': 8}
# Best Accuracy: 0.55

In [None]:
# 수치 초기화
best_accuracy = 0
best_params = None
results = []

for learning_rate in tqdm(param_grid['learning_rate']):
    for batch_size in param_grid['per_device_train_batch_size']:
        for epochs in param_grid['num_train_epochs']:
            try:
                if batch_size > len(train_dataset):
                    print(f"Skipping batch size {batch_size} as it is larger than the dataset size {len(train_dataset)}")
                    continue

                # TrainingArguments 설정
                training_args = TrainingArguments(
                    output_dir="./results",  # 학습된 모델과 결과를 저장할 경로 설정
                    evaluation_strategy="epoch",  # 각 에포크마다 평가 수행
                    learning_rate=learning_rate,  # 학습률 설정
                    per_device_train_batch_size=batch_size,  # 학습 배치 크기 설정
                    per_device_eval_batch_size=batch_size,  # 평가 배치 크기 설정
                    num_train_epochs=epochs,  # 현재 학습 에포크 수 설정
                    weight_decay=0.01,  # 가중치 감쇠 설정
                    logging_dir='./logs',  # 로그 저장 경로 설정
                    logging_steps=10,  # 로그를 기록할 단계 수 설정
                )

                # Trainer 생성
                trainer = Trainer(
                    model=Classification_model,  # 훈련모델 설정
                    args=training_args,
                    train_dataset=train_dataset,  # 훈련 데이터셋
                    eval_dataset=test_dataset,  # 평가 데이터셋
                )

                # 모델 학습
                trainer.train()

                # 모델 평가
                eval_results = trainer.evaluate()

                # 예측 수행
                predictions = trainer.predict(test_dataset)
                preds = np.argmax(predictions.predictions, axis=1)  # 모델이 예측한 클래스의 인덱스를 포함하는 배열
                labels = predictions.label_ids  # 실제 정답 레이블의 인덱스를 포함하는 배열

                # 평가지표
                accuracy = accuracy_score(labels, preds)
                precision = precision_score(labels, preds, average='binary')
                recall = recall_score(labels, preds, average='binary')
                f1 = f1_score(labels, preds, average='binary')
                tn, fp, fn, tp = confusion_matrix(labels, preds).ravel()
                specificity = tn / (tn + fp)

                results.append({
                    'learning_rate': learning_rate,
                    'batch_size': batch_size,
                    'num_train_epochs': epochs,
                    'accuracy': accuracy,
                    'precision': precision,
                    'recall': recall,
                    'specificity': specificity,
                    'f1_score': f1
                })

                # 정확도를 기준으로 최고모델 저장
                if accuracy > best_accuracy:
                    best_accuracy = accuracy
                    best_params = {
                        'learning_rate': learning_rate,
                        'batch_size': batch_size,
                        'num_train_epochs': epochs
                    }
                    best_confusion_matrix = confusion_matrix(labels, preds)

            except Exception as e:
                print(f"Error with parameters: learning_rate={learning_rate}, batch_size={batch_size}, epochs={epochs}")
                print(f"Exception: {e}")
                continue  # 에러 발생 시 다음 파라미터 조합으로 넘어감

# 최적 하이퍼파라미터 조합과 정확도 출력
print("Best Parameters:", best_params)
print("Best Accuracy:", best_accuracy)

In [None]:
# 혼동행렬 시각화
plt.figure(figsize=(8, 6))
sns.heatmap(best_confusion_matrix, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()

# 모든 결과 출력
for result in results:
    print(result)

In [None]:
# 최적 하이퍼파라미터로 다시 학습
best_training_args = TrainingArguments(
    output_dir='./best_model',
    learning_rate=best_params['learning_rate'],
    per_device_train_batch_size=best_params['per_device_train_batch_size'],
    num_train_epochs=best_params['num_train_epochs'],
    evaluation_strategy="epoch",
    save_strategy="epoch",
)

best_trainer = Trainer(
    model=model,
    args=best_training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset
)

best_trainer.train()
best_trainer.save_model('./best_model')

In [None]:
# 모델 저장
model.save_pretrained("./kf-deberta-finetuned")
tokenizer.save_pretrained("./kf-deberta-finetuned")