In [1]:
from transformers import AutoModelForSequenceClassification, TrainingArguments, DistilBertConfig
from tokenization_kobert import KoBertTokenizer
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.model_selection import train_test_split
import os
import numpy as np
from torch.nn.utils.rnn import pad_sequence

# 1. 데이터 로드
train_data = pd.read_csv("../data/sentiment_data/train.csv", sep='\t')
val_data = pd.read_csv("../data/sentiment_data/validation.csv", sep='\t')
test_data = pd.read_csv("../data/sentiment_data/test.csv", sep='\t')

#data = pd.read_csv("../data/sentiment_data/korean_investment_data.csv", sep='\t')  # data 변수 초기화

In [2]:
# # 2. 학습, 테스트 데이터 분리
# train, test = train_test_split(data, test_size=0.2, random_state=42)

# # 3. train, validation 데이터 분리
# train, val = train_test_split(train, test_size=0.1, random_state=42)

In [2]:
# 4. 레이블 정의
label_list = ["안정형", "안정추구형", "위험중립형", "공격투자형", "공격형"]
label_dict = {label: i for i, label in enumerate(label_list)}  # label_dict 변수 초기화
id2label = {i: label for i, label in enumerate(label_list)}
label2id = {label: i for i, label in enumerate(label_list)}

In [3]:
# 5. 데이터셋 객체 생성
def preprocess_text(text, max_length):
    if len(text) > max_length:
        text = text[:max_length]  # 텍스트 길이 제한
    return text

class InvestmentDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length=512):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.iloc[idx]["text"]
        label = label_dict[self.data.iloc[idx]["label"]]

        # 텍스트 전처리
        text = preprocess_text(text, self.max_length)

        encoding = self.tokenizer.encode_plus(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(0),
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'labels': torch.tensor(label, dtype=torch.long)
        }


In [4]:
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def compute_metrics(pred):
    labels = pred['label_ids']
    predictions = np.array(pred['predictions'])
    # predictions이 1차원이면 이미 정수형 레이블로 처리된 것으로 가정
    if predictions.ndim == 1:
        preds = predictions
    else:
        preds = predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

In [5]:
# 7. 토크나이저 및 모델 로드
tokenizer = KoBertTokenizer.from_pretrained("monologg/distilkobert", pad_token='[PAD]')

# 8. 모델 설정
config = DistilBertConfig.from_pretrained(
    "monologg/distilkobert",
    num_labels=len(label_list),
    id2label=id2label,
    label2id=label2id
)

# 9. 모델 초기화
model = AutoModelForSequenceClassification.from_pretrained(
    "monologg/distilkobert",
    config=config
)

model.config.return_dict = True  # 이 줄을 추가해서 출력이 딕셔너리 형태가 되도록 함

In [6]:
# 10. 데이터셋 객체 생성
train_dataset = InvestmentDataset(train_data, tokenizer)
val_dataset = InvestmentDataset(val_data, tokenizer)
test_dataset = InvestmentDataset(test_data, tokenizer)

In [7]:
# 11. 학습 설정
training_args = TrainingArguments(
    output_dir="./models/classifier_model/distilkobert_investment",
    learning_rate=2e-5,
    num_train_epochs=5,
    weight_decay=0.01,
    save_steps=500,
    save_total_limit=3
)

In [8]:
# 12. 데이터 로더 생성
def collate_fn(batch):
    input_ids = [item['input_ids'] for item in batch]
    attention_mask = [item['attention_mask'] for item in batch]
    labels = [item['labels'] for item in batch]

    # 패딩
    input_ids = pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id)
    attention_mask = pad_sequence(attention_mask, batch_first=True, padding_value=0)
    labels = torch.tensor(labels)

    return {
        'input_ids': input_ids,
        'attention_mask': attention_mask,
        'labels': labels
    }

train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
eval_dataloader = DataLoader(val_dataset, batch_size=16, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)

In [9]:
# 13. 옵티마이저 설정
optimizer = torch.optim.AdamW(model.parameters(), lr=training_args.learning_rate)

In [10]:
# 14. 학습 루프
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)

model.train()
for epoch in range(training_args.num_train_epochs):
    for batch in train_dataloader:
        # 배치 데이터를 GPU로 이동
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        # 순전파
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

        # 손실 값 추출
        loss = outputs[0]  # outputs이 튜플 형태인 경우
        logits = outputs[1] if len(outputs) > 1 else None

        # 역전파 및 최적화
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

# 검증 루프
model.eval()
eval_loss = 0
eval_preds = []
eval_labels = []

with torch.no_grad():
    for batch in eval_dataloader:
        # 배치 데이터를 GPU로 이동
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        # 순전파
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

        # 손실 값 추출
        loss = outputs[0]  # outputs이 튜플 형태인 경우
        logits = outputs[1] if len(outputs) > 1 else None

        # 예측
        preds = torch.argmax(logits, dim=-1).cpu().numpy()
        labels = labels.cpu().numpy()

        # 예측 및 실제 레이블 저장
        eval_loss += loss.item()
        eval_preds.extend(preds)
        eval_labels.extend(labels)

# 평가 지표 계산
unique_labels = np.unique(eval_labels)
if len(unique_labels) > 1:
    try:
        eval_metrics = compute_metrics({"predictions": eval_preds, "label_ids": eval_labels})
        print(f"Epoch {epoch+1}: Eval Loss={eval_loss/len(eval_dataloader):.4f}, Eval Accuracy={eval_metrics['accuracy']:.4f}")
    except ValueError as e:
        print(f"ValueError during compute_metrics: {e}")
        print("Skipping evaluation for this epoch.")
else:
    print(f"Epoch {epoch+1}: Eval Loss={eval_loss/len(eval_dataloader):.4f}, Eval Accuracy=N/A (Single label)")


KeyboardInterrupt: 

In [28]:
# 15. 테스트 데이터 평가
model.eval()
test_loss = 0
test_preds = []
test_labels = []

with torch.no_grad():
    for batch in test_dataloader:
        # 배치 데이터를 GPU로 이동
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        # 순전파
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        
        # 튜플 형태로 반환되는 경우
        loss = outputs[0]
        logits = outputs[1]

        # 예측 및 실제 레이블 저장
        test_loss += loss.item()
        preds = torch.argmax(logits, dim=-1).cpu().numpy()
        labels = labels.cpu().numpy()
        test_preds.extend(preds)
        test_labels.extend(labels)

In [29]:
# 평가 지표 계산
test_metrics = compute_metrics({"predictions": test_preds, "label_ids": test_labels})
print(f"Test Loss={test_loss/len(test_dataloader):.4f}, Test Accuracy={test_metrics['accuracy']:.4f}")

Test Loss=1.6030, Test Accuracy=0.2100


In [None]:
#모델 저장
model_path = "./models/classifier_model/distilkobert_investment/kobert_fined_model"

model.save_pretrained(model_path)
# 토크나이저 저장
tokenizer.save_vocabulary(model_path)