In [None]:
# 라이브러리 불러오기 및 함수화
import os
import re
import shutil
import itertools
import numpy as np
import pandas as pd
from tqdm import tqdm
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.utils import resample
from sklearn.metrics import confusion_matrix

import torch
from konlpy.tag import Komoran
from pykospacing import Spacing
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from transformers import BertTokenizer, BertModel, ProgressCallback, Trainer, BertForSequenceClassification, TrainingArguments

komoran = Komoran()
spacing = Spacing()
label_encoder = LabelEncoder()

# tqdm과 pandas 통합
tqdm.pandas()

In [None]:
df = pd.read_excel(r'..\..\data\filtered_samsung_news_with_outcome.xlsx')
# df = df[5801:5900]
# 기본 불용어 불러오기
korean_stopwords_path = '../../data/stopwords-ko.txt'
with open(korean_stopwords_path, encoding='utf8') as f:
    stopwords = f.readlines()
stopwords = [x.strip() for x in stopwords]

In [None]:
# 클래스별로 데이터 분리
df_positive = df[df['Outcome'] == '악재']
df_negative = df[df['Outcome'] == '호재']

# 최소 클래스의 샘플 수 확인
min_class_count = min(len(df_positive), len(df_negative))

# 다운샘플링 적용
df_positive_downsampled = resample(df_positive, 
                                   replace=False,                               # 샘플을 복원하지 않고
                                   n_samples=min_class_count,                   # 최소 클래스의 개수로 맞추기
                                   random_state=42)                             # 재현성을 위해 random_state 사용

df_negative_downsampled = resample(df_negative, 
                                   replace=False, 
                                   n_samples=min_class_count, 
                                   random_state=42)

df_balanced = pd.concat([df_positive_downsampled, df_negative_downsampled])     # 다운샘플링된 데이터 결합
df = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)         # 데이터 셔플
print(df_balanced['Outcome'].value_counts())                                    # 결과 확인

In [None]:
# 텍스트 전처리 함수
def preprocessing(text):
    text = spacing(text)
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    return text

# komoran토큰화 &불용어 처리 함수
def remove_stopwords(text, stopwords):
    tokens = []
    morphs = komoran.morphs(text)
    for token in morphs:
        if token not in stopwords:
            tokens.append(token)
    return tokens

# 텍스트 전처리 및 토큰화, 불용어 처리
cleaned_data = []
for i in tqdm(range(len(df))):
    feature_text = df.loc[i, 'summary_content']
    processed_text = preprocessing(feature_text)
    cleaned_text = remove_stopwords(processed_text, stopwords)
    cleaned_data.append(cleaned_text)
df['cleaned'] = cleaned_data

In [None]:
# num_labels는 분류할 클래스의 수를 지정합니다.
tokenizer = BertTokenizer.from_pretrained('monologg/kobert')
model = BertForSequenceClassification.from_pretrained('monologg/kobert', num_labels=2)

In [None]:
# Dataset 클래스를 정의하여 데이터를 모델에 맞게 전처리합니다.
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts                                          # 전처리된 텍스트 데이터 리스트
        self.labels = labels                                        # 라벨 데이터 리스트
        self.tokenizer = tokenizer                                  # KoBERT 토큰나이저
        self.max_len = max_len                                      # 최대 토큰 길이 (128로 설정)

    def __len__(self):
        return len(self.texts)                                      # 데이터셋의 크기를 반환
    
    def __getitem__(self, idx):
        text = self.texts[idx]                                      # 주어진 인덱스에 해당하는 텍스트 가져오기
        label = self.labels[idx]                                    # 주어진 인덱스에 해당하는 라벨 가져오기

        # 텍스트를 KoBERT 토크나이저로 인토딩하여 입력 데이터로 변환
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,                                # 특별 토큰([CLS], [SEP]) 추가
            max_length=self.max_len,                                # 최대 토큰 길이만큼 패딩 또는 자르기
            return_token_type_ids=False,                            # token_type_ids 반환하지 않음
            padding='max_length',                                   # max_length만큼 패딩 적용
            truncation=True,                                        # max_length를 초과하는 부분을 잘라냄
            return_attention_mask=True,                             # 어텐션 마스크 반환 (패딩된 부분은 0, 나머지는 1)
            return_tensors='pt',                                    # PyTorch 텐서로 변환
        )       
        return {
            'input_ids': encoding['input_ids'].flatten(),           # 인코딩된 입력 ID 텐서
            'attention_mask': encoding['attention_mask'].flatten(), # 어텐션 마스크 텐서
            'labels': torch.tensor(label, dtype=torch.long)         # 라벨 텐서 (정수형)
        }
    
# 평가지표를 계산하는 함수 정의
def compute_metrics(eval_pred):
    logits, labels = eval_pred

    # logits이 numpy.ndarry인 경우 PyTorch 텐서로 변환
    if isinstance(logits, np.ndarray):
        logits = torch.tensor(logits)
      
    predictions = torch.argmax(logits, dim=1)
    acc = accuracy_score(labels, predictions.numpy())
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions.numpy(), average='weighted')
    return {
        'accuracy'  : acc,
        'f1'        : f1,
        'precision' : precision,
        'recall'    : recall
    }

In [None]:
# 전처리된 텍스트와 라벨로 학습 및 평가 데이터셋 인스턴스 생성
train_df, eval_df = train_test_split(df, test_size=0.2, random_state=42)

train_texts   = train_df['cleaned'].tolist()
train_labels  = label_encoder.fit_transform(train_df['Outcome'])

eval_texts    = eval_df['cleaned'].tolist()
eval_labels   = label_encoder.transform(eval_df['Outcome'])

train_dataset = TextDataset(train_texts, train_labels, tokenizer)
eval_dataset  = TextDataset(eval_texts, eval_labels, tokenizer)

In [None]:
# 짧은 경로 설정 (예: C:/logs)
logging_dir = os.path.abspath('C:/logs')

if os.path.exists(logging_dir):
    shutil.rmtree(logging_dir)

os.makedirs(logging_dir)

In [None]:
# 학습 파라미터 설정
training_args = TrainingArguments(
    output_dir                 = './result',                         # 학습 결과가 저장될 디렉토리
    num_train_epochs           =3,                                   # 학습을 반복할 에폭 수
    per_device_train_batch_size=16,                                  # 학습 시 배치 크기
    per_device_eval_batch_size =16,                                  # 평가 시 배치 크기
    warmup_steps               =500,                                 # 학습 초기 단계에서 학습률을 서서히 증가하는 단계 수
    weight_decay               =0.01,                                # 가중치 감쇠 (L2 정규화) 비율
    logging_dir                =logging_dir,                         # 학습 로그가 저장될 디렉토리
    logging_steps              =10,                                  # 몇 스텝마다 로그를 남길지 설정
    evaluation_strategy        ='steps',                             # 평가 전략 (학습 중 주기적으로 평가)       
    save_total_limit           =2                                    # 저장할 체크포인트 파일의 개수를 제한
)

In [None]:
# Trainer 클래스 설정
# Trainer는 학습을 쉽게 관리할 수 있게 해주는 Hugging Face의 유틸리티 클래스
trainer = Trainer(
    model          = model,
    args           = training_args,
    train_dataset  = train_dataset,
eval_dataset   = eval_dataset,
    compute_metrics= compute_metrics,
    # callbacks    = [ProgressCallback]
)

trainer.train()

In [None]:
# 하이퍼 파라미터 그리드 정의
learning_rates = [2e-5, 3e-5]
batch_sizes    = [16, 32, 64]
epochs         = [2, 4]

best_accuracy = 0
best_params   = None
param_grid = list(itertools.product(learning_rates, batch_sizes, epochs))

In [None]:
# 각 파라미터 조합에 대해 학습 및 평가
for lr, batch_size, epoch in tqdm(param_grid, desc="Find Best Param"):
    print(f"Testing: Learning Rate={lr}, Batch_Size={batch_size}, Epochs={epoch}")

    training_args = TrainingArguments(
        output_dir                 ='./results',
        num_train_epochs           = epoch,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size =batch_size,
        warmup_steps               =500,
        weight_decay               =0.01,
        logging_dir                =logging_dir,
        logging_steps              =10,
        evaluation_strategy        ='epoch',
        learning_rate              =lr
    )

    # Trainer 설정
    trainer = Trainer(
        model                      =model,
        args                       =training_args,
        train_dataset              =train_dataset,
        eval_dataset               =eval_dataset,
        compute_metrics            =compute_metrics
    )

    # 모델 학습
    trainer.train()

    # 모델 평가
    eval_results = trainer.evaluate()

    # 최적의 하이퍼파라미터 조합을 업데이트
    if eval_results['eval_accuracy'] > best_accuracy:
        best_accuracy = eval_results['eval_accuracy']
        best_params = (lr, batch_size, epochs)

    print(f"Accuracy: {eval_results['eval_accuracy']}")

print(f"Best Params: Learning Rate={best_params[0]}, Batch_size={best_params[1]}, Epochs={best_params[2]}")

In [None]:
# 최적의 하이퍼파라미터 모델 학습
best_lr, best_batch_size, best_epoc = best_params

training_args = TrainingArguments(
    output_dir                 ='./results',
    num_train_epochs           = epoch,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size =batch_size,
    warmup_steps               =500,
    weight_decay               =0.01,
    logging_dir                =logging_dir,
    logging_steps              =10,
    evaluation_strategy        ='epoch',
    learning_rate              =lr
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    compute_metrics=compute_metrics
)

trainer.train()

# 모델 평가 및 예측
predictions, labels, _ = trainer.predict(eval_dataset)
predictions = torch.tensor(predictions)
predictions = torch.argmax(predictions, dim=1)

In [None]:
# 혼동행렬 계산
conf_matrix = confusion_matrix(labels, predictions)

# 혼동행렬 시각화
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()

In [None]:
# 라벨 인덱스와 실제 라벨명을 매핑한 딕셔너리
label_map = {0: '악재', 1: '호재'}

# 텍스트를 입력 받아 예측하는 함수 정의
def predict_text(text, model, tokenizer, max_len=128):
    encoding = tokenizer.encode_plus(
        text,
        add_special_tokens   =True,
        max_length           =max_len,
        return_token_type_ids=False,
        padding              ='max_length',
        truncation           =True,
        return_attention_mask=True,
        return_tensors       ='pt',
    )
    input_ids      = encoding['input_ids']
    attention_mask = encoding['attention_mask']
    
    with torch.no_grad():
        outputs    = model(input_ids, attention_mask=attention_mask)
    
    logits     = outputs.logits
    prediction = torch.argmax(logits, dim=-1)
    
    return prediction.item()

In [None]:
# 테스트할 텍스트 입력
text_to_predict = '''[서울와이어 천성윤 기자] 삼성전자 최대 노조인 전국삼성전자노동조합(삼성노조)이 29일 파업을 선언했다. 삼성전자에서 파업은 창사 이래 처음이다.  

삼성노조는 이날 서울 서초구 삼성전자 서초사옥 앞에서 기자회견을 열고 “노동자들을 무시하는 사측의 태도에 파업을 선언한다”고 밝혔다.

삼성노조의 파업 선언은 전날 올해 임금협상을 위한 교섭에서 사측과 이견이 좁혀지지 않으며 파행한 지 하루만에 이뤄졌다. 전날 교섭에서 노사 양측은 사측 위원 2명의 교섭 참여를 놓고 갈등을 빚었다. 이 문제 때문에 정작 핵심인 임금협상 관련 중요 내용은 오가지도 못한 것으로 알려졌다.

삼성노조는 “사측이 교섭에 아무런 안건도 준비하지 않고 나왔다”며 파업 선언에 이르기까지의 책임을 사측에 돌렸다.

현재 삼성노조 조합원 수는 2만8000여명으로 삼성전자 전체 직원(약 12만5000명)의 22% 수준이다. 이들이 파업에 돌입함으로서 실적 개선을 이어가야 하는 삼성전자는 큰 타격을 입을 것으로 예상된다. 

삼성전자는 지난해 반도체 업황 부진으로 디바이스솔루션(DS) 부문에서 14조8800억원의 적자를 기록했다. 올해 1분기는 매출 71조9200억원, 영업이익 6조6100억원으로 상승세에 올라탔다.

삼성노조는 즉각적인 총파업에 나서는 대신 연차 소진 등의 방식으로 단체행동을 이어갈 예정이다. 삼성노조 집행부는 조합원들에게 오는 6월 7일 하루 연차를 소진하라는 지침을 전달했다.

또 이날부터 서초사옥 앞에서 버스 숙박 농성을 진행한다. 삼성노조 측은 “아직은 소극적인 파업으로 볼 수 있지만, 단계를 밟아나가겠다”면서 “총파업까지 갈 수 있고, 파업이 실패할 수도 있지만 1호 파업 행동 자체가 의미 있다”고 밝혔다.

출처 : 서울와이어(http://www.seoulwire.com)'''

# 예측
predicted_label_index = predict_text(text_to_predict, model, tokenizer)

# 예측 결과 출력
predicted_label = label_map[predicted_label_index]
print(f"Predicted Label: {predicted_label}")

In [None]:
# 모델의 가중치 저장
torch.save(model.state_dict(), "model.pth")

print("Model has been saved as model.pth")