In [1]:
#주요 변경사항:
 #인코더 제거 - 디코더만 사용하는 GPT-1 구조
 #masked Self-Attention 추가 - 미래 토큰 참조 방지
 #학습 가능한 Position Embedding (sinusoidal → learned)
 #GELU 활성화 함수 적용 (ReLU → GELU)
 #Pre-LayerNorm 구조 적용
 #단일 시퀀스 처리 방식으로 변경
 #언어 모델링 목적함수로 변경

In [2]:
!pip install sentencepiece scikit-learn

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import sentencepiece as spm
import math
import os
import re
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split



In [3]:
# ===== 1. 경로 설정 =====
base_dir = os.path.expanduser("~/work/transformer_chatbot/data")
os.makedirs(base_dir, exist_ok=True)
csv_path = os.path.join(base_dir, "ChatbotData.csv")
train_txt = os.path.join(base_dir, "ChatbotData_clean.txt")
model_pref = os.path.join(base_dir, "spm_chatbot")

In [4]:
# ===== 2. 데이터 전처리 =====
df = pd.read_csv(csv_path)
print("원본 데이터 크기:", df.shape)

df = df.dropna(subset=['Q', 'A'])
df = df.drop_duplicates(subset=['Q', 'A'])

def clean_text(text):
    text = str(text)
    text = re.sub(r"[^가-힣0-9ㄱ-ㅎㅏ-ㅣ .,!?~]+", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    text = re.sub(r"(ㅋ){2,}", "ㅋㅋ", text)
    text = re.sub(r"(ㅎ){2,}", "ㅎㅎ", text)
    return text

df['Q'] = df['Q'].apply(clean_text)
df['A'] = df['A'].apply(clean_text)
print("전처리 후 데이터 크기:", df.shape)

# txt 파일 저장 - GPT-1 방식으로 변경: Q + A를 하나의 시퀀스로 연결
with open(train_txt, "w", encoding="utf-8") as f:
    for q, a in zip(df['Q'], df['A']):
        # 변경사항: Q와 A를 하나의 시퀀스로 연결 (GPT-1 방식)
        combined = q + " <SEP> " + a
        f.write(combined + "\n")

원본 데이터 크기: (11823, 3)
전처리 후 데이터 크기: (11750, 3)


In [5]:
# ===== 3. SentencePiece 학습 (특수 토큰 수정) =====
# 변경사항: GPT-1에 맞는 특수 토큰 추가
spm.SentencePieceTrainer.Train(
    f"--input={train_txt} "
    f"--model_prefix={model_pref} "
    f"--vocab_size=8000 "
    f"--character_coverage=0.9995 "
    f"--model_type=bpe "
    f"--user_defined_symbols=<BOS>,<EOS>,<PAD>,<SEP> "  # <SEP> 토큰 추가
    f"--pad_id=0 --bos_id=1 --eos_id=2 --unk_id=3"
)

# 토크나이저 로드
sp = spm.SentencePieceProcessor()
sp.load(f"{model_pref}.model")

# 특수 토큰 ID
PAD_ID = sp.piece_to_id('<PAD>')
BOS_ID = sp.piece_to_id('<BOS>')
EOS_ID = sp.piece_to_id('<EOS>')
UNK_ID = sp.piece_to_id('<UNK>')
SEP_ID = sp.piece_to_id('<SEP>')  # 추가: 구분자 토큰
vocab_size = sp.get_piece_size()

print(f"PAD_ID: {PAD_ID}, BOS_ID: {BOS_ID}, EOS_ID: {EOS_ID}, SEP_ID: {SEP_ID}, vocab_size: {vocab_size}")

PAD_ID: 6, BOS_ID: 4, EOS_ID: 5, SEP_ID: 7, vocab_size: 8000


sentencepiece_trainer.cc(178) LOG(INFO) Running command: --input=/home/jovyan/work/transformer_chatbot/data/ChatbotData_clean.txt --model_prefix=/home/jovyan/work/transformer_chatbot/data/spm_chatbot --vocab_size=8000 --character_coverage=0.9995 --model_type=bpe --user_defined_symbols=<BOS>,<EOS>,<PAD>,<SEP> --pad_id=0 --bos_id=1 --eos_id=2 --unk_id=3
sentencepiece_trainer.cc(78) LOG(INFO) Starts training with : 
trainer_spec {
  input: /home/jovyan/work/transformer_chatbot/data/ChatbotData_clean.txt
  input_format: 
  model_prefix: /home/jovyan/work/transformer_chatbot/data/spm_chatbot
  model_type: BPE
  vocab_size: 8000
  self_test_sample_size: 0
  character_coverage: 0.9995
  input_sentence_size: 0
  shuffle_input_sentence: 1
  seed_sentencepiece_size: 1000000
  shrinking_factor: 0.75
  max_sentence_length: 4192
  num_threads: 16
  num_sub_iterations: 2
  max_sentencepiece_length: 16
  split_by_unicode_script: 1
  split_by_number: 1
  split_by_whitespace: 1
  split_digits: 0
  pret

In [6]:
# ===== 4. GPT-1 방식 데이터셋 클래스 =====
class GPTChatbotDataset(Dataset):
    """
    변경사항: GPT-1 방식으로 데이터셋 구성
    - Q와 A를 하나의 시퀀스로 연결
    - 언어 모델링 목적함수 사용 (다음 토큰 예측)
    """
    def __init__(self, dataframe, tokenizer, max_length=128):
        self.data = dataframe.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.max_length = max_length
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        question = row['Q']
        answer = row['A']
        
        # 변경사항: Q + <SEP> + A 형태로 하나의 시퀀스 구성
        combined_text = question + " <SEP> " + answer
        
        # 토큰화: BOS + 텍스트 + EOS
        tokens = [BOS_ID] + self.tokenizer.encode_as_ids(combined_text) + [EOS_ID]
        
        # 패딩 또는 자르기
        if len(tokens) > self.max_length:
            tokens = tokens[:self.max_length]
        else:
            tokens = tokens + [PAD_ID] * (self.max_length - len(tokens))
        
        # 언어 모델링: input과 target이 한 칸씩 shift
        input_ids = tokens[:-1]  # 마지막 토큰 제외
        target_ids = tokens[1:]  # 첫 번째 토큰 제외
        
        return {
            'input_ids': torch.tensor(input_ids, dtype=torch.long),
            'target_ids': torch.tensor(target_ids, dtype=torch.long)
        }


In [7]:
# ===== 5. GPT-1 트랜스포머 모델 구현 =====

class LearnedPositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length=512):
        super().__init__()
        self.position_embeddings = nn.Embedding(max_seq_length, d_model)
        
    def forward(self, x):
        batch_size, seq_len = x.size()[:2]
        position_ids = torch.arange(seq_len, device=x.device).expand(batch_size, seq_len)
        return self.position_embeddings(position_ids)

class GELU(nn.Module):
    def forward(self, x):
        return 0.5 * x * (1.0 + torch.tanh(math.sqrt(2.0 / math.pi) * (x + 0.044715 * torch.pow(x, 3.0))))

class CausalMultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()
        assert d_model % num_heads == 0
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.qkv_proj = nn.Linear(d_model, 3 * d_model, bias=True)
        self.out_proj = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask=None):
        batch_size, seq_len = x.size()[:2]
        
        # Q, K, V 계산
        qkv = self.qkv_proj(x)
        q, k, v = qkv.chunk(3, dim=-1)
        
        # Multi-head로 reshape
        q = q.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        k = k.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        v = v.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        
        # Scaled dot-product attention
        attn_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        # 마스크 적용 - 수정된 부분
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        
        attn_probs = F.softmax(attn_scores, dim=-1)
        attn_probs = self.dropout(attn_probs)
        
        # Attention 적용
        attn_output = torch.matmul(attn_probs, v)
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        
        return self.out_proj(attn_output)

class GPTFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.gelu = GELU()
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        x = self.fc1(x)
        x = self.gelu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

class GPTDecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super().__init__()
        self.ln1 = nn.LayerNorm(d_model)
        self.self_attn = CausalMultiHeadAttention(d_model, num_heads, dropout)
        
        self.ln2 = nn.LayerNorm(d_model) 
        self.feed_forward = GPTFeedForward(d_model, d_ff, dropout)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask=None):
        # Pre-LayerNorm 구조
        x = x + self.dropout(self.self_attn(self.ln1(x), mask))
        x = x + self.dropout(self.feed_forward(self.ln2(x)))
        
        return x

class GPTChatbot(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super().__init__()
        self.d_model = d_model
        self.max_seq_length = max_seq_length
        
        # 임베딩 레이어들
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = LearnedPositionalEncoding(d_model, max_seq_length)
        
        # 디코더 레이어들
        self.decoder_layers = nn.ModuleList([
            GPTDecoderLayer(d_model, num_heads, d_ff, dropout) 
            for _ in range(num_layers)
        ])
        
        # 최종 LayerNorm
        self.ln_final = nn.LayerNorm(d_model)
        
        # Language modeling head
        self.lm_head = nn.Linear(d_model, vocab_size, bias=False)
        
        self.dropout = nn.Dropout(dropout)
        
        # 가중치 초기화
        self.apply(self._init_weights)
        
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
        elif isinstance(module, nn.LayerNorm):
            torch.nn.init.zeros_(module.bias)
            torch.nn.init.ones_(module.weight)
    
    def generate_causal_mask(self, seq_len, device):
        
        mask = torch.tril(torch.ones(seq_len, seq_len, device=device, dtype=torch.bool))
        return mask.unsqueeze(0).unsqueeze(0)  # (1, 1, seq_len, seq_len)
    
    def forward(self, input_ids):
        batch_size, seq_len = input_ids.size()
        
        # 패딩 마스크 생성 
        pad_mask = (input_ids != PAD_ID).unsqueeze(1).unsqueeze(2)  # boolean
        
        # Causal mask 생성 
        causal_mask = self.generate_causal_mask(seq_len, input_ids.device)  # boolean
        
        # 두 마스크를 결합 
        combined_mask = pad_mask & causal_mask
        
        # 임베딩: 토큰 + 포지션
        token_embeds = self.token_embedding(input_ids) * math.sqrt(self.d_model)
        pos_embeds = self.pos_encoding(input_ids)
        
        x = self.dropout(token_embeds + pos_embeds)
        
        # 디코더 레이어들 통과
        for layer in self.decoder_layers:
            x = layer(x, combined_mask)
        
        # 최종 LayerNorm
        x = self.ln_final(x)
        
        # Language modeling head
        logits = self.lm_head(x)
        
        return logits

In [8]:
# ===== 6. 데이터 로더 준비 =====
train_df, val_df = train_test_split(df, test_size=0.1, random_state=42)

train_dataset = GPTChatbotDataset(train_df, sp, max_length=128)  # GPT 데이터셋 사용
val_dataset = GPTChatbotDataset(val_df, sp, max_length=128)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

print(f"학습 데이터: {len(train_dataset)}개")
print(f"검증 데이터: {len(val_dataset)}개")

학습 데이터: 10575개
검증 데이터: 1175개


In [9]:
# ===== 7. 모델 설정 ====
model = GPTChatbot(
    vocab_size=vocab_size,
    d_model=128,        
    num_heads=4,        
    num_layers=4,       
    d_ff=512,          
    max_seq_length=128,
    dropout=0.3       
)

criterion = nn.CrossEntropyLoss(ignore_index=PAD_ID)
optimizer = optim.Adam(model.parameters(), lr=0.005)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
print(f"Using device: {device}")
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

Using device: cuda
Model parameters: 2,857,728


In [10]:
# ===== 8. 학습 함수 (GPT-1 방식으로 수정) =====
def train_epoch(model, train_loader, criterion, optimizer, device):
    """
    변경사항: 언어 모델링 목적함수로 학습
    """
    model.train()
    total_loss = 0
    
    for batch in train_loader:
        input_ids = batch['input_ids'].to(device)
        target_ids = batch['target_ids'].to(device)
        
        optimizer.zero_grad()
        
        # Forward pass
        logits = model(input_ids)
        
        # Loss 계산 (다음 토큰 예측)
        loss = criterion(logits.view(-1, vocab_size), target_ids.view(-1))
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(train_loader)

def validate_epoch(model, val_loader, criterion, device):
    """
    변경사항: 언어 모델링 목적함수로 검증
    """
    model.eval()
    total_loss = 0
    
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            target_ids = batch['target_ids'].to(device)
            
            logits = model(input_ids)
            loss = criterion(logits.view(-1, vocab_size), target_ids.view(-1))
            
            total_loss += loss.item()
    
    return total_loss / len(val_loader)

In [11]:
# ===== 9. 학습 실행 =====
best_val_loss = float('inf')
patience = 5
patience_counter = 0

num_epochs = 30
train_losses = []
val_losses = []

for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss = validate_epoch(model, val_loader, criterion, device)
    
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    
    print(f"Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
    
    # Early Stopping 체크
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), 'best_gpt_chatbot.pth')
       
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping!")
            break

print("학습 완료!")

Epoch 1/30: Train Loss: 5.7529, Val Loss: 5.3375
Epoch 2/30: Train Loss: 5.1055, Val Loss: 5.0284
Epoch 3/30: Train Loss: 4.8061, Val Loss: 4.8858
Epoch 4/30: Train Loss: 4.6004, Val Loss: 4.7626
Epoch 5/30: Train Loss: 4.4279, Val Loss: 4.7143
Epoch 6/30: Train Loss: 4.2805, Val Loss: 4.6507
Epoch 7/30: Train Loss: 4.1677, Val Loss: 4.6495
Epoch 8/30: Train Loss: 4.1145, Val Loss: 4.6396
Epoch 9/30: Train Loss: 4.0122, Val Loss: 4.6289
Epoch 10/30: Train Loss: 3.9340, Val Loss: 4.6309
Epoch 11/30: Train Loss: 3.8730, Val Loss: 4.6082
Epoch 12/30: Train Loss: 3.8253, Val Loss: 4.6046
Epoch 13/30: Train Loss: 3.7627, Val Loss: 4.5988
Epoch 14/30: Train Loss: 3.7216, Val Loss: 4.6037
Epoch 15/30: Train Loss: 3.6667, Val Loss: 4.6355
Epoch 16/30: Train Loss: 3.6467, Val Loss: 4.6197
Epoch 17/30: Train Loss: 3.6214, Val Loss: 4.5680
Epoch 18/30: Train Loss: 3.5846, Val Loss: 4.6161
Epoch 19/30: Train Loss: 3.5657, Val Loss: 4.6193
Epoch 20/30: Train Loss: 3.5345, Val Loss: 4.6394
Epoch 21/

In [12]:
# ===== 10. GPT-1 방식 추론 함수 =====
def generate_response(model, question, max_length=64, temperature=0.8, top_k=50):
    model.eval()
    
    with torch.no_grad():
        # 입력 준비: Question + <SEP>
        input_text = question + " <SEP>"
        input_ids = [BOS_ID] + sp.encode_as_ids(input_text)
        input_tensor = torch.tensor([input_ids], dtype=torch.long).to(device)
        
        # 자동회귀 생성
        for _ in range(max_length):
            # 현재 시퀀스에서 다음 토큰 예측
            logits = model(input_tensor)
            next_token_logits = logits[0, -1, :] / temperature
            
            # Top-k 샘플링
            if top_k > 0:
                values, indices = torch.topk(next_token_logits, top_k)
                next_token_logits[next_token_logits < values[-1]] = -float('inf')
            
            # 다음 토큰 샘플링
            probs = F.softmax(next_token_logits, dim=-1)
            next_token = torch.multinomial(probs, 1)
            
            # EOS 토큰이면 종료
            if next_token.item() == EOS_ID:
                break
            
            # 시퀀스에 추가
            input_tensor = torch.cat([input_tensor, next_token.unsqueeze(0)], dim=1)
        
        # 응답 부분만 추출
        generated_ids = input_tensor[0].tolist()
        
        # <SEP> 이후 부분만 디코딩
        sep_idx = None
        for i, token_id in enumerate(generated_ids):
            if token_id == SEP_ID:
                sep_idx = i
                break
        
        if sep_idx is not None and sep_idx + 1 < len(generated_ids):
            response_ids = generated_ids[sep_idx + 1:]
            # BOS, EOS, PAD 제거
            response_ids = [id for id in response_ids if id not in [BOS_ID, EOS_ID, PAD_ID]]
            response = sp.decode_ids(response_ids).strip()
        else:
            response = "죄송합니다. 답변을 생성할 수 없습니다."
    
    return response


In [13]:
# ===== 11. 테스트 =====
# 모델 로드
model.load_state_dict(torch.load('best_gpt_chatbot.pth'))

test_questions = [
    "안녕하세요", 
    "오늘 날씨 어때?", 
    "뭐하고 있어?",
    "고마워",
    "미안해",
    "사랑이란 무엇일까?"
]

print("=== GPT-1 챗봇 테스트 ===")
for q in test_questions:
    response = generate_response(model, q, max_length=32, temperature=0.8)
    print(f"Q: {q}")
    print(f"A: {response}\n")

=== GPT-1 챗봇 테스트 ===
Q: 안녕하세요
A: 회원정보 찾기를 해보세요.

Q: 오늘 날씨 어때?
A: 저도 해보고 싶네요.

Q: 뭐하고 있어?
A: 어떤 말도 위로가 되지 않으니까요.

Q: 고마워
A: 저도 좀 더 힘들거예요.

Q: 미안해
A: 저도 받아들여야 할 거예요.

Q: 사랑이란 무엇일까?
A: 당신이 옆에 있어요.

