#### project 과정
transformer chatbot(한글)
- PositionalEncoding: transformer는 병령 처리를 위해 한 번에 입력을 준다 따라서 모델이 순서(맥락)을 알 수 없기 때문에 필요하다
- scaled dot product: 차원이 많아질 수록 softmax를 취하면 값의 차이가 극단적으로 나타난다(뽀족해짐) 따라서 차원의 제곱근으로 나눠 그 값의 차이를 평탄화해줘야 한다.
- MultiHeadAttention: 차원을 nheads로 나눠서 연산을 한다.
- create_look_ahead_mask: 디코더가 예측을 할 때 현재 time보다 나중에 나올말을 보면 안 되기 때문에 mask로 가려주는 역할
- 데이터 불러오기 & 간단한 불용어 처리
- sentencepiece 모델 학습 (vocab만들기)
- 데이터셋 구현
- 모델 정의 및 학습하기
- 챗봇 테스트하기

실험 방법
- NUM_LAYERS =2 > 3, DROPOUT: 0.1 > 0.2, warmup_steps=EPOCH*0.1, BATCH_SIZE = 32

1. epoch: 50 > 100 > 
    - 충분한 학습이 이루어지지 못한 것 같다고 생각해서 epoch를 늘려 학습을 다시 실행했을 때 loss는 꾸준히 감소하지만, acc가 0.3XX 이상으로 증가하지 않는 것을 확인하고 epoch를 늘려도 모델의 성능에 영향을 주지 못한 것으로 판단함
    - 초반 10~20에포크 acc 업데이트가 안 되었음 | 결과적으로 마지막 테스트에서 의미있는 출력이 나오지 못함
    > 코사인 워밍업 방식으로 변경 후 초기 에포크만으로 충분한 학습이 이루어지는 것을 확인됨 

2. 전처리 subword(bpe) > subword(unigram)
    - 기존 bpe방식은 전체 corpus에서 많이 나온 순서대로 vocab을 만들지만, unigram은 확률적으로 문장에서 가장 정보량이 많은 서브워드 단위로 vocab을 만들기 때문에 변경했다.
    > 학습 결과만 보면 acc가 .3xx로 비슷해서 어느 것이 좋다고는 못하겠으나 테스트 결과만 보면 bpe: 1번과 2번 모두 출력이 좋은 좋은 ., unigram: 1번과 2번이 출력이 다른 것을 보면 현재 상황에선 unigram이 조금 더 좋은 것을 판단됨
    
3. vocab_sise: 8000 > 9000 > 5000 
    - vocab_sise를 가능한 최대로 늘렸을 때 이전 보다 출력이 단답이 되는 경우가 발생함. 모델 성능의 개선이 없는 것으 오히려 성능이 저하된 것으로 확인됨

3. NUM_LAYERS 수를 2에서 3로 늘리기
    - 레이어를 늘림으로 문장의 특징을 좀 더 추출하고자 변경

4. oprimizer Adam > AdamW로 변경
    - 모델의 과대적합을 방지하기 위해가 가중치 감쇠가 적응형 학습률에 의해 왜곡되지 않고 독립적으로 작동하는 AdamW로 변경했습니다.
    > Adam : epoch 30 > Avg Loss: 4.0630, Avg Acc: 0.3868
    > AdamW:epoch 30 > Avg Loss: 0.4636, Avg Acc: 0.8826

5. 학습률 스케줄러 변경
    - 선형 워밍업 > 코사인 어닐링 > 코사인 워밍업 
    - 기존 선형 워밍업 방식은 워밍업동안 빠르게 학습률이 높아지고 빠르게 낮아지는 것을 그래프로 학인할 수 있다. 이 과정에서 충분한 학습이 이루어지지 못하는 것으로 생각했다
    - 다음으로 코사인어닐링방식을 사용했는데 초기 학습률에서 점진적으로 감소하지만 pre_train에서 과도하게 낮은 학습률은 학습에 방해가 되기 때문에 좋은 성능의 모델을 만들지 못했습니다.
    - 다음으로 코사인 워밍업 방식으로 변경했을 때, 기존 선형 워밍업 그래프와 마찬가지로 워밍업동안 학습률이 증가하고 

---

In [None]:
# !pip install sentencepiece

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
import sentencepiece as spm

import copy
import math
import os
import re
import math
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# MAX_SAMPLES = 50000                     # 최대 샘플 수 제한
file_path = './data/augmented_ChatbotData.csv'    # 파일 경로 설정 
corpus_file = "clean_corpus.txt"        # sentencepiece corpus 파일

# 하이퍼파라미터 설정
NUM_LAYERS = 4     # 인코더/디코더 층 수
D_MODEL = 512     # 임베딩 및 내부 표현 차원
NUM_HEADS = 8      # 멀티헤드 어텐션에서의 헤드 수
UNITS = 2048      # 피드포워드 신경망의 은닉 차원
DROPOUT = 0.2      # 드롭아웃 비율
VOCAB_SIZE = 7000 # 단어 집합 크기
BATCH_SIZE = 32   #배치사이즈
EPOCH = 100       # 원하는 에포크
lr= 5e-4

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

---
#### 위치 인코딩

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, position, d_model):
        super(PositionalEncoding, self).__init__()
        self.d_model = d_model
        self.position = position

        self.pos_encoding = self._build_pos_encoding(position, d_model)

    def _get_angles(self, position, i, d_model):
        return 1.0 / (10000.0 ** ((2.0 * (i // 2)) / d_model)) * position

    def _build_pos_encoding(self, position, d_model):
        pos = torch.arange(position, dtype=torch.float32).unsqueeze(1)
        i = torch.arange(d_model, dtype=torch.float32).unsqueeze(0)

        angle_rads = self._get_angles(pos, i, d_model)
        sines = torch.sin(angle_rads[:, 0::2])
        cosines = torch.cos(angle_rads[:, 1::2])

        pos_encoding = torch.zeros(position, d_model)
        pos_encoding[:, 0::2] = sines
        pos_encoding[:, 1::2] = cosines

        pos_encoding = pos_encoding.unsqueeze(0)  # shape: [1, position, d_model]
        return pos_encoding

    def forward(self, x):
        return x + self.pos_encoding[:, :x.size(1), :].to(x.device)

In [None]:
# PositionalEncoding 시각화
sample_pos_encoding = PositionalEncoding(25, 512)

plt.pcolormesh(sample_pos_encoding.pos_encoding.numpy()[0], cmap='RdBu')
plt.xlabel('Depth')
plt.xlim((0, 512))
plt.ylabel('Position')
plt.colorbar()
plt.show()

---
#### scaled dot product

In [None]:
def scaled_dot_product_attention(query, key, value, mask=None):

    # 1) Q와 K의 내적을 통해 score(유사도) 계산
    # key.transpose(-1, -2): (batch_size, heads, depth, seq_len)
    # matmul 결과 shape: (batch_size, heads, seq_len, seq_len)
    matmul_qk = torch.matmul(query, key.transpose(-1, -2))

    # 2) depth에 따라 정규화
    depth = key.size(-1)  # depth = d_model / heads
    logits = matmul_qk / math.sqrt(depth)

    # 3) 마스크가 주어졌다면 -1e9(아주 작은 값)를 더해 소프트맥스에서 제외시키도록 함
    if mask is not None:
        # 텐서플로우: logits += (mask * -1e9)
        # 파이토치 동일 적용
        logits = logits + (mask * -1e9)

    # 4) 소프트맥스 계산해 attention weights 생성
    attention_weights = F.softmax(logits, dim=-1)

    # 5) attention weights와 value의 내적
    output = torch.matmul(attention_weights, value)

    return output, attention_weights

---
#### MultiHeadAttention

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, name="multi_head_attention"):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        # d_model은 num_heads로 나누어떨어져야 함
        assert d_model % num_heads == 0

        self.depth = d_model // num_heads

        # 파이토치에서 Dense는 nn.Linear로 대응
        self.query_dense = nn.Linear(d_model, d_model)
        self.key_dense = nn.Linear(d_model, d_model)
        self.value_dense = nn.Linear(d_model, d_model)

        self.out_dense = nn.Linear(d_model, d_model)

    def split_heads(self, x, batch_size):
        """
        x: (batch_size, seq_len, d_model)
        => (batch_size, num_heads, seq_len, depth) 형태로 변환
        """
        x = x.view(batch_size, -1, self.num_heads, self.depth)
        x = x.permute(0, 2, 1, 3)  # (batch_size, num_heads, seq_len, depth)
        return x

    def forward(self, query, key, value, mask=None):
        """
        query, key, value: (batch_size, seq_len, d_model)
        mask: (batch_size, 1, seq_len, seq_len) 등으로 broadcast 가능하도록 구성
        """
        batch_size = query.size(0)

        # Q, K, V에 각각 Linear 적용
        query = self.query_dense(query)
        key = self.key_dense(key)
        value = self.value_dense(value)

        # Head 분할
        query = self.split_heads(query, batch_size)
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        # 스케일드 닷 프로덕트 어텐션
        scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)

        # (batch_size, num_heads, seq_len, depth) -> (batch_size, seq_len, num_heads, depth)
        scaled_attention = scaled_attention.permute(0, 2, 1, 3).contiguous()

        # 다시 (batch_size, seq_len, d_model)로 합치기
        concat_attention = scaled_attention.view(batch_size, -1, self.d_model)

        # 최종 Dense
        output = self.out_dense(concat_attention)
        return output


---
#### masked

In [None]:
def create_padding_mask(x):
    # x == 0 위치를 찾아 float형 1로 변환
    mask = (x == 0).float()
    # (batch_size, seq_len) -> (batch_size, 1, 1, seq_len)
    mask = mask.unsqueeze(1).unsqueeze(2)
    return mask

In [None]:
def create_look_ahead_mask(x):
    seq_len = x.size(1)

    # (seq_len, seq_len) 크기의 하삼각 행렬(tril) 생성 후 1에서 빼서
    # 상삼각이 1, 하삼각(자기 자신 포함)이 0이 되도록 설정
    # => 미래 토큰(자신 인덱스보다 큰 위치) 마스킹
    look_ahead_mask = 1 - torch.tril(torch.ones((seq_len, seq_len)))

    # 패딩 마스크 생성 (shape: (batch_size, 1, 1, seq_len))
    padding_mask = create_padding_mask(x)

    # look_ahead_mask: (seq_len, seq_len) -> (1, seq_len, seq_len)
    look_ahead_mask = look_ahead_mask.unsqueeze(0)
    # -> (1, seq_len, seq_len) -> (1, 1, seq_len, seq_len)
    look_ahead_mask = look_ahead_mask.unsqueeze(1)
    look_ahead_mask = look_ahead_mask.to(x.device)

    # look-ahead 마스크와 패딩 마스크를 합성 (둘 중 하나라도 1이면 마스킹)
    # 최종 shape은 브로드캐스팅으로 (batch_size, 1, seq_len, seq_len)
    combined_mask = torch.max(look_ahead_mask, padding_mask)
    return combined_mask

In [None]:
x = torch.tensor([[1, 2, 0, 3, 0],
                  [0, 0, 0, 4, 5]])
mask = create_padding_mask(x)
print("입력 텐서 크기 :", x.shape)    # (2, 5)
print("생성된 마스크 크기 :", mask.shape)  # (2, 1, 1, 5)
print(mask)

In [None]:
x = torch.tensor([[1, 2, 3, 4, 5]])
mask_1 = create_look_ahead_mask(x)
print("첫 번째 시퀀스:\n", mask_1, mask_1.shape)

In [None]:
x2 = torch.tensor([[0, 5, 1, 5, 5]])
mask_2 = create_look_ahead_mask(x2)
print("두 번째 시퀀스:\n", mask_2, mask_2.shape)

---
#### Encoder&Decoder

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)  # 이전에 구현한 MHA
        self.dropout1 = nn.Dropout(dropout)
        self.norm1 = nn.LayerNorm(d_model, eps=1e-6)

        # 피드포워드 부분 (Dense -> ReLU -> Dense)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, d_model)
        )
        self.dropout2 = nn.Dropout(dropout)
        self.norm2 = nn.LayerNorm(d_model, eps=1e-6)

    def forward(self, x, mask=None):
        # (1) 멀티 헤드 어텐션 (셀프 어텐션)
        attn_output = self.mha(x, x, x, mask)  # (batch_size, seq_len, d_model)
        attn_output = self.dropout1(attn_output)
        out1 = self.norm1(x + attn_output)     # 잔차 연결 + LayerNorm

        # (2) 피드포워드 신경망
        ffn_output = self.ffn(out1)            # (batch_size, seq_len, d_model)
        ffn_output = self.dropout2(ffn_output)
        out2 = self.norm2(out1 + ffn_output)   # 잔차 연결 + LayerNorm

        return out2


In [None]:
class Encoder(nn.Module):
    def __init__(self,
                 vocab_size,
                 num_layers,
                 ff_dim,
                 d_model,
                 num_heads,
                 dropout=0.1):
        super(Encoder, self).__init__()
        self.d_model = d_model

        # (1) 임베딩 레이어
        self.embedding = nn.Embedding(vocab_size, d_model)

        # (2) 포지셔널 인코딩
        self.pos_encoding = PositionalEncoding(position=vocab_size, d_model=d_model)

        self.dropout = nn.Dropout(dropout)

        # (3) EncoderLayer 쌓기
        self.enc_layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, ff_dim, dropout)
            for _ in range(num_layers)
        ])

    def forward(self, x, mask=None):
        # (1) 임베딩 & sqrt(d_model)로 스케일링
        x = self.embedding(x) * math.sqrt(self.d_model)

        # (2) 포지셔널 인코딩 적용 + 드롭아웃
        x = self.pos_encoding(x)  # shape: (batch_size, seq_len, d_model)
        x = self.dropout(x)

        # (3) num_layers만큼 쌓아올린 EncoderLayer 통과
        for layer in self.enc_layers:
            x = layer(x, mask)

        return x

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ff_dim, dropout=0.1):
        super(DecoderLayer, self).__init__()

        # 첫 번째 서브 레이어 (디코더 내부 셀프 어텐션)
        self.self_mha = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model, eps=1e-6)

        # 두 번째 서브 레이어 (인코더-디코더 어텐션)
        self.encdec_mha = MultiHeadAttention(d_model, num_heads)
        self.norm2 = nn.LayerNorm(d_model, eps=1e-6)

        # 세 번째 서브 레이어 (피드포워드 네트워크)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, ff_dim),  # Dense(units=ff_dim)
            nn.ReLU(),                   # activation='relu'
            nn.Linear(ff_dim, d_model)   # Dense(units=d_model)
        )
        self.norm3 = nn.LayerNorm(d_model, eps=1e-6)

        # 드롭아웃
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, x, enc_outputs, look_ahead_mask=None, padding_mask=None):
        # 1) 셀프 어텐션 (디코더 내부)
        self_attn_out = self.self_mha(x, x, x, mask=look_ahead_mask)
        self_attn_out = self.dropout1(self_attn_out)
        out1 = self.norm1(x + self_attn_out)  # 잔차 연결 + LayerNorm

        # 2) 인코더-디코더 어텐션
        encdec_attn_out = self.encdec_mha(out1, enc_outputs, enc_outputs, mask=padding_mask)
        encdec_attn_out = self.dropout2(encdec_attn_out)
        out2 = self.norm2(out1 + encdec_attn_out)  # 잔차 연결 + LayerNorm

        # 3) 피드포워드 (Dense -> ReLU -> Dense)
        ffn_out = self.ffn(out2)
        ffn_out = self.dropout3(ffn_out)
        out3 = self.norm3(out2 + ffn_out)  # 잔차 연결 + LayerNorm

        return out3

In [None]:
class Decoder(nn.Module):
    def __init__(self,
                 vocab_size,
                 num_layers,
                 ff_dim,
                 d_model,
                 num_heads,
                 dropout=0.1):
        super(Decoder, self).__init__()
        self.d_model = d_model

        # (1) 임베딩 레이어
        self.embedding = nn.Embedding(vocab_size, d_model)

        # (2) 포지셔널 인코딩
        # 실제 학습 시에는 최대 시퀀스 길이에 맞추어 쓰기도 함
        self.pos_encoding = PositionalEncoding(position=vocab_size, d_model=d_model)

        self.dropout = nn.Dropout(dropout)

        # (3) DecoderLayer 쌓기
        self.dec_layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, ff_dim, dropout)
            for _ in range(num_layers)
        ])

    def forward(self, x, enc_outputs, look_ahead_mask=None, padding_mask=None):
        # (1) 임베딩 + sqrt(d_model)로 스케일링
        x = self.embedding(x) * math.sqrt(self.d_model)

        # (2) 포지셔널 인코딩 + 드롭아웃
        x = self.pos_encoding(x)    # (batch_size, tgt_seq_len, d_model)
        x = self.dropout(x)

        # (3) num_layers만큼 쌓인 DecoderLayer 통과
        for layer in self.dec_layers:
            x = layer(x, enc_outputs, look_ahead_mask, padding_mask)

        return x

--- 
#### 데이터 불러오기

In [None]:
try:
    df = pd.read_csv(file_path, encoding='utf-8-sig')

    # 구조 확인 (Q, A, label 컬럼이 있는지 확인)
    expected_columns = ['Q', 'A', 'label']
    if all(col in df.columns for col in expected_columns):
        print("데이터 로드 성공: 지정된 컬럼(Q, A, label)을 모두 포함하고 있습니다.")
    else:
        print(f"주의: 컬럼 구성이 다릅니다. 현재 컬럼: {list(df.columns)}")

    # 데이터 샘플 확인
    print(df.head())

except FileNotFoundError:
    print(f"오류: '{file_path}' 파일을 찾을 수 없습니다. 경로를 확인해주세요.")
except Exception as e:
    print(f"오류 발생: {e}")

In [None]:
def preprocess_sentence(sentence):
    # 1. 양쪽 공백 제거
    sentence = sentence.strip()

    # 2. 단어와 구두점(?.!,) 사이의 거리를 만듭니다.
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)

    # 3. 한글(가-힣, ㄱ-ㅎ, ㅏ-ㅣ)과 구두점(?.!,)을 제외한 모든 문자를 공백으로 대체
    # 영어도 함께 남기고 싶다면 a-zA-Z를 추가하세요.
    sentence = re.sub(r"[^가-힣ㄱ-ㅎㅏ-ㅣa-zA-Z0-9?.!,]+", " ", sentence)
    
    # 4. 다시 양쪽 공백 제거 및 불필요한 공백 정리
    sentence = sentence.strip()
    return sentence

In [None]:
# 사용할 샘플의 최대 개수
# df = df[:MAX_SAMPLES]

df['Q'] = df['Q'].apply(preprocess_sentence)
df['A'] = df['A'].apply(preprocess_sentence)

questions = df['Q'].tolist()
answers = df['A'].tolist()
pairs = list(zip(questions, answers))

print('전체 샘플 수 :', len(questions))
print('전처리 후 Q 샘플 :', questions[0])
print('전처리 후 A 샘플 :', answers[0])

---
#### sentencepiece 모델 학습

In [None]:
with open(corpus_file, 'w', encoding='utf-8') as f:
    for q, a in pairs:
        f.write(q + "\n")
        f.write(a + "\n")

In [None]:
spm.SentencePieceTrainer.Train(
    input=corpus_file,
    model_prefix="spm_uni_cornell",
    vocab_size=VOCAB_SIZE,
    character_coverage=0.9995,
    model_type="unigram",
    max_sentence_length=999999,
    bos_id=1,  # <s> (Beginning of Sentence) 설정
    eos_id=2,  # </s> (End of Sentence) 설정
    pad_id=0,  # Padding ID 설정
    unk_id=3   # Unknown Token ID 설정
)


In [None]:
sp = spm.SentencePieceProcessor()
sp.load('spm_uni_cornell.model')

In [None]:
# 예제 문장
sentence = "12시 땡! 하루가 또 가네요..ㅠㅠ"

sentence = preprocess_sentence(sentence)
print("전처리 후의 문장:", sentence)

# 1. 토크나이징 (subword 단위로 분할)
tokens = sp.encode(sentence, out_type=str)
print("Tokenized:", tokens)

# 2. 인코딩 (서브워드를 정수 ID로 변환)
encoded = sp.encode(sentence, out_type=int)
print("Encoded:", encoded)

# 3. 디코딩 (정수 ID → 원본 문장 복원)
decoded = sp.decode(encoded)
print("Decoded:", decoded)


데이터셋 구현

In [None]:
def analyze_and_plot(df, columns, limit_len=25):
    """
    텍스트 데이터의 길이 통계, 특정 길이 포함 비율 확인 및 시각화를 수행합니다.
    """
    plt.figure(figsize=(12, 5))
    
    for i, col in enumerate(columns):
        # 1. 길이 계산
        lengths = df[col].apply(len)
        
        # 2. 통계치 산출
        mean_val = lengths.mean()
        median_val = lengths.median()
        max_val = lengths.max()
        
        # 3. 포함 비율 계산 (평균 기준 & 사용자 입력 기준)
        mean_coverage = (lengths <= mean_val).sum() / len(lengths) * 100
        limit_coverage = (lengths <= limit_len).sum() / len(lengths) * 100
        
        # 4. 텍스트 통계 출력
        print(f"[{col} 데이터 통계]")
        print(f" - 평균: {mean_val:.2f} / 중앙값: {median_val} / 최대: {max_val}")
        print(f" - 평균({mean_val:.2f}) 이하 포함 비율: {mean_coverage:.2f}%")
        print(f" - 설정 길이({limit_len}) 이하 포함 비율: {limit_coverage:.2f}%")
        print("-" * 40)
        
        # 5. 히스토그램 시각화
        plt.subplot(1, len(columns), i + 1)
        plt.hist(lengths, bins=30, alpha=0.7, color='skyblue' if i==0 else 'salmon')
        plt.axvline(limit_len, color='red', linestyle='--', label=f'Limit ({limit_len})')
        plt.title(f'{col} Length Distribution')
        plt.xlabel('Length')
        plt.ylabel('Count')
        plt.legend()

    plt.tight_layout()
    plt.show()

analyze_and_plot(df, columns=['Q', 'A'], limit_len=25)

In [None]:
class CornellDataset(Dataset):
    def __init__(self, pairs, sp, max_length=40):
        super().__init__()
        self.sp = sp
        self.max_length = max_length
        self.data = []

        for q_text, a_text in pairs:
            # 1) 토크나이즈
            q_ids = sp.EncodeAsIds(q_text)
            a_ids = sp.EncodeAsIds(a_text)

            # 2) [CLS]/[SEP] 같은 별도 스페셜 토큰을 쓸 수도 있으나,
            #    여기서는 SentencePiece 기본 <s>, </s> 등 혹은 사용자 정의 토큰 활용 가능
            #    간단히 <s>=sp.bos_id(), </s>=sp.eos_id()로 가정해본다면:
            #    sp.SetEncodeExtraOptions("bos:eos") 등으로 설정하는 방법도 있음.
            # 여기서는 수동으로 bos/eos id를 붙인다고 가정
            bos_id = sp.bos_id() if sp.bos_id() >= 0 else 1  # 혹은 임의값
            eos_id = sp.eos_id() if sp.eos_id() >= 0 else 2

            q_tokens = [bos_id] + q_ids + [eos_id]
            a_tokens = [bos_id] + a_ids + [eos_id]

            # 3) 길이 제한
            if len(q_tokens) > max_length or len(a_tokens) > max_length:
                continue

            # 4) 고정 길이 패딩
            q_tokens += [0]*(max_length - len(q_tokens))  # 0 -> <pad> 가정
            a_tokens += [0]*(max_length - len(a_tokens))

            # 5) 디코더 입력(dec_input): a_tokens[:-1], 타겟(outputs): a_tokens[1:]
            #    (teacher forcing용)
            dec_input = a_tokens[:-1]
            target = a_tokens[1:]

            self.data.append({
                "enc_input": q_tokens,
                "dec_input": dec_input,
                "target": target
            })

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        enc_input = torch.tensor(sample["enc_input"], dtype=torch.long)
        dec_input = torch.tensor(sample["dec_input"], dtype=torch.long)
        target = torch.tensor(sample["target"], dtype=torch.long)
        return enc_input, dec_input, target

In [None]:
dataset = CornellDataset(pairs, sp, max_length=25)

In [None]:
for encoder_input, decoder_input, decoder_label  in dataset:
    print("텐서 크기 :",encoder_input.size())
    print(encoder_input)
    print(sp.decode(encoder_input.tolist()))
    print(decoder_input)
    print(sp.decode(decoder_input.tolist()))
    print(decoder_label)
    print(sp.decode(decoder_label.tolist()))
    break

 DataLoader 구성하기

In [None]:
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE,shuffle=True)

In [None]:
for encoder_input, decoder_input, decoder_label in dataloader:
    print(encoder_input.size())
    print(decoder_input.size())
    print(decoder_label.size())
    break

---
#### model 정의

In [None]:
class Transformer(nn.Module):
    def __init__(self,
                 vocab_size,
                 num_layers,      # 인코더/디코더 층 수
                 units,           # feed-forward 네트워크의 중간 차원(ff_dim)
                 d_model,         # 임베딩 및 내부 표현 차원
                 num_heads,       # 멀티헤드 어텐션의 헤드 수
                 dropout=0.1):
        super(Transformer, self).__init__()

        # 인코더
        self.encoder = Encoder(
            vocab_size=vocab_size,
            num_layers=num_layers,
            ff_dim=units,
            d_model=d_model,
            num_heads=num_heads,
            dropout=dropout
        )

        # 디코더
        self.decoder = Decoder(
            vocab_size=vocab_size,
            num_layers=num_layers,
            ff_dim=units,
            d_model=d_model,
            num_heads=num_heads,
            dropout=dropout
        )

        # 최종 출력층: (d_model) -> (vocab_size)
        self.final_linear = nn.Linear(d_model, vocab_size)

        # 참고: 텐서플로우 코드의 `name="transformer"`는 파이토치에선 보통 사용 안 함

    def forward(self, inputs, dec_inputs):
        # 1) 인코더 패딩 마스크 생성
        enc_padding_mask = create_padding_mask(inputs)     # shape (batch_size, 1, 1, src_seq_len)

        # 2) 디코더 look-ahead + 패딩 마스크
        look_ahead_mask = create_look_ahead_mask(dec_inputs)  # shape (batch_size, 1, tgt_seq_len, tgt_seq_len)

        # 3) 디코더에서 인코더 출력 쪽을 마스킹할 때 쓸 패딩 마스크
        dec_padding_mask = create_padding_mask(inputs)        # shape (batch_size, 1, 1, src_seq_len)

        # 4) 인코더 수행
        enc_outputs = self.encoder(
            x=inputs,
            mask=enc_padding_mask
        )  # shape: (batch_size, src_seq_len, d_model)

        # 5) 디코더 수행
        dec_outputs = self.decoder(
            x=dec_inputs,           # (batch_size, tgt_seq_len)
            enc_outputs=enc_outputs,# (batch_size, src_seq_len, d_model)
            look_ahead_mask=look_ahead_mask,
            padding_mask=dec_padding_mask
        )  # shape: (batch_size, tgt_seq_len, d_model)

        # 6) 최종 Dense (vocab_size)
        logits = self.final_linear(dec_outputs)  # (batch_size, tgt_seq_len, vocab_size)
        return logits

In [None]:
# 모델 생성
model = Transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT
)

print(model)

In [None]:
# MultiHeadAttention의 가중치 check
target_mha = model.encoder.enc_layers[0].mha
weight = target_mha.query_dense.weight

# 1번 헤드(0~31행)와 2번 헤드(32~63행) 가중치 비교
head1_w = weight[:64, :]
head2_w = weight[64:128, :]

print("두 가중치가 같은가? :", torch.equal(head1_w, head2_w))

---
#### scheduler 정의

In [None]:
# 하이퍼파라미터 설정
TOTAL_STEPS = len(dataloader) * EPOCH
WARMUP_STEPS  = int(TOTAL_STEPS * 0.1)

In [None]:
# Linear Warmup lr
noam_lr = lr * math.sqrt(D_MODEL) * math.sqrt(WARMUP_STEPS)

In [None]:
# Noam Scheduler
def get_lr_lambda(d_model, warmup_steps=4000):
    d_model = float(d_model)
    def lr_lambda(step):
        # step은 0부터 시작하므로 +1로 보정
        step = step + 1
        return (d_model ** -0.5) * min(step ** -0.5, step * (warmup_steps ** -1.5))
    return lr_lambda

In [None]:
# cosine Warmup
def get_cosine_with_warmup_lr_lambda(total_steps, warmup_steps, min_lr_ratio=1e-7):
    def lr_lambda(current_step):
        # 1. Linear Warmup 구간
        if current_step < warmup_steps:
            return float(current_step) / float(max(1, warmup_steps))
        
        # 2. Cosine Annealing 구간
        progress = float(current_step - warmup_steps) / float(max(1, total_steps - warmup_steps))
        
        # cosine_decay는 1.0 ~ min_lr_ratio로 변함
        cosine_decay = 0.5 * (1.0 + math.cos(math.pi * progress))
        
        # (1.0 - min_lr_ratio) 범위를 곱해주고 마지막에 min_lr_ratio를 더함
        return min_lr_ratio + (1.0 - min_lr_ratio) * cosine_decay
        
    return lr_lambda

In [None]:
def get_lr_history(scheduler, optimizer, total_steps):
    history = []
    for _ in range(total_steps):
        history.append(optimizer.param_groups[0]['lr'])
        # 실제 학습 시에는 optimizer.step()이 먼저 호출됨
        optimizer.step()
        scheduler.step()
    return history

# 2. 각 케이스별 시뮬레이션
# Case 1: LambdaLR (Custom Warmup)
model_1 = nn.Linear(D_MODEL, VOCAB_SIZE)
opt_1 = optim.AdamW(model_1.parameters(), lr = noam_lr, betas=(0.9, 0.98), weight_decay=0.01)
sched_1 = torch.optim.lr_scheduler.LambdaLR(opt_1, lr_lambda=get_lr_lambda(D_MODEL, WARMUP_STEPS))
hist_1 = get_lr_history(sched_1, opt_1, TOTAL_STEPS)

# Case 2: CosineAnnealingLR
model_2 = nn.Linear(D_MODEL, VOCAB_SIZE)
opt_2 = optim.AdamW(model_2.parameters(), lr=lr)
sched_2 = torch.optim.lr_scheduler.CosineAnnealingLR(opt_2, T_max=TOTAL_STEPS, eta_min=1e-6)
hist_2 = get_lr_history(sched_2, opt_2, TOTAL_STEPS)

# Case 3: Cosine Decay with Warmup
model_3 = nn.Linear(D_MODEL, VOCAB_SIZE)
opt_3 = optim.AdamW(model_3.parameters(), lr=lr)
sched_3 = torch.optim.lr_scheduler.LambdaLR(opt_3, lr_lambda=get_cosine_with_warmup_lr_lambda(TOTAL_STEPS, WARMUP_STEPS, min_lr_ratio=2e-3))
hist_3 = get_lr_history(sched_3, opt_3, TOTAL_STEPS)

# 3. 가로로 3개 출력 (시각화)
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
titles = ['LambdaLR (Warmup)', 'CosineAnnealingLR', 'Cosine with Warmup']
histories = [hist_1, hist_2, hist_3]
colors = ['#1f77b4', '#ff7f0e', '#2ca02c']

for i, ax in enumerate(axes):
    ax.plot(range(TOTAL_STEPS), histories[i], color=colors[i], linewidth=2)
    ax.set_title(titles[i], fontsize=14)
    ax.set_xlabel('Steps (Epochs)', fontsize=12)
    ax.set_ylabel('Learning Rate', fontsize=12)
    # ax.set_yscale('log')
    # ax.set_ylim(1e-9, 2e-3)
    ax.grid(True, linestyle='--', alpha=0.7)

plt.tight_layout()
plt.show()

---

In [None]:
# AdamW + 선형 워밍업
optimizer = optim.AdamW(model.parameters(), lr = noam_lr, betas=(0.9, 0.98), weight_decay=0.01)
scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=get_lr_lambda(D_MODEL, warmup_steps=WARMUP_STEPS))

In [None]:
# # AdamW + 코사인 어닐링
# optimizer = optim.AdamW(model.parameters(), lr=lr, betas=(0.9, 0.98), weight_decay=0.01)

# scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=TOTAL_STEPS, eta_min=1e-6)

In [None]:
# # AdamW +  코사인 워밍업
# optimizer = optim.AdamW(model.parameters(), lr=lr, betas=(0.9, 0.98), eps=1e-9,  weight_decay=0.01)
# scheduler = lr_scheduler.LambdaLR(
#     optimizer,
#     lr_lambda=get_cosine_with_warmup_lr_lambda(
#         total_steps=TOTAL_STEPS,
#         warmup_steps=WARMUP_STEPS,
#         min_lr_ratio=2e-3
#     )
# )

In [None]:
loss_function = nn.CrossEntropyLoss(ignore_index=sp.pad_id())

In [None]:
def accuracy_function(y_pred, y_true, pad_id=0):
    """
    y_pred: (batch_size, seq_len, vocab_size)
    y_true: (batch_size, seq_len)|
    """
    preds = y_pred.argmax(dim=-1)  # (batch_size, seq_len)
    mask = (y_true != pad_id)
    correct = (preds == y_true) & mask
    acc = correct.float().sum() / mask.float().sum()
    return acc

In [None]:
model = model.to(device)

훈련하기

In [None]:
def train_step(model, batch, optimizer, loss_function, device):
    model.train()
    enc_input, dec_input, target = [x.to(device) for x in batch]

    optimizer.zero_grad()

    # 모델 포워드 패스
    logits = model(enc_input, dec_input)  # (batch_size, seq_len, vocab_size)

    # Loss 계산 (패딩 토큰 무시)
    loss = loss_function(logits.permute(0, 2, 1), target)  # (batch_size, vocab_size, seq_len) 필요

    # Backpropagation
    loss.backward()
    optimizer.step()

    return loss.item(), accuracy_function(logits, target, pad_id=sp.pad_id())

In [None]:
def train(model, dataloader, optimizer, loss_function, scheduler, num_epochs, device, save_path="checkpoints"):
    model.to(device)

    # best_avg_acc = 0
    # best_model_wts = copy.deepcopy(model.state_dict())

    # 1. 저장 디렉토리 생성
    if not os.path.exists(save_path):
        os.makedirs(save_path)

    for epoch in range(num_epochs):
        total_loss, total_acc = 0, 0
        for step, batch in enumerate(dataloader):
            loss, acc = train_step(model, batch, optimizer, loss_function, device)
            total_loss += loss
            total_acc += acc

            if step % 100 == 0:
                print(f"[Epoch {epoch+1}, Step {step}] Loss: {loss:.4f}, Acc: {acc:.4f}")

            # 학습률 스케줄러 업데이트
            scheduler.step()

        avg_loss = total_loss / len(dataloader)
        avg_acc = total_acc / len(dataloader)
        print(f"Epoch {epoch+1} Completed - Avg Loss: {avg_loss:.4f}, Avg Acc: {avg_acc:.4f}")
        
        # --- 2. 10 에포크마다 체크포인트 저장 ---
        if (epoch + 1) % 10 == 0:
            checkpoint_file = os.path.join(save_path, f"checkpoint_epoch_{epoch+1}.pt")
            
            # 모델 가중치, 옵티마이저, 스케줄러 상태를 모두 저장 (재개 가능하도록)
            torch.save({
                'epoch': epoch + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'scheduler_state_dict': scheduler.state_dict(),
                'loss': avg_loss,
            }, checkpoint_file)
            
            print(f"==> Checkpoint saved: {checkpoint_file}")
        
        # if avg_acc > best_avg_acc:
        #     best_val_loss = avg_acc
        #     best_epoch = epoch+1
        #     best_model_wts = copy.deepcopy(model.state_dict()) 

        # print(f"best{best_epoch}")
        # model.load_state_dict(best_model_wts)

# checkpoint = torch.load('checkpoints/checkpoint_epoch_10.pt')
# model.load_state_dict(checkpoint['model_state_dict'])
# optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
# scheduler.load_state_dict(checkpoint['scheduler_state_dict'])

In [None]:
%%time

history = train(
    model=model,
    dataloader=dataloader,
    optimizer=optimizer,
    loss_function=loss_function,
    scheduler=scheduler,
    num_epochs=EPOCH,  # 원하는 에폭 수
    device=device
)

[Epoch 49, Step 600] Loss: 0.0268, Acc: 0.9958
[Epoch 49, Step 700] Loss: 0.0302, Acc: 0.9966
Epoch 49 Completed - Avg Loss: 0.0443, Avg Acc: 0.9909
best49
[Epoch 50, Step 0] Loss: 0.0191, Acc: 0.9963
[Epoch 50, Step 100] Loss: 0.0618, Acc: 0.9883
[Epoch 50, Step 200] Loss: 0.0454, Acc: 0.9894
[Epoch 50, Step 300] Loss: 0.0137, Acc: 1.0000
[Epoch 50, Step 400] Loss: 0.0515, Acc: 0.9898
[Epoch 50, Step 500] Loss: 0.0270, Acc: 0.9923
[Epoch 50, Step 600] Loss: 0.0630, Acc: 0.9926
[Epoch 50, Step 700] Loss: 0.0900, Acc: 0.9822
Epoch 50 Completed - Avg Loss: 0.0432, Avg Acc: 0.9916
==> Checkpoint saved: checkpoints/checkpoint_epoch_50.pt
best50
[Epoch 51, Step 0] Loss: 0.0550, Acc: 0.9921
[Epoch 51, Step 100] Loss: 0.0325, Acc: 0.9928
[Epoch 51, Step 200] Loss: 0.0194, Acc: 1.0000
[Epoch 51, Step 300] Loss: 0.0209, Acc: 0.9921
[Epoch 51, Step 400] Loss: 0.0907, Acc: 0.9735
[Epoch 51, Step 500] Loss: 0.0355, Acc: 0.9853
[Epoch 51, Step 600] Loss: 0.0594, Acc: 0.9929
[Epoch 51, Step 700] Los

---
#### 챗봇 테스트하기

In [None]:
def decoder_inference(model, sentence, tokenizer, device='cpu'):
    START_TOKEN = tokenizer.bos_id()
    END_TOKEN = tokenizer.eos_id()
    MAX_LENGTH = 40

    # 전처리
    sentence = preprocess_sentence(sentence)

    # 인코더 입력: [START] + 인코딩 + [END]
    enc_input_ids = [START_TOKEN] + tokenizer.encode(sentence) + [END_TOKEN]
    # 차원 확장: (batch_size=1, seq_len)
    enc_input = torch.tensor([enc_input_ids], dtype=torch.long, device=device)

    # 디코더 입력(dec_input)을 START_TOKEN만 포함한 상태로 시작
    dec_input = torch.tensor([[START_TOKEN]], dtype=torch.long, device=device)

    model.eval()  # 모델 평가 모드
    with torch.no_grad():
        for i in range(MAX_LENGTH):
            # 모델 forward: (enc_input, dec_input) -> (batch_size=1, seq_len, vocab_size)
            logits = model(enc_input, dec_input)

            # 마지막 타임스텝의 예측만 추출: shape (1, 1, vocab_size)
            # logits[:, -1, :] -> (1, vocab_size)
            last_step_logits = logits[:, -1, :]

            # argmax로 가장 높은 확률의 토큰 선택
            predicted_id = torch.argmax(last_step_logits, dim=-1)  # shape: (1,)

            # 종료 토큰이면 중단
            if predicted_id.item() == END_TOKEN:
                break

            # 디코더 입력(dec_input)에 예측 토큰을 이어붙임
            predicted_id = predicted_id.unsqueeze(0)  # shape (1,1)
            dec_input = torch.cat([dec_input, predicted_id], dim=1)

    # 최종 시퀀스: dec_input: (1, seq_len)에서 (seq_len,)로
    output_sequence = dec_input.squeeze(0).tolist()  # e.g. [START_TOKEN, ..., 토큰들...]

    return output_sequence

In [None]:
def sentence_generation(model, sentence, tokenizer, device='cpu'):
    # 디코더 인퍼런스 -> 예측된 토큰 시퀀스
    output_seq = decoder_inference(model, sentence, tokenizer, device=device)

    # 토크나이저로 디코딩 (패딩, START/END 토큰 등은 제외하거나 처리)
    # 여기서는 단순히 tokenizer.decode() 직접 호출
    predicted_sentence = tokenizer.decode(
        [token for token in output_seq if token < tokenizer.GetPieceSize()]
    )

    print("입력 :", sentence)
    print("출력 :", predicted_sentence)
    return predicted_sentence

In [None]:
sentence = '안녕'
sentence_generation(model, sentence, sp, device)

In [None]:
sentence = '너 괜찮아'
sentence_generation(model, sentence, sp, device)

In [None]:
sentence = "요즘 살이 찐 거 같은데 운동이나 해볼까?"
sentence_generation(model, sentence, sp, device)

In [None]:
sentence = "요즘 살이 찐 거 같은데"
sentence_generation(model, sentence, sp, device)

In [None]:
sentence = '바보야'
sentence_generation(model, sentence, sp, device)

In [None]:
sentence = '멍청아'
sentence_generation(model, sentence, sp, device)

In [None]:
sentence = "너는 누구야?"
sentence_generation(model, sentence, sp, device)

In [None]:
sentence = "사랑해"
sentence_generation(model, sentence, sp, device)

In [None]:
sentence = "오늘 날씨 어때?"
sentence_generation(model, sentence, sp, device)

In [None]:
sentence = "뭘까"
sentence_generation(model, sentence, sp, device)