# Seq2Seq Q&A Chatbot 구현

### 데이터 취득

In [1]:
import numpy as np
import pandas as pd

df = pd.read_csv('https://raw.githubusercontent.com/songys/Chatbot_data/refs/heads/master/ChatbotData.csv')

df = df[['Q', 'A']]
df

Unnamed: 0,Q,A
0,12시 땡!,하루가 또 가네요.
1,1지망 학교 떨어졌어,위로해 드립니다.
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.
4,PPL 심하네,눈살이 찌푸려지죠.
...,...,...
11818,훔쳐보는 것도 눈치 보임.,티가 나니까 눈치가 보이는 거죠!
11819,훔쳐보는 것도 눈치 보임.,훔쳐보는 거 티나나봐요.
11820,흑기사 해주는 짝남.,설렜겠어요.
11821,힘든 연애 좋은 연애라는게 무슨 차이일까?,잘 헤어질 수 있는 사이 여부인 거 같아요.


In [2]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11823 entries, 0 to 11822
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Q       11823 non-null  object
 1   A       11823 non-null  object
dtypes: object(2)
memory usage: 184.9+ KB


### 데이터 전처리

##### 토커나이저 학습 (sentencepiece 활용)

- 접두사, 접미사 처리 (문장의 처음과 끝에 붙는 bos, eos)
    - Train() 인자 cmd 옵션을 추가 (`<bos>, <eos>, <pad>, <oov> ...`)
    - set_encode_extra_options(':') <br>
        set_encode_extra_options('bos:') <br>
        set_encode_extra_options(':eos') <br>
        set_encode_extra_options('bos:eos') <br>

In [3]:
import sentencepiece as spt

with open("chatbot_corpus.txt", "w", encoding="utf-8") as f:
    for q, a in zip(df['Q'], df['A']):
        f.write(str(q).strip() + "\n")
        f.write(str(a).strip() + "\n")

input_file = 'chatbot_corpus.txt'
vocab_size = 9500
model_prefix = 'chatbot_spt'
cmd = f'--input={input_file} --model_prefix={model_prefix} --vocab_size={vocab_size}'

spt.SentencePieceTrainer.Train(cmd)

In [4]:
sp = spt.SentencePieceProcessor()
sp.Load(f'{model_prefix}.model')

for doc in df['Q'].values[:3]:  # 질문(Q) 데이터 중 앞의 3개만 테스트
    print("원문:", doc)
    print("Pieces:", sp.encode_as_pieces(doc))  # 토큰 단위 출력
    print("Ids:", sp.encode_as_ids(doc))        # 인덱스 번호 출력
    print()

원문: 12시 땡!
Pieces: ['▁12', '시', '▁', '땡', '!']
Ids: [4291, 568, 4, 7826, 63]

원문: 1지망 학교 떨어졌어
Pieces: ['▁1', '지망', '▁학교', '▁떨어졌어']
Ids: [250, 7060, 722, 1585]

원문: 3박4일 놀러가고 싶다
Pieces: ['▁3', '박', '4', '일', '▁놀러가고', '▁싶다']
Ids: [283, 1537, 2625, 94, 2745, 89]



##### 학습용 데이터 Q_input, A_input, A_target 생성

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

VOCAB_SIZE = 9500

Q_inputs, A_inputs, A_targets = [], [], []

for q, a in zip(df['Q'], df['A']):
    Q_inputs.append(str(q))
    A_inputs.append('<bos> ' + str(a))
    A_targets.append(str(a) + ' <eos>')

print(Q_inputs[:5])
print(A_inputs[:5])
print(A_targets[:5])

['12시 땡!', '1지망 학교 떨어졌어', '3박4일 놀러가고 싶다', '3박4일 정도 놀러가고 싶다', 'PPL 심하네']
['<bos> 하루가 또 가네요.', '<bos> 위로해 드립니다.', '<bos> 여행은 언제나 좋죠.', '<bos> 여행은 언제나 좋죠.', '<bos> 눈살이 찌푸려지죠.']
['하루가 또 가네요. <eos>', '위로해 드립니다. <eos>', '여행은 언제나 좋죠. <eos>', '여행은 언제나 좋죠. <eos>', '눈살이 찌푸려지죠. <eos>']


In [6]:
tokenizer = Tokenizer(num_words=VOCAB_SIZE, oov_token='<oov>', filters='')
tokenizer.fit_on_texts(Q_inputs + A_inputs + A_targets)

In [7]:
# 시퀀스 변환
Q_seq = tokenizer.texts_to_sequences(Q_inputs)
A_input_seq = tokenizer.texts_to_sequences(A_inputs)
A_target_seq = tokenizer.texts_to_sequences(A_targets)

In [8]:
# 패딩 처리
max_len_q = 15
max_len_a = 22

Q_seq = pad_sequences(Q_seq, maxlen=max_len_q, padding='pre')
A_input_seq = pad_sequences(A_input_seq, maxlen=max_len_a, padding='post')
A_target_seq = pad_sequences(A_target_seq, maxlen=max_len_a, padding='post')

print(Q_seq.shape, A_input_seq.shape, A_target_seq.shape)
#print(Q_seq[:3], A_input_seq[:3], A_target_seq[:3])
# Q_seq.shape[1] -> 질문 (인코더 입력) 최대 길이
# A_input_seq.shape[1] -> 답변 (디코더 입력) 최대 길이
# A_target_seq.shape[1] -> 디코더 타겟 최대 길이

(11823, 15) (11823, 22) (11823, 22)


### 모델 생성 및 학습

In [9]:
# 임베딩 레이어 생성
from tensorflow.keras.layers import Embedding

EMBEDDING_DIM = 100

embedding_layer = Embedding(
    input_dim = VOCAB_SIZE,
    output_dim = EMBEDDING_DIM,
    trainable = True
)

##### 인코더 생성

In [10]:
from tensorflow.keras import layers, models

LATENT_DIM = 512

encoder_inputs = layers.Input(shape=(max_len_q,))     # 시퀀스 최대 길이 

encoder_embedding = embedding_layer(encoder_inputs)

encoder_outputs, h, c = layers.LSTM(LATENT_DIM, return_state=True)(encoder_embedding)       # 은닉층의 차원을 512로 설정 , 마지막 상태에 대한 값을 반환 
encoder_states = [h, c]

encoder_model = models.Model(inputs=encoder_inputs, outputs=encoder_states)
encoder_model.summary()

##### 디코더 (teacher-forcing 모델) 생성

In [11]:
decoder_inputs = layers.Input(shape=(max_len_a,))

decoder_embedding = embedding_layer(decoder_inputs)

decoder_lstm = layers.LSTM(LATENT_DIM, return_sequences=True, return_state=True)
decoder_outputs, h, c = decoder_lstm(decoder_embedding, initial_state=encoder_states)

decoder_dense = layers.Dense(VOCAB_SIZE, activation='softmax')     # 활성화 함수로 소프트 맥스 , 단어들 사이에서 확률상으로 나올 단어
decoder_outputs = decoder_dense(decoder_outputs)

decoder_teacher_forcing_model = models.Model(
    inputs = [encoder_inputs, decoder_inputs],
    outputs = decoder_outputs
)

decoder_teacher_forcing_model.summary()

##### 학습 진행 compile, fit

In [14]:
decoder_teacher_forcing_model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer = 'adam',
    metrics = ['accuracy']
)

history = decoder_teacher_forcing_model.fit(
    [Q_seq, A_input_seq],
    A_target_seq,
    batch_size=64,
    epochs=50,
    validation_split=0.2
)

Epoch 1/50
[1m 20/148[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m52s[0m 413ms/step - accuracy: 0.6529 - loss: 6.2377

KeyboardInterrupt: 

### 모델 추론

##### 디코더 (추론 모델) 생성

In [None]:
decoder_hidden_state = layers.Input(shape=(LATENT_DIM,))
decoder_cell_state = layers.Input(shape=(LATENT_DIM,))
decoder_states_inputs = [decoder_hidden_state, decoder_cell_state]

decoder_single_input = layers.Input(shape=(1,))

single_decoder_embedding = embedding_layer(decoder_single_input)

lstm_outputs, h, c = decoder_lstm(single_decoder_embedding, initial_state=decoder_states_inputs)
decoder_states = [h, c]

decoder_outputs_ = decoder_dense(lstm_outputs)

decoder_inference_model = models.Model(
    inputs = [decoder_single_input] + decoder_states_inputs,
    outputs = [decoder_outputs_] + decoder_states
)

decoder_inference_model.summary()

##### 추론 함수

In [None]:
def translate(input_seq):
  encoder_states_value = encoder_model.predict(input_seq)
  decoder_states_value = encoder_states_value

  bos_index = tokenizer.word_index['<bos>']
  eos_index = tokenizer.word_index['<eos>']

  target_seq = np.zeros((1, 1))
  target_seq[0, 0] = bos_index

  output_sentence = []

  for _ in range(max_len_a):
    output_tokens, h, c = decoder_inference_model.predict([target_seq] + decoder_states_value)

    pred_proba = output_tokens[0, 0, :]
    pred_index = np.argmax(pred_proba)

    if pred_index == eos_index:
      break

    if pred_index > 0:
      word = tokenizer.index_word[pred_index]
      output_sentence.append(word)

    target_seq[0, 0] = pred_index
    decoder_states_value = [h, c]

  return " ".join(output_sentence)

### 테스트

In [None]:
test_sentence = "너무 피곤해"
seq = tokenizer.texts_to_sequences([test_sentence])
seq = pad_sequences(seq, maxlen=max_len_q, padding='pre')

response = translate(seq)
print("Q:", test_sentence)
print("A:", response)

### 간단한 Chatbot 구현

1. 사용자의 입력을 받아 (인코더의 input으로 넣어야 하니까 처리해주고)
2. 입력 받은 걸 추론 함수에 전달해서
3. 응답을 출력해주고
4. 이 과정을 '종료' 전까지 반복

In [None]:
while True:
  user_input = input('질문을 입력하세요:')

  if user_input.lower() == '종료':
    print('Chatbot이 종료됩니다.')
    break

  seq = tokenizer.texts_to_sequences([user_input])
  seq = pad_sequences(seq, maxlen=max_len_q, padding='pre')

  response = translate(seq)

  print(f'Chatbot : {response}')
  print('-' * 50)