In [23]:
from keras import models
from keras import layers
from keras import optimizers, losses, metrics
from keras import preprocessing

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import re

from konlpy.tag import Okt

In [24]:
# 태그 단어
PAD = "<PADDING>"   # 패딩
STA = "<START>"     # 시작
END = "<END>"       # 끝
OOV = "<OOV>"       # 없는 단어(Out of Vocabulary)

# 태그 인덱스
PAD_INDEX = 0
STA_INDEX = 1
END_INDEX = 2
OOV_INDEX = 3

# 데이터 타입
ENCODER_INPUT  = 0
DECODER_INPUT  = 1
DECODER_TARGET = 2

# 한 문장에서 단어 시퀀스의 최대 개수
max_sequences = 30

# 임베딩 벡터 차원
embedding_dim = 100

# LSTM 히든레이어 차원
lstm_hidden_dim = 128

# 정규 표현식 필터
RE_FILTER = re.compile("[.,!?\"':;~()]")

# 챗봇 데이터 로드
chatbot_data = pd.read_csv('chatbot/ChatbotData.csv', encoding='utf-8')
question, answer = list(chatbot_data['Q']), list(chatbot_data['A'])

In [25]:
# 데이터 개수
len(question)


11823

In [26]:
# 데이터의 일부만 학습에 사용
question = question[:100]
answer = answer[:100]

# 챗봇 데이터 출력
for i in range(10):
    print('Q : ' + question[i])
    print('A : ' + answer[i])
    print()


Q : 12시 땡!
A : 하루가 또 가네요.

Q : 1지망 학교 떨어졌어
A : 위로해 드립니다.

Q : 3박4일 놀러가고 싶다
A : 여행은 언제나 좋죠.

Q : 3박4일 정도 놀러가고 싶다
A : 여행은 언제나 좋죠.

Q : PPL 심하네
A : 눈살이 찌푸려지죠.

Q : SD카드 망가졌어
A : 다시 새로 사는 게 마음 편해요.

Q : SD카드 안돼
A : 다시 새로 사는 게 마음 편해요.

Q : SNS 맞팔 왜 안하지ㅠㅠ
A : 잘 모르고 있을 수도 있어요.

Q : SNS 시간낭비인 거 아는데 매일 하는 중
A : 시간을 정하고 해보세요.

Q : SNS 시간낭비인데 자꾸 보게됨
A : 시간을 정하고 해보세요.



<br>
<br>
# 단어 사전 생성

In [27]:
# 형태소분석 함수
def pos_tag(sentences):
    
    # KoNLPy 형태소분석기 설정
    tagger = Okt()
    
    # 문장 품사 변수 초기화
    sentences_pos = []
    
    # 모든 문장 반복
    for sentence in sentences:
        # 특수기호 제거
        sentence = re.sub(RE_FILTER, "", sentence)
        
        # 배열인 형태소분석의 출력을 띄어쓰기로 구분하여 붙임
        sentence = " ".join(tagger.morphs(sentence))
        sentences_pos.append(sentence)
        
    return sentences_pos

In [28]:
# 형태소분석 수행
question = pos_tag(question)
answer = pos_tag(answer)

# 형태소분석으로 변환된 챗봇 데이터 출력
for i in range(10):
    print('Q : ' + question[i])
    print('A : ' + answer[i])
    print()


Q : 12시 땡
A : 하루 가 또 가네요

Q : 1 지망 학교 떨어졌어
A : 위로 해 드립니다

Q : 3 박 4일 놀러 가고 싶다
A : 여행 은 언제나 좋죠

Q : 3 박 4일 정도 놀러 가고 싶다
A : 여행 은 언제나 좋죠

Q : PPL 심하네
A : 눈살 이 찌푸려지죠

Q : SD 카드 망가졌어
A : 다시 새로 사는 게 마음 편해요

Q : SD 카드 안 돼
A : 다시 새로 사는 게 마음 편해요

Q : SNS 맞팔 왜 안 하지 ㅠㅠ
A : 잘 모르고 있을 수도 있어요

Q : SNS 시간 낭비 인 거 아는데 매일 하는 중
A : 시간 을 정 하고 해보세요

Q : SNS 시간 낭비 인데 자꾸 보게 됨
A : 시간 을 정 하고 해보세요



In [29]:
# 질문과 대답 문장들을 하나로 합침
sentences = []
sentences.extend(question)
sentences.extend(answer)

words = []

# 단어들의 배열 생성
for sentence in sentences:
    for word in sentence.split():
        words.append(word)

# 길이가 0인 단어는 삭제
words = [word for word in words if len(word) > 0]

# 중복된 단어 삭제
words = list(set(words))

# 제일 앞에 태그 단어 삽입
words[:0] = [PAD, STA, END, OOV]

In [30]:
# 단어 개수
len(words)


454

In [31]:
# 단어 출력
words[:20]


['<PADDING>',
 '<START>',
 '<END>',
 '<OOV>',
 '들',
 '살찐',
 '그게',
 '필요하죠',
 '그런거니',
 '나',
 '그럴',
 '에는',
 '낮잠',
 '너무',
 '장난',
 '스트레스',
 '학교',
 '컨트롤',
 '지',
 '까지']

In [32]:
# 단어와 인덱스의 딕셔너리 생성
word_to_index = {word: index for index, word in enumerate(words)}
index_to_word = {index: word for index, word in enumerate(words)}

In [33]:
# 단어 -> 인덱스
# 문장을 인덱스로 변환하여 모델 입력으로 사용
dict(list(word_to_index.items())[:20])


{'<PADDING>': 0,
 '<START>': 1,
 '<END>': 2,
 '<OOV>': 3,
 '들': 4,
 '살찐': 5,
 '그게': 6,
 '필요하죠': 7,
 '그런거니': 8,
 '나': 9,
 '그럴': 10,
 '에는': 11,
 '낮잠': 12,
 '너무': 13,
 '장난': 14,
 '스트레스': 15,
 '학교': 16,
 '컨트롤': 17,
 '지': 18,
 '까지': 19}

In [34]:
# 인덱스 -> 단어
# 모델의 예측 결과인 인덱스를 문장으로 변환시 사용
dict(list(index_to_word.items())[:20])


{0: '<PADDING>',
 1: '<START>',
 2: '<END>',
 3: '<OOV>',
 4: '들',
 5: '살찐',
 6: '그게',
 7: '필요하죠',
 8: '그런거니',
 9: '나',
 10: '그럴',
 11: '에는',
 12: '낮잠',
 13: '너무',
 14: '장난',
 15: '스트레스',
 16: '학교',
 17: '컨트롤',
 18: '지',
 19: '까지'}

<br>
<br>
# 전처리

In [35]:
# 문장을 인덱스로 변환
def convert_text_to_index(sentences, vocabulary, type): 
    
    sentences_index = []
    
    # 모든 문장에 대해서 반복
    for sentence in sentences:
        sentence_index = []
        
        # 디코더 입력일 경우 맨 앞에 START 태그 추가
        if type == DECODER_INPUT:
            sentence_index.extend([vocabulary[STA]])
        
        # 문장의 단어들을 띄어쓰기로 분리
        for word in sentence.split():
            if vocabulary.get(word) is not None:
                # 사전에 있는 단어면 해당 인덱스를 추가
                sentence_index.extend([vocabulary[word]])
            else:
                # 사전에 없는 단어면 OOV 인덱스를 추가
                sentence_index.extend([vocabulary[OOV]])

        # 최대 길이 검사
        if type == DECODER_TARGET:
            # 디코더 목표일 경우 맨 뒤에 END 태그 추가
            if len(sentence_index) >= max_sequences:
                sentence_index = sentence_index[:max_sequences-1] + [vocabulary[END]]
            else:
                sentence_index += [vocabulary[END]]
        else:
            if len(sentence_index) > max_sequences:
                sentence_index = sentence_index[:max_sequences]
            
        # 최대 길이에 없는 공간은 패딩 인덱스로 채움
        sentence_index += (max_sequences - len(sentence_index)) * [vocabulary[PAD]]
        
        # 문장의 인덱스 배열을 추가
        sentences_index.append(sentence_index)

    return np.asarray(sentences_index)

In [36]:
# 인코더 입력 인덱스 변환
x_encoder = convert_text_to_index(question, word_to_index, ENCODER_INPUT)

# 첫 번째 인코더 입력 출력 (12시 땡)
x_encoder[0]


array([323, 436,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0])

In [37]:
# 디코더 입력 인덱스 변환
x_decoder = convert_text_to_index(answer, word_to_index, DECODER_INPUT)

# 첫 번째 디코더 입력 출력 (START 하루 가 또 가네요)
x_decoder[0]


array([  1,  59, 310, 245,  97,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0])

In [38]:
# 디코더 목표 인덱스 변환
y_decoder = convert_text_to_index(answer, word_to_index, DECODER_TARGET)

# 첫 번째 디코더 목표 출력 (하루 가 또 가네요 END)
y_decoder[0]


array([ 59, 310, 245,  97,   2,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0])

In [39]:
# 원핫인코딩 초기화
one_hot_data = np.zeros((len(y_decoder), max_sequences, len(words)))

# 디코더 목표를 원핫인코딩으로 변환
# 학습시 입력은 인덱스이지만, 출력은 원핫인코딩 형식임
for i, sequence in enumerate(y_decoder):
    for j, index in enumerate(sequence):
        one_hot_data[i, j, index] = 1

# 디코더 목표 설정
y_decoder = one_hot_data

# 첫 번째 디코더 목표 출력
y_decoder[0]

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [1., 0., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.]])

<br>
<br>
# 모델 생성

In [40]:
#--------------------------------------------
# 훈련 모델 인코더 정의
#--------------------------------------------

# 입력 문장의 인덱스 시퀀스를 입력으로 받음
encoder_inputs = layers.Input(shape=(None,))

# 임베딩 레이어
encoder_outputs = layers.Embedding(len(words), embedding_dim)(encoder_inputs)

# return_state가 True면 상태값 리턴
# LSTM은 state_h(hidden state)와 state_c(cell state) 2개의 상태 존재
encoder_outputs, state_h, state_c = layers.LSTM(lstm_hidden_dim,
                                                dropout=0.1,
                                                recurrent_dropout=0.5,
                                                return_state=True)(encoder_outputs)

# 히든 상태와 셀 상태를 하나로 묶음
encoder_states = [state_h, state_c]



#--------------------------------------------
# 훈련 모델 디코더 정의
#--------------------------------------------

# 목표 문장의 인덱스 시퀀스를 입력으로 받음
decoder_inputs = layers.Input(shape=(None,))

# 임베딩 레이어
decoder_embedding = layers.Embedding(len(words), embedding_dim)
decoder_outputs = decoder_embedding(decoder_inputs)

# 인코더와 달리 return_sequences를 True로 설정하여 모든 타임 스텝 출력값 리턴
# 모든 타임 스텝의 출력값들을 다음 레이어의 Dense()로 처리하기 위함
decoder_lstm = layers.LSTM(lstm_hidden_dim,
                           dropout=0.1,
                           recurrent_dropout=0.5,
                           return_state=True,
                           return_sequences=True)

# initial_state를 인코더의 상태로 초기화
decoder_outputs, _, _ = decoder_lstm(decoder_outputs,
                                     initial_state=encoder_states)

# 단어의 개수만큼 노드의 개수를 설정하여 원핫 형식으로 각 단어 인덱스를 출력
decoder_dense = layers.Dense(len(words), activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)



#--------------------------------------------
# 훈련 모델 정의
#--------------------------------------------

# 입력과 출력으로 함수형 API 모델 생성
model = models.Model([encoder_inputs, decoder_inputs], decoder_outputs)

# 학습 방법 설정
model.compile(optimizer='rmsprop',
              loss='categorical_crossentropy',
              metrics=['accuracy'])    

지금까지의 예제는 Sequential 방식의 모델이었습니다. 하지만 이번에는 함수형 API 모델을 사용했습니다. 인코더와 디코더가 따로 분리되어야 하는데, 단순히 레이어를 추가하여 붙이는 순차형으로는 구현이 불가능하기 때문입니다. 

Model() 함수로 입력과 출력을 따로 설정하여 모델을 만듭니다. 그다음 compile과 fit은 이전과 동일하게 적용하시면 됩니다.
<br>
<br>
<br>

In [41]:
#--------------------------------------------
#  예측 모델 인코더 정의
#--------------------------------------------

# 훈련 모델의 인코더 상태를 사용하여 예측 모델 인코더 설정
encoder_model = models.Model(encoder_inputs, encoder_states)



#--------------------------------------------
# 예측 모델 디코더 정의
#--------------------------------------------

# 예측시에는 훈련시와 달리 타임 스텝을 한 단계씩 수행
# 매번 이전 디코더 상태를 입력으로 받아서 새로 설정
decoder_state_input_h = layers.Input(shape=(lstm_hidden_dim,))
decoder_state_input_c = layers.Input(shape=(lstm_hidden_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]    

# 임베딩 레이어
decoder_outputs = decoder_embedding(decoder_inputs)

# LSTM 레이어
decoder_outputs, state_h, state_c = decoder_lstm(decoder_outputs,
                                                 initial_state=decoder_states_inputs)

# 히든 상태와 셀 상태를 하나로 묶음
decoder_states = [state_h, state_c]

# Dense 레이어를 통해 원핫 형식으로 각 단어 인덱스를 출력
decoder_outputs = decoder_dense(decoder_outputs)

# 예측 모델 디코더 설정
decoder_model = models.Model([decoder_inputs] + decoder_states_inputs,
                      [decoder_outputs] + decoder_states)

<br>
<br>
# 훈련 및 테스트

In [42]:
# 인덱스를 문장으로 변환
def convert_index_to_text(indexs, vocabulary): 
    
    sentence = ''
    
    # 모든 문장에 대해서 반복
    for index in indexs:
        if index == END_INDEX:
            # 종료 인덱스면 중지
            break;
        if vocabulary.get(index) is not None:
            # 사전에 있는 인덱스면 해당 단어를 추가
            sentence += vocabulary[index]
        else:
            # 사전에 없는 인덱스면 OOV 단어를 추가
            sentence.extend([vocabulary[OOV_INDEX]])
            
        # 빈칸 추가
        sentence += ' '

    return sentence

In [52]:
# 에폭 반복
for epoch in range(20):
    print('Total Epoch :', epoch + 1)

    # 훈련 시작
    history = model.fit([x_encoder, x_decoder],
                        y_decoder,
                        epochs=100,
                        batch_size=64,
                        verbose=0)
    
    # 정확도와 손실 출력
    print('accuracy :', history.history['accuracy'][-1])
    print('loss :', history.history['loss'][-1])
    
    # 문장 예측 테스트
    # (3 박 4일 놀러 가고 싶다) -> (여행 은 언제나 좋죠)
    input_encoder = x_encoder[2].reshape(1, x_encoder[2].shape[0])
    input_decoder = x_decoder[2].reshape(1, x_decoder[2].shape[0])
    results = model.predict([input_encoder, input_decoder])
    
    # 결과의 원핫인코딩 형식을 인덱스로 변환
    # 1축을 기준으로 가장 높은 값의 위치를 구함
    indexs = np.argmax(results[0], 1) 
    
    # 인덱스를 문장으로 변환
    sentence = convert_index_to_text(indexs, index_to_word)
    print(sentence)
    print()


Total Epoch : 1
accuracy : 0.9240000247955322
loss : 0.3688793182373047
저 은 언제나 좋죠 

Total Epoch : 2
accuracy : 0.9663333296775818
loss : 0.15087465941905975
저 은 언제나 좋죠 

Total Epoch : 3
accuracy : 0.9733333587646484
loss : 0.08889476954936981
저 은 언제나 좋죠 

Total Epoch : 4
accuracy : 0.9789999723434448
loss : 0.0636691004037857
여행 은 언제나 좋죠 

Total Epoch : 5
accuracy : 0.984333336353302
loss : 0.04499061405658722
여행 은 언제나 좋죠 

Total Epoch : 6
accuracy : 0.9903333187103271
loss : 0.03434133902192116
여행 은 언제나 좋죠 

Total Epoch : 7
accuracy : 0.9959999918937683
loss : 0.015586639754474163
여행 은 언제나 좋죠 

Total Epoch : 8
accuracy : 0.9976666569709778
loss : 0.0077430265955626965
여행 은 언제나 좋죠 

Total Epoch : 9
accuracy : 0.9993333220481873
loss : 0.00396965304389596
여행 은 언제나 좋죠 

Total Epoch : 10
accuracy : 0.9993333220481873
loss : 0.00318046472966671
여행 은 언제나 좋죠 

Total Epoch : 11
accuracy : 0.9993333220481873
loss : 0.0028662020340561867
여행 은 언제나 좋죠 

Total Epoch : 12
accuracy : 1.0
loss : 0.0

<br>
<br>
# 문장 생성

In [53]:
# 예측을 위한 입력 생성
def make_predict_input(sentence):

    sentences = []
    sentences.append(sentence)
    sentences = pos_tag(sentences)
    input_seq = convert_text_to_index(sentences, word_to_index, ENCODER_INPUT)
    
    return input_seq

In [55]:
# 텍스트 생성
def generate_text(input_seq):
    
    # 입력을 인코더에 넣어 마지막 상태 구함
    states = encoder_model.predict(input_seq)

    # 목표 시퀀스 초기화
    target_seq = np.zeros((1, 1))
    
    # 목표 시퀀스의 첫 번째에 <START> 태그 추가
    target_seq[0, 0] = STA_INDEX
    
    # 인덱스 초기화
    indexs = []
    
    # 디코더 타임 스텝 반복
    while 1:
        # 디코더로 현재 타임 스텝 출력 구함
        # 처음에는 인코더 상태를, 다음부터 이전 디코더 상태로 초기화
        decoder_outputs, state_h, state_c = decoder_model.predict(
                                                [target_seq] + states)

        # 결과의 원핫인코딩 형식을 인덱스로 변환
        index = np.argmax(decoder_outputs[0, 0, :])
        indexs.append(index)
        
        # 종료 검사
        if index == END_INDEX or len(indexs) >= max_sequences:
            break

        # 목표 시퀀스를 바로 이전의 출력으로 설정
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = index
        
        # 디코더의 이전 상태를 다음 디코더 예측에 사용
        states = [state_h, state_c]

    # 인덱스를 문장으로 변환
    sentence = convert_index_to_text(indexs, index_to_word)
        
    return sentence

In [56]:
# 문장을 인덱스로 변환
input_seq = make_predict_input('3박4일 놀러가고 싶다')
input_seq


array([[428, 384, 117, 167,  23,  60,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0]])

In [57]:
# 예측 모델로 텍스트 생성
sentence = generate_text(input_seq)
sentence


'여행 은 언제나 좋죠 '

In [58]:
# 문장을 인덱스로 변환
input_seq = make_predict_input('3박4일 같이 놀러가고 싶다')
input_seq


array([[428, 384, 117,  81, 167,  23,  60,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0]])

In [59]:
# 예측 모델로 텍스트 생성
sentence = generate_text(input_seq)
sentence


'여행 은 언제나 좋죠 '

In [60]:
# 문장을 인덱스로 변환
input_seq = make_predict_input('3박4일 놀러가려고')
input_seq


array([[428, 384, 117, 167,  22,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0]])

In [61]:
# 예측 모델로 텍스트 생성
sentence = generate_text(input_seq)
sentence


'여행 은 언제나 좋죠 '