In [None]:
import os
import time
import torch
import random
import numpy as np
import pandas as pd

from tqdm.auto import tqdm
from torch.optim import AdamW
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from transformers import get_cosine_schedule_with_warmup
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    set_seed,
)

In [None]:
# 시드 고정
def set_all_seeds(seed=42):
    """모든 라이브러리 시드 고정"""
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    set_seed(seed)

# 시드 설정
seed = 84
set_all_seeds(seed)

# 버전
version = "electra-kor-base-pt"

# 설정
class Config:
    """학습에 필요한 하이퍼파라미터 및 설정을 관리합니다."""
    MODEL_NAME = "kykim/electra-kor-base"  # 한국어 모델
    MAX_LENGTH = 256
    BATCH_SIZE = 64
    LEARNING_RATE = 2e-5
    EPOCHS = 3
    WARMUP_RATIO = 0.1
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

cfg = Config()
print(f"디바이스: {cfg.DEVICE}")
print(f"사용 모델: {cfg.MODEL_NAME}")

In [None]:
# 데이터 로드
train = pd.read_csv('./project/sentence/data/train.csv')

In [None]:
# 데이터셋 생성 함수 및 클래스 정의
def create_enhanced_pairwise_data(df):
   """개선된 쌍 데이터 생성 - 다양한 거리의 positive/negative 샘플 포함"""
   pairs = []
   labels = []
   seen_pairs = set()  # 중복 체크용 set

   for _, row in tqdm(df.iterrows(), total=len(df), desc="개선된 쌍 데이터 생성 중"):
       sentences = [row[f'sentence_{i}'] for i in range(4)]
       correct_order = [row[f'answer_{i}'] for i in range(4)]

       # 1. 연속된 문장 쌍 (거리 1)
       for i in range(3):
           # Positive: 올바른 순서 (앞 문장 → 뒤 문장)
           pairs.append((sentences[correct_order[i]], sentences[correct_order[i+1]]))
           labels.append(1)

           # Negative: 역순 (뒤 문장 → 앞 문장)
           pairs.append((sentences[correct_order[i+1]], sentences[correct_order[i]]))
           labels.append(0)

       # 2. 비연속 문장 쌍 (거리 2)
       for i in range(2):
           # Positive: 올바른 순서
           pairs.append((sentences[correct_order[i]], sentences[correct_order[i+2]]))
           labels.append(1)

           # Negative: 역순
           pairs.append((sentences[correct_order[i+2]], sentences[correct_order[i]]))
           labels.append(0)

       # 3. 거리 3인 쌍 추가 (첫 번째와 마지막 문장)
       pairs.append((sentences[correct_order[0]], sentences[correct_order[3]]))
       labels.append(1)
       pairs.append((sentences[correct_order[3]], sentences[correct_order[0]]))
       labels.append(0)

       # 4. 완전 역순 쌍 추가 (추가적인 hard negative)
       reverse_order = correct_order[::-1]
       for i in range(3):
           pairs.append((sentences[reverse_order[i]], sentences[reverse_order[i+1]]))
           labels.append(0)

       # 5. 랜덤 negative 쌍 추가
       all_indices = list(range(4))
       negative_count = 0
       max_attempts = 10

       for attempt in range(max_attempts):
           if negative_count >= 2:
               break

           idx1, idx2 = random.sample(all_indices, 2)
           if correct_order.index(idx1) > correct_order.index(idx2):
                pair_tuple = (sentences[idx1], sentences[idx2])
                pair_hash = hash(pair_tuple)

                if pair_hash not in seen_pairs:
                    pairs.append(pair_tuple)
                    labels.append(0)
                    seen_pairs.add(pair_hash)
                    negative_count += 1

   return pairs, labels

class SentenceOrderDataset(Dataset):
    """문장 순서 예측을 위한 PyTorch 커스텀 데이터셋"""
    def __init__(self, pairs, labels, tokenizer, max_len):
        self.pairs = pairs
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        sent_A, sent_B = self.pairs[idx]
        label = self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            sent_A, sent_B,
            add_special_tokens=True, max_length=self.max_len,
            padding='max_length', truncation=True,
            return_token_type_ids=True, return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'token_type_ids': encoding['token_type_ids'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

# 학습 및 평가 함수 정의
def train_epoch(model, data_loader, optimizer, scheduler, device):
    """1 에폭 동안 모델 학습"""
    model.train()
    total_loss = 0
    total_grad_norm = 0
    num_batches = 0

    for batch in tqdm(data_loader, desc="학습 진행 중"):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        token_type_ids = batch['token_type_ids'].to(device)
        labels = batch['labels'].to(device)

        outputs = model(
            input_ids=input_ids, attention_mask=attention_mask,
            token_type_ids=token_type_ids, labels=labels
        )

        loss = outputs.loss
        total_loss += loss.item()

        loss.backward()
        grad_norm = torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        total_grad_norm += grad_norm.item()
        num_batches += 1

        optimizer.step()
        scheduler.step()
    
    avg_loss = total_loss / len(data_loader)
    avg_grad_norm = total_grad_norm / num_batches

    return avg_loss, avg_grad_norm

def evaluate(model, data_loader, device):
    """모델 성능 평가"""
    model.eval()
    total_loss = 0
    correct_predictions = 0

    with torch.no_grad():
        for batch in tqdm(data_loader, desc="평가 진행 중"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            token_type_ids = batch['token_type_ids'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(
                input_ids=input_ids, attention_mask=attention_mask,
                token_type_ids=token_type_ids, labels=labels
            )

            loss = outputs.loss
            logits = outputs.logits
            total_loss += loss.item()

            preds = torch.argmax(logits, dim=1)
            correct_predictions += torch.sum(preds == labels)

    accuracy = correct_predictions.double() / len(data_loader.dataset)
    return accuracy, total_loss / len(data_loader)

In [None]:
# 데이터 분할
train_df, val_df = train_test_split(train, test_size=0.1, random_state=seed)

# Pairwise 데이터 생성
train_pairs, train_labels = create_enhanced_pairwise_data(train_df)
val_pairs, val_labels = create_enhanced_pairwise_data(val_df)

# 토크나이저 및 모델 로드
tokenizer = AutoTokenizer.from_pretrained(cfg.MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(cfg.MODEL_NAME, num_labels=2)
model.to(cfg.DEVICE)

# 데이터셋 및 데이터로더 생성
train_dataset = SentenceOrderDataset(train_pairs, train_labels, tokenizer, cfg.MAX_LENGTH)
val_dataset = SentenceOrderDataset(val_pairs, val_labels, tokenizer, cfg.MAX_LENGTH)

train_loader = DataLoader(train_dataset, batch_size=cfg.BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=cfg.BATCH_SIZE)

# 옵티마이저 및 스케줄러 설정
optimizer = AdamW(model.parameters(), lr=cfg.LEARNING_RATE)
total_steps = len(train_loader) * cfg.EPOCHS
warmup_steps = int(total_steps * cfg.WARMUP_RATIO)
scheduler = get_cosine_schedule_with_warmup(
    optimizer,
    num_warmup_steps=warmup_steps,
    num_training_steps=total_steps
)

In [None]:
# 학습 시작
start_time = time.time()
print("=" * 60)
print(f"학습 시작 - 총 {cfg.EPOCHS} 에폭")
print("=" * 60)

for epoch in range(cfg.EPOCHS):
    print(f"\n======== 에폭 {epoch + 1} / {cfg.EPOCHS} ========")
    epoch_start_time = time.time()

    # 학습
    train_loss, train_grad_norm = train_epoch(model, train_loader, optimizer, scheduler, cfg.DEVICE)

    # 검증
    val_acc, val_loss = evaluate(model, val_loader, cfg.DEVICE)

    # 에폭 종료 시간
    epoch_time = time.time() - epoch_start_time

    # 결과 출력
    print(f"학습 손실: {train_loss:.4f}")
    print(f"검증 손실: {val_loss:.4f}")
    print(f"검증 정확도: {val_acc:.4f}")
    print(f"그래디언트 노름: {train_grad_norm:.4f}")
    print(f"에폭 실행 시간: {epoch_time:.2f}초")

# 전체 학습 시간 계산
total_time = time.time() - start_time
hours = int(total_time // 3600)
minutes = int((total_time % 3600) // 60)
seconds = int(total_time % 60)

print("\n학습 완료!")
print(f"전체 학습 시간: {hours:02d}:{minutes:02d}:{seconds:02d}")

In [None]:
# 모델 저장
output_dir = f"./project/sentence/{version}"
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
print(f"모델이 '{output_dir}' 디렉토리에 저장되었습니다.")

# 예측값 저장
val_acc, val_loss, val_predictions, val_labels = evaluate(model, val_loader, cfg.DEVICE, return_predictions=True)

np.save(f'{output_dir}/val_predictions.npy', val_predictions)
np.save(f'{output_dir}/val_labels.npy', val_labels)