<a href="https://colab.research.google.com/github/jong104b-kr/AIFFEL_quest_cr/blob/master/Exploration/Quest05/SubQuestC_26_ChatGPT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Step 0. 라이브러리 설치 및 임포트
!pip install -q tensorflow tensorflow-datasets

import tensorflow as tf
import tensorflow_datasets as tfds
import pandas as pd
import numpy as np
import re
import matplotlib.pyplot as plt

print("TensorFlow version:", tf.__version__)

TensorFlow version: 2.18.0


In [2]:
# Step 1. 데이터 수집

# Google Drive에서 파일을 직접 업로드하거나 수동 다운로드한 후 Colab으로 업로드
from google.colab import files

uploaded = files.upload()  # ChatbotData.csv 업로드

df = pd.read_csv('ChatbotData.csv')
print(f"데이터 샘플 수: {len(df)}")
df.head()

Saving ChatbotData.csv to ChatbotData.csv
데이터 샘플 수: 11823


Unnamed: 0,Q,A,label
0,12시 땡!,하루가 또 가네요.,0
1,1지망 학교 떨어졌어,위로해 드립니다.,0
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.,0
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.,0
4,PPL 심하네,눈살이 찌푸려지죠.,0


In [3]:
# Step 2. 데이터 전처리

def preprocess_sentence(sentence):
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = sentence.strip()
    return sentence

questions = [preprocess_sentence(q) for q in df['Q']]
answers = ["<start> " + preprocess_sentence(a) + " <end>" for a in df['A']]

In [4]:
# Step 3. SubwordTextEncoder 사용

tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    questions + answers, target_vocab_size=2**13)

VOCAB_SIZE = tokenizer.vocab_size + 2
START_TOKEN = [VOCAB_SIZE - 2]
END_TOKEN = [VOCAB_SIZE - 1]

def tokenize_and_filter(inputs, outputs):
    tokenized_inputs, tokenized_outputs = [], []
    for (sentence1, sentence2) in zip(inputs, outputs):
        sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
        sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN
        if len(sentence1) <= 40 and len(sentence2) <= 40:
            tokenized_inputs.append(sentence1)
            tokenized_outputs.append(sentence2)
    return tokenized_inputs, tokenized_outputs

questions, answers = tokenize_and_filter(questions, answers)

# Tensor 변환 및 패딩
MAX_LENGTH = 40
BATCH_SIZE = 64
BUFFER_SIZE = 20000

questions = tf.keras.preprocessing.sequence.pad_sequences(questions, maxlen=MAX_LENGTH, padding='post')
answers = tf.keras.preprocessing.sequence.pad_sequences(answers, maxlen=MAX_LENGTH, padding='post')

dataset = tf.data.Dataset.from_tensor_slices((
    {'inputs': questions, 'dec_inputs': answers[:, :-1]},
    {'outputs': answers[:, 1:]},
))
dataset = dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.AUTOTUNE)

In [5]:
# Step 4. Transformer 모델 구성

# Positional Encoding
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                            np.arange(d_model)[np.newaxis, :],
                            d_model)

    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

# 마스크 생성
def create_padding_mask(seq):
    return tf.cast(tf.math.equal(seq, 0), tf.float32)[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(size):
    return 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)

# Scaled Dot-Product Attention
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)

    return output, attention_weights

# Multi-head Attention
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads
        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)
        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, _ = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        output = self.dense(concat_attention)

        return output

# Encoder & Decoder Layer
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'),
        tf.keras.layers.Dense(d_model)
    ])

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(dropout)
        self.dropout2 = tf.keras.layers.Dropout(dropout)

    def call(self, x, mask):
        attn_output = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2

class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout):
        super().__init__()
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(dropout)
        self.dropout2 = tf.keras.layers.Dropout(dropout)
        self.dropout3 = tf.keras.layers.Dropout(dropout)

    def call(self, x, enc_output, look_ahead_mask, padding_mask):
        attn1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1)
        out1 = self.layernorm1(attn1 + x)

        attn2 = self.mha2(enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2)
        out2 = self.layernorm2(attn2 + out1)

        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output)
        out3 = self.layernorm3(ffn_output + out2)

        return out3

# 전체 Transformer 클래스 정의는 길어서 나눠드릴게요. 다음으로 넘어갈까요?

In [6]:
# 전체 Transformer 구성
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, dropout):
        super().__init__()
        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(MAX_LENGTH, d_model)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, dropout) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(dropout)

    def call(self, x, mask):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, mask)

        return x

class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, dropout):
        super().__init__()
        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(MAX_LENGTH, d_model)

        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, dropout) for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(dropout)

    def call(self, x, enc_output, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x)

        for i in range(self.num_layers):
            x = self.dec_layers[i](x, enc_output, look_ahead_mask, padding_mask)

        return x

class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, dropout=0.1):
        super().__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, dropout)
        self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, dropout)

        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def call(self, inputs, training):
        enc_inputs, dec_inputs = inputs['inputs'], inputs['dec_inputs']
        enc_padding_mask = create_padding_mask(enc_inputs)
        dec_padding_mask = create_padding_mask(enc_inputs)
        look_ahead_mask = create_look_ahead_mask(tf.shape(dec_inputs)[1])
        dec_target_padding_mask = create_padding_mask(dec_inputs)
        combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

        enc_output = self.encoder(enc_inputs, enc_padding_mask)
        dec_output = self.decoder(dec_inputs, enc_output, combined_mask, dec_padding_mask)

        final_output = self.final_layer(dec_output)
        return final_output

In [7]:
# 손실함수 정의
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(y_true, y_pred):
    y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
    loss = loss_object(y_true, y_pred)

    mask = tf.cast(tf.not_equal(y_true, 0), dtype=loss.dtype)
    loss *= mask

    return tf.reduce_mean(loss)

# 옵티마이저
learning_rate = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=1e-3,
    decay_steps=10000,
    decay_rate=0.9)

optimizer = tf.keras.optimizers.Adam(learning_rate)

In [8]:
# 모델 선언
NUM_LAYERS = 2
D_MODEL = 256
NUM_HEADS = 8
DFF = 512
DROPOUT = 0.1

transformer = Transformer(
    num_layers=NUM_LAYERS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dff=DFF,
    input_vocab_size=VOCAB_SIZE,
    target_vocab_size=VOCAB_SIZE,
    dropout=DROPOUT)

In [9]:
# 모델 컴파일
transformer.compile(optimizer=optimizer, loss=loss_function)

# 학습
EPOCHS = 20

transformer.fit(dataset, epochs=EPOCHS)

Epoch 1/20


TypeError: Expected float32, but got outputs of type 'str'.

In [10]:
def evaluate(sentence):
    sentence = preprocess_sentence(sentence)
    sentence = START_TOKEN + tokenizer.encode(sentence) + END_TOKEN
    sentence = tf.expand_dims(sentence, axis=0)

    output = tf.expand_dims(START_TOKEN, 0)

    for i in range(MAX_LENGTH):
        predictions = transformer(inputs={'inputs': sentence, 'dec_inputs': output}, training=False)
        predictions = predictions[:, -1:, :]
        predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

        if tf.equal(predicted_id, END_TOKEN[0]):
            break

        output = tf.concat([output, predicted_id], axis=-1)

    return tf.squeeze(output, axis=0)

def predict(sentence):
    prediction = evaluate(sentence)
    predicted_sentence = tokenizer.decode(
        [i for i in prediction if i < tokenizer.vocab_size])

    print(f'Q: {sentence}')
    print(f'A: {predicted_sentence}')

In [11]:
# 예시 테스트
predict("오늘 날씨 어때?")
predict("고양이가 너무 귀여워")
predict("너 이름이 뭐야?")
predict("나 너무 우울해")
predict("점심 뭐 먹을까?")

Q: 오늘 날씨 어때?
A: 잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어희생양잃어버렸어희생양잃어버렸어희생양잃어버렸어러가잃어버렸어희생양잃어버렸어희생양잃어버렸어희생양러가러가러가잃어버렸어잃어버렸어
Q: 고양이가 너무 귀여워
A: 잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어러가잃어버렸어잃어버렸어잃어버렸어잃어버렸어희생양잃어버렸어희생양러가러가잃어버렸어잃어버렸어
Q: 너 이름이 뭐야?
A: 잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어러가잃어버렸어러가잃어버렸어러가잃어버렸어러가러가러가러가잃어버렸어러가러가러가러가러가러가러가러가
Q: 나 너무 우울해
A: 잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어려지해보는잃어버렸어려지해보는잃어버렸어려지해보는잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어려지러가잃어버렸어러가잃어버렸어러가러가러가러가잃어버렸어잃어버렸어잃어버렸어러가러가러가러가러가러가러가
Q: 점심 뭐 먹을까?
A: 잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어비밀번호 러가잃어버렸어잃어버렸어잃어버렸어잃어버렸어잃어버렸어러가러가러가잃어버렸어잃어버렸어잃어버렸어
