In [None]:
!git clone https://github.com/kmasta/Aiffel_DLThon.git

# Baseline Code

## 필요한 라이브러리 임포트

In [None]:
import yaml
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback, ElectraForSequenceClassification
from sklearn.metrics import f1_score, accuracy_score
from torch.utils.data import Dataset
import torch
import os

from Aiffel_DLThon.pytorch_code.callback.logger import log_message, log_target_distribution, LoggingCallback
from Aiffel_DLThon.pytorch_code.callback.early_stopping import EarlyStopping
from Aiffel_DLThon.pytorch_code.callback.checkpoint import save_model
from Aiffel_DLThon.pytorch_code.callback.save_results import save_submission

## 모델 정보 불러오기

In [None]:
# Load model name
with open("./Aiffel_DLThon/pytorch_code/config/model_name.yaml", "r") as f:
    model_name_config = yaml.safe_load(f)
model_key = model_name_config["model_name"]

# Load full config for that model
with open(f"./Aiffel_DLThon/pytorch_code/config/{model_key}.yaml", "r") as f:
    config = yaml.safe_load(f)

log_message(f"Running experiment: {config['experiment_name']}", config["log_dir"])

## 토크나이저, 모델 구현

In [None]:
from transformers import ElectraPreTrainedModel, ElectraModel
from transformers.modeling_outputs import SequenceClassifierOutput
import torch.nn.functional as F
import torch.nn as nn

class ElectraTwoStageModel(ElectraPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.electra = ElectraModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.stage1_classifier = nn.Linear(config.hidden_size, 2)  # Binary
        self.stage2_classifier = nn.Linear(config.hidden_size, 4)  # Multiclass

        self.init_weights()

    def forward(self, input_ids, attention_mask=None, token_type_ids=None,
            binary_labels=None, multiclass_labels=None):
        outputs = self.electra(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids
        )
        cls_output = self.dropout(outputs.last_hidden_state[:, 0, :])

        # Stage 1: 일반 vs 특수
        stage1_logits = self.stage1_classifier(cls_output)

        # Stage 2: 일반 중 세부 분류
        stage2_logits = self.stage2_classifier(cls_output)

        loss = None
        if binary_labels is not None and multiclass_labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss1 = loss_fct(stage1_logits, binary_labels)

            mask = (binary_labels == 0)
            if mask.sum() > 0:
                loss2 = loss_fct(stage2_logits[mask], multiclass_labels[mask])
                loss = loss1 + loss2
            else:
                loss = loss1

        return SequenceClassifierOutput(
            loss=loss,
            logits=stage1_logits
        )

In [None]:
tokenizer = AutoTokenizer.from_pretrained(config["tokenizer_name"])

## 커스텀 데이터셋 클래스(토크나이징)

In [None]:
class TwoStageDataset(Dataset):
    def __init__(self, encodings, binary_labels, multiclass_labels):
        self.encodings = encodings
        self.binary_labels = binary_labels
        self.multiclass_labels = multiclass_labels

    def __len__(self):
        return len(self.binary_labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.binary_labels[idx])
        item["binary_labels"] = torch.tensor(self.binary_labels[idx])
        item["multiclass_labels"] = torch.tensor(self.multiclass_labels[idx])
        return item

## 데이터 불러오기

In [None]:
from sklearn.model_selection import train_test_split

def prepare_two_stage_dataset(texts, labels, tokenizer, max_length=128, test_size=0.2):
    # Stage 1 labels: 0~3 → 0, 4 → 1
    binary_labels = [0 if l in [0, 1, 2, 3] else 1 for l in labels]

    # Stage 2 labels: 0~3은 그대로, 4는 -1
    multiclass_labels = [l if l in [0, 1, 2, 3] else -1 for l in labels]

    # Split 먼저
    train_texts, val_texts, train_binary, val_binary, train_multi, val_multi = train_test_split(
        texts, binary_labels, multiclass_labels, test_size=test_size, stratify=labels, random_state=42
    )

    # Tokenize 각 split
    train_encodings = tokenizer(train_texts, truncation=True, padding=True, max_length=max_length,return_tensors="pt" )
    val_encodings = tokenizer(val_texts, truncation=True, padding=True, max_length=max_length,return_tensors="pt" )

    # Dataset 생성
    train_dataset = TwoStageDataset(train_encodings, train_binary, train_multi)
    val_dataset = TwoStageDataset(val_encodings, val_binary, val_multi)

    return train_dataset, val_dataset

In [None]:
train_df = pd.read_csv(config["train_file"])
texts = train_df["clean_text"].tolist()
labels = train_df["label"].tolist()

daily_df = pd.read_csv(config["daily_file"])
texts.extend(daily_df["clean_text"].tolist())
labels.extend([4 for _ in daily_df["label"].tolist()])

"""gen1_df = pd.read_csv(config["gen1_file"])
texts.extend(gen1_df["clean_text"].tolist())
labels.extend([4 for _ in gen1_df["label"].tolist()])

gen2_df = pd.read_csv(config["gen2_file"])
texts.extend(gen2_df["clean_text"].tolist())
labels.extend([4 for _ in gen2_df["label"].tolist()])"""

#ai_daily_df = pd.read_csv(config["aihub_daily_file"])
#texts.extend(ai_daily_df["conversation"].tolist())
#labels.extend([4 for _ in ai_daily_df["class"].tolist()])

#dataset = ChatDataset(texts, labels, tokenizer, config["max_length"])

In [None]:
train_dataset, val_dataset = prepare_two_stage_dataset(texts, labels, tokenizer)

In [None]:
label_counts = pd.Series(labels).value_counts().sort_index()

print("📊 train label 클래스 분포:")
print(label_counts)

In [None]:
"""label_counts = pd.Series(val_labels).value_counts().sort_index()

print("📊 validation label 클래스 분포:")
print(label_counts)"""

## 매트릭 함수

In [None]:
from sklearn.metrics import f1_score, accuracy_score
import numpy as np
from collections import Counter

def evaluate_stage1(trainer, val_dataset):
    pred_output = trainer.predict(val_dataset)
    y_true = pred_output.label_ids
    y_pred = np.argmax(pred_output.predictions, axis=1)

    # 메트릭 계산
    f1 = f1_score(y_true, y_pred, average="macro")
    acc = accuracy_score(y_true, y_pred)

    # 분포 확인
    print("📊 Stage 1 정답 분포:", Counter(y_true))
    print("📊 Stage 1 예측 분포:", Counter(y_pred))
    print("✅ Stage 1 Accuracy:", acc)
    print("✅ Stage 1 F1 (macro):", f1)

    return {
        "f1_macro": f1,
        "accuracy": acc
    }

In [None]:
from sklearn.metrics import f1_score, accuracy_score
import numpy as np
from collections import Counter

def evaluate_stage2(trainer, val_dataset):
    # 예측
    pred_output = trainer.predict(val_dataset)
    y_true = pred_output.label_ids
    y_pred = np.argmax(pred_output.predictions, axis=1)

    # 메트릭 계산
    f1 = f1_score(y_true, y_pred, average="macro")
    acc = accuracy_score(y_true, y_pred)

    # 분포 확인
    print("📊 정답 분포:", Counter(y_true))
    print("📊 예측 분포:", Counter(y_pred))
    print("✅ Stage 2 Accuracy:", acc)
    print("✅ Stage 2 F1 (macro):", f1)

    return {
        "f1_macro": f1,
        "accuracy": acc
    }


In [None]:
"""def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1_macro": f1_score(labels, preds, average="macro")
    }"""

In [None]:
def compute_metrics(pred):

    # 예외 처리: label_ids가 tuple이면 첫 번째 요소만 사용 (binary classification용)
    label_ids = pred.label_ids[0] if isinstance(pred.label_ids, tuple) else pred.label_ids

    preds = np.argmax(pred.predictions, axis=1)

    return {
        "accuracy": accuracy_score(label_ids, preds),
        "f1_macro": f1_score(label_ids, preds, average="macro")
    }

## 모델 준비

In [None]:
training_args = TrainingArguments(
    output_dir='./Aiffel_DLThon/ckpoints/tunib',
    num_train_epochs=5,
    per_device_train_batch_size=32,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=float(2e-5),
    weight_decay=0.01,
    warmup_ratio=0.1,
    logging_dir='./Aiffel_DLThon//logs/klue_roberta',
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="f1_macro",
    logging_strategy="steps",
    logging_steps=50,
    report_to="tensorboard",
)

from transformers import AutoConfig

config = AutoConfig.from_pretrained("tunib/electra-ko-en-base")
two_stage_model = ElectraTwoStageModel(config)

trainer = Trainer(
    model=two_stage_model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
    callbacks=[
        EarlyStoppingCallback(early_stopping_patience=2),
        LoggingCallback(),
    ]
)

## 모델 학습

In [None]:
trainer.train()
#save_model(two_stage_model, config["output_dir"], epoch=config["epochs"])

In [None]:
from torch.utils.data import Dataset, DataLoader

model = two_stage_model
class PlainTextDataset(Dataset):
    def __init__(self, texts, tokenizer, max_length):
        self.encodings = tokenizer(texts, truncation=True, padding=True, max_length=max_length)

    def __len__(self):
        return len(self.encodings["input_ids"])

    def __getitem__(self, idx):
        return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


test_df = pd.read_csv("./Aiffel_DLThon/data/original_data/test.csv")
test_texts = test_df["text"].tolist()

# Dataset & Dataloader
test_dataset = PlainTextDataset(test_texts, tokenizer, 200)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

import torch
from torch.nn.functional import softmax

model.eval()
label4_probs = []

with torch.no_grad():
    for batch in test_loader:
        batch = {k: v.to(model.device) for k, v in batch.items()}
        outputs = model(**batch)
        probs = softmax(outputs.logits, dim=-1)
        label4_batch_probs = probs[:, 1].cpu().numpy()
        label4_probs.extend(label4_batch_probs)

import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(8, 4))
sns.histplot(label4_probs, bins=30, kde=True)
plt.title("Distribution of Label 4 Probabilities on Test Set")
plt.xlabel("Predicted Probability of Label 4")
plt.ylabel("Number of Samples")
plt.grid(True)
plt.show()


In [None]:
from sklearn.metrics import accuracy_score, f1_score, classification_report
from torch.utils.data import DataLoader
from tqdm import tqdm
import torch

def evaluate_stage1(model, dataset, batch_size=32):
    model.eval()
    dataloader = DataLoader(dataset, batch_size=batch_size)
    preds = []
    labels = []

    for batch in tqdm(dataloader, desc="Stage 1 Evaluation"):
        input_ids = batch["input_ids"].to(model.device)
        attention_mask = batch["attention_mask"].to(model.device)
        token_type_ids = batch["token_type_ids"].to(model.device)
        binary_labels = batch["binary_labels"].to(model.device)

        with torch.no_grad():
            logits = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                token_type_ids=token_type_ids
            ).logits  # ✅ Stage 1 logits only

        pred = torch.argmax(logits, dim=1)
        preds.extend(pred.cpu().tolist())
        labels.extend(binary_labels.cpu().tolist())

    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average='macro')

    print("📊 Stage 1 Evaluation Results")
    print(f"✅ Accuracy: {acc:.4f}")
    print(f"✅ F1 Score (macro): {f1:.4f}")
    print("📋 Classification Report:\n", classification_report(labels, preds, digits=4))
    return {"accuracy": acc, "f1_macro": f1}
stage1_metrics = evaluate_stage1(two_stage_model, val_dataset)


In [None]:
from collections import Counter

binary_preds = np.argmax(trainer.predict(val_dataset).predictions, axis=1)
counter = Counter(binary_preds)
print("✅ Stage 1 예측 분포:", counter)

In [None]:
stage2_df = train_df[train_df["label"] != 4].copy()
stage2_df = stage2_df.reset_index(drop=True)

texts = stage2_df["clean_text"].tolist()
labels = stage2_df["label"].tolist()  # label: 0~3

encodings = tokenizer(texts, truncation=True, padding=True, max_length=128, return_tensors="pt")
class Stage2Dataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)
from sklearn.model_selection import train_test_split

train_texts, val_texts, train_labels, val_labels = train_test_split(
    texts, labels, test_size=0.2, stratify=labels, random_state=42
)

train_encodings = tokenizer(train_texts, truncation=True, padding=True, max_length=128)
val_encodings = tokenizer(val_texts, truncation=True, padding=True, max_length=128)
train_dataset = Stage2Dataset(train_encodings, train_labels)
val_dataset = Stage2Dataset(val_encodings, val_labels)

from transformers import ElectraModel, ElectraPreTrainedModel
import torch.nn as nn

class ElectraStage2Classifier(ElectraPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.electra = ElectraModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, 4)  # 4-class

        self.init_weights()

    def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
        outputs = self.electra(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids
        )
        cls_output = self.dropout(outputs.last_hidden_state[:, 0, :])
        logits = self.classifier(cls_output)

        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits
        )
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir="./stage2_model",
    eval_strategy="epoch",
    save_strategy="epoch",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=5,
    logging_dir="./logs",
    logging_steps=50,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="f1_macro",
    report_to="tensorboard",
)
from sklearn.metrics import accuracy_score, f1_score

def compute_metrics_stage2(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1_macro": f1_score(labels, preds, average="macro")
    }
from transformers import AutoConfig

config = AutoConfig.from_pretrained("tunib/electra-ko-en-base", num_labels=4)
model = ElectraStage2Classifier.from_pretrained("tunib/electra-ko-en-base", config=config)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics_stage2
)

trainer.train()


In [None]:
from collections import Counter
import matplotlib.pyplot as plt

# 실제 사용한 label 리스트를 넣으세요
# 예: labels = stage2_df["label"].tolist()
stage2_labels = labels  # ✅ ← 여기만 수정

# 분포 확인
label_counts = Counter(stage2_labels)

print("📊 Stage 2 학습용 라벨 분포:")
for label, count in sorted(label_counts.items()):
    print(f"Label {label}: {count}")

# 시각화
plt.bar(label_counts.keys(), label_counts.values())
plt.title("Stage 2 Class Distribution")
plt.xlabel("Label")
plt.ylabel("Count")
plt.xticks([0, 1, 2, 3])
plt.show()

## 테스트

In [None]:
from tqdm import tqdm

batch_size = 32  # 또는 16으로 줄일 수 있음
two_stage_model.eval()
model.eval()
final_preds = []
test_df = pd.read_csv("./Aiffel_DLThon/data/original_data/test.csv")
test_texts = test_df["text"].tolist()

for i in tqdm(range(0, len(test_texts), batch_size), desc="Stage 1 + Stage 2 추론"):
    batch_texts = test_texts[i:i + batch_size]

    # 인코딩 + device 이동
    batch_encodings = tokenizer(
        batch_texts,
        truncation=True,
        padding=True,
        max_length=128,
        return_tensors="pt"
    )
    batch_encodings = {k: v.to(two_stage_model.device) for k, v in batch_encodings.items()}

    with torch.no_grad():
        # Stage 1 예측
        s1_logits = two_stage_model(**batch_encodings).logits
        s1_preds = torch.argmax(s1_logits, dim=1)

        for j, s1 in enumerate(s1_preds):
            if s1 == 1:
                final_preds.append(4)
            else:
                # Stage 2 예측
                single_input = {
                    key: val[j].unsqueeze(0).to(model.device)
                    for key, val in batch_encodings.items()
                }
                with torch.no_grad():
                    s2_logits = model(**single_input).logits
                    s2_pred = torch.argmax(s2_logits, dim=1).item()
                    final_preds.append(s2_pred)

In [None]:
from collections import Counter

print("📊 예측 결과 분포:")
print(Counter(final_preds))

In [None]:
for i in range(5):
    print(f"문장: {test_texts[i]}")
    print(f"예측 라벨: {final_preds[i]}")
    print("-" * 30)

In [None]:
save_submission(preds, "./Aiffel_DLThon/data/original_data/submission.csv", './submission.csv')
log_message("Experiment complete.", '.')

In [None]:
import torch
import pandas as pd
from transformers import AutoTokenizer, ElectraPreTrainedModel, ElectraModel
from transformers.modeling_outputs import SequenceClassifierOutput
from tqdm import tqdm

# ============================
# ✅ 모델 정의 (통합 모델)
# ============================
class ElectraTwoStageModel(ElectraPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.electra = ElectraModel(config)
        self.dropout = torch.nn.Dropout(config.hidden_dropout_prob)
        self.stage1_classifier = torch.nn.Linear(config.hidden_size, 2)
        self.stage2_classifier = torch.nn.Linear(config.hidden_size, 4)
        self.init_weights()

    def forward(self, input_ids, attention_mask=None, token_type_ids=None,
                binary_labels=None, multiclass_labels=None):
        outputs = self.electra(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        cls_output = self.dropout(outputs.last_hidden_state[:, 0, :])

        stage1_logits = self.stage1_classifier(cls_output)
        stage2_logits = self.stage2_classifier(cls_output)

        loss = None
        if binary_labels is not None and multiclass_labels is not None:
            loss_fct = torch.nn.CrossEntropyLoss()
            loss1 = loss_fct(stage1_logits, binary_labels)
            mask = (binary_labels == 0)
            if mask.sum() > 0:
                loss2 = loss_fct(stage2_logits[mask], multiclass_labels[mask])
                loss = loss1 + loss2
            else:
                loss = loss1

        return SequenceClassifierOutput(loss=loss, logits=stage1_logits)


# ============================
# ✅ 모델 및 토크나이저 로드
# ============================
from transformers import AutoConfig
MODEL_NAME = "tunib/electra-ko-en-base"
MODEL_PATH = "tunib/electra-ko-en-base"  # 학습된 모델이 저장된 디렉토리

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
config = AutoConfig.from_pretrained(MODEL_NAME)
model = ElectraTwoStageModel.from_pretrained(MODEL_PATH, config=config).to("cuda")
model.eval()

# ============================
# ✅ 테스트 데이터 로드
# ============================
test_df = pd.read_csv("./Aiffel_DLThon/data/original_data/test.csv")
test_texts = test_df["text"].tolist()

# ============================
# ✅ 예측 수행
# ============================
final_preds = []

for text in tqdm(test_texts):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128).to(model.device)

    with torch.no_grad():
        # Stage 1 예측
        stage1_logits = model(**inputs).logits
        stage1_pred = torch.argmax(stage1_logits, dim=1).item()

        if stage1_pred == 1:
            final_preds.append(4)
        else:
            # Stage 2 예측
            cls_output = model.dropout(model.electra(**inputs).last_hidden_state[:, 0, :])
            stage2_logits = model.stage2_classifier(cls_output)
            stage2_pred = torch.argmax(stage2_logits, dim=1).item()
            final_preds.append(stage2_pred)

# ============================
# ✅ 제출 파일 저장
# ============================
submission = pd.DataFrame({
    "idx": test_df["idx"],
    "label": final_preds
})
submission.to_csv("submission.csv", index=False)
print("\n✅ submission.csv 저장 완료!")


In [None]:
def log_target_distribution(df: pd.DataFrame, log_dir: str):
    """
    주어진 DataFrame에서 'target' 컬럼의 클래스 분포를 로그 파일과 콘솔에 출력합니다.
    """
    if "label" not in df.columns:
        log_message("❌ 'target' 컬럼이 데이터프레임에 없습니다.", log_dir)
        return

    counts = df["label"].value_counts().sort_index()
    log_message("📊 Target 클래스 분포:", log_dir)
    for label, count in counts.items():
        log_message(f"Label {label}: {count}", log_dir)

In [None]:
# 파일 로딩
df = pd.read_csv('/content/submission.csv')
log_target_distribution(df, '.')