In [1]:
import pandas as pd
import tensorflow as tf
import re
from sklearn.model_selection import train_test_split

# 1. 데이터 로드 및 전처리

In [2]:
file_path = './data/ChatbotData .csv'
data = pd.read_csv(file_path)

In [3]:
questions = data['Q'].tolist()
answers = data['A'].tolist()

In [4]:
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^가-힣a-zA-Z?.!,]+", " ", sentence)
    sentence = sentence.strip()
    return sentence

In [5]:
questions = [preprocess_sentence(sentence) for sentence in questions]
answers = [preprocess_sentence(sentence) for sentence in answers]

In [6]:
answers = ["<start> " + answer + " <end>" for answer in answers]

In [7]:
tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
tokenizer.fit_on_texts(questions + answers)

VOCAB_SIZE = len(tokenizer.word_index) + 1

In [8]:
questions_seq = tokenizer.texts_to_sequences(questions)
answers_seq = tokenizer.texts_to_sequences(answers)

In [9]:
MAX_LENGTH = 40  
questions_seq = tf.keras.preprocessing.sequence.pad_sequences(questions_seq, maxlen=MAX_LENGTH, padding='post')
answers_seq = tf.keras.preprocessing.sequence.pad_sequences(answers_seq, maxlen=MAX_LENGTH, padding='post')

In [10]:
questions_train, questions_val, answers_train, answers_val = train_test_split(questions_seq, answers_seq, test_size=0.2)

In [11]:
dec_inputs_train = answers_train[:, :-1]
dec_inputs_val = answers_val[:, :-1]

In [12]:
answers_train_shifted = answers_train[:, 1:]
answers_val_shifted = answers_val[:, 1:]

In [13]:
BUFFER_SIZE = 20000
BATCH_SIZE = 64

train_dataset = tf.data.Dataset.from_tensor_slices((questions_train, dec_inputs_train, answers_train_shifted))
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

val_dataset = tf.data.Dataset.from_tensor_slices((questions_val, dec_inputs_val, answers_val_shifted))
val_dataset = val_dataset.batch(BATCH_SIZE)

In [14]:
labels = data['label'].values  

questions_train, questions_val, answers_train, answers_val, labels_train, labels_val = train_test_split(
    questions_seq, answers_seq, labels, test_size=0.2, random_state=42
)

In [15]:
train_dataset = tf.data.Dataset.from_tensor_slices(((questions_train, dec_inputs_train), labels_train))
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

val_dataset = tf.data.Dataset.from_tensor_slices(((questions_val, dec_inputs_val), labels_val))
val_dataset = val_dataset.batch(BATCH_SIZE)


# 2. 트랜스포머 모델 정의

In [16]:
def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
    inputs = tf.keras.Input(shape=(None, d_model), name="inputs")

    attention = tf.keras.layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=d_model, dropout=dropout)(inputs, inputs)
    attention = tf.keras.layers.Dropout(rate=dropout)(attention)
    attention = tf.keras.layers.LayerNormalization(epsilon=1e-6)(inputs + attention)

    outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention)
    outputs = tf.keras.layers.Dense(units=d_model)(outputs)
    outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
    outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention + outputs)

    return tf.keras.Model(inputs=inputs, outputs=outputs, name=name)

def encoder(vocab_size, num_layers, units, d_model, num_heads, dropout, name="encoder"):
    inputs = tf.keras.Input(shape=(None,), name="inputs")
    padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

    embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
    embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
    embeddings = tf.keras.layers.Dropout(rate=dropout)(embeddings)

    outputs = embeddings

    for i in range(num_layers):
        outputs = encoder_layer(units, d_model, num_heads, dropout, name=f"encoder_layer_{i}")(outputs)

    return tf.keras.Model(inputs=[inputs, padding_mask], outputs=outputs, name=name)

def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
    inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
    enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
    look_ahead_mask = tf.keras.Input(shape=(1, None, None), name="look_ahead_mask")
    padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

    attention1 = tf.keras.layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=d_model, dropout=dropout)(inputs, inputs, attention_mask=look_ahead_mask)
    attention1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention1 + inputs)

    attention2 = tf.keras.layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=d_model, dropout=dropout)(attention1, enc_outputs, attention_mask=padding_mask)
    attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
    attention2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention2 + attention1)

    outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention2)
    outputs = tf.keras.layers.Dense(units=d_model)(outputs)
    outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
    outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(outputs + attention2)

    return tf.keras.Model(
        inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
        outputs=outputs,
        name=name)

def decoder(vocab_size, num_layers, units, d_model, num_heads, dropout, name="decoder"):
    inputs = tf.keras.Input(shape=(None,), name="inputs")
    enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
    look_ahead_mask = tf.keras.Input(shape=(1, None, None), name="look_ahead_mask")
    padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

    embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
    embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
    embeddings = tf.keras.layers.Dropout(rate=dropout)(embeddings)

    outputs = embeddings

    for i in range(num_layers):
        outputs = decoder_layer(units, d_model, num_heads, dropout, name=f"decoder_layer_{i}")(
            inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])

    return tf.keras.Model(
        inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
        outputs=outputs,
        name=name)


In [17]:
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(seq):
    seq_len = tf.shape(seq)[1]
    look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
    look_ahead_mask = look_ahead_mask[tf.newaxis, tf.newaxis, :, :]
    return look_ahead_mask  


def transformer(vocab_size, num_layers, units, d_model, num_heads, dropout, name="transformer"):
    inputs = tf.keras.Input(shape=(None,), name="inputs")
    dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")

    enc_padding_mask = tf.keras.layers.Lambda(create_padding_mask, output_shape=(1, 1, None), name='enc_padding_mask')(inputs)
    look_ahead_mask = tf.keras.layers.Lambda(lambda x: create_look_ahead_mask(x), output_shape=(1, None, None), name='look_ahead_mask')(dec_inputs)
    dec_padding_mask = tf.keras.layers.Lambda(create_padding_mask, output_shape=(1, 1, None), name='dec_padding_mask')(inputs)

    enc_outputs = encoder(
        vocab_size=vocab_size,
        num_layers=num_layers,
        units=units,
        d_model=d_model,
        num_heads=num_heads,
        dropout=dropout,
    )(inputs=[inputs, enc_padding_mask])

    dec_outputs = decoder(
        vocab_size=vocab_size,
        num_layers=num_layers,
        units=units,
        d_model=d_model,
        num_heads=num_heads,
        dropout=dropout,
    )(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])

    outputs = tf.keras.layers.Dense(units=3, activation='softmax', name="outputs")(dec_outputs[:, -1, :])

    return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)

# 3. 학습 설정 및 컴파일

In [18]:
NUM_LAYERS = 2
D_MODEL = 256
NUM_HEADS = 8
UNITS = 512
DROPOUT = 0.1

model = transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT)

def loss_function(y_true, y_pred):
    y_true = tf.reshape(y_true, shape=(-1,))
    return tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)(y_true, y_pred)

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()
        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)
        self.warmup_steps = warmup_steps

    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps**-1.5)
        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(D_MODEL)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

def accuracy(y_true, y_pred):
    y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
    return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)

model.compile(optimizer=optimizer, loss=loss_function, metrics=['accuracy'])

model.summary()


Model: "transformer"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
inputs (InputLayer)             [(None, None)]       0                                            
__________________________________________________________________________________________________
dec_inputs (InputLayer)         [(None, None)]       0                                            
__________________________________________________________________________________________________
enc_padding_mask (Lambda)       (None, 1, 1, None)   0           inputs[0][0]                     
__________________________________________________________________________________________________
encoder (Functional)            (None, None, 256)    9982208     inputs[0][0]                     
                                                                 enc_padding_mask[0][0] 

# 4. 모델 훈련

In [19]:
EPOCHS = 10
model.fit(train_dataset, epochs=EPOCHS, validation_data=val_dataset)

Epoch 1/10




Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7d01f960ee50>

# 5. 챗봇 테스트

In [21]:
response_dict = {
    0: "일상적인 대화입니다.",
    1: "이별 관련 대화입니다.",
    2: "사랑에 관한 대화네요."
}

def predict_label(sentence):
    sentence = preprocess_sentence(sentence)
    sentence_seq = tokenizer.texts_to_sequences([sentence])
    sentence_seq = tf.keras.preprocessing.sequence.pad_sequences(sentence_seq, maxlen=MAX_LENGTH, padding='post')

    predictions = model([sentence_seq, sentence_seq], training=False)
    predicted_label = tf.argmax(predictions, axis=-1).numpy()[0]

    return predicted_label

test_sentences = [
    "이별한 지 열흘 되었어요",
    "1지망 학교 떨어졌어",
    "사랑에 빠진 것 같아요.",
    "최근에 힘든 이별을 겪었어요."
]

for sentence in test_sentences:
    predicted_label = predict_label(sentence)
    response = response_dict[predicted_label]
    print(f"질문: {sentence}")
    print(f"챗봇: {response}\n")


질문: 이별한 지 열흘 되었어요
챗봇: 이별 관련 대화입니다.

질문: 1지망 학교 떨어졌어
챗봇: 일상적인 대화입니다.

질문: 사랑에 빠진 것 같아요.
챗봇: 사랑에 관한 대화네요.

질문: 최근에 힘든 이별을 겪었어요.
챗봇: 이별 관련 대화입니다.

