In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
train_path = "/content/drive/MyDrive/AIFFEL_Exploration/5/개인및관계_train.json"
valid_path = "/content/drive/MyDrive/AIFFEL_Exploration/5/개인및관계_valid.json"


In [3]:
import json

def load_json_pairs(json_path):
    with open(json_path, 'r', encoding='utf-8') as f:
        raw = json.load(f)

    pairs = []
    for dialog in raw['data']:
        utterances = dialog.get('body', [])
        utter_texts = [u.get('utterance', '').strip() for u in utterances if u.get('utterance', '').strip()]

        for i in range(len(utter_texts) - 1):
            q = utter_texts[i]
            a = utter_texts[i + 1]
            pairs.append((q, a))

    return pairs


In [4]:
train_pairs = load_json_pairs(train_path)
valid_pairs = load_json_pairs(valid_path)

print("train_pairs 수:", len(train_pairs))  # 드디어 0보다 커야 해! 🎉


train_pairs 수: 7919850


In [5]:
with open(train_path, 'r', encoding='utf-8') as f:
    sample = json.load(f)

print(sample.keys())  # 'data' 있는지 확인
print(sample['data'][0])  # 한 개만 보기


dict_keys(['numberOfItems', 'data'])
{'header': {'dialogueInfo': {'dialogueID': '565b3753-3543-56c6-9201-fca177bccfcc', 'numberOfParticipants': 2, 'numberOfUtterances': 30, 'numberOfTurns': 12, 'type': '일상 대화', 'topic': '개인 및 관계'}, 'participantsInfo': [{'participantID': 'P01', 'gender': '여성', 'age': '20대', 'residentialProvince': '경기도'}, {'participantID': 'P02', 'gender': '남성', 'age': '20대', 'residentialProvince': '경기도'}]}, 'body': [{'utteranceID': 'U1', 'turnID': 'T1', 'participantID': 'P01', 'date': '2017-07-24', 'time': '06:27:00', 'utterance': '나지금밥머거2시간걸어서'}, {'utteranceID': 'U2', 'turnID': 'T1', 'participantID': 'P01', 'date': '2017-07-24', 'time': '06:27:00', 'utterance': '번화가찾았어..ㅜㅜ'}, {'utteranceID': 'U3', 'turnID': 'T1', 'participantID': 'P01', 'date': '2017-07-24', 'time': '06:27:00', 'utterance': '잉ㅜㅜ'}, {'utteranceID': 'U4', 'turnID': 'T2', 'participantID': 'P02', 'date': '2017-07-24', 'time': '09:36:00', 'utterance': '헐 ㅠㅠ'}, {'utteranceID': 'U5', 'turnID': 'T2', 'particip

#전처리

In [6]:
import re

def clean_text(text):
    text = str(text).strip()
    text = re.sub(r"[^ㄱ-ㅎ가-힣0-9a-zA-Z .,!?]", "", text)
    return text

def apply_cleaning(pairs):
    cleaned = []
    for q, a in pairs:
        q_clean = clean_text(q)
        a_clean = clean_text(a)
        if q_clean and a_clean:
            cleaned.append((q_clean, a_clean))
    return cleaned

In [7]:
train_pairs = load_json_pairs(train_path)
valid_pairs = load_json_pairs(valid_path)

cleaned_train_pairs = apply_cleaning(train_pairs)
cleaned_valid_pairs = apply_cleaning(valid_pairs)

print("✅ cleaned_train_pairs 개수:", len(cleaned_train_pairs))



✅ cleaned_train_pairs 개수: 7814353


In [18]:
with open("train_texts_ko.txt", "w", encoding="utf-8") as f:
    for q, a in cleaned_train_pairs:
        f.write(q + "\n")
        f.write(a + "\n")


In [8]:
!pip install tokenizers




#✅ 대화 데이터를 숫자 시퀀스로 변환하고, 학습용 Dataset 만들기

In [27]:
from tokenizers import ByteLevelBPETokenizer
import os

save_dir = "tokenizer_korean"
os.makedirs(save_dir, exist_ok=True)

tokenizer = ByteLevelBPETokenizer()
tokenizer.train(
    files="train_texts_ko.txt",
    vocab_size=8000,
    min_frequency=2,
    special_tokens=["<s>", "<pad>", "</s>", "<unk>", "<mask>"]
)

tokenizer.save_model(save_dir)


['tokenizer_korean/vocab.json', 'tokenizer_korean/merges.txt']

In [31]:
from tokenizers.implementations import ByteLevelBPETokenizer
from transformers import PreTrainedTokenizerFast

# 1. 기존 BPE 토크나이저 다시 불러오기
bpe_tokenizer = ByteLevelBPETokenizer(
    "tokenizer_korean/vocab.json",
    "tokenizer_korean/merges.txt"
)

# 2. HuggingFace용으로 변환
hf_tokenizer = PreTrainedTokenizerFast(
    tokenizer_object=bpe_tokenizer,
    unk_token="<unk>",
    pad_token="<pad>",
    cls_token="<s>",
    sep_token="</s>",
    mask_token="<mask>"
)

# 3. tokenizer.json 생성 및 저장
hf_tokenizer.save_pretrained("tokenizer_korean")


('tokenizer_korean/tokenizer_config.json',
 'tokenizer_korean/special_tokens_map.json',
 'tokenizer_korean/tokenizer.json')

In [38]:
from transformers import PreTrainedTokenizerFast

tokenizer = PreTrainedTokenizerFast.from_pretrained(
    "./tokenizer_korean",
    unk_token="<unk>",
    pad_token="<pad>",
    cls_token="<s>",
    sep_token="</s>",       # 여기 설정하면...
    mask_token="<mask>"
)

print("sep_token 확인:", tokenizer.sep_token)  # → '</s>' 나와야 해!




sep_token 확인: </s>


In [39]:
!ls tokenizer_korean

merges.txt		 tokenizer_config.json	vocab.json
special_tokens_map.json  tokenizer.json


#토크나이즈

In [40]:
def tokenize_conversations_batch(pairs, tokenizer, max_length=64):
    texts = [q + tokenizer.sep_token + a for q, a in pairs]
    encoded = tokenizer(
        texts,
        padding="max_length",
        truncation=True,
        max_length=max_length,
        return_tensors="pt"
    )
    return list(encoded["input_ids"]), list(encoded["attention_mask"])


In [94]:
sample_pairs = cleaned_train_pairs[:50000]
valid_sample_pairs = cleaned_valid_pairs[:5000]
train_inputs = [q + tokenizer.sep_token for q, a in cleaned_train_pairs[:50000]]
train_labels = [a for q, a in cleaned_train_pairs[:50000]]


In [95]:
train_input_ids, train_attention_masks = tokenize_conversations_fast(sample_pairs, tokenizer)
valid_input_ids, valid_attention_masks = tokenize_conversations_fast(cleaned_valid_pairs[:5000], tokenizer)



In [96]:
import torch
from torch.utils.data import Dataset

class ChatDataset(Dataset):
    def __init__(self, input_ids, attention_masks):
        self.input_ids = input_ids
        self.attention_masks = attention_masks

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return {
            "input_ids": torch.tensor(self.input_ids[idx], dtype=torch.long),
            "attention_mask": torch.tensor(self.attention_masks[idx], dtype=torch.long),
            "labels": torch.tensor(self.input_ids[idx], dtype=torch.long)
        }

train_dataset = ChatDataset(train_input_ids, train_attention_masks)
valid_dataset = ChatDataset(valid_input_ids, valid_attention_masks)




In [97]:
train_dataset = ChatDataset(train_input_ids, train_attention_masks)
valid_dataset = ChatDataset(valid_input_ids, valid_attention_masks)


In [102]:
from torch.utils.data import Dataset

class QAChatDataset(Dataset):
    def __init__(self, questions, answers, tokenizer, max_length=64):
        self.questions = questions
        self.answers = answers
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        q = self.questions[idx]
        a = self.answers[idx]

        enc_input = self.tokenizer(
            q, truncation=True, padding="max_length",
            max_length=self.max_length, return_tensors="pt"
        )
        enc_label = self.tokenizer(
            a, truncation=True, padding="max_length",
            max_length=self.max_length, return_tensors="pt"
        )

        return {
            "input_ids": enc_input["input_ids"].squeeze(),
            "labels": enc_label["input_ids"].squeeze()
        }


In [103]:
from torch.utils.data import DataLoader

train_dataset = QAChatDataset(train_inputs, train_labels, tokenizer)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)


#모델 만들기

positional Encoding

In [46]:
import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # [1, max_len, d_model]

        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)].to(x.device)
        return x


TokenEmbbeding

In [47]:
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        return self.embedding(x)


Multi-Head Self-Attention

In [48]:
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0

        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        self.qkv_proj = nn.Linear(d_model, d_model * 3)
        self.out_proj = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size, seq_len, d_model = x.size()
        qkv = self.qkv_proj(x)  # [B, S, 3*D]
        qkv = qkv.view(batch_size, seq_len, 3, self.num_heads, self.head_dim)
        qkv = qkv.permute(2, 0, 3, 1, 4)  # [3, B, H, S, D]
        q, k, v = qkv[0], qkv[1], qkv[2]

        attn_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim)  # [B, H, S, S]
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, float('-inf'))

        attn_weights = torch.softmax(attn_scores, dim=-1)
        out = torch.matmul(attn_weights, v)  # [B, H, S, D]
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)

        return self.out_proj(out)


Feed Forward

In [49]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear2(self.dropout(self.relu(self.linear1(x))))


Decoder Block

In [50]:
class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attn = MultiHeadSelfAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_out = self.attn(self.norm1(x), mask)
        x = x + self.dropout(attn_out)
        ffn_out = self.ffn(self.norm2(x))
        x = x + self.dropout(ffn_out)
        return x


In [51]:
class GPT(nn.Module):
    def __init__(self, vocab_size, d_model=512, n_layers=6, n_heads=8, d_ff=2048, max_len=512, dropout=0.1):
        super().__init__()

        self.token_embedding = TokenEmbedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)

        self.blocks = nn.ModuleList([
            DecoderBlock(d_model, n_heads, d_ff, dropout)
            for _ in range(n_layers)
        ])

        self.norm = nn.LayerNorm(d_model)
        self.lm_head = nn.Linear(d_model, vocab_size)

    def forward(self, input_ids):
        mask = self.generate_causal_mask(input_ids.size(1)).to(input_ids.device)
        x = self.token_embedding(input_ids)
        x = self.pos_encoding(x)

        for block in self.blocks:
            x = block(x, mask)

        x = self.norm(x)
        return self.lm_head(x)

    def generate_causal_mask(self, seq_len):
        return torch.tril(torch.ones((1, 1, seq_len, seq_len), device='cpu')).to(torch.bool)


In [52]:
import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = GPT(vocab_size=tokenizer.vocab_size).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
optimizer = AdamW(model.parameters(), lr=5e-4)


In [98]:
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=16)


In [110]:
import torch.nn as nn
from torch.optim import AdamW

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = GPT(vocab_size=tokenizer.vocab_size).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
optimizer = AdamW(model.parameters(), lr=5e-4)

epochs = 10

for epoch in range(epochs):
    model.train()
    total_loss = 0

    for batch in train_loader:
        input_ids = batch["input_ids"].to(device)
        labels = batch["labels"].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids)  # (batch, seq, vocab)

        loss = criterion(outputs.view(-1, outputs.size(-1)), labels.view(-1))
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"📘 Epoch {epoch+1} | Train Loss: {total_loss / len(train_loader):.4f}")




📘 Epoch 1 | Train Loss: 7.9094
📘 Epoch 2 | Train Loss: 7.7035
📘 Epoch 3 | Train Loss: 7.6686
📘 Epoch 4 | Train Loss: 7.6362
📘 Epoch 5 | Train Loss: 7.6026
📘 Epoch 6 | Train Loss: 7.5621
📘 Epoch 7 | Train Loss: 7.5081
📘 Epoch 8 | Train Loss: 7.4337
📘 Epoch 9 | Train Loss: 7.3384
📘 Epoch 10 | Train Loss: 7.2236


In [118]:
import torch.nn.functional as F
import random

def generate_response(prompt, tokenizer, model, max_length=50, top_k=10, temperature=1.0):
    model.eval()
    device = model.lm_head.weight.device

    input_text = prompt + tokenizer.sep_token
    input_ids = tokenizer(input_text, return_tensors="pt")["input_ids"].to(device)

    for _ in range(max_length):
        with torch.no_grad():
            outputs = model(input_ids)
            next_token_logits = outputs[:, -1, :] / temperature

            # Top-k 샘플링
            top_k_probs, top_k_indices = torch.topk(next_token_logits, top_k, dim=-1)
            probs = F.softmax(top_k_probs, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)

            next_token_id = top_k_indices.gather(-1, next_token)

        input_ids = torch.cat([input_ids, next_token_id], dim=1)

        if next_token_id.item() == tokenizer.convert_tokens_to_ids("</s>"):
            break

    full_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
    if tokenizer.sep_token in full_text:
        response = full_text.split(tokenizer.sep_token)[-1].strip()
    else:
        response = full_text.strip()

    return response




In [121]:
response = generate_response("안녕", tokenizer, model, top_k=20, temperature=0.9)
print("🤖", response)



🤖 안녕은게게..........고.....??...?..다..는 왜!만게..이?..가게..구?을?이..야게고시는 내가.. 기... 안..하고만이


#회고

* 챗봇이 아니라 스카이넷을 만들어냈다.

1.   배운 점 : 챗봇 구조를 직접 실험해보며 알아볼 수 있었다.
2.   어려웠던 점 : 전부 다 어려웠다. 토큰화 하는 것부터 어려웠다.

