# 02. 계층적 문서-문단 모델링

성능 목표: 0.85 → 0.90 (계층적 구조 활용)

## 핵심 개선사항
- 글 단위 컨텍스트 모델링  
- 문단 간 관계 학습
- 순서 정보 활용
- 일관성 강제 메커니즘

In [None]:
# 필수 라이브러리 및 이전 단계 데이터 로딩
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel
import pickle
import json
from sklearn.metrics import roc_auc_score

# 이전 단계 데이터 로딩
train_para_df = pd.read_pickle('train_para_df.pkl')
test_df = pd.read_pickle('test_df.pkl')

with open('data_info.pkl', 'rb') as f:
    data_info = pickle.load(f)
    train_titles = data_info['train_titles']
    val_titles = data_info['val_titles']
    class_weight_dict = data_info['class_weight_dict']

with open('step1_metadata.json', 'r') as f:
    step1_metadata = json.load(f)
    MODEL_NAME = step1_metadata['model_name']

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"데이터 로딩 완료: {len(train_para_df)}개 문단, {len(test_df)}개 테스트 문단")
print(f"모델: {MODEL_NAME}, 디바이스: {device}")


In [None]:
# 계층적 AI 탐지 모델 정의
class HierarchicalAIDetector(nn.Module):
    """글 단위 컨텍스트를 고려한 계층적 AI 탐지 모델"""
    
    def __init__(self, model_name, hidden_size=1024, num_heads=16):
        super().__init__()
        
        # 기본 인코더 (문단 수준)
        self.roberta = AutoModel.from_pretrained(model_name)
        self.hidden_size = hidden_size
        
        # 위치 임베딩 (문단 순서)
        self.position_embedding = nn.Embedding(50, hidden_size)
        
        # 문단 간 어텐션 레이어 
        self.inter_paragraph_attention = nn.MultiheadAttention(
            embed_dim=hidden_size,
            num_heads=num_heads,
            batch_first=True,
            dropout=0.1
        )
        
        # 문서 레벨 트랜스포머
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_size,
            nhead=num_heads,
            dim_feedforward=hidden_size * 2,
            dropout=0.1,
            batch_first=True
        )
        self.document_encoder = nn.TransformerEncoder(encoder_layer, num_layers=2)
        
        # 일관성 체크 레이어
        self.consistency_layer = nn.Linear(hidden_size * 2, hidden_size)
        
        # 최종 분류기
        self.classifier = nn.Sequential(
            nn.Dropout(0.3),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.GELU(),
            nn.BatchNorm1d(hidden_size // 2),
            nn.Dropout(0.2),
            nn.Linear(hidden_size // 2, 1)
        )
        
    def forward(self, paragraph_inputs, document_context=None, paragraph_positions=None):
        batch_size, seq_len = paragraph_inputs['input_ids'].shape
        
        # 문단 수준 인코딩
        with torch.cuda.amp.autocast():
            paragraph_outputs = self.roberta(**paragraph_inputs)
            paragraph_embeddings = paragraph_outputs.last_hidden_state[:, 0, :]  # [CLS]
        
        # 위치 정보 추가
        if paragraph_positions is not None:
            pos_embeddings = self.position_embedding(paragraph_positions)
            paragraph_embeddings = paragraph_embeddings + pos_embeddings
        
        # 문서 컨텍스트가 있는 경우 문단 간 관계 모델링
        if document_context is not None:
            # 문단들을 시퀀스로 처리
            paragraph_sequence = paragraph_embeddings.unsqueeze(0)  # (1, num_paragraphs, hidden)
            
            # 문단 간 어텐션
            attended_paragraphs, _ = self.inter_paragraph_attention(
                paragraph_sequence, paragraph_sequence, paragraph_sequence
            )
            
            # 문서 레벨 인코딩
            document_representation = self.document_encoder(attended_paragraphs)
            
            # 원본과 문서 컨텍스트 결합
            combined = torch.cat([
                paragraph_embeddings,
                document_representation.squeeze(0)
            ], dim=-1)
            
            final_embeddings = self.consistency_layer(combined)
        else:
            final_embeddings = paragraph_embeddings
        
        # 분류
        logits = self.classifier(final_embeddings)
        return logits.squeeze(-1)

print("계층적 모델 클래스 정의 완료")


In [None]:
# 계층적 데이터셋 클래스
class HierarchicalDataset(Dataset):
    """문서 컨텍스트를 고려한 데이터셋"""
    
    def __init__(self, para_df, tokenizer, max_length=512, mode='train'):
        self.para_df = para_df
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.mode = mode
        
        # title별로 그룹화
        self.grouped = para_df.groupby('title')
        self.samples = []
        
        for title, group in self.grouped:
            group = group.sort_values('paragraph_index')
            for idx, row in group.iterrows():
                self.samples.append({
                    'title': title,
                    'paragraph_text': row['paragraph_text'],
                    'paragraph_index': row['paragraph_index'],
                    'generated': row['generated'] if mode == 'train' else None,
                    'other_paragraphs': group[group.index != idx]['paragraph_text'].tolist()
                })
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample = self.samples[idx]
        
        # 현재 문단 토크나이징
        paragraph_encoding = self.tokenizer(
            sample['paragraph_text'],
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        
        # 문서 컨텍스트 (다른 문단들)
        if sample['other_paragraphs']:
            # 다른 문단들을 합쳐서 컨텍스트로 사용 (일부만)
            context_text = ' '.join(sample['other_paragraphs'][:3])  # 최대 3개 문단
            context_encoding = self.tokenizer(
                context_text,
                truncation=True,
                padding='max_length',
                max_length=self.max_length,
                return_tensors='pt'
            )
        else:
            # 컨텍스트가 없으면 현재 문단으로 대체
            context_encoding = paragraph_encoding
        
        result = {
            'paragraph_input_ids': paragraph_encoding['input_ids'].flatten(),
            'paragraph_attention_mask': paragraph_encoding['attention_mask'].flatten(),
            'context_input_ids': context_encoding['input_ids'].flatten(),
            'context_attention_mask': context_encoding['attention_mask'].flatten(),
            'paragraph_position': torch.tensor(sample['paragraph_index'], dtype=torch.long)
        }
        
        if self.mode == 'train':
            result['labels'] = torch.tensor(sample['generated'], dtype=torch.float)
        
        return result

print("계층적 데이터셋 클래스 정의 완료")


In [None]:
# 계층적 모델 훈련 함수
def train_hierarchical_model(model, train_loader, val_loader, num_epochs=3, lr=1e-5):
    """계층적 모델 훈련"""
    
    # 옵티마이저 및 스케줄러
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
    
    # 클래스 가중치 적용한 손실 함수
    class_weights_tensor = torch.tensor([
        class_weight_dict[0], class_weight_dict[1]
    ], dtype=torch.float).to(device)
    
    criterion = nn.BCEWithLogitsLoss(
        pos_weight=class_weights_tensor[1] / class_weights_tensor[0]
    )
    
    model.to(device)
    best_auc = 0
    
    for epoch in range(num_epochs):
        # 훈련
        model.train()
        train_loss = 0
        
        for batch in train_loader:
            optimizer.zero_grad()
            
            # 입력 준비
            paragraph_inputs = {
                'input_ids': batch['paragraph_input_ids'].to(device),
                'attention_mask': batch['paragraph_attention_mask'].to(device)
            }
            context_inputs = {
                'input_ids': batch['context_input_ids'].to(device),
                'attention_mask': batch['context_attention_mask'].to(device)
            }
            positions = batch['paragraph_position'].to(device)
            labels = batch['labels'].to(device)
            
            # 순전파
            with torch.cuda.amp.autocast():
                outputs = model(
                    paragraph_inputs,
                    document_context=context_inputs,
                    paragraph_positions=positions
                )
                loss = criterion(outputs, labels)
            
            # 역전파
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            train_loss += loss.item()
        
        # 검증
        model.eval()
        val_predictions = []
        val_labels = []
        
        with torch.no_grad():
            for batch in val_loader:
                paragraph_inputs = {
                    'input_ids': batch['paragraph_input_ids'].to(device),
                    'attention_mask': batch['paragraph_attention_mask'].to(device)
                }
                context_inputs = {
                    'input_ids': batch['context_input_ids'].to(device),
                    'attention_mask': batch['context_attention_mask'].to(device)
                }
                positions = batch['paragraph_position'].to(device)
                labels = batch['labels']
                
                with torch.cuda.amp.autocast():
                    outputs = model(
                        paragraph_inputs,
                        document_context=context_inputs,
                        paragraph_positions=positions
                    )
                
                predictions = torch.sigmoid(outputs).cpu().numpy()
                val_predictions.extend(predictions)
                val_labels.extend(labels.numpy())
        
        # AUC 계산
        val_auc = roc_auc_score(val_labels, val_predictions)
        
        print(f"Epoch {epoch+1}/{num_epochs}:")
        print(f"  Train Loss: {train_loss/len(train_loader):.4f}")
        print(f"  Val AUC: {val_auc:.4f}")
        
        # 최고 성능 모델 저장
        if val_auc > best_auc:
            best_auc = val_auc
            torch.save(model.state_dict(), 'hierarchical_model_best.pth')
    
    return best_auc

print("훈련 함수 정의 완료")


In [None]:
# 데이터 준비 및 모델 생성
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# 훈련/검증 데이터 분할
train_mask = train_para_df['title'].isin(train_titles)
val_mask = train_para_df['title'].isin(val_titles)

train_data = train_para_df[train_mask]
val_data = train_para_df[val_mask]

print(f"훈련 데이터: {len(train_data)}개 문단")
print(f"검증 데이터: {len(val_data)}개 문단")

# 데이터셋 생성
train_dataset = HierarchicalDataset(train_data, tokenizer, mode='train')
val_dataset = HierarchicalDataset(val_data, tokenizer, mode='train')

# 데이터로더 생성
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# 계층적 모델 생성
hierarchical_model = HierarchicalAIDetector(MODEL_NAME)

print(f"계층적 모델 생성 완료")
print(f"훈련 배치 수: {len(train_loader)}")
print(f"검증 배치 수: {len(val_loader)}")


In [None]:
# 계층적 모델 훈련 실행
print("계층적 모델 훈련 시작...")

try:
    best_auc = train_hierarchical_model(
        hierarchical_model, 
        train_loader, 
        val_loader,
        num_epochs=3,
        lr=1e-5
    )
    
    print(f"계층적 모델 훈련 완료")
    print(f"최고 검증 AUC: {best_auc:.4f}")
    
    # 메타데이터 저장
    hierarchical_metadata = {
        'model_type': 'hierarchical',
        'best_auc': best_auc,
        'model_name': MODEL_NAME,
        'improvements': [
            'document_context_modeling',
            'inter_paragraph_attention', 
            'position_encoding',
            'consistency_layer'
        ]
    }
    
    with open('step2_metadata.json', 'w') as f:
        json.dump(hierarchical_metadata, f, indent=2)
    
    print("2단계 완료 - 계층적 모델링")
    print("다음 단계: 03_korean_features.ipynb 실행")
    
except Exception as e:
    print(f"훈련 중 오류: {e}")
    import traceback
    traceback.print_exc()
