# Financial PhraseBank: 금융 감성 분석 모델 학습

이 노트북은 Financial PhraseBank 데이터셋을 사용하여 금융 뉴스 헤드라인의 감성(긍정/중립/부정)을 분류하는 모델을 학습하고 평가합니다.

## 1. 환경 설정 및 라이브러리 임포트

In [None]:
!pip install -q datasets transformers[torch] scikit-learn pandas numpy matplotlib seaborn tensorboard

In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch

from datasets import Dataset, DatasetDict
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, EarlyStoppingCallback
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix
from huggingface_hub import notebook_login

In [None]:
# Hugging Face Hub 로그인 (필요시)
# notebook_login()

## 2. 데이터셋 준비

Financial PhraseBank 데이터셋은 일반적으로 `.txt` 파일 형태로 제공되며, 각 라인은 `텍스트.@감성` 형식입니다.
여기서는 `Sentences_50Agree.txt` 파일을 기본으로 사용합니다. 다른 파일을 사용하려면 `file_path` 변수를 수정하세요.
데이터셋은 [여기](https://www.kaggle.com/datasets/ankurzing/sentiment-analysis-for-financial-news/data) 등에서 다운로드 받을 수 있습니다.
다운로드 후, 이 노트북과 같은 디렉토리에 해당 파일을 위치시키거나 `file_path`를 알맞게 수정해주세요.

In [None]:
file_path = 'Sentences_50Agree.txt' # 기본 파일명, 필요시 Sentences_AllAgree.txt 등으로 변경
encoding_to_try = ['utf-8', 'latin1', 'ISO-8859-1'] # 시도해볼 인코딩 목록

df = None
for encoding in encoding_to_try:
    try:
        df = pd.read_csv(file_path, sep='.@', header=None, names=['text', 'sentiment'], engine='python', encoding=encoding)
        print(f"Successfully loaded data with encoding: {encoding}")
        break
    except FileNotFoundError:
        print(f"Error: File '{file_path}' not found. Please download it and place it in the correct directory.")
        df = None # 명시적으로 None 처리
        break
    except Exception as e:
        print(f"Failed to load with encoding {encoding}: {e}")

if df is not None and not df.empty:
    print("\nDataset loaded successfully.")
    print(f"Dataset shape: {df.shape}")
    print("\nFirst 5 rows:")
    print(df.head())
    print("\nSentiment distribution:")
    print(df['sentiment'].value_counts())
else:
    print("\nError: Could not load the dataset. Please check the file path and encoding.")
    # 데이터프레임이 로드되지 않았을 경우 이후 코드 실행을 막기 위해 예외 발생 또는 exit()
    # 여기서는 간단히 비어있는 데이터프레임을 만들어서 이후 오류를 유도 (실제로는 더 강력한 처리 필요)
    if df is None: df = pd.DataFrame(columns=['text', 'sentiment'])

### 2.1. 데이터 전처리 및 라벨 인코딩

In [None]:
if not df.empty:
    # 라벨 인코딩 (positive: 0, neutral: 1, negative: 2)
    # 주의: Hugging Face의 AutoModelForSequenceClassification은 일반적으로 레이블을 0부터 시작하는 정수로 기대합니다.
    sentiment_to_id = {'positive': 0, 'neutral': 1, 'negative': 2}
    id_to_sentiment = {v: k for k, v in sentiment_to_id.items()}
    num_fin_labels = len(sentiment_to_id)

    df['label'] = df['sentiment'].map(sentiment_to_id)

    # 누락된 라벨이 있는지 확인
    if df['label'].isnull().any():
        print("\nWarning: Some sentiments were not mapped to labels. Check sentiment_to_id mapping and dataset values.")
        print(df[df['label'].isnull()])
        df.dropna(subset=['label'], inplace=True) # NaN 라벨 제거
        df['label'] = df['label'].astype(int) # 정수형으로 변환
    
    print("\nData with encoded labels:")
    print(df.head())
    print(f"\nNumber of labels: {num_fin_labels}")
    print(f"Label mapping: {sentiment_to_id}")
else:
    print("DataFrame is empty, skipping preprocessing.")
    num_fin_labels = 3 # 기본값, 실제로는 데이터 로드 성공 시 결정됨
    sentiment_to_id = {'positive': 0, 'neutral': 1, 'negative': 2}
    id_to_sentiment = {v: k for k, v in sentiment_to_id.items()}

### 2.2. 데이터 분할 (학습, 검증, 테스트)

In [None]:
if not df.empty and 'label' in df.columns:
    # 학습/테스트 분할 (80% 학습, 20% 테스트)
    train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])
    # 학습 데이터에서 검증 데이터 분할 (원래 학습 데이터의 12.5% -> 전체의 10%)
    # train_test_split은 test_size 비율을 첫 번째 인자로 전달된 데이터프레임 기준으로 계산합니다.
    # 따라서, train_df에서 0.125를 val_df로 할당하면, (0.8 * 0.125) = 0.1, 즉 전체의 10%가 됩니다.
    train_df, val_df = train_test_split(train_df, test_size=0.125, random_state=42, stratify=train_df['label'])

    print(f"Train set size: {len(train_df)}")
    print(f"Validation set size: {len(val_df)}")
    print(f"Test set size: {len(test_df)}")

    # Hugging Face Dataset 형식으로 변환
    train_dataset_hf = Dataset.from_pandas(train_df.reset_index(drop=True))
    val_dataset_hf = Dataset.from_pandas(val_df.reset_index(drop=True))
    test_dataset_hf = Dataset.from_pandas(test_df.reset_index(drop=True))

    financial_datasets = DatasetDict({
        'train': train_dataset_hf,
        'validation': val_dataset_hf,
        'test': test_dataset_hf
    })
    print("\nFinancial Datasets (Hugging Face format):")
    print(financial_datasets)
else:
    print("DataFrame is empty or 'label' column is missing, skipping data splitting.")
    # 빈 DatasetDict 생성 (오류 방지용)
    financial_datasets = DatasetDict({
        'train': Dataset.from_dict({'text': [], 'label': []}),
        'validation': Dataset.from_dict({'text': [], 'label': []}),
        'test': Dataset.from_dict({'text': [], 'label': []})
    })

### 2.3. 토큰화

In [None]:
MODEL_NAME = "distilbert-base-uncased" # GoEmotions와 동일 모델 사용 또는 변경 가능
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

In [None]:
def tokenize_function_financial(examples):
    return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=128) # 금융 뉴스는 보통 짧음

In [None]:
if not df.empty and financial_datasets['train']:
    tokenized_financial_datasets = financial_datasets.map(tokenize_function_financial, batched=True)
    # 'text'와 'sentiment' 컬럼은 모델 학습에 직접 사용되지 않으므로 제거 (이미 'label'로 변환됨)
    # 'Unnamed: 0', '__index_level_0__' 등 pandas에서 추가된 인덱스 컬럼도 제거
    columns_to_remove = ['text', 'sentiment']
    # 데이터셋에 따라 추가적으로 생성될 수 있는 인덱스 컬럼명 확인 및 추가
    if 'Unnamed: 0' in tokenized_financial_datasets['train'].column_names:
        columns_to_remove.append('Unnamed: 0')
    if '__index_level_0__' in tokenized_financial_datasets['train'].column_names:
        columns_to_remove.append('__index_level_0__')
        
    tokenized_financial_datasets = tokenized_financial_datasets.remove_columns(columns_to_remove)
    tokenized_financial_datasets.set_format("torch")

    print("\nTokenized Financial Datasets:")
    print(tokenized_financial_datasets)
    if tokenized_financial_datasets['train']:
      print(tokenized_financial_datasets['train'][0])
else:
    print("Dataset is empty, skipping tokenization.")
    # 빈 DatasetDict (오류 방지용)
    tokenized_financial_datasets = DatasetDict({
        'train': Dataset.from_dict({'input_ids': [], 'attention_mask': [], 'label': []}),
        'validation': Dataset.from_dict({'input_ids': [], 'attention_mask': [], 'label': []}),
        'test': Dataset.from_dict({'input_ids': [], 'attention_mask': [], 'label': []})
    })
    tokenized_financial_datasets.set_format("torch")

## 3. 모델 선택 및 학습

In [None]:
model_financial = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME, 
    num_labels=num_fin_labels, 
    id2label=id_to_sentiment, 
    label2id=sentiment_to_id
)

# GPU 사용 가능 여부 확인 및 모델 이동
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_financial.to(device)
print(f"Using device: {device}")

### 3.1. 학습 설정 및 메트릭 정의

In [None]:
def compute_metrics_financial(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return {
        "accuracy": accuracy_score(labels, predictions),
        "f1_macro": f1_score(labels, predictions, average="macro", zero_division=0)
    }

In [None]:
BATCH_SIZE = 16
LEARNING_RATE = 2e-5
NUM_EPOCHS = 3 # 하루 안에 완료를 위해 에포크 수 조정

training_args_financial = TrainingArguments(
    output_dir="./results_financial",
    eval_strategy="epoch",
    save_strategy="epoch",
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    num_train_epochs=NUM_EPOCHS,
    learning_rate=LEARNING_RATE,
    logging_dir="./logs_financial",
    logging_steps=50, # 데이터셋 크기가 작을 수 있으므로 조정
    load_best_model_at_end=True,
    metric_for_best_model="f1_macro",
    greater_is_better=True,
    save_total_limit=2,
    fp16=torch.cuda.is_available(),
    report_to="tensorboard"
)

### 3.2. Trainer 정의 및 모델 학습

In [None]:
trainer_financial = Trainer(
    model=model_financial,
    args=training_args_financial,
    train_dataset=tokenized_financial_datasets["train"],
    eval_dataset=tokenized_financial_datasets["validation"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics_financial,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

In [None]:
# TensorBoard 실행 (Colab 또는 로컬 터미널에서)
# %load_ext tensorboard
# %tensorboard --logdir logs_financial

In [None]:
if not df.empty and tokenized_financial_datasets['train']:
    trainer_financial.train()
else:
    print("Skipping training as dataset is not loaded or processed correctly.")

## 4. 모델 평가 및 분석

In [None]:
if not df.empty and tokenized_financial_datasets['test']:
    eval_results_financial = trainer_financial.evaluate(tokenized_financial_datasets["test"])
    print("\nTest Set Evaluation Results (Financial):")
    for key, value in eval_results_financial.items():
        print(f"{key}: {value:.4f}")
else:
    print("Skipping evaluation as dataset is not loaded or processed correctly.")

In [None]:
if not df.empty and tokenized_financial_datasets['test']:
    predictions_output_financial = trainer_financial.predict(tokenized_financial_datasets["test"])
    logits_financial = predictions_output_financial.predictions
    true_labels_financial = predictions_output_financial.label_ids
    
    predicted_labels_financial = np.argmax(logits_financial, axis=-1)
else:
    print("Skipping prediction as dataset is not loaded or processed correctly.")
    # 빈 배열로 초기화하여 이후 시각화 코드에서 오류 방지
    true_labels_financial = np.array([])
    predicted_labels_financial = np.array([])

### 4.1. 분류 보고서

In [None]:
if true_labels_financial.size > 0:
    financial_label_names = [id_to_sentiment[i] for i in range(num_fin_labels)]
    print("\nFinancial PhraseBank Classification Report (Test Set):")
    print(classification_report(true_labels_financial, predicted_labels_financial, target_names=financial_label_names, zero_division=0))
else:
    print("Cannot generate classification report: No predictions available.")

### 4.2. 혼동 행렬

In [None]:
if true_labels_financial.size > 0:
    cm_financial = confusion_matrix(true_labels_financial, predicted_labels_financial)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm_financial, annot=True, fmt="d", cmap="Blues", 
                xticklabels=financial_label_names, yticklabels=financial_label_names)
    plt.title("Financial PhraseBank Confusion Matrix")
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.show()
else:
    print("Cannot generate confusion matrix: No predictions available.")

## 5. 샘플 예측

In [None]:
def predict_financial_sentiment(text, model, tokenizer, id2label):
    model.eval()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        logits = model(**inputs).logits
    
    predicted_class_id = logits.argmax().item()
    predicted_label = id2label[predicted_class_id]
    
    return predicted_label, torch.softmax(logits, dim=1).squeeze().cpu().numpy()

In [None]:
if not df.empty and 'label' in df.columns: # 모델 학습이 시도되었는지 확인
    sample_financial_texts = [
        "The company reported strong earnings growth this quarter.",
        "Market sentiment remains neutral amidst global uncertainties.",
        "Analysts predict a downturn in stock prices next week.",
        "Despite the volatile market, our portfolio showed a slight increase.",
        "The new regulations are expected to negatively impact the industry."
    ]

    for text in sample_financial_texts:
        # 학습된 model_financial 사용
        predicted_sentiment, probs = predict_financial_sentiment(text, model_financial, tokenizer, id_to_sentiment)
        print(f"\nSample Financial Text: '{text}'")
        print(f"Predicted Sentiment: {predicted_sentiment}")
        # print(f"Probabilities: {probs}") # 각 클래스에 대한 확률 출력 (선택 사항)
else:
    print("\nSkipping sample prediction as model was not trained due to data loading/processing issues.")

## 6. 모델 저장 (선택 사항)

In [None]:
if not df.empty and tokenized_financial_datasets['train'] and hasattr(trainer_financial, 'model'):
    output_model_dir_financial = "./saved_model_financial"
    os.makedirs(output_model_dir_financial, exist_ok=True)

    trainer_financial.save_model(output_model_dir_financial)
    tokenizer.save_pretrained(output_model_dir_financial)

    print(f"Financial model and tokenizer saved to {output_model_dir_financial}")
else:
    print("Skipping model saving as model was not trained or does not exist.")

### 모델 로드 및 사용 예시 (저장된 모델)

In [None]:
# from transformers import AutoModelForSequenceClassification, AutoTokenizer
# import torch

# # 저장된 모델과 토크나이저 로드
# if os.path.exists(output_model_dir_financial if 'output_model_dir_financial' in locals() else './saved_model_financial_placeholder'):
#     loaded_model_financial = AutoModelForSequenceClassification.from_pretrained(output_model_dir_financial)
#     loaded_tokenizer_financial = AutoTokenizer.from_pretrained(output_model_dir_financial)

#     # id_to_sentiment 맵핑 (실제 사용 시 저장/로드 필요)
#     # 이 예제에서는 위에서 정의된 id_to_sentiment를 사용합니다.

#     sample_text_for_loading_test = "The acquisition is expected to boost profits significantly."
#     predicted_sentiment_loaded, _ = predict_financial_sentiment(sample_text_for_loading_test, loaded_model_financial, loaded_tokenizer_financial, id_to_sentiment)

#     print(f"\nSample Financial Text (loaded model): '{sample_text_for_loading_test}'")
#     print(f"Predicted Sentiment: {predicted_sentiment_loaded}")
# else:
#     print("Saved model directory not found. Skipping loading example.")