In [42]:
import os
import re
import pandas as pd
import numpy as np
import sentencepiece as spm
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, ops
from tensorflow.keras.preprocessing.sequence import pad_sequences

# 데이터 다운로드 및 도구 설치
!mkdir -p data
if not os.path.exists('data/ChatbotData.csv'):
    !wget https://github.com/songys/Chatbot_data/raw/master/ChatbotData.csv -O data/ChatbotData.csv
!pip install sentencepiece

# 전처리 함수
def preprocess_sentence(sentence):
    sentence = sentence.strip()
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^가-힣?.!,0-9a-zA-Z]+", " ", sentence)
    return sentence.strip()

data = pd.read_csv('data/ChatbotData.csv')
questions = [preprocess_sentence(q) for q in data['Q']]
answers = [preprocess_sentence(a) for a in data['A']]

with open('chatbot.txt', 'w', encoding='utf-8') as f:
    for line in questions + answers:
        f.write(line + '\n')



In [43]:
# Step 2: 데이터 결합 방식 변경
tokenized_data = []
for q, a in zip(questions, answers):
    # 구조: BOS + 질문 + | + 답변 + EOS
    # 여기서 '|'는 질문이 끝나고 답변이 시작됨을 알리는 아주 강력한 신호입니다.
    combined = [s.bos_id()] + s.EncodeAsIds(q) + s.EncodeAsIds("|") + s.EncodeAsIds(a) + [s.eos_id()]
    tokenized_data.append(combined)

# 길이를 넉넉히 60으로 잡고 패딩 처리
tokenized_data = pad_sequences(tokenized_data, maxlen=60, padding='post', value=0)

# [중요] 데이터셋을 새로 만들고 모델을 반드시 다시 'fit' 시켜야 합니다!
dataset = tf.data.Dataset.from_tensor_slices((tokenized_data[:, :-1], tokenized_data[:, 1:])).shuffle(20000).batch(64)
model.fit(dataset, epochs=30) # 이 구조를 다시 학습시켜야 모델이 '벽'을 인식합니다.

Epoch 1/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 51ms/step - accuracy: 0.9638 - loss: 0.1927
Epoch 2/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 53ms/step - accuracy: 0.9640 - loss: 0.1895
Epoch 3/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 55ms/step - accuracy: 0.9643 - loss: 0.1882
Epoch 4/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 57ms/step - accuracy: 0.9642 - loss: 0.1879
Epoch 5/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 54ms/step - accuracy: 0.9642 - loss: 0.1875
Epoch 6/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 52ms/step - accuracy: 0.9644 - loss: 0.1863
Epoch 7/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 51ms/step - accuracy: 0.9641 - loss: 0.1857
Epoch 8/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 51ms/step - accuracy: 0.9643 - loss: 0.1851
Epoch 9/30
[1m185/185[0m

<keras.src.callbacks.history.History at 0x7878d8b854f0>

In [44]:
# Step 3: GPT-1 모델 설계
def gelu(x):
    return 0.5 * x * (1 + ops.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * ops.power(x, 3))))

class GPTCausalMask(layers.Layer):
    def call(self, inputs):
        seq_len = ops.shape(inputs)[1]
        mask = 1.0 - ops.tri(seq_len, seq_len, k=0, dtype="float32")
        return mask[None, None, :, :]
    def compute_output_shape(self, input_shape):
        return (None, 1, input_shape[1], input_shape[1])

class PositionalEncoding(layers.Layer):
    def __init__(self, position, d_model, **kwargs):
        super().__init__(**kwargs)
        self.pos_encoding = self.positional_encoding(position, d_model)
    def positional_encoding(self, position, d_model):
        angle_rads = (ops.cast(ops.arange(position), "float32")[:, None] /
                      ops.power(10000.0, (2.0 * (ops.cast(ops.arange(d_model), "float32")[None, :] // 2.0)) / ops.cast(d_model, "float32")))
        sines, cosines = ops.sin(angle_rads[:, 0::2]), ops.cos(angle_rads[:, 1::2])
        return ops.concatenate([sines, cosines], axis=-1)[None, ...]
    def call(self, inputs):
        return ops.cast(inputs, "float32") + self.pos_encoding[:, :ops.shape(inputs)[1], :]

class MultiHeadAttention(layers.Layer):
    def __init__(self, d_model, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.num_heads, self.d_model, self.depth = num_heads, d_model, d_model // num_heads
        self.q_dense, self.k_dense, self.v_dense, self.dense = [layers.Dense(d_model) for _ in range(4)]
    def split_heads(self, x, batch_size):
        return ops.transpose(ops.reshape(x, (batch_size, -1, self.num_heads, self.depth)), (0, 2, 1, 3))
    def call(self, inputs):
        q, k, v, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
        batch_size = ops.shape(q)[0]
        q, k, v = self.split_heads(self.q_dense(q), batch_size), self.split_heads(self.k_dense(k), batch_size), self.split_heads(self.v_dense(v), batch_size)
        logits = ops.matmul(q, ops.transpose(k, (0, 1, 3, 2))) / np.sqrt(self.depth)
        if mask is not None: logits += (mask * -1e9)
        out = ops.transpose(ops.matmul(ops.softmax(logits, axis=-1), v), (0, 2, 1, 3))
        return self.dense(ops.reshape(out, (batch_size, -1, self.d_model)))

def build_gpt(vocab_size, num_layers, dff, d_model, num_heads):
    inputs = layers.Input(shape=(None,), name="inputs")
    look_ahead_mask = GPTCausalMask()(inputs)
    x = layers.Embedding(vocab_size, d_model)(inputs)
    x = PositionalEncoding(vocab_size, d_model)(x * np.sqrt(d_model))
    for i in range(num_layers):
        attn = MultiHeadAttention(d_model, num_heads, name=f"mha_{i}")({'query': x, 'key': x, 'value': x, 'mask': look_ahead_mask})
        x = layers.LayerNormalization(epsilon=1e-6)(x + attn)
        ffn = layers.Dense(dff, activation=gelu)(x)
        x = layers.LayerNormalization(epsilon=1e-6)(x + layers.Dense(d_model)(ffn))
    return keras.Model(inputs=inputs, outputs=layers.Dense(vocab_size)(x))

In [45]:
# Step 4: 모델 학습 및 결과 확인
class CustomSchedule(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super().__init__()
        self.d_model, self.warmup_steps = ops.cast(d_model, "float32"), warmup_steps
    def __call__(self, step):
        step = ops.cast(step, "float32")
        return ops.rsqrt(self.d_model) * ops.minimum(ops.rsqrt(step), step * (self.warmup_steps**-1.5))

model = build_gpt(8000, 4, 512, 256, 8)
model.compile(optimizer=keras.optimizers.Adam(CustomSchedule(256), beta_1=0.9, beta_2=0.98, epsilon=1e-9),
              loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

dataset = tf.data.Dataset.from_tensor_slices((tokenized_data[:, :-1], tokenized_data[:, 1:])).shuffle(20000).batch(64)
model.fit(dataset, epochs=30)

Epoch 1/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 82ms/step - accuracy: 0.4848 - loss: 7.4078
Epoch 2/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 51ms/step - accuracy: 0.7572 - loss: 3.6745
Epoch 3/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 53ms/step - accuracy: 0.8047 - loss: 1.6549
Epoch 4/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 56ms/step - accuracy: 0.8148 - loss: 1.3750
Epoch 5/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 56ms/step - accuracy: 0.8264 - loss: 1.2453
Epoch 6/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 53ms/step - accuracy: 0.8344 - loss: 1.1417
Epoch 7/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 51ms/step - accuracy: 0.8416 - loss: 1.0528
Epoch 8/30
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 51ms/step - accuracy: 0.8479 - loss: 0.9741
Epoch 9/30
[1m185/185[0

<keras.src.callbacks.history.History at 0x7878154e7890>

In [46]:

# Step 5: 한글 깨짐 방지 및 답변 추출 로직 강화
def gpt_inference(sentence, temp=0.8):
    sentence = preprocess_sentence(sentence)
    # 질문 뒤에 학습 시 사용한 구분자 '|'를 반드시 붙여줍니다.
    input_ids = [s.bos_id()] + s.EncodeAsIds(sentence) + s.EncodeAsIds("|")
    input_seq = tf.convert_to_tensor([input_ids], dtype=tf.int32)

    for _ in range(40): # 최대 40단어 생성
        predictions = model(inputs=input_seq, training=False)[:, -1:, :] / temp

        # 확률 기반 샘플링으로 답변의 다양성 확보
        predicted_id = tf.random.categorical(tf.reshape(predictions, [1, -1]), num_samples=1)
        predicted_id = tf.cast(predicted_id, tf.int32)

        curr_id = predicted_id.numpy().item()
        # EOS가 나오거나 패딩(0)이 나오면 즉시 중단
        if curr_id == s.eos_id() or curr_id == 0:
            break

        input_seq = tf.concat([input_seq, predicted_id], axis=-1)

    # 전체 시퀀스 복원
    full_sentence = s.Decode(input_seq.numpy().squeeze().tolist())

    # 구분자 '|' 뒷부분만 답변으로 추출
    if "|" in full_sentence:
        answer = full_sentence.split("|")[-1].strip()
    else:
        answer = full_sentence.replace(sentence, "").strip()

    # 결과가 비어있거나 이상하면 예외 처리
    answer = answer.replace("??", "").strip()
    return answer if len(answer) > 0 else "조금 더 구체적으로 물어봐 주실래요?"

# 최종 성능 검증 테스트
test_questions = [
    "나 너무 피곤해",
    "갑자기 화가나",
    "나는 지금 배가 고파. 저녁 메뉴를 추천해줄래?",
    "AI는 뭐야?",
    "앞으로 AI 시장은 어떻게 변할까?"
]

print("\n--- 최종 모델 성능 검증 결과 ---")
for q in test_questions:
    print(f"Q: {q}\nA: {gpt_inference(q)}\n{'-'*30}")


--- 최종 모델 성능 검증 결과 ---
Q: 나 너무 피곤해
A: ⁇  좀 더 일찍 잠자리에 들어보세요 .
------------------------------
Q: 갑자기 화가나
A: ⁇  자신의 삶의 낙이죠 .
------------------------------
Q: 나는 지금 배가 고파. 저녁 메뉴를 추천해줄래?
A: ⁇  더 참았어요 .
------------------------------
Q: AI는 뭐야?
A: A ⁇ 는 뭐야 ?  ⁇  그 사람의 사랑의 고민를 찾아
------------------------------
Q: 앞으로 AI 시장은 어떻게 변할까?
A: 앞으로 A ⁇  시장은 어떻게 변할까 ?  ⁇  하세요 .
------------------------------


In [None]:
#회고
# Transformer는 질문을 해석하는 인코더와 답변을 만드는 디코더가 있는 것임. 하지만 GPT는 디코더만 사용함. GPT의 핵심은 앞에 나온 단어를 보고 다음에 올 가장 자연스러운 단어를 예측하는 것임. 따라서 인코더를 생략하고 디코더 스택만 쌓은 GPT 스타일로 변경,
# 기존 FeLU 대신 GPT-1에서 사용된 GELU를 적용함. GELU는 0 근처에서 더 부드럽게 꺾이는 특성이 있기 때문에 인경 신경망이 복잡한 언어 패턴을 유연하게 학습하도록 도움
# 지난주 과제는 데이터의 한계점이라 생각하고 이번엔 데이터를 바꿔봤는데도 여전히 답이 이상한 것들이 많았음. 학습된 데이터에서는 AI라는 단어가 없어서 그런지 AI 자체를 인지하지 못함.
# 추후에는 더 많은 양을 학습시키는 등의 노력이 필요해보임