In [0]:
from keras import models
from keras import layers
from keras import optimizers, losses, metrics
from keras import preprocessing

import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import re

from konlpy.tag import Okt

Using TensorFlow backend.


## 데이터 로드 및 파라미터 지정

In [0]:
# 태그 단어
PAD = "<PADDING>"
STA = "<START>"
END = "<END>"
OOV = "<OOV>"

# 태그 인덱스
PAD_INDEX = 0
STA_INDEX = 1
END_INDEX = 2
OOV_INDEX = 3

# 데이터 타입
ENCODER_INPUT = 0
DECODER_INPUT = 1
DECODER_TARGET = 2

# 한 문장에서 단어 시퀀스의 최대 개수
max_sequences = 30

# 임베딩 벡터 차원
embedding_dim = 100

# LSTM 히든레이어 차원
lstm_hidden_layer = 128

# 정규표현식 필터
RE_FILTER = re.compile("[.,!?\"':;~()]")

# 챗봇 데이터 로드
chatbot_data = pd.read_csv('./data_in/ChatbotData .csv', encoding='utf-8')
question, answer = list(chatbot_data['Q']), list(chatbot_data['A'])

코랩 RAM 제한으로 데이터 개수 제한

In [0]:
question = question[:100]
answer = answer[:100]

In [0]:
# 형태소 분석 함수
def pos_tag(sentences):
  
  okt = Okt()
  sentences_pos = []
  
  for sentence in sentences:
    # 특수기호 제거
    sentence = re.sub(RE_FILTER, "", sentence)
    
    # 배열인 형태소분석의 출력을 띄어쓰기로 구분하여 붙임
    sentence = " ".join(okt.morphs(sentence))
    sentences_pos.append(sentence)
    
  return sentences_pos

In [0]:
question = pos_tag(question)
answer = pos_tag(answer)

In [0]:
# 질문과 대답 문장들을 하나로 합침
sentences = []
sentences.extend(question)
sentences.extend(answer)

words = []

# 단어들의 배열 생성
for sentence in sentences:
  for word in sentence.split():
    words.append(word)
    
# 길이가 0인 단어는 삭제
words = [word for word in words if len(word) > 0]

# 중복된 단어 삭제
words = list(set(words))

# 제일 앞에 태그 단어 삽입
words[:0] = [PAD, STA, END, OOV]

In [0]:
# 단어 개수
len(words)

454

In [0]:
# 단어 출력
words[:10]

['<PADDING>', '<START>', '<END>', '<OOV>', '하겠어', '참', '가고', '졸려', '친구', '나갔어']

In [0]:
# 단어와 인덱스의 딕셔너리 생성
word_to_index = {word: index for index, word in enumerate(words)}
index_to_word = {index: word for index, word in enumerate(words)}

In [0]:
# 문장을 인덱스로 변환
def convert_text_to_index(sentences, vocabulary, type):
  
  sentences_index = []
  
  # 모든 문장에 대해 반복
  for sentence in sentences:
    sentence_index = []
    
    # 디코더 입력일 경우 맨 앞에 START 태그 추가
    if type == DECODER_INPUT:
      sentence_index.extend([vocabulary[STA]])
      
    # 문장의 단어들을 띄어쓰기로 분리
    for word in sentence.split():
      if vocabulary.get(word) is not None:
        sentence_index.extend([vocabulary[word]])
      else:
        sentence_index.extend([vocabulary[OOV]])
        
    # 최대 길이 검사
    if type == DECODER_TARGET:
      # 디코더 타겟값일 경우 맨 뒤에 END 태그 추가
      if len(sentence_index) >= max_sequences:
        sentence_index = sentence_index[:max_sequences - 1] + [vocabulary[END]]
      else:
        sentence_index += [vocabulary[END]]
    else:
      if len(sentence_index) > max_sequences:
        sentence_index = sentence_index[:max_sequences]
        
    # 최대 길이에 없는 공간은 패딩 인덱스로 채움
    sentence_index += (max_sequences - len(sentence_index)) * [vocabulary[PAD]]
    
    # 문장의 인덱스 배열을 추가
    sentences_index.append(sentence_index)
  
  return np.asarray(sentences_index)

In [0]:
# 인코더 입력 인덱스 반환
x_encoder = convert_text_to_index(question, word_to_index, ENCODER_INPUT)

# 디코더 입력 인덱스 반환
x_decoder = convert_text_to_index(answer, word_to_index, DECODER_INPUT)

# 디코더 타겟 인덱스 반환
y_decoder = convert_text_to_index(answer, word_to_index, DECODER_TARGET)

In [0]:
y_decoder = tf.one_hot(y_decoder, len(word_to_index))

In [0]:
y_decoder.shape

TensorShape([Dimension(100), Dimension(30), Dimension(454)])

## 모델 정의 및 학습하기

In [0]:
# ---------------------------------
# 훈련모델 인코더 정의
# ---------------------------------

# 입력 문장의 인덱스 시퀀스를 입력으로 받음
encoder_inputs = layers.Input(shape=(None,))

# 임베딩 레이어
encoder_outputs = layers.Embedding(len(words), embedding_dim)(encoder_inputs)

# return_state가 True면 상태값 리턴
# LSTM은 state_h(hidden state)와 state_c(cell state) 2개의 상태 존재
encoder_outputs, state_h, state_c = layers.LSTM(lstm_hidden_layer,
                                                dropout=0.1,
                                                recurrent_dropout=0.5,
                                                return_state=True)(encoder_outputs)

# 히든 상태와 셀 상태를 하나로 묶음
encoder_states = [state_h, state_c]


# ---------------------------------
# 훈련모델 디코더 정의
# ---------------------------------

decoder_inputs = layers.Input(shape=(None,))

# 임베딩 레이어
decoder_embedding = layers.Embedding(len(words), embedding_dim)
decoder_outputs = decoder_embedding(decoder_inputs)

# 인코더와 달리 return_sequences를 True로 설정하여 모든 타임 스텝 출력값 리턴
# 모든 타임 스텝의 출력값들을 다음 레이어의 Dense()로 처리하기 위함
decoder_lstm = layers.LSTM(lstm_hidden_layer,
                           dropout=0.1,
                           recurrent_dropout=0.5,
                           return_state=True,
                           return_sequences=True)

# initial_state를 인코더의 상태로 초기화
decoder_outputs, _, _ = decoder_lstm(decoder_outputs,
                                     initial_state=encoder_states)

# 단어의 개수만큼 노드의 개수를 설정하여 원-핫 형식으로 각 단어 인덱스를 출력
decoder_dense = layers.Dense(len(words), activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)


# ---------------------------------
# 훈련모델 정의
# ---------------------------------

# 입력과 출력으로 함수형 API 모델 생성
model = models.Model([encoder_inputs, decoder_inputs], decoder_outputs)

# 학습 방법 설정
model.compile(optimizer='rmsprop',
              loss='categorical_crossentropy',
              metrics=['accuracy'])

W0820 06:07:24.388791 139659571902336 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:74: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead.

W0820 06:07:24.391278 139659571902336 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:517: The name tf.placeholder is deprecated. Please use tf.compat.v1.placeholder instead.

W0820 06:07:24.395896 139659571902336 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:4138: The name tf.random_uniform is deprecated. Please use tf.random.uniform instead.

W0820 06:07:24.550677 139659571902336 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:133: The name tf.placeholder_with_default is deprecated. Please use tf.compat.v1.placeholder_with_default instead.

W0820 06:07:24.562307 

In [0]:
# ---------------------------------
# 예측모델 인코더 정의
# ---------------------------------

# 훈련모델의 인코더 상태를 사용하여 예측 모델 인코더 설정
encoder_model = models.Model(encoder_inputs, encoder_states)


# ---------------------------------
# 예측모델 디코더 정의
# ---------------------------------

# 예측시에는 훈련시와 달리 타임 스텝을 한 단계씩 수행
# 매번 이전 디코더 상태를 입력으로 받아서 새로 설정
decoder_state_input_h = layers.Input(shape=(lstm_hidden_layer,))
decoder_state_input_c = layers.Input(shape=(lstm_hidden_layer,))
decoder_state_inputs = [decoder_state_input_h, decoder_state_input_c]

# 임베딩 레이어
decoder_outputs = decoder_embedding(decoder_inputs)

# LSTM 레이어
decoder_outputs, state_h, state_c = decoder_lstm(decoder_outputs, initial_state=decoder_state_inputs)

# 히든 상태와 셀 상태를 하나로 묶음
decoder_states = [state_h, state_c]

# Dense 레이어를 통해 원-핫 형식으로 각 단어 인덱스를 출력
decoder_outputs  = decoder_dense(decoder_outputs)

# 예측 모델 디코더 설정
decoder_model = models.Model([decoder_inputs] + decoder_state_inputs,
                      [decoder_outputs] + decoder_states)

In [0]:
# 인덱스를 문장으로 변환
def convert_index_to_text(indexs, vocabulary): 
    
    sentence = ''
    
    # 모든 문장에 대해서 반복
    for index in indexs:
        if index == END_INDEX:
            # 종료 인덱스면 중지
            break;
        if vocabulary.get(index) is not None:
            # 사전에 있는 인덱스면 해당 단어를 추가
            sentence += vocabulary[index]
        else:
            # 사전에 없는 인덱스면 OOV 단어를 추가
            sentence.extend([vocabulary[OOV_INDEX]])
            
        # 빈칸 추가
        sentence += ' '

    return sentence

In [0]:
# 에폭 반복
# 훈련 시작
history = model.fit([x_encoder, x_decoder],
                    y_decoder,
                    epochs=100,
                    steps_per_epoch=64,
                    verbose=0)
    
# 정확도와 손실 출력
print('accuracy :', history.history['acc'][-1])
print('loss :', history.history['loss'][-1])
    
# 문장 예측 테스트
# (3 박 4일 놀러 가고 싶다) -> (여행 은 언제나 좋죠)
input_encoder = x_encoder[2].reshape(1, x_encoder[2].shape[0])
input_decoder = x_decoder[2].reshape(1, x_decoder[2].shape[0])
results = model.predict([input_encoder, input_decoder])
    
# 결과의 원핫인코딩 형식을 인덱스로 변환
# 1축을 기준으로 가장 높은 값의 위치를 구함
indexs = np.argmax(results[0], 1) 
    
# 인덱스를 문장으로 변환
sentence = convert_index_to_text(indexs, index_to_word)
print(sentence)
print()

accuracy : 1.0
loss : 3.7689041907373877e-06
여행 은 언제나 좋죠 



In [0]:
# 예측을 위한 입력 생성
def make_predict_input(sentence):

    sentences = []
    sentences.append(sentence)
    sentences = pos_tag(sentences)
    input_seq = convert_text_to_index(sentences, word_to_index, ENCODER_INPUT)
    
    return input_seq

In [0]:
# 텍스트 생성
def generate_text(input_seq):
    
    # 입력을 인코더에 넣어 마지막 상태 구함
    states = encoder_model.predict(input_seq)

    # 목표 시퀀스 초기화
    target_seq = np.zeros((1, 1))
    
    # 목표 시퀀스의 첫 번째에 <START> 태그 추가
    target_seq[0, 0] = STA_INDEX
    
    # 인덱스 초기화
    indexs = []
    
    # 디코더 타임 스텝 반복
    while 1:
        # 디코더로 현재 타임 스텝 출력 구함
        # 처음에는 인코더 상태를, 다음부터 이전 디코더 상태로 초기화
        decoder_outputs, state_h, state_c = decoder_model.predict(
                                                [target_seq] + states)

        # 결과의 원핫인코딩 형식을 인덱스로 변환
        index = np.argmax(decoder_outputs[0, 0, :])
        indexs.append(index)
        
        # 종료 검사
        if index == END_INDEX or len(indexs) >= max_sequences:
            break

        # 목표 시퀀스를 바로 이전의 출력으로 설정
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = index
        
        # 디코더의 이전 상태를 다음 디코더 예측에 사용
        states = [state_h, state_c]

    # 인덱스를 문장으로 변환
    sentence = convert_index_to_text(indexs, index_to_word)
        
    return sentence

## 예측하기

* 데이터셋에 있는 '3박4일 놀러가고 싶다'는 예측을 잘 하였다

In [0]:
input_seq = make_predict_input('3박4일 놀러가고 싶다')
input_seq

array([[440, 438, 429, 161,   6,  86,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0]])

In [0]:
sentence = generate_text(input_seq)
sentence

'여행 은 언제나 좋죠 '

* '아무것도 하기싫다'도 잘 예측하는 모습

In [0]:
input_seq = make_predict_input('아무것도 하기싫다')
sentence = generate_text(input_seq)
sentence

'이럴 때 잘 쉬는 게 중요해요 '

* 하지만 다른 문장들은 쉽게 예측하지 못하였다. 이런 이유는 데이터셋을 100개 정도밖에 사용하지 않았기 때문이고 
데이터셋을 모두 학습시키면 성능이 올라갈 것이다.

In [0]:
input_seq = make_predict_input('집에 불 키고 나왔다')
sentence = generate_text(input_seq)
sentence

'무모한 결정 을 내 리지 마세요 '

데이터셋을 모두 돌리고 싶었지만 코랩 RAM이 자꾸 나가서 세션이 종료되는 바람에 모두 돌리지는 못한게 아쉬움이 크다. 하지만 tensorflow estimator로 구현된 것을 케라스로 이해하면서 바꾸니 쉽게 바꿀 수 있었다.