# Attention 신경망 구현 및 학습

In [None]:
!pip install konlpy

Collecting konlpy
  Downloading konlpy-0.5.2-py2.py3-none-any.whl (19.4 MB)
[K     |████████████████████████████████| 19.4 MB 227 kB/s 
Collecting colorama
  Downloading colorama-0.4.4-py2.py3-none-any.whl (16 kB)
Collecting JPype1>=0.7.0
  Downloading JPype1-1.3.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (448 kB)
[K     |████████████████████████████████| 448 kB 60.1 MB/s 
Collecting beautifulsoup4==4.6.0
  Downloading beautifulsoup4-4.6.0-py3-none-any.whl (86 kB)
[K     |████████████████████████████████| 86 kB 7.3 MB/s 
Installing collected packages: JPype1, colorama, beautifulsoup4, konlpy
  Attempting uninstall: beautifulsoup4
    Found existing installation: beautifulsoup4 4.6.3
    Uninstalling beautifulsoup4-4.6.3:
      Successfully uninstalled beautifulsoup4-4.6.3
Successfully installed JPype1-1.3.0 beautifulsoup4-4.6.0 colorama-0.4.4 konlpy-0.5.2


In [None]:
import random
import tensorflow as tf
from konlpy.tag import Okt

## 하이퍼 파라미터

In [None]:
num_epochs=200
vocab_size=2000

# Encoder

In [None]:
class Encoder(tf.keras.Model):
  def __init__(self):
    super(Encoder, self).__init__()
    self.emb  = tf.keras.layers.Embedding(vocab_size, 64)

    # 인코더의 매 시퀀스의 리턴을 받아서 이를 K, V로 사용한다.
    self.lstm = tf.keras.layers.LSTM(512, return_sequences=True, return_state=True)

  def call(self, x, training=False):
    x = self.emb(x)
    
    hidden_states, last_hidden_state, last_cell_state = self.lstm(x) # 매 시퀀스의 hidden state들을 K, V로 사용해야 한다.
    return hidden_states, last_hidden_state, last_cell_state # 리턴

# Decoder

In [None]:
class Decoder(tf.keras.Model):
  def __init__(self):
    super(Decoder, self).__init__()
    self.emb = tf.keras.layers.Embedding(vocab_size, 64)
    self.lstm = tf.keras.layers.LSTM(512, return_sequences=True, return_state=True)

    # Attention 매커니즘 추가
    # LSTM 출력에 Attention Value를 concat 해서 dense쪽으로 넘겨주는 것 -> Attention Mechanism
    self.att = tf.keras.layers.Attention()
    self.dense = tf.keras.layers.Dense(vocab_size, activation='softmax')

  def call(self, inputs, training=False):
    # x : Shifted Output
    # s0 : t-1 번째 Decoder의 Hidden State
    # c0 : t-1 번째 Decoder의 Cell State
    # H : 인코더의 Hidden States. 얘를 K, V로 사용한다.
    x, s0, c0, H = inputs

    # s0 : shape=(32, 512)

    x = self.emb(x)
    S, h, c = self.lstm(x, initial_state=[s0, c0])

    # S : (32, 64, 512) - Training=True면 64개의 x가 순서대로 들어가서 TimeStep이 쌓이니까..
    #                   - Training=False면 쌓여가는 Sequence 별로 쌓이게 된다. (32, 1, 512) - (32, 2, 512) - (32, 3, 512)

    # Attention 으로 넘어가는 State를 따로 편집해야 한다..
    # 이전 스텝의 모든 내용을 이어서 Query(S_)로 만들기

    # s0[:, tf.newaxis, :] : 1개의 time step(tf.newaxis)을 추가!
    # S[:, -1, :] : 마지막 Hidden State의 출력은 내지 않는다.
    S_ = tf.concat([s0[:, tf.newaxis, :], S[:, :-1, :]], axis=1) # axis차원 1로 합치기
    
    # S_ : ( 32, 64, 512 ) - 훈련 시에...
    
    A = self.att([S_, H]) # S_ (Query)를 먼저 입력하고, K, V -> H를 입력해 주면 된다.
    # A : 각 스텝 마다의 Attention Value 등장 (32, 64, 512)
    
    # Dense로 넣어 주기 전에 Dense로 들어가게 될 Hidden State (S), 
    V = tf.concat([S, A], axis=-1) # 한 줄로 쭉 이어질 수 있게 axis는 마지막 차원으로

    y = self.dense(V)

    return y, h, c

# Seq2Seq

In [None]:
class Seq2seq(tf.keras.Model):
  
  def __init__(self, sos, eos):
    super(Seq2seq, self).__init__()
    self.sos = sos # decoder에서 사용되어질 sos
    self.eos = eos # encoder에서 사용되어질 eos

    self.enc = Encoder()
    self.dec = Decoder()

  def call(self, inputs, training=False):
    
    if training:

      x, y = inputs
      H, h, c = self.enc(x)
      
      # K, V로 사용되어지는 hidden_states(H) 를 디코더에 넣어줌.
      y, _, __ = self.dec((y, h, c, H))
      
      return y
    else:
      x = inputs # x : 질문 문장
      H, h, c = self.enc(x) # 전체 Hidden States(H) 리턴 - K, V
      
      y = tf.convert_to_tensor(self.sos)
      y = tf.reshape(y, (1, 1)) 

      seq = tf.TensorArray(tf.int32, 64)

      for idx in tf.range(64):
        y, h, c = self.dec([y, h, c, H])  # 마지막에 K, V만 넣어줌
        y = tf.cast(tf.argmax(y, axis=-1), dtype=tf.int32)
        y = tf.reshape(y, (1, 1))
        seq = seq.write(idx, y)

        if y == self.eos:
          break
      
      return tf.reshape(seq.stack(), (1, 64))

# 학습, 테스트 루프 정의

In [None]:
@tf.function
def train_step(model, inputs, labels, loss_object, optimizer, train_loss, train_accuarcy):
  # labels는 <sos>, <eos> 를 포함한 정보
  # output_labels : <sos>를 제외하고 <eos>를 포함해서 만든다.
  output_labels = labels[:, 1:]
  # shifted_lables : <sos>를 포함하고 <eos>를 제외해서 만든다.
  shifted_labels = labels[:, :-1]

  with tf.GradientTape() as tape:
    # inputs : x의 역할. Encoder에 들어감
    # shifted_labels : Encoder가 예측하고, 예측해야 할 데이터
    predictions = model([inputs, shifted_labels], training=True) # 예측을 하고
    loss = loss_object(output_labels, predictions) # 정답이 이거였어~ 라고 이야기 하는 것
  
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients,model.trainable_variables))
  
  train_loss(loss)
  train_accuracy(output_labels, predictions)

@tf.function
def test_step(model, inputs):
  # 입력 데이터만 주고 추론은 모델이 알아서 할 수 있도록...
  return model(inputs, training=False)

# 데이터셋 준비
* http://www.aihub.or.kr

In [None]:
from konlpy.tag import Okt

dataset_file = "chatbot_data.csv"
okt = Okt()

In [None]:
with open(dataset_file, 'r') as file:
  lines = file.readlines()
  seq = [line for line in lines]

In [None]:
seq[:6]

['아이스아메리카노 하나요\n',
 '테이크아웃하실 건가요?\n',
 '저 카푸치노로 주문할게요\n',
 '시럽은 얼마나 뿌려드릴까요?\n',
 '저 도장 다 모았는데 나중에 써도 되나요?\n',
 '네 다음에 써도 됩니다\n']

In [None]:
questions = seq[::2]
answers = ["\t " + lines for lines in seq[1::2]] # \t : <sos>

print(questions[:3])
print(answers[:3])

['아이스아메리카노 하나요\n', '저 카푸치노로 주문할게요\n', '저 도장 다 모았는데 나중에 써도 되나요?\n']
['\t 테이크아웃하실 건가요?\n', '\t 시럽은 얼마나 뿌려드릴까요?\n', '\t 네 다음에 써도 됩니다\n']


# 데이터 잘라내기

In [None]:
num_samples = len(questions)
print(num_samples)

500


In [None]:
term = list(range(num_samples))
print("섞이기 전 : {}".format(term[:10]))
# 랜덤 시드 고정
random.seed(0)
random.shuffle(term)

print("섞인 후 : {}".format(term[:10]))

섞이기 전 : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
섞인 후 : [419, 459, 130, 431, 370, 26, 201, 56, 366, 108]


* questions : 입력 데이터(inputs)
* answers : 예측 레이블 (outputs)

In [None]:
train_q = [] # X_train
train_a = [] # y_train

test_q = [] # X_test
test_a = [] # y_test

In [None]:
test_ratio = 0.2
test_cnt = int(len(questions) * test_ratio)

train_indices = term[test_cnt: ]
test_indices  = term[:test_cnt]

for idx in train_indices:
  train_q.append(questions[idx])
  train_a.append(answers[idx])

for idx in test_indices:
  test_q.append(questions[idx])
  test_a.append(answers[idx])

In [None]:
test_q[:3], test_a[:3]

(['사이즈업해서 주세요\n', '캐러멜 드리블이랑 통 잡아 칩이요\n', '시즌 메뉴와 함께 구성되어 있는 세트 메뉴가 있나요?\n'],
 ['\t 네 결제는 어떻게 도와드릴까요?\n',
  '\t 6700원 결제 도와드리겠습니다\n',
  '\t 네 치즈 케이크와 시즌 메뉴 두 잔으로 구성된 세트 메뉴 있습니다\n'])

# 토크나이징

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

tokenizer = Tokenizer(num_words=vocab_size, filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~')

In [None]:
tokenizer.fit_on_texts(train_q + train_a) # 질문과 대답의 모든 내용을 토큰화
print(tokenizer.word_index)

{'\t': 1, '\n': 2, '네': 3, '주세요\n': 4, '아메리카노': 5, '한': 6, '아이스': 7, '있나요': 8, '수': 9, '드시고': 10, '드릴까요': 11, '잔': 12, '따뜻한': 13, '있습니다\n': 14, '아니요': 15, '그럼': 16, '하나': 17, '가시나요': 18, '어떤': 19, '여기': 20, '다': 21, '더': 22, '걸로': 23, '포인트': 24, '건': 25, '되나요': 26, '라테': 27, '한잔': 28, '가능한가요': 29, '알겠습니다\n': 30, '할게요\n': 31, '음료': 32, '쿠폰': 33, '안': 34, '사용': 35, '아이스로': 36, '결제': 37, '게': 38, '사이즈로': 39, '같이': 40, '사이즈는': 41, '제일': 42, '카드로': 43, '카페라테': 44, '먹고': 45, '해주세요\n': 46, '몇': 47, '어떻게': 48, '주문': 49, '도와드리겠습니다\n': 50, '됩니다\n': 51, '케이크': 52, '얼마인가요': 53, '거': 54, '이': 55, '매장에서': 56, '두': 57, '건가요': 58, '테이크': 59, '많이': 60, '다른': 61, '울리면': 62, '가능합니다\n': 63, '얼마에요': 64, '사이즈업': 65, '개': 66, '휘핑크림': 67, '샷': 68, '디카페인': 69, '번호': 70, '주세요': 71, '없으신가요': 72, '4500원입니다\n': 73, '가능합니다': 74, '있어요\n': 75, '총': 76, '가능하세요\n': 77, '거예요\n': 78, '카드': 79, '커피는': 80, '혹시': 81, '결제는': 82, '영수증': 83, '갈': 84, '둘': 85, '잘': 86, '할인': 87, '현금영수증': 88, '치즈케이크': 89, '드릴게요\n': 90, '필요한': 91

정수 인코딩

In [None]:
train_q_seq = tokenizer.texts_to_sequences(train_q)
train_a_seq = tokenizer.texts_to_sequences(train_a)

test_q_seq  = tokenizer.texts_to_sequences(test_q)
test_a_seq  = tokenizer.texts_to_sequences(test_a)

train_q_seq[:3], train_a_seq[:3]

([[170, 239, 4], [3, 171], [3, 240, 52, 4]],
 [[1, 774, 775, 776, 777, 778, 9, 368],
  [1, 22, 91, 25, 72, 2],
  [1, 123, 162, 11, 2]])

패딩 후 최종 데이터 마련하기

In [None]:
# 문장의 최대길이 64로 설정 했음!
X_train = pad_sequences(
    train_q_seq,
    value=0,
    padding='pre',
    maxlen=64
)

y_train = pad_sequences(
    train_a_seq,
    value=0,
    padding='post',
    maxlen=65 # <sos>, <eos>
)

X_test = pad_sequences( test_q_seq, value=0, padding='pre', maxlen=64 )
y_test = pad_sequences( test_a_seq, value=0, padding='post', maxlen=65 )

In [None]:
X_train[0], y_train[0]

(array([  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0, 170, 239,   4],
       dtype=int32),
 array([  1, 774, 775, 776, 777, 778,   9, 368,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0],
       dtype=int32))

In [None]:
train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(1024).batch(32).prefetch(1024) # prefetch : 데이터를 미리 저장할 공간을 의미
test_ds  = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(1).prefetch(1024)

# 학습 환경 정의
모델 생성, 손실 함수, 최적화 알고리즘, 평가지표 정의

In [None]:
# 모델 생성
model = Seq2seq(
    sos=tokenizer.word_index["\t"],
    eos=tokenizer.word_index["\n"]
)

# Loss 선정. 정수 인코딩된 결과를 t로 사용, softmax 이용한 정수값을 예측으로 쓰니까 sparse_categorical_crossentropy
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

# 모델 평가 방식
train_loss = tf.keras.metrics.Mean(name="train_loss")
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

# 학습 루프 동작

In [None]:
EPOCHS = 200
for epoch in range(EPOCHS):
  for seqs, labels in train_ds:
    train_step(model, seqs, labels, loss_object, optimizer, train_loss, train_accuracy)
  
  print("Epoch : {}, Loss : {:.3f}, Accuracy : {:.3f}".format(epoch + 1,
                                                      train_loss.result(),
                                                      train_accuracy.result() * 100))
  
  train_loss.reset_states()
  train_accuracy.reset_states()

ValueError: ignored

Accuracy가 좋은 이유는?? `Teacher Forcing` 했으니까 좋을 수 밖에..

# 테스트 루프 만들기

In [None]:
for test_seq, test_labels in test_ds:
  prediction = test_step(model, test_seq)
  
  test_q = tokenizer.sequences_to_texts(test_seq.numpy()) # 질문
  test_a = tokenizer.sequences_to_texts(test_labels.numpy()) # 실제 대답
  test_p = tokenizer.sequences_to_texts(prediction.numpy()) # 챗봇의 대답

  print("______")
  print("질문 : \t{}".format(test_q))
  print("실제 대답 : {}".format(test_a))
  print("챗봇 대답 : {}".format(test_p))
