1. 모델 클래스:
   - `KoBERTRubricScorer`: 루브릭 기반 점수 예측을 위한 KoBERT 회귀 모델
   - `KoBERTRubricClassifier`: 루브릭 기반 점수 분류를 위한 KoBERT 분류 모델
   - `KoBERTRubricMTLScorer`: 루브릭 기반 점수 예측을 위한 KoBERT MTL 모델
   - `KoBERTRubricMTLRegressor`: 루브릭 기반 점수 예측을 위한 KoBERT MTL 회귀 모델

2. 데이터셋 클래스:
   - `RubricScoringDataset`: 루브릭 기반 점수 평가 데이터셋을 로드하고 전처리하는 클래스

3. 데이터 로더 함수:
   - `create_data_loader`: 데이터셋을 배치 단위로 로드하는 데이터 로더를 생성하는 함수

4. 트레이너 클래스:
   - `RubricScorerTrainer`: 모델 학습을 수행하는 트레이너 클래스

5. 앙상블 클래스:
   - `RubricScorerEnsemble`: 여러 모델의 예측 결과를 앙상블하는 클래스

6. 평가 함수:
   - `evaluate_regression`: 회귀 모델의 평가 지표(MSE)를 계산하는 함수
   - `evaluate_classification`: 분류 모델의 평가 지표(정확도, F1 점수)를 계산하는 함수

7. 데이터 전처리 및 증강 함수:
   - `preprocess_text`: 텍스트 데이터를 전처리하는 함수
   - `augment_text`: 텍스트 데이터를 증강하는 함수

8. 메인 함수:
   - 데이터 로드, 전처리, 모델 생성, 학습, 앙상블, 평가 등의 전체 프로세스를 수행합니다.

코드의 실행 흐름은 다음과 같습니다:

1. 데이터 로드 및 전처리
2. 모델 생성 및 옵티마이저, 손실 함수 설정
3. 각 모델 별로 `RubricScorerTrainer`를 사용하여 학습 수행
4. 앙상블 모델 생성 및 예측
5. 회귀 모델과 분류 모델의 평가 지표 계산 및 출력

In [2]:
import re
import random
import numpy as np
import torch
import torch.nn as nn
from transformers import BertModel, BertConfig, BertTokenizer
from torch.utils.data import DataLoader, random_split
from konlpy.tag import Mecab, Okt
from sklearn.metrics import mean_squared_error, accuracy_score, f1_score
import pandas as pd

# 모델
class KoBERTRubricScorer(nn.Module):
    def __init__(self, num_tasks, hidden_size=768, dropout_rate=0.1):
        super().__init__()
        config = BertConfig.from_pretrained('monologg/kobert')
        config.num_labels = num_tasks
        self.bert = BertModel.from_pretrained('monologg/kobert', config=config)
        self.dropout = nn.Dropout(dropout_rate)
        self.regressor = nn.Linear(hidden_size, num_tasks)

    def forward(self, input_ids, attention_mask=None, token_type_ids=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        logits = self.regressor(pooled_output)
        return logits

class KoBERTRubricClassifier(nn.Module):
    def __init__(self, num_classes, hidden_size=768, dropout_rate=0.1):
        super().__init__()
        config = BertConfig.from_pretrained('monologg/kobert')
        config.num_labels = num_classes
        self.bert = BertModel.from_pretrained('monologg/kobert', config=config)
        self.dropout = nn.Dropout(dropout_rate)
        self.classifier = nn.Linear(hidden_size, num_classes)

    def forward(self, input_ids, attention_mask=None, token_type_ids=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits

class KoBERTRubricMTLScorer(nn.Module):
    def __init__(self, num_tasks, hidden_size=768, dropout_rate=0.1):
        super().__init__()
        config = BertConfig.from_pretrained('monologg/kobert')
        self.bert = BertModel.from_pretrained('monologg/kobert', config=config)
        self.dropout = nn.Dropout(dropout_rate)
        self.regression_heads = nn.ModuleList([nn.Linear(hidden_size, 1) for _ in range(num_tasks)])
        self.classification_head = nn.Linear(hidden_size, 5)

    def forward(self, input_ids, attention_mask=None, token_type_ids=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        regression_outputs = [head(pooled_output) for head in self.regression_heads]
        regression_outputs = torch.cat(regression_outputs, dim=-1)
        classification_output = self.classification_head(pooled_output)
        return regression_outputs, classification_output

class KoBERTRubricMTLRegressor(nn.Module):
    def __init__(self, num_tasks, hidden_size=768, dropout_rate=0.1):
        super().__init__()
        config = BertConfig.from_pretrained('monologg/kobert')
        self.bert = BertModel.from_pretrained('monologg/kobert', config=config)
        self.dropout = nn.Dropout(dropout_rate)
        self.regression_heads = nn.ModuleList([nn.Linear(hidden_size, 1) for _ in range(num_tasks)])

    def forward(self, input_ids, attention_mask=None, token_type_ids=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        regression_outputs = [head(pooled_output) for head in self.regression_heads]
        regression_outputs = torch.cat(regression_outputs, dim=-1)
        return regression_outputs

# 데이터셋
class RubricScoringDataset(torch.utils.data.Dataset):
    def __init__(self, data, tokenizer, max_length, okt=None, stopwords=None, synonym_dict=None, augment_ratio=0.0):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.okt = okt
        self.stopwords = stopwords
        self.synonym_dict = synonym_dict
        self.augment_ratio = augment_ratio

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data.iloc[idx]
        text = self.preprocess_text(item['text'])
        if self.augment_ratio > 0 and random.random() < self.augment_ratio:
            text = self.augment_text(text)
        inputs = self.tokenizer(text, max_length=self.max_length, truncation=True, padding='max_length',
                                return_tensors='pt')
        labels = torch.tensor(item['labels'], dtype=torch.float32)
        return inputs['input_ids'].squeeze(), inputs['attention_mask'].squeeze(), labels

    def preprocess_text(self, text):
        if self.okt is not None and self.stopwords is not None:
            tokens = self.okt.morphs(text)
            tokens = [token for token in tokens if token not in self.stopwords]
            text = ' '.join(tokens)
        return text

    def augment_text(self, text):
        if self.synonym_dict is not None:
            words = text.split()
            for i, word in enumerate(words):
                if word in self.synonym_dict and random.random() < 0.1:
                    words[i] = random.choice(self.synonym_dict[word])
            text = ' '.join(words)
        return text

# 데이터 로더
def create_data_loader(dataset, batch_size, shuffle=True):
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

# 트레이너
class RubricScorerTrainer:
    def __init__(self, model, train_dataloader, val_dataloader, optimizer, criterion, device):
        self.model = model
        self.train_dataloader = train_dataloader
        self.val_dataloader = val_dataloader
        self.optimizer = optimizer
        self.criterion = criterion
        self.device = device

    def train(self, num_epochs):
        self.model.to(self.device)

        for epoch in range(num_epochs):
            self.model.train()
            train_loss = 0.0
            train_correct = 0
            train_total = 0

            for batch in self.train_dataloader:
                input_ids, attention_mask, labels = [data.to(self.device) for data in batch]

                self.optimizer.zero_grad()
                outputs = self.model(input_ids, attention_mask)

                if isinstance(self.model, KoBERTRubricClassifier):
                    loss = self.criterion(outputs, labels[:, 3].long())
                    _, predicted = torch.max(outputs, 1)
                    train_correct += (predicted == labels[:, 3].long()).sum().item()
                    train_total += labels.size(0)
                else:
                    loss = self.criterion(outputs, labels[:, :3])

                loss.backward()
                self.optimizer.step()

                train_loss += loss.item()

            train_loss /= len(self.train_dataloader)
            train_accuracy = train_correct / train_total if isinstance(self.model, KoBERTRubricClassifier) else None

            val_loss, val_accuracy = self.validate()

            print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy if train_accuracy else '-'}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy if val_accuracy else '-'}")

    def validate(self):
        self.model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for batch in self.val_dataloader:
                input_ids, attention_mask, labels = [data.to(self.device) for data in batch]

                outputs = self.model(input_ids, attention_mask)

                if isinstance(self.model, KoBERTRubricClassifier):
                    loss = self.criterion(outputs, labels[:, 3].long())
                    _, predicted = torch.max(outputs, 1)
                    val_correct += (predicted == labels[:, 3].long()).sum().item()
                    val_total += labels.size(0)
                else:
                    loss = self.criterion(outputs, labels[:, :3])

                val_loss += loss.item()

        val_loss /= len(self.val_dataloader)
        val_accuracy = val_correct / val_total if isinstance(self.model, KoBERTRubricClassifier) else None

        return val_loss, val_accuracy

# 앙상블
class RubricScorerEnsemble:
    def __init__(self, models, ensemble_method='mean'):
        self.models = models
        self.ensemble_method = ensemble_method

    def predict(self, dataloader, device):
        predictions = []
        for model in self.models:
            model.eval()
            model_predictions = []
            with torch.no_grad():
                for batch in dataloader:
                    input_ids, attention_mask, _ = [data.to(device) for data in batch]
                    outputs = model(input_ids, attention_mask)
                    if isinstance(model, KoBERTRubricClassifier):
                        outputs = torch.softmax(outputs, dim=1)
                    model_predictions.extend(outputs.cpu().numpy())
            predictions.append(model_predictions)

        if self.ensemble_method == 'mean':
            ensemble_predictions = np.mean(predictions, axis=0)
        elif self.ensemble_method == 'weighted_mean':
            weights = self._calculate_weights(predictions)
            ensemble_predictions = np.average(predictions, axis=0, weights=weights)
        elif self.ensemble_method == 'voting':
            ensemble_predictions = self._voting_ensemble(predictions)
        else:
            raise ValueError(f"지원하지 않는 앙상블 방법입니다: {self.ensemble_method}")

        return ensemble_predictions

    def _calculate_weights(self, predictions):
        # 각 모델의 예측 결과를 기반으로 가중치를 계산하는 로직을 구현합니다.
        # 예시: 각 모델의 예측 결과의 표준편차의 역수로 가중치를 설정
        weights = [1 / np.std(pred) for pred in predictions]
        weights = np.array(weights) / np.sum(weights)
        return weights

    def _voting_ensemble(self, predictions):
        # 투표 기반 앙상블을 수행합니다.
        voted_predictions = np.apply_along_axis(lambda x: np.argmax(np.bincount(x)), axis=0, arr=np.argmax(predictions, axis=2))
        return voted_predictions

# 평가
def evaluate_regression(y_true, y_pred):
    mse = mean_squared_error(y_true, y_pred)
    return mse

def evaluate_classification(y_true, y_pred):
    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average='weighted')
    return accuracy, f1

# 데이터 전처리 및 증강
def preprocess_text(text, okt, stopwords):
    # 형태소 분석 및 불용어 제거 등의 전처리 수행
    tokens = okt.morphs(text)
    tokens = [token for token in tokens if token not in stopwords]
    return ' '.join(tokens)

def augment_text(text, okt, synonym_dict, augment_ratio):
    # 동의어 대체, 삽입, 삭제 등의 데이터 증강 수행
    if random.random() < augment_ratio:
        # 동의어 대체
        words = text.split()
        for i, word in enumerate(words):
            if word in synonym_dict and random.random() < 0.1:
                words[i] = random.choice(synonym_dict[word])
        text = ' '.join(words)
    return text

In [None]:
# 메인 함수
def main():
    # 데이터 로드
    train_data = pd.read_csv('train_data.csv')
    val_data = pd.read_csv('val_data.csv')
    test_data = pd.read_csv('test_data.csv')

    # 토크나이저 및 전처리 도구 로드
    tokenizer = BertTokenizer.from_pretrained('monologg/kobert')
    okt = Okt()
    stopwords = ['은', '는', '이', '가', '을', '를']
    synonym_dict = {'좋아하다': ['즐기다', '기뻐하다'], '훌륭하다': ['우수하다', '뛰어나다']}

    # 데이터셋 생성
    train_dataset = RubricScoringDataset(train_data, tokenizer, max_length=128, okt=okt, synonym_dict=synonym_dict, augment_ratio=0.1)
    val_dataset = RubricScoringDataset(val_data, tokenizer, max_length=128)
    test_dataset = RubricScoringDataset(test_data, tokenizer, max_length=128)

    # 데이터 로더 생성
    train_dataloader = create_data_loader(train_dataset, batch_size=16)
    val_dataloader = create_data_loader(val_dataset, batch_size=16)
    test_dataloader = create_data_loader(test_dataset, batch_size=16)

    # 모델 생성
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    rubric_scorer = KoBERTRubricScorer(num_tasks=3).to(device)
    rubric_classifier = KoBERTRubricClassifier(num_classes=5).to(device)
    mtl_rubric_scorer = KoBERTRubricMTLScorer(num_tasks=3).to(device)
    mtl_rubric_regressor = KoBERTRubricMTLRegressor(num_tasks=3).to(device)

    # 옵티마이저 및 손실 함수 설정
    scorer_optimizer = torch.optim.Adam(rubric_scorer.parameters(), lr=1e-5)
    scorer_criterion = nn.MSELoss()

    classifier_optimizer = torch.optim.Adam(rubric_classifier.parameters(), lr=1e-5)
    classifier_criterion = nn.CrossEntropyLoss()

    mtl_scorer_optimizer = torch.optim.Adam(mtl_rubric_scorer.parameters(), lr=1e-5)
    mtl_scorer_criterion = nn.MSELoss()

    mtl_regressor_optimizer = torch.optim.Adam(mtl_rubric_regressor.parameters(), lr=1e-5)
    mtl_regressor_criterion = nn.MSELoss()
    # 훈련
    num_epochs = 10

    # Rubric Scorer 훈련
    print("Training Rubric Scorer...")
    rubric_scorer_trainer = RubricScorerTrainer(rubric_scorer, train_dataloader, val_dataloader,
                                                scorer_optimizer, scorer_criterion, device)
    rubric_scorer_trainer.train(num_epochs)

    # Rubric Classifier 훈련
    print("Training Rubric Classifier...")
    rubric_classifier_trainer = RubricScorerTrainer(rubric_classifier, train_dataloader, val_dataloader,
                                                    classifier_optimizer, classifier_criterion, device)
    rubric_classifier_trainer.train(num_epochs)

    # MTL Rubric Scorer 훈련
    print("Training MTL Rubric Scorer...")
    mtl_rubric_scorer_trainer = RubricScorerTrainer(mtl_rubric_scorer, train_dataloader, val_dataloader,
                                                    mtl_scorer_optimizer, mtl_scorer_criterion, device)
    mtl_rubric_scorer_trainer.train(num_epochs)

    # MTL Rubric Regressor 훈련
    print("Training MTL Rubric Regressor...")
    mtl_rubric_regressor_trainer = RubricScorerTrainer(mtl_rubric_regressor, train_dataloader, val_dataloader,
                                                    mtl_regressor_optimizer, mtl_regressor_criterion, device)
    mtl_rubric_regressor_trainer.train(num_epochs)

    # 앙상블 설정
    ensemble_method = 'weighted_mean'
    models = [rubric_scorer, rubric_classifier, mtl_rubric_scorer, mtl_rubric_regressor]
    ensemble = RubricScorerEnsemble(models, ensemble_method)

    # 평가
    ensemble_predictions = ensemble.predict(test_dataloader, device)

    # 회귀 모델 평가
    regression_labels = test_data['labels'].values[:, :3]
    regression_mse = evaluate_regression(regression_labels, ensemble_predictions[:, :3])
    print(f"Ensemble Regression MSE: {regression_mse:.4f}")

    # 분류 모델 평가
    classification_labels = test_data['labels'].values[:, 3]
    classification_predictions = np.argmax(ensemble_predictions[:, 3:], axis=1)
    accuracy, f1 = evaluate_classification(classification_labels, classification_predictions)
    print(f"Ensemble Classification Accuracy: {accuracy:.4f}")
    print(f"Ensemble Classification F1 Score: {f1:.4f}")