<a href="https://colab.research.google.com/github/chacha86/pythonai2/blob/main/chatbot_exam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
## 챗봇 트레이닝용 데이터 얻기 -> korpora에서 다운로드 받기

In [None]:
## 필요한 라이브러리 임포트
import pandas as pd
import numpy as np


In [None]:
## 데이터 불러오기

df = pd.read_csv('/content/ChatbotData.csv')
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11823 entries, 0 to 11822
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Q       11823 non-null  object
 1   A       11823 non-null  object
 2   label   11823 non-null  int64 
dtypes: int64(1), object(2)
memory usage: 277.2+ KB


In [None]:
# 질문과 답변 문장을 따로 저장

texts = [] # 질문 저장 리스트
pairs = [] # 답변 저장 리스트

for text, pair in zip(df['Q'], df['A']) :
  texts.append(text)
  pairs.append(pair)

In [None]:
# 질문과 답변 쌍을 5개 확인
for text, pair in zip(texts[:5], pairs[:5]):
  print(text, ' : ', pair)

In [None]:
## 빠르고 간단한 테스트를 위해 특수문자와 영어 제거
import re

def clean_sentence(sentence) :
  sentence = re.sub(r'[^0-9ㄱ-ㅎㅏ-ㅣ가-힣 ]', r'', sentence)
  return sentence

## 함수 잘 작동하는지 테스트
sentence = clean_sentence('12시 땡^^!!??')
sentence

'12시 땡'

In [None]:
## 한국어 문장을 분해하기 위한 라이브러리(형태소 분석)
!pip install konlpy

In [None]:
## 형태소 추출
### 형태소란? 의미를 가지는 요소로서는 더 이상 쪼갤 수 없는 가장 작은 말의 단위
from konlpy.tag import Okt

okt = Okt()

def process_morph(sentence):
  return ' '.join(okt.morphs(sentence))



'안녕하세요 저 는 홍길동 입니다 . 당신 의 성공 을 항상 기원 합니다 . 사랑 합니다 .'

In [None]:
## 위 함수 잘 작동하는지 확인
process_morph('안녕하세요 저는 홍길동입니다. 당신의 성공을 항상 기원합니다. 사랑합니다.')

In [None]:
## 문장을 입력받아 형태소로 쪼개주는 함수
def clean_and_morph(sentence, is_question=True):
  ## 한글만 남기기
  sentence = clean_sentence(sentence)

  ## 형태소로 쪼개기
  sentence = process_morph(sentence)

  if is_question:
    return sentence

  else :
    ## 후에 토크나이저하기 위해서는 공백이 꼭 들어가야 함.
    return ('<S> ' + sentence, sentence + ' <E>')

In [None]:
test_txt = '안녕하세요 저는 홍길동입니다. 당신의 성공을 항상 기원합니다. 사랑합니다.'
clean_and_morph(test_txt, is_question=False)

('<S>안녕하세요 저 는 홍길동 입니다 당신 의 성공 을 항상 기원 합니다 사랑 합니다',
 '안녕하세요 저 는 홍길동 입니다 당신 의 성공 을 항상 기원 합니다 사랑 합니다<E>')

In [None]:
def preprocess(texts, pairs):
  questions = [] # 질문 전처리 결과
  answer_in = [] # 답변 입력 전처리 결과
  answer_out = [] # 답변 출력 전처리 결과

  ## 질문에 대한 전처리
  for text in texts :
    question = clean_and_morph(text, is_question=True)
    questions.append(question)

  ## 답변에 대한 전처리
  for pair in pairs:
    in_, out_ = clean_and_morph(pair, is_question=False) # 디코더에 들어가는 인풋과 아웃풋.
    answer_in.append(in_)
    answer_out.append(out_)

  return questions, answer_in, answer_out

In [None]:
q, ai, ao = preprocess(texts[:3], pairs[:3]) # 전처리 함수 테스트
for s1, s2, s3 in zip(q, ai, ao) :
  print(s1, ':' , s2, ':' , s3)

12시 땡 : <S> 하루 가 또 가네요 : 하루 가 또 가네요 <E>
1 지망 학교 떨어졌어 : <S> 위로 해 드립니다 : 위로 해 드립니다 <E>
3 박 4일 놀러 가고 싶다 : <S> 여행 은 언제나 좋죠 : 여행 은 언제나 좋죠 <E>


In [None]:
test_texts = texts[:1000]
test_pairs = pairs[:1000]

questions, answers_in, answers_out = preprocess(test_texts, test_pairs)

In [None]:
answers_in[:5]
answers_out[:5]

['하루 가 또 가네요 <E>',
 '위로 해 드립니다 <E>',
 '여행 은 언제나 좋죠 <E>',
 '여행 은 언제나 좋죠 <E>',
 '눈살 이 찌푸려지죠 <E>']

In [None]:
## 후에 토크나이저를 한번에 하기 위해 문장을 합쳐줌(리스트 합치기)
all_sentences = questions + answers_in + answers_out

In [None]:
# 전체 형태소 개수 -> 이거 왜 하지?
a = (' '.join(questions) + ' '.join(answers_in) + ' '.join(answers_out)).split()
len(set(a))

2300

In [None]:
import warnings
import tensorflow as tf

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [None]:
## filter => 문장의 특수기호등을 임의로 처리하지 말라. 필터링 하지 말라
## lower => 소문자로 변경하지 마라
## oov_token => 단어 사전에 존재하지 않는 단어라면 ''로 대체
tokenizer = Tokenizer(filters='', lower=False, oov_token='<OOV>')

In [None]:
## 단어 사전 만들기
## 공백을 기준으로 쪼개주는 듯하다
tokenizer.fit_on_texts(all_sentences)

In [None]:
## 각 단어들에 넘버링(벡터화) -> 고유한 단어만 남기기 때문에 2300개임
tokenizer.word_index

2300

In [None]:
## 각 단어와 단어의 인덱스 번호를 확인
for word, idx in tokenizer.word_index.items():
  print(f'{word}\t\t => \t{idx}')
  if idx > 10:
    break

<OOV>		 => 	1
<S>		 => 	2
<E>		 => 	3
이		 => 	4
거		 => 	5
을		 => 	6
가		 => 	7
나		 => 	8
예요		 => 	9
사람		 => 	10
요		 => 	11


In [None]:
VOCAB_SIZE = len(tokenizer.word_index) + 1 # <OOV> 까지 합하면 +1 해야함
VOCAB_SIZE

2301

In [None]:
question_sequences = tokenizer.texts_to_sequences(questions)

for idx in question_sequences[2]:
  print(tokenizer.index_word[idx])

In [None]:
answer_in_sequences = tokenizer.texts_to_sequences(answers_in)
answer_out_sequences = tokenizer.texts_to_sequences(answers_out)

In [None]:
questions[0], answer_in_sequences[0],answer_out_sequences[0], pairs[0], tokenizer.word_index['하루']

('12시 땡', [2, 391, 7, 356, 1234], [391, 7, 356, 1234, 3], '하루가 또 가네요.', 391)

In [None]:
## 딥러닝의 경우 입력값이 항상 일정해야 하므로(네트워크 모델은 입력값에 의해 모양이 바뀌므로 입력값은 바뀌면 안된다.)
MAX_LENGTH = 30 # 최대 몇개의 단어
TRUNCATING = 'post' # 문장이 MAX_LENGTH를 넘어갈 때 앞(pre), 뒤(post) 어디를 자를지 여부
PADDING = 'post' # 문장이 MAX_LENGTH를 채우지 못했을 때 앞(pre), 뒤(post) 어디로 채울지 여부

In [None]:
## 트런케이팅과 패딩 적용하기
question_padded = pad_sequences(question_sequences, maxlen=MAX_LENGTH, truncating=TRUNCATING, padding=PADDING)
answer_in_padded = pad_sequences(answer_in_sequences, maxlen=MAX_LENGTH, truncating=TRUNCATING, padding=PADDING)
answer_out_padded = pad_sequences(answer_out_sequences, maxlen=MAX_LENGTH, truncating=TRUNCATING, padding=PADDING)

In [None]:
## 잘 적용 되었는지 shape로 확인
question_padded.shape, answer_in_padded.shape, answer_out_padded.shape

((1000, 30), (1000, 30), (1000, 30))

In [None]:
## 12 땡이라는 문장이기 때문에 앞에 2개만 존재하고 나머지 28개는 padding 때문에 비어 있음.
question_padded[0]

array([1608, 1609,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0], dtype=int32)

In [None]:
## 단어에는 비교우위가 없으므로 카테고리컬로 데이터로 보고 원핫 인코딩을 해준다.
from tensorflow.keras.utils import to_categorical

# 우선, 단어 인덱스의 최대값을 찾아야 합니다.
# 이 값이 원-핫 벡터의 길이가 됩니다.

# 원-핫 인코딩 수행
data_one_hot = to_categorical(answer_in_padded, num_classes=VOCAB_SIZE)

In [None]:
data_one_hot.shape
data_one_hot

In [None]:
## 모델이 예측한 인코딩된 값을 다시 문자로 디코딩 해주는 함수
def convert_index_to_text(indexes, end_token):
  sentence = ''
  for index in indexes: ## 문장의 순서
    if index == end_token:  ## 문장의 마지막이면 종료
      break
    if index > 0 and tokenizer.index_word[index] is not None: ## 단아 사전에 존재하고 올바른 인덱스라면
      sentence += tokenizer.index_word[index] # 최종 문자열에 이어 붙인다.
    else:
      sentence += '' # 없는 거면 공백.

    sentence += ' ' # 한 형태소가 끝나면 띄어쓰기
  return sentence

In [None]:
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint

## Embbeding -> 카테고리컬 단어값을 고차원으로 바꾸는 것.
## Embbeding을 해야 하는 이유
### 1. 차원의 저주. 원핫 인코딩의 경우 어마어마한 차원이 생성됨. 이를 줄이기 위해 임베딩 공간을 사용
### 2. 단어간의 의미와 유사성 추론 가능. 유사한 단어들 간의 벡터 거리를 계산해 유사도 측정.
### 3. sparse 벡터를 dense 벡터로 변환하여 사용하므로 성능 이점.

## LSTM -> Long Short Term Memory(순환 신경망의 일종)
### RNN은 이전 셀의 내용을 다음 셀이 전이하는데 이 전이 과정이 길어질 수록 기억이 사라지는 현상이 있다.(그래디언트 소실 때문) 시그모이드나 하이퍼볼릭 탄젠트는 미분 될 때마다 기울기가 작아진다.(출력값이 -1 ~ 0 ~ 1 사이이므로. 기울기를 곱하면 작은 값이나 0이 된다.)
### LSTM은 기억을 장기 보존하기 위해 게이트를 구성한 버전 -> 어떻게 해결했나? 여러 게이트를 두어 게이트에 추가 정보를 저장. 학습 과정에서 추가 정보를 살릴지 말지. 얼마나 반영할지 결정한다.


## Dense -> 뉴런 layer를 의미. 뉴런의 개수와 활성화 함수를 정할 수 있다.

## Dropout -> 딥러닝의 규제 방법. 하이퍼 파라미터. 과도한 뉴런은 과적합을 유발할 수 있기 때문에, x% 만큼 뉴런을 비활성화 시켜 일반화 성능을 높인다.

## Model
### 텐서플로 기본 신경망 모델. 여러 Layer를 정의할 수 있고, compile 메서드를 이용해 최적화 알고리즘 및 손실함수, 평가지표등을 정할 수 있다.
### 예측과 모델 저장 및 로드도 가능.
### 서브 클래싱 -> 상속 받아 사용자 정의 모델도 생성 가능. 여기서는 서브클래싱용으로 사용.






In [None]:
## 모델 객체
class Encoder(tf.keras.Model):
  def __init__(self, units, vocab_size, embedding_dim, time_steps):
    super(Encoder, self).__init__()

    ## Embedding -> 카테고리컬 단어값을 고차원으로 바꾸는 것(우리는 원핫을 사용함)
    self.embedding = Embedding(vocab_size, embedding_dim, input_length=time_steps) ## 단어 개수, 변환하고자 하는 임베딩 차원, 한 문장의 길이
    self.dropout = Dropout(0.2) ## 과적합을 방지하기 위한 하이퍼파라미터. 임의로 20% 뉴런을 잡아서 비활성화 시킴
    self.lstm = LSTM(units, return_state=True) ## 최종 히든 스테이트를 얻어야 벡터 콘텍스트에 넣을 수 있음.

  def call(self, inputs):
    x = self.embedding(inputs) ## 임베딩 세팅
    x = self.dropout(x) ## 과적합 방지 파라미터 세팅
    x, hidden_state, cell_state =self.lstm(x) ## 정답, 히든 스테이트, 셀 스테이트

    return [hidden_state, cell_state]

In [None]:
class Decoder(tf.keras.Model):
  def __init__(self, units, vocab_size, embedding_dim, time_steps):
    super(Decoder, self).__init__()
    self.embedding = Embedding(vocab_size, embedding_dim, input_length=time_steps)
    self.dropout = Dropout(0.2)
    self.lstm = LSTM(units,
                     return_state=True, # 스테이트값을 알아야 다음 셀에서 진행 가능
                     return_sequences=True ## 각 유닛의 스테이트값을 다 얻어서 결과를 얻어야 하므로
    )
    self.dense = Dense(vocab_size, activation='softmax') ## 결과를 얻기 위한 출력층

  def call(self, inputs, initial_state): ## initial_state는 encoder의 출력값
    x = self.embedding(inputs)
    x = self.dropout(x)
    x, hidden_state, cell_state = self.lstm(x, initial_state=initial_state)
    x = self.dense(x) # 최종 결과값은 출력층을 거쳐 결과를 낸다

    return x, hidden_state, cell_state


In [None]:
class Seq2seq(tf.keras.Model):
  def __init__(self, units, vocab_size, embedding_dim, time_steps, start_token, end_token):
    super(Seq2seq, self).__init__()

    self.start_token = start_token
    self.end_token = end_token
    self.time_steps = time_steps # 문장의 길이(30)

    self.encoder = Encoder(units, vocab_size, embedding_dim, time_steps)
    self.decoder = Decoder(units, vocab_size, embedding_dim, time_steps)

  def call(self, inputs, training=True): ## training: 학습용, 예측용 구별
    if training: ## 학습일 땐,
      encoder_inputs, decoder_inputs = inputs ## 인코더, 디코더 모두 동일한 입력값 넣는다.
      context_vector = self.encoder(encoder_inputs) ## 인코더에 넣어서 벡터 얻어냄
      decoder_outputs, _, _ = self.decoder(inputs=decoder_inputs, initial_state=context_vector) ## 얻어낸 인코더의 벡터값을 디코더에 사용

      return decoder_outputs

    else: ## 예측일 땐,
      context_vector = self.encoder(inputs) ##
      target_seq = tf.constant([[self.start_token]], dtype=tf.float32) ## 첫번째는 무조건 ,
      results = tf.TensorArray(tf.int32, self.time_steps) ## 결과 배열. 그래프 그리기 위해 텐서 배열로 담는다.

      for i in tf.range(self.time_steps):
        decoder_output, decoder_hidden, decoder_cell = self.decoder(target_seq, initial_state=context_vector)
        decoder_output = tf.cast(tf.argmax(decoder_output, axis=-1), dtype=tf.int32)
        decoder_output = tf.reshape(decoder_output, shape=(1, 1))
        results = results.write(i, decoder_output)

        if decoder_output == self.end_token:
          break

        target_seq = decoder_output
        context_vector = [decoder_hidden, decoder_cell]

      return tf.reshape(results.stack(), shape=(1, self.time_steps))

In [None]:
BUFFER_SIZE = 1000
BATCH_SIZE = 16
EMBEDDING_DIM = 100
TIME_STEPS = MAX_LENGTH

START_TOKEN = tokenizer.word_index['']
END_TOKEN = tokenizer.word_index['']
UNITS = 128

VOCAB_SIZE = len(tokenizer.word_index) + 1
DATA_LENGTH = len(questions)
SAMPLE_SIZE = 3
NUM_EPOCHS = 10

In [None]:
checkpoint_path = 'sample-checkpoint.h5'
checkpoint = ModelCheckpoint(filepath=checkpoint_path,
                             save_best_only=True,
                             monitor='loss',
                             verbose=1,
                             save_weights_only=True)

In [None]:
seq2seq = Seq2seq(UNITS, VOCAB_SIZE, EMBEDDING_DIM, TIME_STEPS, START_TOKEN, END_TOKEN)
seq2seq.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['acc'])

In [None]:
for epoch in range(NUM_EPOCHS):
  print(f"processing epoch : {epoch * 10 + 1} ...")
  seq2seq.fit([question_padded, answer_in_padded], answer_out_one_hot, epochs=10, batch_size=BATCH_SIZE, callbacks=[checkpoint])

  samples = np.random.randint(DATA_LENGTH, size=SAMPLE_SIZE)

  for idx in samples:
    question_inputs = question_padded[idx]
    results = make_prediction(seq2seq, np.expand_dims(question_inputs, 0))

    results = convert_index_to_text(results, END_TOKEN)