# 뉴스 편향성 분석 모델

이 노트북은 뉴스 기사의 민주당과 국힘에 대한 편향성을 분석하는 딥러닝 모델을 구현합니다.

In [None]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import (
    AutoTokenizer,
    AutoModel,
    Trainer,
    TrainingArguments,
    EarlyStoppingCallback,
    DataCollatorWithPadding
)
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, f1_score
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import os
os.environ["WANDB_DISABLED"] = "true"

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## 1. 데이터 로드 및 전처리

In [2]:
# 베이스라인 결과 로드
baseline_results = pd.read_csv('./baseline_results/baseline_results.csv').iloc[0]

In [None]:
# 데이터 로드
df = pd.read_csv('../data/정당_관점_라벨링_최종_업데이트.csv')  # 파일 경로는 실제 경로에 맞게 수정해주세요

# 정당 레이블 매핑
party_mapping = {'국민의힘': 0, '민주당': 1, '그외': 2}

# 정당 레이블 변환
df['party_label'] = df['party'].map(party_mapping)

# NaN 값 처리
df = df.dropna(subset=['title_cleaned', 'content_cleaned', 'party_label'])

# 제목과 본문 결합
df['text'] = df['title_cleaned'] + ' ' + df['content_cleaned']

print(f"전체 데이터 수: {len(df)}")
print("\n정당별 기사 수:")
print(df['party'].value_counts())
print("\n정당 레이블 분포:")
print(df['party_label'].value_counts())

## 2. 데이터셋 클래스 정의

In [4]:
class NewsDataset(Dataset):
    def __init__(self, texts, party_labels, tokenizer, max_length=1600):
        self.texts = texts
        self.party_labels = party_labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        encoding = self.tokenizer(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        # 텐서 차원 조정
        input_ids = encoding['input_ids'].squeeze(0)  # [max_length]
        attention_mask = encoding['attention_mask'].squeeze(0)  # [max_length]

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'party_label': torch.tensor(self.party_labels[idx], dtype=torch.long)
        }

## 3. 모델 정의

In [5]:
class NewsBiasModel(torch.nn.Module):
    def __init__(self, model_name, num_party_labels=3, class_weights=None,
                 dropout_rate=0.2, hidden_size=512):
        super().__init__()
        self.longformer = AutoModel.from_pretrained(model_name)
        self.register_buffer("class_weights", torch.tensor(class_weights, dtype=torch.float))

        # 드롭아웃 레이어
        self.dropout = torch.nn.Dropout(dropout_rate)

        # 특성 추출 레이어
        hidden_size_longformer = self.longformer.config.hidden_size
        self.feature_layer = torch.nn.Sequential(
            torch.nn.Linear(hidden_size_longformer, hidden_size),
            torch.nn.LayerNorm(hidden_size),
            torch.nn.ReLU(),
            torch.nn.Dropout(dropout_rate)
        )

        # BiLSTM 레이어
        self.bilstm = torch.nn.LSTM(
            input_size=hidden_size,
            hidden_size=hidden_size // 2,
            num_layers=1,
            batch_first=True,
            bidirectional=True,
            dropout=dropout_rate+0.1
        )

        # 정당 분류기
        self.party_classifier = torch.nn.Sequential(
            torch.nn.Linear(hidden_size, hidden_size // 2),
            torch.nn.LayerNorm(hidden_size // 2),
            torch.nn.ReLU(),
            torch.nn.Dropout(dropout_rate),
            torch.nn.Linear(hidden_size // 2, num_party_labels)
        )

    def forward(self, input_ids, attention_mask, party_label=None):
        # Longformer 출력
        outputs = self.longformer(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = outputs.last_hidden_state  # (batch_size, seq_len, 768)

        # 특성 추출
        features = self.dropout(sequence_output)
        features = self.feature_layer(features)  # (batch_size, seq_len, hidden_size)

        # BiLSTM 적용
        lstm_output, _ = self.bilstm(features)  # (batch_size, seq_len, hidden_size)
        
        # 최종 특성 추출 (평균 풀링)
        final_features = torch.mean(lstm_output, dim=1)  # (batch_size, hidden_size)

        # 정당 예측
        party_logits = self.party_classifier(final_features)  # (batch_size, num_party_labels)

        if party_label is not None:
            loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights)
            party_loss = loss_fct(party_logits, party_label)
            return {'loss': party_loss, 'party_logits': party_logits}

        return {'party_logits': party_logits}

## 4. 학습 준비

In [6]:
# 데이터 분할
train_texts, val_texts, train_party_labels, val_party_labels = train_test_split(
    df['text'].values,
    df['party_label'].values,
    test_size=0.1,
    random_state=42
)

In [None]:
# 토크나이저 초기화
tokenizer = AutoTokenizer.from_pretrained('allenai/longformer-base-4096')

# 데이터셋 생성
train_dataset = NewsDataset(train_texts, train_party_labels, tokenizer, max_length=1600)
val_dataset = NewsDataset(val_texts, val_party_labels, tokenizer, max_length=1600)

In [None]:
label_list = [example['party_label'] for example in train_dataset]
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.array([0, 1, 2]),  # 0: 국민의힘, 1: 민주당, 2: 그외
    y=np.array(label_list)
)
print("클래스 가중치:", class_weights)

In [None]:
# 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 모델 초기화 및 GPU 이동을 분리
model = NewsBiasModel('allenai/longformer-base-4096', class_weights=class_weights)
model = model.to(device)

# 커스텀 데이터 콜레이터
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [10]:
def compute_metrics(eval_pred):
    party_preds = eval_pred.predictions
    party_labels = eval_pred.label_ids

    party_preds = np.argmax(party_preds, axis=1)

    party_report = classification_report(
        party_labels,
        party_preds,
        target_names=['국민의힘', '민주당', '그외'],
        output_dict=True,
        zero_division=0
    )

    return {
        'party_f1': party_report['weighted avg']['f1-score'],
        'party_accuracy': party_report['accuracy'],
    }

In [None]:
# 학습 인자 설정
training_args = TrainingArguments(
    output_dir='./news_bias_longformer_results',  # 디렉토리 이름 변경
    num_train_epochs=9,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    learning_rate=3.3541e-05,
    warmup_ratio=0.1135,
    weight_decay=0.0256,
    logging_dir='./news_bias_longformer_logs',  # 로그 디렉토리 이름 변경
    logging_steps=100,
    eval_strategy='epoch',
    save_strategy='epoch',
    load_best_model_at_end=True,
    metric_for_best_model='party_f1',  # 평가 메트릭 이름 변경
    gradient_accumulation_steps=3,  # 그래디언트 누적
    max_grad_norm=0.7155,  # 그래디언트 클리핑
    fp16=True,  # 혼합 정밀도 학습
    label_smoothing_factor=0.0672,  # 레이블 스무딩
    optim='adamw_torch',  # AdamW 옵티마이저 사용
    lr_scheduler_type='cosine'
)

# 트레이너 초기화
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3, early_stopping_threshold=0.001)]
)

## 5. 학습 및 평가

In [None]:
# 학습 실행
trainer.train()

# 최종 평가
final_metrics = trainer.evaluate()
print("\n최종 평가 결과:")
print(f"정당 분류 F1 점수: {final_metrics['eval_party_f1']:.4f}")
print(f"정당 분류 정확도: {final_metrics['eval_party_accuracy']:.4f}")

In [None]:
# 베이스라인과 성능 비교
comparison_results = {
    'Model': ['Baseline', 'News Bias BERT'],  # 모델 이름 변경
    'Party F1': [baseline_results['party_f1'], final_metrics['eval_party_f1']],  # 메트릭 이름 변경
    'Party Accuracy': [baseline_results['party_accuracy'], final_metrics['eval_party_accuracy']],  # 메트릭 이름 변경
}

comparison_df = pd.DataFrame(comparison_results)
comparison_df.to_csv('./news_bias_results/model_comparison.csv', index=False)  # 저장 경로 변경
comparison_df.head()

In [None]:
# 성능 향상 시각화
plt.figure(figsize=(12, 6))
metrics = ['Party F1', 'Party Accuracy']  # 메트릭 이름 변경
x = np.arange(len(metrics))
width = 0.35

plt.bar(x - width/2, comparison_df.iloc[0, 1:], width, label='Baseline')
plt.bar(x + width/2, comparison_df.iloc[1, 1:], width, label='News Bias BERT')  # 모델 이름 변경

plt.xlabel('Metrics')
plt.ylabel('Score')
plt.title('Model Performance Comparison')
plt.xticks(x, metrics)
plt.legend()

plt.savefig('./news_bias_results/performance_comparison.png')  # 저장 경로 변경
plt.show()

## 6. 하이퍼파라미터 튜닝

In [None]:
!pip install optuna
!pip install optuna-integration[pytorch_lightning]  # 설치 후 세션을 다시 시작해야 함

In [15]:
import optuna
from optuna.integration import PyTorchLightningPruningCallback
from sklearn.model_selection import KFold

In [16]:
def objective(trial):
    # 하이퍼파라미터 정의
    params = {
        'learning_rate': trial.suggest_float('learning_rate', 1e-6, 1e-4, log=True),
        'weight_decay': trial.suggest_float('weight_decay', 0.01, 0.04),
        'dropout_rate': trial.suggest_float('dropout_rate', 0.1, 0.3),
        'hidden_size': trial.suggest_int('hidden_size', 256, 1024),
        'num_epochs': trial.suggest_int('num_epochs', 5, 15),
        'batch_size': trial.suggest_categorical('batch_size', [16, 32, 64]),
        'warmup_ratio': trial.suggest_float('warmup_ratio', 0.1, 0.2),
        'label_smoothing': trial.suggest_float('label_smoothing', 0.05, 0.10),
        'gradient_accumulation_steps': trial.suggest_int('gradient_accumulation_steps', 1, 4),
        'max_grad_norm': trial.suggest_float('max_grad_norm', 0.5, 1.0)
    }

    # K-fold 교차 검증
    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    scores = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(df)):
        # 데이터 분할
        train_texts = df.iloc[train_idx]['text'].values
        val_texts = df.iloc[val_idx]['text'].values
        train_labels = df.iloc[train_idx]['party_label'].values
        val_labels = df.iloc[val_idx]['party_label'].values

        # 데이터셋 생성
        train_dataset = NewsDataset(train_texts, train_labels, tokenizer)
        val_dataset = NewsDataset(val_texts, val_labels, tokenizer)

        # 모델 초기화
        model = NewsBiasModel(
            'klue/roberta-base',
            class_weights=class_weights,
            dropout_rate=params['dropout_rate'],
            hidden_size=params['hidden_size']
        )
        model = model.to(device)

        # 학습 인자 설정
        training_args = TrainingArguments(
            output_dir='./temp_results',
            num_train_epochs=params['num_epochs'],
            per_device_train_batch_size=params['batch_size'],
            per_device_eval_batch_size=params['batch_size'] * 2,
            learning_rate=params['learning_rate'],
            warmup_ratio=params['warmup_ratio'],
            weight_decay=params['weight_decay'],
            logging_dir='./temp_logs',
            logging_steps=100,
            eval_strategy='epoch',
            save_strategy='no',
            load_best_model_at_end=False,
            save_total_limit=0,
            metric_for_best_model='party_f1',
            gradient_accumulation_steps=params['gradient_accumulation_steps'],
            max_grad_norm=params['max_grad_norm'],
            fp16=True,
            label_smoothing_factor=params['label_smoothing'],
            optim='adamw_torch',
            lr_scheduler_type='cosine'
        )

        # 트레이너 초기화
        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
            compute_metrics=compute_metrics,
            callbacks=[
                EarlyStoppingCallback(
                    early_stopping_patience=3,
                    early_stopping_threshold=0.001
                )
            ]
        )

        # 학습
        trainer.train()

        # 평가
        metrics = trainer.evaluate()
        scores.append(metrics['eval_party_f1'])

    # 평균 F1 점수 반환
    return np.mean(scores)

In [None]:
# Optuna 스터디 생성 및 최적화 실행
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20)  # 20회 시도

# 최적의 하이퍼파라미터 출력
print("Best trial:")
trial = study.best_trial
print("  Value: ", trial.value)
print("  Params: ")
for key, value in trial.params.items():
    print(f"    {key}: {value}")

In [None]:
# 모든 trial의 결과를 DataFrame으로 저장
trials_df = study.trials_dataframe()
trials_df.to_csv('hyperparameter_trials.csv', index=False)
trials_df.head()

In [None]:
print(trials_df.sort_values(by='value', ascending=False).iloc[0])

In [18]:
# 최적의 하이퍼파라미터로 모델 학습
best_params = trial.params

In [None]:
best_params

## 7. 모델 저장 및 로드

In [24]:
# 모델 저장
trainer.save_model('./news_bias_longformer_model')
tokenizer.save_pretrained('./news_bias_longformer_model')

# 모델 로드
def load_model(model_path):
    model = NewsBiasModel('allenai/longformer-base-4096')
    model.load_state_dict(torch.load(f'{model_path}/pytorch_model.bin'))
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    return model, tokenizer

## 8. 새로운 기사에 대한 예측

In [None]:
def predict_news(text, model, tokenizer, device):
    model.eval()
    encoding = tokenizer(
        text,
        add_special_tokens=True,
        max_length=1600,
        padding='max_length',
        truncation=True,
        return_tensors='pt'
    )

    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        party_pred = torch.argmax(outputs['party_logits'], dim=1).item()

    # 정당 레이블 매핑
    party_mapping = {0: '국민의힘', 1: '민주당', 2: '그외'}
    return party_mapping[party_pred]  # 숫자 레이블을 정당 이름으로 변환하여 반환

In [None]:
def predict_csv_file(csv_path, model, tokenizer, device):
    # CSV 파일 로드
    df = pd.read_csv(csv_path)

    # 예측 결과를 저장할 리스트
    party_predictions = []

    # 각 텍스트에 대해 예측 수행
    for text in tqdm(df['text'], desc="Predicting"):
        party_pred = predict_news(text, model, tokenizer, device)
        party_predictions.append(party_pred)

    # 예측 결과를 DataFrame에 추가
    df['party'] = party_predictions

    # 결과 저장
    output_path = csv_path.replace('.csv', '_predicted.csv')
    df.to_csv(output_path, index=False)

    # 예측 결과 통계 출력
    print(f"\n{csv_path} 예측 결과:")
    print("\n정당 예측 분포:")
    print(df['party'].value_counts())

    return df

In [None]:
# 모델 로드
model, tokenizer = load_model('./news_bias_longformer_model')  # 모델 경로 변경
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# CSV 파일 예측
csv_path = "../data/전체1.csv"  # 여기에 실제 CSV 파일 경로를 입력하시면 됩니다
predicted_df = predict_csv_file(csv_path, model, tokenizer, device)
predicted_df.head()

In [None]:
# 저장
predicted_df.to_csv(csv_path, index=False)