In [67]:
import torch
import torch.nn as nn
import pandas as pd
import re
import torch.optim as optim
import torch.nn.functional as F
import pickle
import math
from sklearn.model_selection import train_test_split
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import Dataset, DataLoader
import numpy as np
import sentencepiece as spm
import os

In [68]:
data = pd.read_csv('/Users/wansookim/Downloads/code_implementation/transformer_project_submit/data/ChatbotData.csv')

In [69]:
# ============================================
# Step 2: 데이터 전처리하기
# ============================================
def load_data(file_path):
    """
    CSV 파일에서 챗봇 데이터를 로드합니다.
    
    Args:
        file_path: CSV 파일 경로
    
    Returns:
        DataFrame: 로드된 데이터프레임
    """
    print("=" * 50)
    print("데이터 로딩 중...")
    print("=" * 50)
    
    # CSV 파일 읽기
    df = pd.read_csv(file_path)
    
    print(f"전체 데이터 수: {len(df)}")
    print(f"\n데이터 샘플:")
    print(df.head())
    print(f"\n컬럼: {df.columns.tolist()}")
    
    return df


In [70]:
def preprocess_sentence(sentence):
    """
    단일 문장을 전처리합니다.
    
    전처리 과정:
    1. 소문자 변환 제거 (한국어는 대소문자 구분이 없음)
    2. 특수문자 처리 (일부는 유지, 일부는 제거)
    3. 중복 공백 제거
    4. 문장 앞뒤 공백 제거
    
    Args:
        sentence: 전처리할 문장
    
    Returns:
        str: 전처리된 문장
    """
    # None이나 NaN 값 처리
    if pd.isna(sentence) or sentence is None:
        return ""
    
    # 문자열로 변환
    sentence = str(sentence)
    
    # 1. 한국어, 영어, 숫자, 기본 구두점만 남기기
    # ㄱ-ㅎ: 한글 자음
    # ㅏ-ㅣ: 한글 모음  
    # 가-힣: 완성형 한글
    # a-zA-Z: 영어
    # 0-9: 숫자
    # \s: 공백
    # .,!?~: 기본 구두점
    sentence = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣a-zA-Z0-9\s.,!?~ㅠㅜ]', ' ', sentence)
    
    # 2. ㅠ, ㅜ 같은 감정 표현 자음/모음은 제거하거나 유지 (여기서는 유지)
    # 필요시 제거: sentence = re.sub(r'[ㅠㅜ]+', '', sentence)
    
    # 3. 여러 개의 공백을 하나로 통일
    sentence = re.sub(r'\s+', ' ', sentence)
    
    # 4. 문장 앞뒤 공백 제거
    sentence = sentence.strip()
    
    # 5. 연속된 특수문자 제거 (예: !!!!! -> !)
    sentence = re.sub(r'([!?.])\1+', r'\1', sentence)
    
    return sentence

In [71]:
def create_dataset(df):
    """
    DataFrame을 질문-답변 쌍으로 변환하고 전처리합니다.
    
    Args:
        df: 원본 데이터프레임
    
    Returns:
        tuple: (questions, answers) - 전처리된 질문과 답변 리스트
    """
    print("\n" + "=" * 50)
    print("데이터 전처리 중...")
    print("=" * 50)
    
    # 질문(Q)과 답변(A) 컬럼 추출
    questions = df['Q'].tolist()
    answers = df['A'].tolist()
    
    # 각 문장 전처리
    preprocessed_questions = []
    preprocessed_answers = []
    
    for i, (q, a) in enumerate(zip(questions, answers)):
        # 전처리 수행
        clean_q = preprocess_sentence(q)
        clean_a = preprocess_sentence(a)
        
        # 빈 문장이 아닌 경우만 추가
        if clean_q and clean_a:
            preprocessed_questions.append(clean_q)
            preprocessed_answers.append(clean_a)
        
        # 진행상황 출력 (1000개마다)
        if (i + 1) % 1000 == 0:
            print(f"전처리 진행: {i + 1}/{len(questions)}")
    
    print(f"\n전처리 완료!")
    print(f"유효한 데이터 쌍: {len(preprocessed_questions)}")
    print(f"\n전처리 예시:")
    for i in range(min(5, len(preprocessed_questions))):
        print(f"Q: {preprocessed_questions[i]}")
        print(f"A: {preprocessed_answers[i]}")
        print()
    
    return preprocessed_questions, preprocessed_answers

In [72]:
def save_processed_data(questions, answers, output_dir='./data'):
    """
    전처리된 데이터를 파일로 저장합니다.
    
    Args:
        questions: 질문 리스트
        answers: 답변 리스트
        output_dir: 출력 디렉토리
    """
    # 출력 디렉토리 생성
    os.makedirs(output_dir, exist_ok=True)
    
    # 질문과 답변을 각각 파일로 저장
    questions_path = os.path.join(output_dir, 'questions.txt')
    answers_path = os.path.join(output_dir, 'answers.txt')
    
    with open(questions_path, 'w', encoding='utf-8') as f:
        f.write('\n'.join(questions))
    
    with open(answers_path, 'w', encoding='utf-8') as f:
        f.write('\n'.join(answers))
    
    print(f"\n전처리된 데이터 저장 완료:")
    print(f"  - 질문: {questions_path}")
    print(f"  - 답변: {answers_path}")
    
    # SentencePiece 학습을 위한 통합 파일 생성
    all_sentences_path = os.path.join(output_dir, 'all_sentences.txt')
    with open(all_sentences_path, 'w', encoding='utf-8') as f:
        # 질문과 답변 모두 포함
        f.write('\n'.join(questions + answers))
    
    print(f"  - 통합 문장: {all_sentences_path}")
    
    return questions_path, answers_path, all_sentences_path


In [73]:
# ============================================
# Step 3: SentencePiece 사용하기
# ============================================

def train_sentencepiece(input_file, model_prefix, vocab_size=8000):
    """
    SentencePiece 모델을 학습합니다.
    
    SentencePiece는 서브워드 단위로 토크나이징을 수행하는 비지도 학습 기반 토크나이저입니다.
    형태소 분석기 없이도 효과적으로 한국어를 토크나이징할 수 있습니다.
    
    장점:
    - 언어에 독립적 (Language-agnostic)
    - Out-of-vocabulary(OOV) 문제 해결
    - 서브워드 단위로 분리하여 의미 있는 단위 학습
    
    Args:
        input_file: 학습할 텍스트 파일 경로
        model_prefix: 저장될 모델 이름 (prefix)
        vocab_size: 어휘 사전 크기 (기본값: 8000)
    
    Returns:
        tuple: (모델 파일 경로, 어휘 사전 파일 경로)
    """
    print("\n" + "=" * 50)
    print("SentencePiece 모델 학습 시작")
    print("=" * 50)
    
    # SentencePiece 학습 파라미터 설정
    # --input: 학습할 텍스트 파일
    # --model_prefix: 저장될 모델 이름
    # --vocab_size: 어휘 사전 크기 (서브워드 개수)
    # --model_type: 모델 타입 (unigram, bpe, char, word 중 선택)
    #               - unigram: 확률 기반, 가장 일반적
    #               - bpe: Byte Pair Encoding
    # --max_sentence_length: 최대 문장 길이
    # --pad_id, --unk_id, --bos_id, --eos_id: 특수 토큰 ID 설정
    # --user_defined_symbols: 사용자 정의 특수 토큰
    
    templates = '--input={} \
                 --model_prefix={} \
                 --vocab_size={} \
                 --model_type=unigram \
                 --max_sentence_length=999999 \
                 --pad_id=0 \
                 --unk_id=1 \
                 --bos_id=2 \
                 --eos_id=3 \
                 --user_defined_symbols=[SEP],[CLS],[MASK]'
    
    # 학습 명령어 생성
    cmd = templates.format(input_file, model_prefix, vocab_size)
    
    print(f"학습 설정:")
    print(f"  - 입력 파일: {input_file}")
    print(f"  - 모델 이름: {model_prefix}")
    print(f"  - 어휘 크기: {vocab_size}")
    print(f"  - 모델 타입: unigram")
    print(f"\n학습 시작... (시간이 소요될 수 있습니다)")
    
    # SentencePiece 학습 실행
    spm.SentencePieceTrainer.Train(cmd)
    
    model_file = f"{model_prefix}.model"
    vocab_file = f"{model_prefix}.vocab"
    
    print(f"\n학습 완료!")
    print(f"  - 모델 파일: {model_file}")
    print(f"  - 어휘 파일: {vocab_file}")
    
    return model_file, vocab_file


In [74]:
def test_sentencepiece(model_file, test_sentences):
    """
    학습된 SentencePiece 모델을 테스트합니다.
    
    Args:
        model_file: 학습된 모델 파일 경로
        test_sentences: 테스트할 문장 리스트
    """
    print("\n" + "=" * 50)
    print("SentencePiece 토크나이저 테스트")
    print("=" * 50)
    
    # SentencePiece 프로세서 로드
    sp = spm.SentencePieceProcessor()
    sp.Load(model_file)
    
    print(f"어휘 사전 크기: {sp.GetPieceSize()}")
    print(f"특수 토큰:")
    print(f"  - PAD: {sp.IdToPiece(0)} (ID: 0)")
    print(f"  - UNK: {sp.IdToPiece(1)} (ID: 1)")
    print(f"  - BOS: {sp.IdToPiece(2)} (ID: 2)")
    print(f"  - EOS: {sp.IdToPiece(3)} (ID: 3)")
    
    print("\n" + "=" * 50)
    print("토크나이징 예시")
    print("=" * 50)
    
    for sentence in test_sentences:
        # 1. 문장을 서브워드로 분리
        pieces = sp.EncodeAsPieces(sentence)
        
        # 2. 문장을 ID로 변환
        ids = sp.EncodeAsIds(sentence)
        
        # 3. ID를 다시 문장으로 변환 (디코딩)
        decoded = sp.DecodeIds(ids)
        
        print(f"\n원본 문장: {sentence}")
        print(f"서브워드:  {pieces}")
        print(f"ID:        {ids}")
        print(f"디코딩:    {decoded}")
        print(f"토큰 수:   {len(pieces)}")

In [75]:
def encode_dataset(questions, answers, sp, max_length=40):
    """
    질문과 답변을 SentencePiece로 인코딩합니다.
    
    Args:
        questions: 질문 리스트
        answers: 답변 리스트
        sp: SentencePiece 프로세서
        max_length: 최대 시퀀스 길이
    
    Returns:
        tuple: (인코딩된 질문, 인코딩된 답변)
    """
    print("\n" + "=" * 50)
    print("데이터셋 인코딩 중...")
    print("=" * 50)
    
    encoded_questions = []
    encoded_answers = []
    
    # BOS(Beginning of Sentence), EOS(End of Sentence) 토큰 ID
    BOS_ID = 2
    EOS_ID = 3
    
    for i, (q, a) in enumerate(zip(questions, answers)):
        # 질문 인코딩: [BOS] + 질문 + [EOS]
        q_ids = [BOS_ID] + sp.EncodeAsIds(q) + [EOS_ID]
        
        # 답변 인코딩: [BOS] + 답변 + [EOS]
        a_ids = [BOS_ID] + sp.EncodeAsIds(a) + [EOS_ID]
        
        # 최대 길이 체크 (너무 긴 문장 제외)
        if len(q_ids) <= max_length and len(a_ids) <= max_length:
            encoded_questions.append(q_ids)
            encoded_answers.append(a_ids)
        
        # 진행상황 출력
        if (i + 1) % 1000 == 0:
            print(f"인코딩 진행: {i + 1}/{len(questions)}")
    
    print(f"\n인코딩 완료!")
    print(f"유효한 시퀀스 쌍: {len(encoded_questions)}")
    print(f"최대 질문 길이: {max(len(q) for q in encoded_questions)}")
    print(f"최대 답변 길이: {max(len(a) for a in encoded_answers)}")
    print(f"평균 질문 길이: {sum(len(q) for q in encoded_questions) / len(encoded_questions):.2f}")
    print(f"평균 답변 길이: {sum(len(a) for a in encoded_answers) / len(encoded_answers):.2f}")
    
    return encoded_questions, encoded_answers

In [76]:
data = pd.read_csv('/Users/wansookim/Downloads/code_implementation/transformer_project_submit/data/ChatbotData.csv')

# ============================================
# 메인 실행 코드
# ============================================

def main():
    """
    전체 전처리 및 토크나이저 학습 파이프라인
    """
    # 1. 데이터 로드
    df = data
    # 2. 데이터 전처리
    questions, answers = create_dataset(df)
    
    # 3. 전처리된 데이터 저장
    questions_path, answers_path, all_sentences_path = save_processed_data(
        questions, answers, output_dir='./data'
    )
    
    # 4. SentencePiece 모델 학습
    model_file, vocab_file = train_sentencepiece(
        input_file=all_sentences_path,
        model_prefix='./data/korean_chatbot_sp',
        vocab_size=8000
    )
    
    # 5. SentencePiece 모델 테스트
    test_sentences = [
        "안녕하세요",
        "오늘 날씨가 정말 좋네요",
        "챗봇 만들기 재미있어요",
        questions[0],
        answers[0]
    ]
    test_sentencepiece(model_file, test_sentences)
    
    # 6. 전체 데이터셋 인코딩
    sp = spm.SentencePieceProcessor()
    sp.Load(model_file)
    
    encoded_questions, encoded_answers = encode_dataset(
        questions, answers, sp, max_length=40
    )
    
    # 7. 학습/검증 데이터 분리
    train_q, val_q, train_a, val_a = train_test_split(
        encoded_questions, encoded_answers, 
        test_size=0.1, random_state=42
    )
    
    print("\n" + "=" * 50)
    print("데이터 분리 완료")
    print("=" * 50)
    print(f"학습 데이터: {len(train_q)}쌍")
    print(f"검증 데이터: {len(val_q)}쌍")
    
    # 8. 인코딩된 데이터 저장 (numpy 배열로)
    np.save('./data/train_questions.npy', np.array(train_q, dtype=object))
    np.save('./data/train_answers.npy', np.array(train_a, dtype=object))
    np.save('./data/val_questions.npy', np.array(val_q, dtype=object))
    np.save('./data/val_answers.npy', np.array(val_a, dtype=object))
    
    print("\n인코딩된 데이터 저장 완료!")
    print("  - ./data/train_questions.npy")
    print("  - ./data/train_answers.npy")
    print("  - ./data/val_questions.npy")
    print("  - ./data/val_answers.npy")
    
    print("\n" + "=" * 50)
    print("전처리 및 토크나이저 학습 완료!")
    print("=" * 50)
    print("\n다음 단계: 트랜스포머 모델 구성 및 학습")


if __name__ == "__main__":
    main()



데이터 전처리 중...
전처리 진행: 1000/11823
전처리 진행: 2000/11823
전처리 진행: 3000/11823
전처리 진행: 4000/11823
전처리 진행: 5000/11823
전처리 진행: 6000/11823
전처리 진행: 7000/11823
전처리 진행: 8000/11823
전처리 진행: 9000/11823
전처리 진행: 10000/11823
전처리 진행: 11000/11823

전처리 완료!
유효한 데이터 쌍: 11823

전처리 예시:
Q: 12시 땡!
A: 하루가 또 가네요.

Q: 1지망 학교 떨어졌어
A: 위로해 드립니다.

Q: 3박4일 놀러가고 싶다
A: 여행은 언제나 좋죠.

Q: 3박4일 정도 놀러가고 싶다
A: 여행은 언제나 좋죠.

Q: PPL 심하네
A: 눈살이 찌푸려지죠.


전처리된 데이터 저장 완료:
  - 질문: ./data/questions.txt
  - 답변: ./data/answers.txt
  - 통합 문장: ./data/all_sentences.txt

SentencePiece 모델 학습 시작
학습 설정:
  - 입력 파일: ./data/all_sentences.txt
  - 모델 이름: ./data/korean_chatbot_sp
  - 어휘 크기: 8000
  - 모델 타입: unigram

학습 시작... (시간이 소요될 수 있습니다)

학습 완료!
  - 모델 파일: ./data/korean_chatbot_sp.model
  - 어휘 파일: ./data/korean_chatbot_sp.vocab

SentencePiece 토크나이저 테스트
어휘 사전 크기: 8000
특수 토큰:
  - PAD: <pad> (ID: 0)
  - UNK: <unk> (ID: 1)
  - BOS: <s> (ID: 2)
  - EOS: </s> (ID: 3)

토크나이징 예시

원본 문장: 안녕하세요
서브워드:  ['▁안녕하세요']
ID:        [3162]
디코딩:    안녕하세요
토큰 수:   1

원본 문장:

sentencepiece_trainer.cc(178) LOG(INFO) Running command: --input=./data/all_sentences.txt                  --model_prefix=./data/korean_chatbot_sp                  --vocab_size=8000                  --model_type=unigram                  --max_sentence_length=999999                  --pad_id=0                  --unk_id=1                  --bos_id=2                  --eos_id=3                  --user_defined_symbols=[SEP],[CLS],[MASK]
sentencepiece_trainer.cc(78) LOG(INFO) Starts training with : 
trainer_spec {
  input: ./data/all_sentences.txt
  input_format: 
  model_prefix: ./data/korean_chatbot_sp
  model_type: UNIGRAM
  vocab_size: 8000
  self_test_sample_size: 0
  character_coverage: 0.9995
  input_sentence_size: 0
  shuffle_input_sentence: 1
  seed_sentencepiece_size: 1000000
  shrinking_factor: 0.75
  max_sentence_length: 999999
  num_threads: 16
  num_sub_iterations: 2
  max_sentencepiece_length: 16
  split_by_unicode_script: 1
  split_by_number: 1
  split_by_whitespace: 1
  spl

In [77]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import time

class MultiHeadAttention(nn.Module):
    """Multi-headed attention from 'Attention Is All You Need' paper"""

    def __init__(
        self,
        emb_dim,
        num_heads,
        dropout=0.0,
        bias=False,
        encoder_decoder_attention=False,  # otherwise self_attention
        causal = False
    ):
        super().__init__()
        self.emb_dim = emb_dim
        self.num_heads = num_heads
        self.dropout = dropout
        self.head_dim = emb_dim // num_heads
        assert self.head_dim * num_heads == self.emb_dim, "emb_dim must be divisible by num_heads"

        self.encoder_decoder_attention = encoder_decoder_attention
        self.causal = causal
        self.q_proj = nn.Linear(emb_dim, emb_dim, bias=bias)
        self.k_proj = nn.Linear(emb_dim, emb_dim, bias=bias)
        self.v_proj = nn.Linear(emb_dim, emb_dim, bias=bias)
        self.out_proj = nn.Linear(emb_dim, emb_dim, bias=bias)


    def transpose_for_scores(self, x):
        # (batch_size, seq_len, emb_dim) -> (batch_size, num_heads, seq_len, head_dim) for multi-headed attention
        new_x_shape = x.size()[:-1] + (
            self.num_heads,
            self.head_dim,
        )
        x = x.view(*new_x_shape)
        return x.permute(0, 2, 1, 3)
        # This is equivalent to
        # return x.transpose(1,2)


    def scaled_dot_product(self,
                           query: torch.Tensor,
                           key: torch.Tensor,
                           value: torch.Tensor,
                           attention_mask: torch.BoolTensor):

        attn_weights = torch.matmul(query, key.transpose(-1, -2)) / math.sqrt(self.emb_dim) # QK^T/sqrt(d)

        if attention_mask is not None:
            attn_weights = attn_weights.masked_fill(attention_mask.unsqueeze(1), float("-inf"))

        attn_weights = F.softmax(attn_weights, dim=-1)  # softmax(QK^T/sqrt(d))
        attn_probs = F.dropout(attn_weights, p=self.dropout, training=self.training)
        attn_output = torch.matmul(attn_probs, value) # softmax(QK^T/sqrt(d))V

        return attn_output, attn_probs


    def MultiHead_scaled_dot_product(self,
                       query: torch.Tensor,
                       key: torch.Tensor,
                       value: torch.Tensor,
                       attention_mask: torch.BoolTensor):

        attn_weights = torch.matmul(query, key.transpose(-1, -2)) / math.sqrt(self.head_dim) # QK^T/sqrt(d)

        # Attention mask
        if attention_mask is not None:
            if self.causal:
              # (seq_len x seq_len)
                attn_weights = attn_weights.masked_fill(attention_mask.unsqueeze(0).unsqueeze(1), float("-inf"))
            else:
              # (batch_size x seq_len)
                attn_weights = attn_weights.masked_fill(attention_mask.unsqueeze(1).unsqueeze(2), float("-inf"))


        attn_weights = F.softmax(attn_weights, dim=-1)  # softmax(QK^T/sqrt(d))
        attn_probs = F.dropout(attn_weights, p=self.dropout, training=self.training)

        attn_output = torch.matmul(attn_probs, value) # softmax(QK^T/sqrt(d))V
        attn_output = attn_output.permute(0, 2, 1, 3).contiguous()
        concat_attn_output_shape = attn_output.size()[:-2] + (self.emb_dim,)
        attn_output = attn_output.view(*concat_attn_output_shape)
        attn_output = self.out_proj(attn_output)

        return attn_output, attn_weights


    def forward(
        self,
        query: torch.Tensor,
        key: torch.Tensor,
        attention_mask: torch.Tensor = None,
        ):

        q = self.q_proj(query)
        # Enc-Dec attention
        if self.encoder_decoder_attention:
            k = self.k_proj(key)
            v = self.v_proj(key)
        # Self attention
        else:
            k = self.k_proj(query)
            v = self.v_proj(query)

        q = self.transpose_for_scores(q)
        k = self.transpose_for_scores(k)
        v = self.transpose_for_scores(v)

        attn_output, attn_weights = self.MultiHead_scaled_dot_product(q,k,v,attention_mask)
        return attn_output, attn_weights


In [78]:
class PositionWiseFeedForward(nn.Module):

    def __init__(self, emb_dim: int, d_ff: int, dropout: float = 0.1):
        super(PositionWiseFeedForward, self).__init__()

        self.activation = nn.ReLU()
        self.w_1 = nn.Linear(emb_dim, d_ff)
        self.w_2 = nn.Linear(d_ff, emb_dim)
        self.dropout = dropout

    def forward(self, x):
        residual = x
        x = self.activation(self.w_1(x))
        x = F.dropout(x, p=self.dropout, training=self.training)

        x = self.w_2(x)
        x = F.dropout(x, p=self.dropout, training=self.training)
        return x + residual # residual connection for preventing gradient vanishing

In [79]:
import numpy as np

# Since Transformer contains no recurrence and no convolution,
# in order for the model to make use of the order of the sequence,
# we must inject some information about the relative or absolute position of the tokens in the sequence.
# To this end, we add “positional encodings” to the input embeddings at the bottoms of the encoder and decoder stacks.
# There are many choices of positional encodings, learned and fixed

class SinusoidalPositionalEmbedding(nn.Embedding):

    def __init__(self, num_positions, embedding_dim, padding_idx=None):
        super().__init__(num_positions, embedding_dim) # torch.nn.Embedding(num_embeddings, embedding_dim)
        self.weight = self._init_weight(self.weight) # self.weight => nn.Embedding(num_positions, embedding_dim).weight

    @staticmethod
    def _init_weight(out: nn.Parameter):
        n_pos, embed_dim = out.shape
        pe = nn.Parameter(torch.zeros(out.shape))
        for pos in range(n_pos):
            for i in range(0, embed_dim, 2):
                pe[pos, i].data.copy_( torch.tensor( np.sin(pos / (10000 ** ( i / embed_dim)))) )
                pe[pos, i + 1].data.copy_( torch.tensor( np.cos(pos / (10000 ** ((i + 1) / embed_dim)))) )
        pe.detach_()

        return pe

    @torch.no_grad()
    def forward(self, input_ids):
      bsz, seq_len = input_ids.shape[:2]
      positions = torch.arange(seq_len, dtype=torch.long, device=self.weight.device)
      return super().forward(positions)


In [80]:

class EncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.emb_dim = config.emb_dim
        self.ffn_dim = config.ffn_dim
        self.self_attn = MultiHeadAttention(
            emb_dim=self.emb_dim,
            num_heads=config.attention_heads,
            dropout=config.attention_dropout)
        self.self_attn_layer_norm = nn.LayerNorm(self.emb_dim)
        self.dropout = config.dropout
        self.activation_fn = nn.ReLU()
        self.PositionWiseFeedForward = PositionWiseFeedForward(self.emb_dim, self.ffn_dim, config.dropout)
        self.final_layer_norm = nn.LayerNorm(self.emb_dim)

    def forward(self, x, encoder_padding_mask):

        residual = x
        x, attn_weights = self.self_attn(query=x, key=x, attention_mask=encoder_padding_mask)
        x = F.dropout(x, p=self.dropout, training=self.training)
        x = residual + x
        x = self.self_attn_layer_norm(x)
        x = self.PositionWiseFeedForward(x)
        x = self.final_layer_norm(x)
        if torch.isinf(x).any() or torch.isnan(x).any():
            clamp_value = torch.finfo(x.dtype).max - 1000
            x = torch.clamp(x, min=-clamp_value, max=clamp_value)
        return x, attn_weights


In [81]:
class Encoder(nn.Module):
    def __init__(self, config, embed_tokens):
        super().__init__()

        self.dropout = config.dropout

        emb_dim = embed_tokens.embedding_dim
        self.padding_idx = embed_tokens.padding_idx
        self.max_source_positions = config.max_position_embeddings

        self.embed_tokens = embed_tokens
        self.embed_positions = SinusoidalPositionalEmbedding(
                config.max_position_embeddings, config.emb_dim, self.padding_idx
            )

        self.layers = nn.ModuleList([EncoderLayer(config) for _ in range(config.encoder_layers)])

    def forward(self, input_ids, attention_mask=None):

        inputs_embeds = self.embed_tokens(input_ids)
        embed_pos = self.embed_positions(input_ids)
        x = inputs_embeds + embed_pos
        x = F.dropout(x, p=self.dropout, training=self.training)

        self_attn_scores = []
        for encoder_layer in self.layers:
            x, attn = encoder_layer(x, attention_mask)
            self_attn_scores.append(attn.detach())

        return x, self_attn_scores


In [82]:
class DecoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.emb_dim = config.emb_dim
        self.ffn_dim = config.ffn_dim
        self.self_attn = MultiHeadAttention(
            emb_dim=self.emb_dim,
            num_heads=config.attention_heads,
            dropout=config.attention_dropout,
            causal=True,
        )
        self.dropout = config.dropout
        self.self_attn_layer_norm = nn.LayerNorm(self.emb_dim)
        self.encoder_attn = MultiHeadAttention(
            emb_dim=self.emb_dim,
            num_heads=config.attention_heads,
            dropout=config.attention_dropout,
            encoder_decoder_attention=True,
        )
        self.encoder_attn_layer_norm = nn.LayerNorm(self.emb_dim)
        self.PositionWiseFeedForward = PositionWiseFeedForward(self.emb_dim, self.ffn_dim, config.dropout)
        self.final_layer_norm = nn.LayerNorm(self.emb_dim)


    def forward(
        self,
        x,
        encoder_hidden_states,
        encoder_attention_mask=None,
        causal_mask=None,
    ):
        residual = x
        # Self Attention
        x, self_attn_weights = self.self_attn(
            query=x,
            key=x, # adds keys to layer state
            attention_mask=causal_mask,
        )
        x = F.dropout(x, p=self.dropout, training=self.training)
        x = residual + x
        x = self.self_attn_layer_norm(x)

        # Cross-Attention Block
        residual = x
        x, cross_attn_weights = self.encoder_attn(
            query=x,
            key=encoder_hidden_states,
            attention_mask=encoder_attention_mask,
        )
        x = F.dropout(x, p=self.dropout, training=self.training)
        x = residual + x
        x = self.encoder_attn_layer_norm(x)

        # Fully Connected
        x = self.PositionWiseFeedForward(x)
        x = self.final_layer_norm(x)

        return (
            x,
            self_attn_weights,
            cross_attn_weights,
        )

In [83]:
class Decoder(nn.Module):

    def __init__(self, config, embed_tokens: nn.Embedding):
        super().__init__()
        self.dropout = config.dropout
        self.padding_idx = embed_tokens.padding_idx
        self.max_target_positions = config.max_position_embeddings
        self.embed_tokens = embed_tokens
        self.embed_positions = SinusoidalPositionalEmbedding(
            config.max_position_embeddings, config.emb_dim, self.padding_idx
        )
        self.layers = nn.ModuleList([DecoderLayer(config) for _ in range(config.decoder_layers)])  # type: List[DecoderLayer]

    def forward(
        self,
        input_ids,
        encoder_hidden_states,
        encoder_attention_mask,
        decoder_causal_mask,
    ):

        # embed positions
        positions = self.embed_positions(input_ids)
        x = self.embed_tokens(input_ids)
        x += positions

        x = F.dropout(x, p=self.dropout, training=self.training)

        # decoder layers
        cross_attention_scores = []
        for idx, decoder_layer in enumerate(self.layers):
            x, layer_self_attn, layer_cross_attn = decoder_layer(
                x,
                encoder_hidden_states,
                encoder_attention_mask=encoder_attention_mask,
                causal_mask=decoder_causal_mask,
            )
            cross_attention_scores.append(layer_cross_attn.detach())

        return x, cross_attention_scores

In [84]:
questions, answers = create_dataset(data)



데이터 전처리 중...
전처리 진행: 1000/11823
전처리 진행: 2000/11823
전처리 진행: 3000/11823
전처리 진행: 4000/11823
전처리 진행: 5000/11823
전처리 진행: 6000/11823
전처리 진행: 7000/11823
전처리 진행: 8000/11823
전처리 진행: 9000/11823
전처리 진행: 10000/11823
전처리 진행: 11000/11823

전처리 완료!
유효한 데이터 쌍: 11823

전처리 예시:
Q: 12시 땡!
A: 하루가 또 가네요.

Q: 1지망 학교 떨어졌어
A: 위로해 드립니다.

Q: 3박4일 놀러가고 싶다
A: 여행은 언제나 좋죠.

Q: 3박4일 정도 놀러가고 싶다
A: 여행은 언제나 좋죠.

Q: PPL 심하네
A: 눈살이 찌푸려지죠.



In [85]:
# === SentencePieceVocab 클래스 ===
import sentencepiece as spm

class SentencePieceVocab:
    def __init__(self, sp_model_path):
        self.sp = spm.SentencePieceProcessor()
        self.sp.Load(sp_model_path)
        self.PAD_ID = 0
        self.UNK_ID = 1
        self.BOS_ID = 2
        self.EOS_ID = 3
        self.stoi = {'<pad>': 0, '<unk>': 1, '<s>': 2, '</s>': 3}
        self.itos = [self.sp.IdToPiece(i) for i in range(self.sp.GetPieceSize())]
    
    def encode(self, sentence):
        return self.sp.EncodeAsIds(sentence)
    
    def decode(self, ids):
        return self.sp.DecodeIds([i for i in ids if i not in [0, 2, 3]])
    
    def __len__(self):
        return self.sp.GetPieceSize()


# === ChatbotDataset 클래스 ===
from torch.utils.data import Dataset

class ChatbotDataset(Dataset):
    def __init__(self, questions, answers, vocab, max_length=40):
        self.questions = questions
        self.answers = answers
        self.vocab = vocab
        self.max_length = max_length
    
    def __len__(self):
        return len(self.questions)
    
    def __getitem__(self, idx):
        q = self.questions[idx]
        a = self.answers[idx]
        src = [self.vocab.BOS_ID] + self.vocab.encode(q) + [self.vocab.EOS_ID]
        trg = [self.vocab.BOS_ID] + self.vocab.encode(a) + [self.vocab.EOS_ID]
        return {
            'SRC': torch.tensor(src[:self.max_length], dtype=torch.long),
            'TRG': torch.tensor(trg[:self.max_length], dtype=torch.long)
        }


# === Collate 함수 ===
def collate_fn(batch, pad_idx=0):
    src_batch = [item['SRC'] for item in batch]
    trg_batch = [item['TRG'] for item in batch]
    src_max = max(len(s) for s in src_batch)
    trg_max = max(len(t) for t in trg_batch)
    
    src_padded = [torch.cat([s, torch.tensor([pad_idx]*(src_max-len(s)))]) 
                  for s in src_batch]
    trg_padded = [torch.cat([t, torch.tensor([pad_idx]*(trg_max-len(t)))]) 
                  for t in trg_batch]
    
    return {'SRC': torch.stack(src_padded), 'TRG': torch.stack(trg_padded)}


# === 실제 객체 생성 ⭐ ===
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

# Vocab 생성
SRC_vocab = SentencePieceVocab('./data/korean_chatbot_sp.model')
TRG_vocab = SRC_vocab

# 데이터 분할
train_q, temp_q, train_a, temp_a = train_test_split(
    questions, answers, test_size=0.2, random_state=42
)
val_q, test_q, val_a, test_a = train_test_split(
    temp_q, temp_a, test_size=0.5, random_state=42
)

# Dataset
train_dataset = ChatbotDataset(train_q, train_a, SRC_vocab, 40)
val_dataset = ChatbotDataset(val_q, val_a, SRC_vocab, 40)
test_dataset = ChatbotDataset(test_q, test_a, SRC_vocab, 40)

# DataLoader
train_iterator = DataLoader(train_dataset, batch_size=32, shuffle=True,
                            collate_fn=lambda b: collate_fn(b, SRC_vocab.PAD_ID))
valid_iterator = DataLoader(val_dataset, batch_size=32, shuffle=False,
                            collate_fn=lambda b: collate_fn(b, SRC_vocab.PAD_ID))
test_iterator = DataLoader(test_dataset, batch_size=32, shuffle=False,
                           collate_fn=lambda b: collate_fn(b, SRC_vocab.PAD_ID))

# device 설정
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')

print(f"✓ Vocab 크기: {len(SRC_vocab):,}")
print(f"✓ 학습: {len(train_q):,}쌍")
print(f"✓ Device: {device}")

✓ Vocab 크기: 8,000
✓ 학습: 9,458쌍
✓ Device: mps


In [86]:
import torch
import torch.nn as nn

# Assuming 'Encoder' and 'Decoder' are defined elsewhere in your code
# If not, you'll need to define these classes as well

class Transformer(nn.Module):
    def __init__(self, SRC_vocab, TRG_vocab, config):
        super().__init__()

        self.SRC_vocab = SRC_vocab
        self.TRG_vocab = TRG_vocab

        self.enc_embedding = nn.Embedding(len(SRC_vocab.itos), config.emb_dim, padding_idx=SRC_vocab.stoi['<pad>'])
        self.dec_embedding = nn.Embedding(len(TRG_vocab.itos), config.emb_dim, padding_idx=TRG_vocab.stoi['<pad>'])

        self.encoder = Encoder(config, self.enc_embedding)
        self.decoder = Decoder(config, self.dec_embedding)

        self.prediction_head = nn.Linear(config.emb_dim, len(TRG_vocab.itos))

        self.init_weights()

    def generate_mask(self, src, trg):
        # Mask encoder attention to ignore padding
        enc_attention_mask = src.eq(self.SRC_vocab.stoi['<pad>']).to(device)
        # Mask decoder attention for causality
        tmp = torch.ones(trg.size(1), trg.size(1), dtype=torch.bool, device=device)
        mask = torch.arange(tmp.size(-1), device=device)
        dec_attention_mask = tmp.masked_fill_(mask < (mask + 1).view(tmp.size(-1), 1), False).to(device)

        return enc_attention_mask, dec_attention_mask

    def init_weights(self):
        for name, param in self.named_parameters():
            if param.requires_grad:
                if 'weight' in name:
                    nn.init.normal_(param.data, mean=0, std=0.01)
                else:
                    nn.init.constant_(param.data, 0)

    def forward(self, src, trg):
        enc_attention_mask, dec_causal_mask = self.generate_mask(src, trg)
        encoder_output, encoder_attention_scores = self.encoder(
            input_ids=src,
            attention_mask=enc_attention_mask
        )

        decoder_output, decoder_attention_scores = self.decoder(
            trg,
            encoder_output,
            encoder_attention_mask=enc_attention_mask,
            decoder_causal_mask=dec_causal_mask,
        )
        decoder_output = self.prediction_head(decoder_output)

        return decoder_output, encoder_attention_scores, decoder_attention_scores



In [87]:
import easydict
import torch.nn as nn
import torch.optim as optim

# Create the configuration for the transformer model
config = easydict.EasyDict({
    "emb_dim": 64,
    "ffn_dim": 256,
    "attention_heads": 4,
    "attention_dropout": 0.0,
    "dropout": 0.2,
    "max_position_embeddings": 512,
    "encoder_layers": 3,
    "decoder_layers": 3,
})

# Constants for training
N_EPOCHS = 100
learning_rate = 5e-4
CLIP = 1

# Updated PAD_IDX to use the new Vocab instance
PAD_IDX = SRC_vocab.stoi['<pad>']

# Instantiate the model using the new Vocab instances instead of the Fields
model = Transformer(SRC_vocab, TRG_vocab, config)
model.to(device)

# Define the optimizer
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Define the loss function, ignoring the index of the padding token
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

# Initialize the best validation loss
best_valid_loss = float('inf')

In [88]:
import math
import time
from tqdm import tqdm

def train(model: nn.Module,
          iterator: DataLoader,
          optimizer: optim.Optimizer,
          criterion: nn.Module,
          clip: float):

    model.train()
    epoch_loss = 0

    for batch in iterator:
        src = batch['SRC'].to(device)
        trg = batch['TRG'].to(device)

        # Assuming src and trg are already tensorized and padded
        # If not, you should perform those steps here

        optimizer.zero_grad()

        output, enc_attention_scores, _ = model(src, trg)

        # Flatten the output and target tensors to compute the loss
        output = output[:,:-1,:].reshape(-1, output.shape[-1])
        trg = trg[:,1:].reshape(-1)

        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)


def evaluate(model: nn.Module,
             iterator: DataLoader,
             criterion: nn.Module):

    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for batch in iterator:
            src = batch['SRC'].to(device)
            trg = batch['TRG'].to(device)

            # Assuming src and trg are already tensorized and padded
            # If not, you should perform those steps here

            output, attention_score, _ = model(src, trg)

            # Flatten the output and target tensors to compute the loss
            output = output[:,:-1,:].reshape(-1, output.shape[-1])
            trg = trg[:,1:].reshape(-1)

            loss = criterion(output, trg)
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

# Training loop
for epoch in tqdm(range(N_EPOCHS), total=N_EPOCHS):
    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_iterator, criterion)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
    else: # early stopping condition
        break

    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')

# Evaluation on test set
test_loss = evaluate(model, test_iterator, criterion)
print(f'| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |')


  0%|          | 0/100 [00:00<?, ?it/s]


RuntimeError: Expected tensor for argument #1 'indices' to have one of the following scalar types: Long, Int; but got MPSFloatType instead (while checking arguments for embedding)