# PKO-T5를 이용한 문장 순서 예측

이 노트북에서는 paust/pko-t5-base 모델을 사용하여 문장 순서 예측을 구현합니다. 다음과 같은 과정을 수행합니다:

1. 데이터 로드 및 전처리
2. 모델 및 학습 설정
3. Early stopping과 체크포인팅을 적용한 학습
4. 모델 평가
5. 하이퍼파라미터 튜닝

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from torch.optim import AdamW
from tqdm.notebook import tqdm
import os
import gc
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import T5ForConditionalGeneration, T5TokenizerFast, AutoTokenizer, AutoModelForSeq2SeqLM, get_linear_schedule_with_warmup
from sklearn.model_selection import train_test_split
import json
import re


# 한글 폰트 설정
plt.rc('font', family='Malgun Gothic')

# 결과 저장을 위한 디렉토리 생성
os.makedirs('checkpoints', exist_ok=True)
os.makedirs('history', exist_ok=True)
os.makedirs('grid_search_results', exist_ok=True)

## 1. 데이터 로드 및 전처리

In [None]:
# 데이터 로드
train_df = pd.read_csv('../data/train.csv')
test_df = pd.read_csv('../data/test.csv')
submission_df = pd.read_csv('../data/sample_submission.csv')

# 데이터 확인
print("데이터셋 크기:", len(train_df))
print("\n샘플 데이터:")
train_df.head(5)

In [None]:
# 텍스트 정제
def clean_text(text):
  # 특수문자 제거
  text = re.sub(r'[^\w\s]', '', text)
  # 소문자 변환: 한글에는 무의미
  text = text.lower()
  # 불필요한 공백 제거
  text = ' '.join(text.split())
  return text

In [None]:
# 텍스트 정제
for i in range(4):
    train_df[f'sentence_{i}'] = train_df[f'sentence_{i}'].apply(clean_text)
    test_df[f'sentence_{i}'] = test_df[f'sentence_{i}'].apply(clean_text)

## 2. 모델 설정 및 데이터 준비

In [None]:
class SentenceOrderDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=200, device='cuda'):
        """
        문장 순서 예측을 위한 데이터셋 클래스
        
        Args:
            texts: 입력 문장들 ([SEP] 토큰으로 구분됨)
            labels: 정답 순서 (공백으로 구분된 숫자 시퀀스)
            tokenizer: T5TokenizerFast 또는 AutoTokenizer 인스턴스
            max_length: 최대 입력 길이
            device: 데이터를 전송할 디바이스
        """
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.device = device if device else ('cuda' if torch.cuda.is_available() else 'cpu')
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = str(self.labels[idx])
        
        inputs = self.tokenizer(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        targets = self.tokenizer(
            label,
            max_length=8,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        # 패딩 토큰을 -100으로 설정하여 손실 계산에서 제외
        labels = targets['input_ids'].squeeze()
        labels[labels == self.tokenizer.pad_token_id] = -100
        
        return {
            'input_ids': inputs['input_ids'].squeeze(),
            'attention_mask': inputs['attention_mask'].squeeze(),
            'labels': labels
        }

    def predict_order(self, text):
        """
        문장 순서 예측
        
        Args:
            text: 입력 문장들 ([SEP] 토큰으로 구분됨)
        Returns:
            predicted_order: 예측된 순서 (정수 리스트)
        """
        self.model.eval()
        with torch.no_grad():
            inputs = self.tokenizer(
                text,
                max_length=200,
                padding='max_length',
                truncation=True,
                return_tensors='pt'
            ).to(self.device)
            
            outputs = self.model.generate(
                input_ids=inputs['input_ids'],
                attention_mask=inputs['attention_mask'],
                max_length=8,
                num_beams=4,
                no_repeat_ngram_size=4
            )
            
            predicted_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            predicted_order = [int(x) for x in predicted_text.split()]
            
            return predicted_order

In [None]:
class SentenceOrderPredictor:
    def __init__(self, model_name="paust/pko-t5-large", device=None, use_auto_classes=False):
        """
        문장 순서 예측 모델 클래스
        
        Args:
            model_name: 사용할 사전학습 모델 이름
            device: 학습에 사용할 디바이스 (GPU/CPU)
            use_auto_classes: AutoTokenizer와 AutoModelForSeq2SeqLM 사용 여부
        """
        self.model_name = model_name
        self.device = device if device else ('cuda' if torch.cuda.is_available() else 'cpu')
        print(f"Using device: {self.device}")  # 디바이스 확인용 출력
        
        # 모델과 토크나이저 초기화
        if use_auto_classes:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
        else:
            self.tokenizer = T5TokenizerFast.from_pretrained(model_name)
            self.model = T5ForConditionalGeneration.from_pretrained(model_name)
        
        # 패딩 토큰 설정
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token
            self.model.config.pad_token_id = self.tokenizer.pad_token_id
        
        # 모델을 명시적으로 지정된 디바이스로 이동
        self.model = self.model.to(self.device)
        
        # 모델이 실제로 지정된 디바이스에 있는지 확인
        print(f"Model device: {next(self.model.parameters()).device}")
    
    def prepare_data(self, df, train_size=0.9, random_state=42):
        """
        학습 및 검증 데이터셋 준비
        
        Args:
            df: 입력 데이터프레임
            train_size: 학습 데이터 비율
            random_state: 랜덤 시드
        """
        texts = df['input_text'].tolist()
        labels = df['target_text'].tolist()
        
        # 데이터 분할
        train_texts, val_texts, train_labels, val_labels = train_test_split(
            texts, labels, train_size=train_size, random_state=random_state
        )
        
        train_dataset = SentenceOrderDataset(
            train_texts, 
            train_labels, 
            self.tokenizer, 
            device=self.device  # device 전달
        )
        val_dataset = SentenceOrderDataset(
            val_texts, 
            val_labels, 
            self.tokenizer,
            device=self.device  # device 전달
        )
        
        return train_dataset, val_dataset
    
    def create_dataloaders(self, train_dataset, val_dataset, batch_size=8):
        """
        학습 및 검증용 데이터로더 생성
        
        Args:
            train_dataset: 학습 데이터셋
            val_dataset: 검증 데이터셋
            batch_size: 배치 크기
        """
        train_loader = DataLoader(
            train_dataset, 
            batch_size=batch_size, 
            shuffle=True,
            num_workers=8,
            pin_memory=True  # CPU-GPU 전송 최적화
        )
        val_loader = DataLoader(
            val_dataset, 
            batch_size=batch_size, 
            shuffle=False,
            num_workers=8,
            pin_memory=True  # CPU-GPU 전송 최적화
        )
        return train_loader, val_loader
    
    def save_checkpoint(self, epoch, model, optimizer, loss, path):
        """
        모델 체크포인트 저장
        
        Args:
            epoch: 현재 에폭
            model: 현재 모델 상태
            optimizer: 현재 옵티마이저 상태
            loss: 현재 손실값
            path: 저장 경로
        """
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss,
        }, path)
    
    def load_checkpoint(self, path):
        """
        모델 체크포인트 불러오기
        
        Args:
            path: 체크포인트 파일 경로
        """
        checkpoint = torch.load(path)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        return checkpoint
    
    def save_history(self, history, path):
        """
        학습 히스토리 저장
        
        Args:
            history: 학습 히스토리 딕셔너리
            path: 저장 경로
        """
        with open(path, 'w') as f:
            json.dump(history, f)
    
    def load_history(self, path):
        """
        학습 히스토리 불러오기
        
        Args:
            path: 히스토리 파일 경로
        """
        with open(path, 'r') as f:
            return json.load(f)

    def predict_order(self, text):
        """
        문장 순서 예측
        
        Args:
            text: 입력 문장들 ([SEP] 토큰으로 구분됨)
        Returns:
            predicted_order: 예측된 순서 (정수 리스트)
        """
        self.model.eval()
        with torch.no_grad():
            inputs = self.tokenizer(
                text,
                max_length=200,
                padding='max_length',
                truncation=True,
                return_tensors='pt'
            ).to(self.device)
            
            outputs = self.model.generate(
                input_ids=inputs['input_ids'],
                attention_mask=inputs['attention_mask'],
                max_length=8,
                num_beams=4,
                no_repeat_ngram_size=4
            )
            
            predicted_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            predicted_order = [int(x) for x in predicted_text.split()]
            
            return predicted_order

In [None]:
def compute_accuracy(pred_order, true_order):
    """
    예측 순서와 실제 순서 간의 정확도 계산
    
    Args:
        pred_order: 예측된 순서
        true_order: 실제 순서
    Returns:
        정확도 (0 또는 1)
    """
    if isinstance(true_order, str):
        true_order = [int(x) for x in true_order.split()]
    return int(np.array_equal(pred_order, true_order))

In [None]:
# pko-t5-large 모델 초기화
predictor = SentenceOrderPredictor()

# 데이터셋 준비 (9:1 비율로 분할)
train_dataset, val_dataset = predictor.prepare_data(train_df)

# 데이터로더 생성
batch_size = 4
train_loader, val_loader = predictor.create_dataloaders(train_dataset, val_dataset, batch_size=batch_size)

print(f"학습 데이터 크기: {len(train_dataset)}")
print(f"검증 데이터 크기: {len(val_dataset)}")

## 3. 학습 설정

In [None]:
# 학습 파라미터
num_epochs = 5
learning_rate = 5e-5
warmup_steps = 1000  # Warmup 스텝 수
gradient_accumulation_steps = 4  # Gradient accumulation 스텝 수
patience = 3  # Early stopping 인내심
min_delta = 1e-4  # 최소 개선 기준

# 옵티마이저 설정
optimizer = AdamW(predictor.model.parameters(), lr=learning_rate)

# Learning rate scheduler 설정
num_training_steps = len(train_loader) * num_epochs
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=warmup_steps,
    num_training_steps=num_training_steps
)

# Mixed precision training을 위한 scaler 설정
scaler = torch.amp.GradScaler('cuda')

# 학습 히스토리 초기화
history = {
    'train_loss': [],
    'val_loss': [],
    'val_accuracy': [],
    'learning_rates': []  # learning rate 기록 추가
}

## 4. 학습 실행

In [None]:
def validate(model, val_loader, device, tokenizer):
    """검증 데이터에 대한 모델 평가"""
    model.eval()
    total_loss = 0
    total_correct = 0
    total_samples = 0
    
    with torch.no_grad():
        for batch in tqdm(val_loader, desc='Validation'):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            
            loss = outputs.loss
            total_loss += loss.item()
            
            # 예측 생성
            predictions = model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_length=8,
                num_beams=4,
                no_repeat_ngram_size=4  # 숫자 중복 방지
            )
            
            # 예측과 레이블을 순서 시퀀스로 변환
            pred_texts = [tokenizer.decode(pred, skip_special_tokens=True) for pred in predictions]
            label_texts = [tokenizer.decode(label, skip_special_tokens=True) for label in labels]
            
            try:
                # 예측과 레이블을 정수 리스트로 변환
                pred_orders = [list(map(int, text.split())) for text in pred_texts]
                label_orders = [list(map(int, text.split())) for text in label_texts]
                
                # 정확도 계산
                correct = sum(compute_accuracy(pred, label) for pred, label in zip(pred_orders, label_orders))
                total_correct += correct
                total_samples += len(input_ids)
            except ValueError as e:
                print(f"Warning: 잘못된 예측 형식 발견 - {e}")
                print(f"Predictions: {pred_texts}")
                print(f"Labels: {label_texts}")
                continue
    
    avg_loss = total_loss / len(val_loader)  # 배치 수로 나누기
    accuracy = total_correct / total_samples if total_samples > 0 else 0
    
    return avg_loss, accuracy

In [None]:
# 학습 시작 전에 실행
# CUDA 캐시 초기화
torch.cuda.empty_cache()

# 가비지 컬렉션 강제 실행
gc.collect()

# CUDA 초기화
if torch.cuda.is_available():
    torch.cuda.init()
    torch.cuda.synchronize()

In [None]:
# 학습 루프
best_val_loss = float('inf')
best_val_accuracy = 0
best_epoch = -1
patience_counter = 0

for epoch in range(num_epochs):
    # 학습 단계
    predictor.model.train()
    total_train_loss = 0
    optimizer.zero_grad()  # 에폭 시작 시 gradient 초기화
    
    for batch_idx, batch in enumerate(tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Training')):
        input_ids = batch['input_ids'].to(predictor.device)
        attention_mask = batch['attention_mask'].to(predictor.device)
        labels = batch['labels'].to(predictor.device)
        
        # Mixed precision training
        with torch.amp.autocast('cuda'):
            outputs = predictor.model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            loss = outputs.loss / gradient_accumulation_steps  # loss를 gradient accumulation 스텝으로 나눔
        
        # Scaled backward pass
        scaler.scale(loss).backward()
        
        if (batch_idx + 1) % gradient_accumulation_steps == 0:
            # Gradient clipping
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(predictor.model.parameters(), max_norm=1.0)
            
            # Optimizer step with scaler
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()
            
            # Learning rate scheduler step
            scheduler.step()
        
        total_train_loss += loss.item() * gradient_accumulation_steps
    
    avg_train_loss = total_train_loss / len(train_loader)
    
    # 검증
    val_loss, val_accuracy = validate(predictor.model, val_loader, predictor.device, predictor.tokenizer)
    
    # 히스토리 업데이트
    history['train_loss'].append(avg_train_loss)
    history['val_loss'].append(val_loss)
    history['val_accuracy'].append(val_accuracy)
    history['learning_rates'].append(scheduler.get_last_lr()[0])  # 현재 learning rate 기록
    
    # 현재 성능 출력
    print(f'Epoch {epoch+1}/{num_epochs}')
    print(f'Train Loss: {avg_train_loss:.4f}')
    print(f'Validation Loss: {val_loss:.4f}')
    print(f'Validation Accuracy: {val_accuracy:.4f}')
    print(f'Learning Rate: {scheduler.get_last_lr()[0]:.2e}')
    
    # Early stopping 및 체크포인트 저장 로직
    if val_loss < best_val_loss - min_delta:  # 유의미한 개선이 있는 경우
        print(f'Validation loss improved from {best_val_loss:.4f} to {val_loss:.4f}')
        best_val_loss = val_loss
        best_val_accuracy = val_accuracy
        best_epoch = epoch
        patience_counter = 0
        
        # 체크포인트 저장
        predictor.save_checkpoint(
            epoch=epoch,
            model=predictor.model,
            optimizer=optimizer,
            loss=val_loss,
            path=f'checkpoints/pko_t5_best_model.pt'
        )
    else:
        patience_counter += 1
        print(f'No improvement for {patience_counter} epochs (best val_loss: {best_val_loss:.4f} at epoch {best_epoch+1})')
        
    if patience_counter >= patience:
        print(f'Early stopping at epoch {epoch+1}')
        break

# 학습 완료 후 최종 결과 출력
print('\n학습 완료!')
print(f'Best epoch: {best_epoch+1}')
print(f'Best validation loss: {best_val_loss:.4f}')
print(f'Best validation accuracy: {best_val_accuracy:.4f}')

# 최종 학습 히스토리 저장
predictor.save_history(history, 'history/pko_t5_history.json')

In [None]:
# 저장
predictor.save_history(history, 'history/pko_t5_history.json')

## 5. 학습 결과 시각화

In [None]:
plt.figure(figsize=(12, 4))

# Loss 그래프
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Train 및 Validation Loss')
plt.legend()

# Accuracy 그래프
plt.subplot(1, 2, 2)
plt.plot(history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Validation Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

## 6. 테스트 데이터 예측

In [None]:
# 모델 로드
predictor = SentenceOrderPredictor()

# 체크포인트 로드 시 메모리 효율적인 방식 사용
checkpoint = torch.load('./checkpoints/pko_t5_best_model.pt', map_location='cpu')
predictor.model.load_state_dict(checkpoint['model_state_dict'])
predictor.model.to('cuda')

# 불필요한 메모리 해제
del checkpoint
torch.cuda.empty_cache()

# 예측 결과를 저장할 리스트
predictions = []

# 각 테스트 케이스에 대해 예측 수행
for idx, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Predicting"):
    # 입력 텍스트 준비 ([SEP]로 문장들 연결)
    input_text = f"{row['sentence_0']} [SEP] {row['sentence_1']} [SEP] {row['sentence_2']} [SEP] {row['sentence_3']}"
    
    # 순서 예측
    predicted_order = predictor.predict_order(input_text)
    
    # 예측 결과를 문자열로 변환 (공백으로 구분)
    prediction_str = ' '.join(map(str, predicted_order))
    predictions.append(prediction_str)

In [None]:
test_df.head(2)

In [None]:
predictions[:2]

In [None]:
# 예측 결과를 제출 형식에 맞게 변환
submission = pd.DataFrame({
    'ID': test_df['ID'],
    'answer_0': [pred[0] for pred in predictions],
    'answer_1': [pred[2] for pred in predictions],
    'answer_2': [pred[4] for pred in predictions],
    'answer_3': [pred[6] for pred in predictions]
})

# 결과를 CSV 파일로 저장
submission.to_csv('../data/submission_t5_seq.csv', index=False)

In [None]:
submission