# 단층 GRU 기반 생성 결과 비교

In [12]:
import json
import math
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.nn import functional as F
from transformers import AutoTokenizer
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
from tqdm import tqdm

### `Vocabulary`

In [13]:
class Vocabulary(object):
    """매핑을 위해 텍스트를 처리하고 어휘 사전을 만드는 클래스 """
    
    def __init__(self, token_to_idx=None):
        """
        매개변수:
            token_to_idx (dict): 기존 토큰-인덱스 매핑 딕셔너리
        """
        
        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx
        self._idx_to_token = {idx: token 
                              for token, idx in self._token_to_idx.items()}

    def to_serializable(self):
        """ 직렬화할 수 있는 딕셔너리를 반환 """
        return {'token_to_idx': self._token_to_idx}

    @classmethod
    def from_serializable(cls, contents):
        """ 직렬화된 딕셔너리에서 Vocabulary 객체를 만듬 """
        return cls(**contents)

    def add_token(self, token):
        """ 토큰을 기반으로 매핑 딕셔너리를 업데이트

        매개변수:
            token (str): Vocabulary에 추가할 토큰
        반환값:
            index (int): 토큰에 상응하는 정수
        """
        if token in self._token_to_idx:
            index = self._token_to_idx[token]
        else:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index
            
    def add_many(self, tokens):
        """토큰 리스트를 Vocabulary에 추가
        
        매개변수:
            tokens (list): 문자열 토큰 리스트
        반환값:
            indices (list): 토큰 리스트에 상응되는 인덱스 리스트
        """
        return [self.add_token(token) for token in tokens]

    def lookup_token(self, token):
        """토큰에 대응하는 인덱스를 추출
        
        매개변수:
            token (str): 찾을 토큰 
        반환값:
            index (int): 토큰에 해당하는 인덱스
        """
        return self._token_to_idx[token]

    def lookup_index(self, index):
        """ 인덱스에 해당하는 토큰을 반환
        
        매개변수: 
            index (int): 찾을 인덱스
        반환값:
            token (str): 인텍스에 해당하는 토큰
        에러:
            KeyError: 인덱스가 Vocabulary에 없을 때 발생
        """
        if index not in self._idx_to_token:
            raise KeyError("the index (%d) is not in the Vocabulary" % index)
        return self._idx_to_token[index]

    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)

    def __len__(self):
        return len(self._token_to_idx)

class SequenceVocabulary(Vocabulary):
    def __init__(self, tokenizer, token_to_idx=None):

        super(SequenceVocabulary, self).__init__(token_to_idx)

        # KLUE RoBERTa 토크나이저에서 특수 토큰 설정
        self._mask_token = tokenizer.mask_token
        self._unk_token = tokenizer.unk_token
        self._begin_seq_token = tokenizer.cls_token
        self._end_seq_token = tokenizer.sep_token

        self.mask_index = self.add_token(self._mask_token)
        self.unk_index = self.add_token(self._unk_token)
        self.begin_seq_index = self.add_token(self._begin_seq_token)
        self.end_seq_index = self.add_token(self._end_seq_token)

    def to_serializable(self):
        contents = super(SequenceVocabulary, self).to_serializable()
        contents.update({'unk_token': self._unk_token,
                         'mask_token': self._mask_token,
                         'begin_seq_token': self._begin_seq_token,
                         'end_seq_token': self._end_seq_token})
        return contents
    
    @classmethod
    def from_serializable(cls, contents):
        # KLUE RoBERTa 토크나이저 초기화
        tokenizer = AutoTokenizer.from_pretrained("klue/roberta-base")
        
        # token_to_idx만 추출하여 부모 클래스 초기화
        token_to_idx = contents.get('token_to_idx', {})
        return cls(tokenizer=tokenizer, token_to_idx=token_to_idx)

    def lookup_token(self, token):
        """ 토큰에 대응하는 인덱스를 추출
        토큰이 없으면 UNK 인덱스를 반환
        
        매개변수:
            token (str): 찾을 토큰 
        반환값:
            index (int): 토큰에 해당하는 인덱스
        노트:
            UNK 토큰을 사용하려면 (Vocabulary에 추가하기 위해)
            `unk_index`가 0보다 커야 함함
        """
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]

### `Vectorizer`

In [14]:
class CommentVectorizer(object):
    """어휘 사전을 생성하고 관리"""
    
    def __init__(self, text_vocab, tokenizer, target_vocab=None):
        """
        매개변수:
            text_vocab (SequenceVocabulary): 댓글 텍스트의 어휘 사전
            target_vocab (Vocabulary, optional): 타겟 레이블의 어휘 사전
            tokenizer (AutoTokenizer, optional): 토큰화를 위한 RoBERTa 토크나이저
        """
        self.tokenizer = tokenizer
        self.text_vocab = text_vocab
        self.target_vocab = target_vocab

    def vectorize(self, text, vector_length=-1):
        """댓글 텍스트를 입력 벡터와 출력 벡터로 변환
        
        매개변수:
            text (str): 벡터로 변환할 댓글 텍스트
            vector_length (int): 벡터의 고정 길이 (기본값: -1, 즉 가변 길이)
        반환값:
            tuple: (from_vector, to_vector)
                from_vector (numpy.ndarray): 입력 텍스트의 벡터화 결과
                to_vector (numpy.ndarray): 타겟 텍스트의 벡터화 결과
        """
        # RoBERTa tokenizer로 토큰화
        tokens = self.tokenizer.tokenize(text)
  
        indices = [self.text_vocab.begin_seq_index]
        indices.extend(self.text_vocab.lookup_token(token) for token in tokens)
        indices.append(self.text_vocab.end_seq_index)
        
        from_vector = np.zeros(vector_length, dtype=np.int64)
        from_indices = indices[:-1]
        from_vector[:len(from_indices)] = from_indices
        from_vector[len(from_indices):] = self.text_vocab.mask_index

        to_vector = np.zeros(vector_length,dtype=np.int64)
        to_indices = indices[1:]
        to_vector[:len(to_indices)] = to_indices
        to_vector[len(to_indices):] = self.text_vocab.mask_index
 
        return from_vector, to_vector

    @classmethod
    def from_dataframe(cls, df):
        """데이터프레임을 이용해 CommentVectorizer 객체를 초기화
        
        매개변수:
            df (pandas.DataFrame): 댓글 데이터프레임
        반환값:
            CommentVectorizer 객체
        """
        tokenizer = AutoTokenizer.from_pretrained("klue/roberta-base")
        text_vocab = SequenceVocabulary(tokenizer=tokenizer)

        for index, row in df.iterrows():
            tokens = tokenizer.tokenize(row.text)
            for char in tokens:
                text_vocab.add_token(char)

        target_vocab = None
        if 'target_label' in df.columns:
            
            for index, row in df.iterrows():
                target_vocab.add_token(row.target_label)

        return cls(text_vocab, tokenizer, target_vocab)

    @classmethod
    def from_serializable(cls, contents):
        """직렬화된 데이터를 사용해 CommentVectorizer 객체를 복원
        
        매개변수:
            contents (dict): 직렬화된 데이터
        반환값:
            CommentVectorizer 객체
        """
        # KLUE RoBERTa 토크나이저 초기화
        tokenizer = AutoTokenizer.from_pretrained("klue/roberta-base")

        text_vocab = SequenceVocabulary.from_serializable(contents['text_vocab'])

        # 타겟 어휘 사전 복원
        target_vocab = None
        if 'target_vocab' in contents:
            target_vocab = Vocabulary.from_serializable(contents['target_vocab'])

        return cls(text_vocab=text_vocab, target_vocab=target_vocab, tokenizer=tokenizer)

    def to_serializable(self):
        """CommentVectorizer 객체를 직렬화 가능한 형태로 변환
        
        반환값:
            dict: 직렬화된 데이터
        """
        contents = {
            'text_vocab': self.text_vocab.to_serializable()
        }
        if self.target_vocab:
            contents['target_vocab'] = self.target_vocab.to_serializable()

        return contents

## 파이토치 데이터셋 

In [15]:
class CommentDataset(Dataset):
    def __init__(self, comment_df, vectorizer):
        """
        매개변수:
            comment_df (pandas.DataFrame): 데이터셋
            vectorizer (CommentVectorizer): 데이터셋에서 만든 Vectorizer 객체
        """
        self.comment_df = comment_df
        self._vectorizer = vectorizer

        # 최대 시퀀스 길이 계산
        self._max_seq_length = max(len(self._vectorizer.tokenizer.tokenize(text)) 
                                 for text in self.comment_df.text) + 2

        # 데이터셋 분할
        self.train_df = self.comment_df[self.comment_df.split == 'train']
        self.train_size = len(self.train_df)

        self.val_df = self.comment_df[self.comment_df.split == 'val']
        self.validation_size = len(self.val_df)

        self.test_df = self.comment_df[self.comment_df.split == 'test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {
            'train': (self.train_df, self.train_size),
            'val': (self.val_df, self.validation_size),
            'test': (self.test_df, self.test_size)
        }

        self.set_split('train')

    @classmethod
    def load_dataset_and_make_vectorizer(cls, dataset_csv):
        """데이터셋을 로드하고 새로운 Vectorizer를 만듭니다.
        
        매개변수:
            dataset_csv (str): 데이터셋의 위치
        반환값:
            CommentDataset 객체
        """
        comment_df = pd.read_csv(dataset_csv)
        return cls(comment_df, CommentVectorizer.from_dataframe(comment_df))
    
    @classmethod
    def load_dataset_and_load_vectorizer(cls, dataset_csv, vectorizer_filepath):
        """데이터셋과 캐싱된 Vectorizer 객체를 로드합니다.
        
        매개변수:
            dataset_csv (str): 데이터셋의 위치
            vectorizer_filepath (str): Vectorizer 객체의 저장 위치
        반환값:
            CommentDataset 객체
        """
        comment_df = pd.read_csv(dataset_csv)
        vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
        return cls(comment_df, vectorizer)

    @staticmethod
    def load_vectorizer_only(vectorizer_filepath):
        """파일에서 Vectorizer 객체를 로드하는 정적 메서드
        
        매개변수:
            vectorizer_filepath (str): 직렬화된 Vectorizer 객체의 위치
        반환값:
            CommentVectorizer 객체
        """
        with open(vectorizer_filepath) as fp:
            return CommentVectorizer.from_serializable(json.load(fp))

    def get_vectorizer(self):
        """벡터 변환 객체를 반환합니다"""
        return self._vectorizer

    def set_split(self, split="train"):
        """데이터셋의 분할을 설정합니다."""
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]

    def __len__(self):
        return self._target_size
    
    def __getitem__(self, index):
        """파이토치 데이터셋의 주요 진입 메서드"""

        row = self._target_df.iloc[index]
        from_vector, to_vector = self._vectorizer.vectorize(row.text, self._max_seq_length)

        return_dict = {
            'x_data': from_vector,
            'y_target': to_vector
        }

        # target_label 컬럼이 있고 vectorizer가 target_vocab을 가지고 있을 때만 처리
        if 'target_label' in self.comment_df.columns and hasattr(self._vectorizer, 'target_vocab') and self._vectorizer.target_vocab is not None:
      
            target_index = self._vectorizer.target_vocab.lookup_token(row.target_label)
            return_dict["target_index"] = target_index

        return return_dict
    
    def get_num_batches(self, batch_size):
        """배치 크기가 주어지면 데이터셋으로 만들 수 있는 배치 개수를 반환합니다.
        
        매개변수:
            batch_size (int)
        반환값:
            배치 개수
        """
        return len(self) // batch_size

### `DataLoader`

In [16]:
def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"): 
    """파이토치 DataLoader를 감싸고 있는 제너레이터 함수"""
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                          shuffle=shuffle, drop_last=drop_last)

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict

## 단층 GRU 모델

In [17]:
class SingleLayerGRUModel(nn.Module):
    """단층 GRU 기반의 생성 모델"""
    
    def __init__(self, embedding_dim, num_embeddings, hidden_size, dropout_p,
                 embedding_path, num_targets=None, padding_idx=0):
        """
        매개변수:
            embedding_dim (int): KLUE RoBERTa 임베딩 차원
            num_embeddings (int): 임베딩 테이블 크기 (단어장 크기)
            hidden_size (int): GRU의 은닉 상태 크기
            dropout_p (float): 드롭아웃 확률
            embedding_path (str): 정렬된 임베딩 파일 경로
            num_targets (int, optional): 타겟 개수 (조건부 생성 시 사용)
            padding_idx (int): 패딩 토큰의 인덱스
        """
        super(SingleLayerGRUModel, self).__init__()
        
        self.embedding = nn.Embedding.from_pretrained(
            torch.load(embedding_path),
            freeze=True,
            padding_idx=padding_idx
        )
        
        if num_targets is not None:
   
            self.target_emb = nn.Embedding(num_embeddings=num_targets,
                                       embedding_dim=hidden_size)

        self.rnn = nn.GRU(input_size = embedding_dim,
                          hidden_size = hidden_size,
                          batch_first= True)
        
        self.fc = nn.Linear(in_features=hidden_size,
                            out_features=num_embeddings)


        self._dropout_p = dropout_p

        self.has_target = num_targets is not None  # 조건부 생성 여부 확인

    def forward(self, x_in, target_index=None, apply_softmax=False):
        """
        순전파
        
        매개변수:
            x_in (torch.Tensor): 입력 텐서 (batch_size, sequence_length)
            target_index (torch.Tensor, optional): 타겟 텐서 (조건부 생성 시 사용)
            apply_softmax (bool): 소프트맥스 적용 여부
        반환값:
            output (torch.Tensor): 출력 텐서 (batch_size, sequence_length, num_embeddings)
        """
        # 입력 텍스트 임베딩

        x_embedded = self.embedding(x_in)

        if self.has_target and target_index is not None:
   
            target_embedded = self.target_emb(target_index).unsqueeze(0)
            y_out, _ = self.rnn(x_embedded,target_embedded)
   
        else:

            y_out, _ = self.rnn(x_embedded)
    
        batch_size, seq_size, feat_size = y_out.shape
        y_out =y_out.contiguous().view(batch_size * seq_size, feat_size)

        y_out= self.fc(F.dropout(y_out,p=self._dropout_p))
                               
        if apply_softmax:
            y_out = F.softmax(y_out, dim=1)

        new_feat_size = y_out.shape[-1]
     
        output = y_out.view(batch_size, seq_size, new_feat_size)    
  
        return output

### 헬퍼 함수

In [18]:
def normalize_sizes(y_pred, y_true):
    """텐서 크기 정규화

    매개변수:
        y_pred (torch.Tensor): 모델의 출력
            3차원 텐서이면 2차원 행렬로 변환
        y_true (torch.Tensor): 타깃 텐서
            2차원 텐서이면 1차원 벡터로 변환
    """
    if len(y_pred.size()) == 3:
        y_pred = y_pred.contiguous().view(-1, y_pred.size(2))
    if len(y_true.size()) == 2:
        y_true = y_true.contiguous().view(-1)
    return y_pred, y_true

def compute_accuracy(y_pred, y_true, mask_index):
    """정확도 계산

    매개변수:
        y_pred (torch.Tensor): 모델의 예측 결과
        y_true (torch.Tensor): 실제 정답
        mask_index (int): 마스크 토큰의 인덱스
    반환값:
        정확도 (float)
    """
    y_pred, y_true = normalize_sizes(y_pred, y_true)
    _, y_pred_indices = y_pred.max(dim=1)

    correct_indices = torch.eq(y_pred_indices, y_true).float()
    valid_indices = torch.ne(y_true, mask_index).float()

    n_correct = (correct_indices * valid_indices).sum().item()
    n_valid = valid_indices.sum().item()

    return n_correct / n_valid * 100

def sequence_loss(y_pred, y_true, mask_index):
    """시퀀스 손실 계산

    매개변수:
        y_pred (torch.Tensor): 모델의 예측 결과
        y_true (torch.Tensor): 실제 정답
        mask_index (int): 마스크 토큰의 인덱스
    반환값:
        손실 값 (torch.Tensor)
    """
    y_pred, y_true = normalize_sizes(y_pred, y_true)
    return F.cross_entropy(y_pred, y_true, ignore_index=mask_index)

def compute_perplexity(y_pred, y_true, mask_index):
    """Perplexity 계산 함수

    매개변수:
        y_pred (torch.Tensor): 모델의 예측 결과
        y_true (torch.Tensor): 실제 정답
        mask_index (int): 마스크 토큰의 인덱스
    반환값:
        perplexity (float): Perplexity 값
    """
    y_pred, y_true = normalize_sizes(y_pred, y_true)
    loss = F.cross_entropy(y_pred, y_true, ignore_index=mask_index, reduction='sum')
    n_tokens = torch.ne(y_true, mask_index).sum().item()
    perplexity = math.exp(loss.item() / n_tokens)
    return perplexity

In [19]:
def sample_from_model(model, vectorizer, prompts, targets=None, sample_size=50, 
                    temperature=1.0):
   """모델이 만든 인덱스 시퀀스를 샘플링

   매개변수:
       model: 훈련된 모델
       vectorizer: CommentVectorizer 객체  
       prompts (list): 생성 시작 부분 텍스트 리스트
       targets (list): 각 샘플의 target(str)을 나타내는 리스트 (조건부 생성시 사용)
       sample_size (int): 샘플의 최대 길이
       temperature (float): 무작위성 정도
           0.0 < temperature < 1.0 이면 최대 값을 선택할 가능성이 높습니다
           temperature > 1.0 이면 균등 분포에 가깝습니다

   반환값:
       indices (torch.Tensor): 인덱스 행렬 (batch_size, sample_size)
   """
   begin_seq_indices = []
   for prompt in prompts:
       tokens = vectorizer.tokenizer.tokenize(prompt)
       indices = [vectorizer.text_vocab.begin_seq_index]
       indices.extend(vectorizer.text_vocab.lookup_token(token) for token in tokens)
       begin_seq_indices.append(indices)
   
   max_len = max(len(indices) for indices in begin_seq_indices)
   for indices in begin_seq_indices:
       while len(indices) < max_len:
           indices.append(vectorizer.text_vocab.mask_index)
   
   indices = torch.tensor(begin_seq_indices, dtype=torch.int64)
   
   target_tensor = None
   if targets is not None and hasattr(vectorizer, 'target_vocab'):
       target_indices = [vectorizer.target_vocab.lookup_token(target) for target in targets]
       target_tensor = torch.tensor(target_indices, dtype=torch.int64)
   
   generated = indices
   for _ in range(sample_size - max_len):
       if target_tensor is not None:
           y_pred = model(generated, target_tensor, apply_softmax=True)
       else:
           y_pred = model(generated, apply_softmax=True)
           
       next_token_logits = y_pred[:, -1, :] / temperature
       next_token = torch.multinomial(next_token_logits, num_samples=1)
       generated = torch.cat([generated, next_token], dim=1)
       
       if (next_token == vectorizer.text_vocab.end_seq_index).any():
           break
   
   return generated

def decode_samples(sampled_indices, vectorizer):
    """인덱스를 댓글 문자열로 변환

    매개변수:
        sampled_indices (torch.Tensor): sample_from_model 함수에서 얻은 인덱스 
        vectorizer (CommentVectorizer): CommentVectorizer 객체

    반환값:
        decoded_comments (list): 디코딩된 댓글 문자열의 리스트
    """
    decoded_comments = []
    vocab = vectorizer.text_vocab
    
    for sample_index in range(sampled_indices.shape[0]):
        text = ""
        for time_step in range(sampled_indices.shape[1]):
            sample_item = sampled_indices[sample_index, time_step].item()
            if sample_item == vocab.begin_seq_index:
                continue
            elif sample_item == vocab.end_seq_index:
                break
            else:
                text += vocab.lookup_index(sample_item)
        decoded_comments.append(text)
       
    return decoded_comments

def load_models_and_vectorizers():
    models = {}
    vectorizers = {}
    
    model_paths = {
        'normal': Path('model_storage/normal_comment_generation'),
        'hate': Path('model_storage/hate_comment_generation'),
        'conditioned_hate': Path('model_storage/conditioned_hate_comment_generation')
    }
    
    for model_type, path in model_paths.items():
        with open(path / 'vectorizer.json', 'r') as f:
            vectorizer_data = json.load(f)
            vectorizers[model_type] = CommentVectorizer.from_serializable(vectorizer_data)
        
        model_path = path / 'single_gru_model.pth'
        if model_type == 'conditioned_hate':
            models[model_type] = SingleLayerGRUModel(
                embedding_dim=768,
                num_embeddings=len(vectorizers[model_type].text_vocab),
                num_targets=len(vectorizers[model_type].target_vocab),
                dropout_p=0.1,
                hidden_size=512,
                embedding_path=str(path / 'roberta_embedding.pt'),
                padding_idx=vectorizers[model_type].text_vocab.mask_index
            )
        else:
            models[model_type] = SingleLayerGRUModel(
                embedding_dim=768,
                num_embeddings=len(vectorizers[model_type].text_vocab),
                dropout_p=0.1,
                hidden_size=512,
                embedding_path=str(path / 'roberta_embedding.pt'),
                padding_idx=vectorizers[model_type].text_vocab.mask_index
            )
        
        models[model_type].load_state_dict(torch.load(model_path))
        models[model_type].eval()
    
    return models, vectorizers


In [20]:
print("\n=== 정량적 성능 평가 시작 ===")

def evaluate_model_on_testset(model, dataset, vectorizer, target_label=None):
    """특정 데이터셋에 대한 모델의 성능을 평가"""
    model = model.eval()
    dataset.set_split('test')
    batch_generator = generate_batches(dataset, batch_size=64, device='cpu')
    
    running_loss = 0.0
    running_acc = 0.0
    running_ppl = 0.0
    batch_count = 0
    
    test_bar = tqdm(desc='테스트 중', 
                   total=dataset.get_num_batches(64),
                   position=1)
    
    with torch.no_grad():
        for _, batch_dict in enumerate(batch_generator):
            # 모델 예측
            if target_label is not None and hasattr(model, 'has_target'):
                target_indices = torch.tensor([
                    vectorizer.target_vocab.lookup_token(target_label)
                ] * batch_dict['x_data'].size(0))
                y_pred = model(x_in=batch_dict['x_data'], target_index=target_indices)
            else:
                y_pred = model(x_in=batch_dict['x_data'])
            
            # 평가 지표 계산
            loss = sequence_loss(y_pred, batch_dict['y_target'],
                               vectorizer.text_vocab.mask_index)
            acc = compute_accuracy(y_pred, batch_dict['y_target'],
                                 vectorizer.text_vocab.mask_index)
            ppl = compute_perplexity(y_pred, batch_dict['y_target'],
                                   vectorizer.text_vocab.mask_index)
            
            # 통계 업데이트
            batch_count += 1
            running_loss += (loss.item() - running_loss) / batch_count
            running_acc += (acc - running_acc) / batch_count
            running_ppl += (ppl - running_ppl) / batch_count
            
            test_bar.set_postfix(loss=running_loss, acc=running_acc, ppl=running_ppl)
            test_bar.update()
    
    test_bar.close()
    return {
        'loss': running_loss,
        'accuracy': running_acc,
        'perplexity': running_ppl
    }

print("\n1. 테스트 데이터셋 로드 중...")

# 일반 댓글 데이터셋
print("\n일반 댓글 데이터셋 로드 중...")
normal_dataset = CommentDataset.load_dataset_and_load_vectorizer(
    "normal_comments.csv",
    "model_storage/normal_comment_generation/vectorizer.json"
)

# 혐오 댓글 데이터셋
print("\n혐오 댓글 데이터셋 로드 중...")
hate_dataset = CommentDataset.load_dataset_and_load_vectorizer(
    "hate_comments.csv",
    "model_storage/hate_comment_generation/vectorizer.json"
)

print("\n2. 모델 로드 및 평가 시작...")
models, vectorizers = load_models_and_vectorizers()

results = {
    'normal_testset': {},
    'hate_testset': {}
}

# 각 모델의 성능 평가
print("\n일반 댓글 테스트셋 평가:")
print("="*60)
for model_type in ['normal', 'hate']:  # 조건부 모델 제외
    print(f"\n=== {model_type} 모델 평가 중 ===")
    results['normal_testset'][model_type] = evaluate_model_on_testset(
        models[model_type], normal_dataset, vectorizers['normal']
    )

print("\n혐오 댓글 테스트셋 평가:")
print("="*60)
for model_type in ['normal', 'hate', 'conditioned_hate']:  # 모든 모델 포함
    print(f"\n=== {model_type} 모델 평가 중 ===")
    if model_type == 'conditioned_hate':
        target_label = hate_dataset.test_df.iloc[0].target_label
        results['hate_testset'][model_type] = evaluate_model_on_testset(
            models[model_type], 
            hate_dataset,
            vectorizers['conditioned_hate'],
            target_label=target_label
        )
    else:
        results['hate_testset'][model_type] = evaluate_model_on_testset(
            models[model_type], 
            hate_dataset, 
            vectorizers['hate']
        )

print("\n3. 평가 결과 요약")
print("\n=== 모델 성능 비교 ===")

# 일반 댓글 테스트셋 결과
print("\n[일반 댓글 테스트셋 결과]")
print("="*80)
print(f"{'모델':^20} {'Loss':^12} {'Accuracy':^12} {'Perplexity':^12}")
print("="*80)
for model_name, metrics in results['normal_testset'].items():
    print(f"{model_name:^20} {metrics['loss']:^12.4f} "
          f"{metrics['accuracy']:^12.2f} {metrics['perplexity']:^12.2f}")
print("="*80)

# 혐오 댓글 테스트셋 결과
print("\n[혐오 댓글 테스트셋 결과]")
print("="*80)
print(f"{'모델':^20} {'Loss':^12} {'Accuracy':^12} {'Perplexity':^12}")
print("="*80)
for model_name, metrics in results['hate_testset'].items():
    print(f"{model_name:^20} {metrics['loss']:^12.4f} "
          f"{metrics['accuracy']:^12.2f} {metrics['perplexity']:^12.2f}")
print("="*80)


=== 정량적 성능 평가 시작 ===

1. 테스트 데이터셋 로드 중...

일반 댓글 데이터셋 로드 중...

혐오 댓글 데이터셋 로드 중...

2. 모델 로드 및 평가 시작...

일반 댓글 테스트셋 평가:

=== normal 모델 평가 중 ===


테스트 중: 100%|██████████| 31/31 [01:46<00:00,  3.43s/it, acc=61.8, loss=1.64, ppl=5.15]



=== hate 모델 평가 중 ===


테스트 중: 100%|██████████| 31/31 [01:39<00:00,  3.21s/it, acc=0.313, loss=16.8, ppl=2.09e+7]



혐오 댓글 테스트셋 평가:

=== normal 모델 평가 중 ===


테스트 중: 100%|██████████| 31/31 [02:20<00:00,  4.54s/it, acc=0.463, loss=18.6, ppl=1.23e+8]



=== hate 모델 평가 중 ===


테스트 중: 100%|██████████| 31/31 [02:22<00:00,  4.59s/it, acc=56.2, loss=1.96, ppl=7.09]



=== conditioned_hate 모델 평가 중 ===


테스트 중: 100%|██████████| 31/31 [02:31<00:00,  4.88s/it, acc=39.3, loss=3.15, ppl=23.8]


3. 평가 결과 요약

=== 모델 성능 비교 ===

[일반 댓글 테스트셋 결과]
         모델              Loss       Accuracy    Perplexity 
       normal           1.6373       61.79         5.15    
        hate           16.8153        0.31     20870141.19 

[혐오 댓글 테스트셋 결과]
         모델              Loss       Accuracy    Perplexity 
       normal          18.6069        0.46     123459010.18
        hate            1.9569       56.16         7.09    
  conditioned_hate      3.1510       39.29        23.77    





## 추론

In [21]:

# %%
test_samples = pd.DataFrame({
    'text': [
        # 일반 댓글 (1-5)
        "이름 장군님께서 만드신 체육이다 늘 항상 이름 장군님께도 감사하게 여기며 살아라 축하한다",
        "이름 오빠 믿어!!!!!!! 항상 믿어요!!!!!!! 영원히 함께하자!!! 사랑해요",
        "못믿겠는데  숏컷한 여자들이 대한민국에 몇명인데  중년이상 여성은 반이상이 숏컷인데 저걸 뭐라했을 리가 없다 ",
        "사실상 통일이 가능한가? 가능하더라도 해야할까? 둘다 아니라고 본다",
        "세상에 이미 죽은 사람에게 상을 주다니 이게 말이 되냐?",
        # 혐오 댓글 (6-10)
        "50대 이상 오래 살았다 아제 맞고 죽어라 ",  # age
        "정치판 기웃대다가  떡실신되서 수술도 안되는 이름발면지켜보는 접대 못해서 안달난 접대녀  우리 이름도 있다",  # gender
        "서울 분리독립하자  외국인들 특히 홍어 짱깨 조선족들 지네나라로 강제추방해버리자 ",  # region
        "미친거 아니야? 민주당 자체가 공산주의를 동경하는구나 나라 망치지말고 북으로 가라 빨갱이들아",  # political
        "체조복장을 성적대상화로 보는 정신이상자가 있나… ? 왜 지들끼리 육갑떨지… ?",  # disabled
    ],
    'prompt': [
        # 일반 댓글 프롬프트
        "이름 장군님께서 만드신 체육이다 늘 항상 ",
        "이름 오빠 믿어!!!!!!! 항상 ",
        "못믿겠는데  숏컷한 여자들이 대한민국에 몇명인데 ",
        "사실상 통일이 가능한가? 가능하더라도 ",
        "세상에 이미 죽은 사람에게 상을 주다니 ",
        # 혐오 댓글 프롬프트 (앞부분만)
        "50대 이상 ",
        "정치판 기웃대다가  떡실신되서 수술도 안되는 ",
        "서울 분리독립하자  외국인들 특히 ",
        "미친거 아니야? 민주당 자체가 ",
        "체조복장을 성적대상화로 보는 "
    ],
    'target_label': [
        # 일반 댓글은 타겟 없음
        None, None, None, None, None,
        # 혐오 댓글 타겟
        "['age']",
        "['gender']",
        "['region']",
        "['political']",
        "['disabled']"
    ],
    'is_hate': [
        # 댓글 타입 구분
        False, False, False, False, False,
        True, True, True, True, True
    ]
})

In [22]:
print("\n1. 모델과 vectorizer 로드 중...")
models, vectorizers = load_models_and_vectorizers()

print("\n2. 댓글 이어쓰기 테스트...")
for idx, sample in enumerate(test_samples.itertuples(), start=1):
    print("\n" + "=" * 80)
    print(f"\n[샘플 {idx}]")

    text = sample.text
    prompt = sample.prompt
    is_hate = sample.is_hate

    print(f"원본 댓글: {text}")
    if is_hate:
        target = eval(sample.target_label)[0]
        print(f"타겟: {target}")
    print(f"\n입력된 댓글 앞 부분: {prompt}")
    print("\n생성 결과:")
    print("-" * 40)

    # 일반 댓글 모델
    model = models['normal'].cpu()
    sampled_indices = sample_from_model(
        model,
        vectorizers['normal'],
        prompts=[prompt],
        temperature=0.7
    )
    normal_result = decode_samples(sampled_indices, vectorizers['normal'])[0]
    print(f"[일반 댓글 모델]\n{normal_result}\n")

    # 혐오 댓글 모델
    model = models['hate'].cpu()
    sampled_indices = sample_from_model(
        model,
        vectorizers['hate'],
        prompts=[prompt],
        temperature=0.7
    )
    hate_result = decode_samples(sampled_indices, vectorizers['hate'])[0]
    print(f"[혐오 댓글 모델]\n{hate_result}\n")

    # 혐오 댓글인 경우에만 조건부 모델 실행
    if is_hate:
        model = models['conditioned_hate'].cpu()
        sampled_indices = sample_from_model(
            model,
            vectorizers['conditioned_hate'],
            prompts=[prompt],
            targets=[target],
            temperature=0.7
        )
        conditioned_result = decode_samples(sampled_indices, vectorizers['conditioned_hate'])[0]
        print(f"[조건부 혐오 댓글 모델 (타겟: {target})]\n{conditioned_result}")

    print("\n" + "=" * 80)


1. 모델과 vectorizer 로드 중...

2. 댓글 이어쓰기 테스트...


[샘플 1]
원본 댓글: 이름 장군님께서 만드신 체육이다 늘 항상 이름 장군님께도 감사하게 여기며 살아라 축하한다

입력된 댓글 앞 부분: 이름 장군님께서 만드신 체육이다 늘 항상 

생성 결과:
----------------------------------------
[일반 댓글 모델]
이름장군##님##께##서만드##신체육##이다늘항상이름장군##님##께##서알##면서든든##하##게살##면서다른모습##으로보##기좋##습##니다축하##드##님너무큰감동##이그래도국##힘##에국##구##를많이합니다결국대한민국

[혐오 댓글 모델]
이름장군##님##께##서만드##신체육##이다늘항상꽃##을##듯죄##다!!!




[샘플 2]
원본 댓글: 이름 오빠 믿어!!!!!!! 항상 믿어요!!!!!!! 영원히 함께하자!!! 사랑해요

입력된 댓글 앞 부분: 이름 오빠 믿어!!!!!!! 항상 

생성 결과:
----------------------------------------
[일반 댓글 모델]
이름오빠믿##어!!!!!!!항상이게##교육믿##고!!!!믿##고##자##나!!!!믿##고!!!!!이름친구!!!같##은여론##으로##불선동##하##지!

[혐오 댓글 모델]
이름오빠믿##어!!!!!!!항상몰락##한놈##이니##가아니##고대선##까##지해라~




[샘플 3]
원본 댓글: 못믿겠는데  숏컷한 여자들이 대한민국에 몇명인데  중년이상 여성은 반이상이 숏컷인데 저걸 뭐라했을 리가 없다 

입력된 댓글 앞 부분: 못믿겠는데  숏컷한 여자들이 대한민국에 몇명인데 

생성 결과:
----------------------------------------
[일반 댓글 모델]
못##믿##겠##는데숏##컷##한여자##들이대한민국##에몇##명##인##데중년##이상여성##은일본##에서##도숏##컷##이심각##했##다남자##고숏##컷잡##은사람##들##의명##이다

[혐오 댓글 모델]
못##믿##겠##는데숏##