In [None]:
!pip install kiwipiepy

Collecting kiwipiepy
  Downloading kiwipiepy-0.21.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.3 kB)
Collecting kiwipiepy_model<0.22,>=0.21 (from kiwipiepy)
  Downloading kiwipiepy_model-0.21.0.tar.gz (35.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.5/35.5 MB[0m [31m22.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading kiwipiepy-0.21.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.6/7.6 MB[0m [31m25.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: kiwipiepy_model
  Building wheel for kiwipiepy_model (setup.py) ... [?25l[?25hdone
  Created wheel for kiwipiepy_model: filename=kiwipiepy_model-0.21.0-py3-none-any.whl size=35593192 sha256=cb164999e3d9f877cef2ca41897c4b448f8677bc089aa40126934a195b36178b
  Stored in directory: /root/.cache/pip/wheels/b0/16/3d/95053ab5

In [None]:
import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout, InputLayer
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

import re
from kiwipiepy import Kiwi

#1.Seq2Seq 모델을 활용한 챗봇


##1-1.데이터 로드 및 확인
- 한글 자연어처리 데이터 셋인 Korpora 중 챗봇용 데이터 셋인 KoreanChatbotKorpus를 사용
- 데이터 출처 : https://github.com/songys/Chatbot_data
- 챗봇 트레이닝용 문답 페어 : 11,823
- 일상다반사 0, 이별(부정) 1, 사랑(긍정) 2로 레이블링

In [None]:
corpus=pd.read_csv('/content/drive/MyDrive/Colab Notebooks/RNN/data/ChatbotData.csv')
corpus.shape

(11823, 3)

In [None]:
corpus.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11823 entries, 0 to 11822
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Q       11823 non-null  object
 1   A       11823 non-null  object
 2   label   11823 non-null  int64 
dtypes: int64(1), object(2)
memory usage: 277.2+ KB


In [None]:
# 일상다반사 0, 이별(부정) 1, 사랑(긍정) 2로 레이블링
corpus['label'].value_counts()

Unnamed: 0_level_0,count
label,Unnamed: 1_level_1
0,5290
1,3570
2,2963


In [None]:
# 질의 Q와 답변 A로 구성 >> 데이터를 리스트 형태로 저장

df_Q=corpus['Q']
texts=df_Q.values.tolist()

df_A=corpus['A']
pairs=df_A.values.tolist()

list(zip(texts, pairs))[:5]

[('12시 땡!', '하루가 또 가네요.'),
 ('1지망 학교 떨어졌어', '위로해 드립니다.'),
 ('3박4일 놀러가고 싶다', '여행은 언제나 좋죠.'),
 ('3박4일 정도 놀러가고 싶다', '여행은 언제나 좋죠.'),
 ('PPL 심하네', '눈살이 찌푸려지죠.')]

##1-2.데이터 전처리

In [None]:
# 1) 데이터 정제 >> 특수문자 제거
def clean_sentence(sentence):
    sentence=re.sub(r'[^0-9ㄱ-ㅎㅏ-ㅣ가-힣 ]',r'', sentence)
    return sentence

In [None]:
clean_sentence('12시 땡^^!??')

'12시 땡'

In [None]:
clean_sentence('abcef가나다^^$%@12시 땡^^!??')

'가나다12시 땡'

In [None]:
# 2) 한글 형태소 분석기
kiwi=Kiwi()
def process_morph(sentence):   # 모프
    tokens=kiwi.tokenize(sentence)
    morphs=[token.form for token in tokens]
    return ' '.join(morphs)   # 추출된 형태소들을 공백(' ')으로 이어붙여 하나의 문자열

##1-3. Seq2Seq모델에 사용되는 3가지 데이터셋을 구성
- question : encoder input 데이터셋(질의)
- answer_input : decoder input 데이터셋, SOS 토큰을 문장 처음에 추가
- answer_output : decoder output 데이터셋, EOS 토큰을 문장 마지막에 추가

In [None]:
# 3) 데이터셋 구성
def clean_and_morph(sentence, is_question=True):  # 문장, 질문답변
    sentence=clean_sentence(sentence)    #  1) 데이터 정제
    sentence=process_morph(sentence)     #  2) 형태소 변환

    if is_question:  # 질문
        return sentence
    else:  #  답변은 시작/끝 토큰을 붙여야 디코더 학습시 유용
        return ('<SOS> ' + sentence, sentence + ' <EOS>')    # 공백 Check

In [None]:
# 4) 데이터셋 생성
def preprocess(texts, pairs):
    questions=[]
    answer_in=[]
    answer_out=[]

    # 질의에 대한 전처리
    for text in texts:
        # 전처리와 morph 수행
        question=clean_and_morph(text, is_question=True)  # 3) 호출 >> 1), 2) 호출
        questions.append(question)

    # 답변에 대한 전처리
    for pair in pairs:
        # 전처리와 morph 수행
        in_, out_ = clean_and_morph(pair, is_question=False)
        answer_in.append(in_)
        answer_out.append(out_)

    return questions, answer_in, answer_out

In [None]:
questions, answer_in, answer_out=preprocess(texts, pairs)   # 4) 호출
questions[:5]

['12 시 땡',
 '1 지망 학교 떨어지 었 어',
 '3 박 4 일 놀 러 가 고 싶 다',
 '3 박 4 일 정도 놀 러 가 고 싶 다',
 '심 하 네']

In [None]:
answer_in[:5]

['<SOS> 하루 가 또 가 네요',
 '<SOS> 위로 하 어 드리 ᆸ니다',
 '<SOS> 여행 은 언제나 좋 죠',
 '<SOS> 여행 은 언제나 좋 죠',
 '<SOS> 눈살 이 찌푸리 어 지 죠']

In [None]:
answer_out[:5]

['하루 가 또 가 네요 <EOS>',
 '위로 하 어 드리 ᆸ니다 <EOS>',
 '여행 은 언제나 좋 죠 <EOS>',
 '여행 은 언제나 좋 죠 <EOS>',
 '눈살 이 찌푸리 어 지 죠 <EOS>']

##1-4.토큰화

In [None]:
# len(texts)   # 11823개의 데이터 전부를 학습하기에는 램이 부족하기 때문에 데이터 수를 조절
train_texts=texts[:5000]
train_pairs=pairs[:5000]

questions, answer_in, answer_out=preprocess(train_texts, train_pairs)   # 4) 호출
len(questions), len(answer_in), len(answer_out)

(5000, 5000, 5000)

In [None]:
# 토큰화를 위해 모든 문장을 합쳐줌
all_sentences=questions + answer_in + answer_out
a=(' '.join(questions) + ' '.join(answer_in) + ' '.join(answer_out)).split()
len(a), len(set(a))

(120016, 3352)

In [None]:
# 토큰의 정의
# shift + tab 확인 >> 아무 문자도 제거하지 않음, 대소문자를 구분
tokenizer=Tokenizer(filters='', lower=False, oov_token='<OOV>')   # 아무 것도 필터링하지 않음, 대소문자를 구분,

# 토근 문장에 대한 Word-Index Vocabulary(단어 사전)을 만듭니다.
tokenizer.fit_on_texts(all_sentences)

In [None]:
for word, idx in tokenizer.word_index.items():   # 빈도수 기반으로 idx값 부여
    print(f'{word}\t\t => \t{idx}')
    if idx > 10:
        break

<OOV>		 => 	1
하		 => 	2
어		 => 	3
이		 => 	4
<SOS>		 => 	5
<EOS>		 => 	6
어요		 => 	7
보		 => 	8
는		 => 	9
세요		 => 	10
ᆯ		 => 	11


In [None]:
print(len(tokenizer.word_index))

3351


In [None]:
# 텍스트를 시퀀스로 인코딩
question_seq=tokenizer.texts_to_sequences(questions)
answer_in_seq=tokenizer.texts_to_sequences(answer_in)
answer_out_seq=tokenizer.texts_to_sequences(answer_out)
# len(question_seq), len(answer_in_seq), len(answer_out_seq)

In [None]:
# 2) 문장의 길이 맞추기
MAX_LENGTH=30

# 문장이 30을 넘어갈 경우 뒤에서 자르기, 뒤에서부터 0으로 패딩
question_pad=pad_sequences(question_seq, maxlen=MAX_LENGTH, truncating='post', padding='post')
answer_in_pad=pad_sequences(answer_in_seq, maxlen=MAX_LENGTH, truncating='post', padding='post')
answer_out_pad=pad_sequences(answer_out_seq, maxlen=MAX_LENGTH, truncating='post', padding='post')

question_pad.shape, answer_in_pad.shape, answer_out_pad.shape

((5000, 30), (5000, 30), (5000, 30))

#2.모델 생성

##2-1.Encoder & Decoder & Seq2Seq
- 추후 : 추론, 정수형 인코딩 텍스트 변환 등 확장성

In [None]:
class Encoder(tf.keras.Model):
    # 객체 생성 시 (한번) >> 초기화
    def __init__(self, units, vocab_size, embedding_dim, max_len):
        super(Encoder, self).__init__()
        self.input_layer=InputLayer(shape=(max_len,))
        self.embedding=Embedding(input_dim=vocab_size, output_dim=embedding_dim)
        self.dropout=Dropout(0.2)
        # self.lstm=LSTM(units, return_state=True,  return_sequences=True) # (32, 10, 64)
        self.lstm=LSTM(units, return_state=True) # (32, 10, 64)

    # 모델 호출 시 (매번) >> 순전파 로직
    def call(self, inputs):
        x=self.embedding(inputs)
        x=self.dropout(x)
        x, hidden_state, cell_state=self.lstm(x)
        return [hidden_state, cell_state]

In [None]:
class Decoder(tf.keras.Model):
    # 객체 생성 시 (한번) >> 초기화
    def __init__(self, units, vocab_size, embedding_dim, max_len):
        super(Decoder, self).__init__()
        self.input_layer=InputLayer(shape=(max_len,))
        self.embedding=Embedding(input_dim=vocab_size, output_dim=embedding_dim)
        self.dropout=Dropout(0.2)
        self.lstm=LSTM(units, return_state=True,  return_sequences=True)
        self.dense=Dense(vocab_size, activation='softmax')

   # 모델 호출 시 (매번) >> 순전파 로직
    def call(self, inputs, initial_state):   # 인코더의 마지막 상태를 디코더의 초기 상태
        x=self.embedding(inputs)
        x=self.dropout(x)
        x, hidden_state, cell_state=self.lstm(x, initial_state=initial_state)
        x=self.dense(x)
        return x, hidden_state, cell_state

In [None]:
class Seq2Seq(tf.keras.Model):
    def __init__(self, units, vocab_size, embedding_dim, max_len, start_token, end_token):  # (128, 3364, 100, 30, 5, 6)
        super(Seq2Seq, self).__init__()
        self.start_token=start_token
        self.end_token=end_token
        self.time_steps=max_len

        self.encoder=Encoder(units, vocab_size, embedding_dim, max_len)
        self.decoder=Decoder(units, vocab_size, embedding_dim, max_len)

    def call(self, inputs, training=True):
        if training:   # 학습
            encoder_inputs, decoder_inputs=inputs         # 질문문장, 정답문장
            context_vector=self.encoder(encoder_inputs)   # 문맥벡터를 얻는다. (h, c) >> 디코더의 초기상태
            decoder_outputs, _, _ = self.decoder(inputs=decoder_inputs, initial_state=context_vector)  # 각 시점에서의 softmax 예측값, h, c
            return decoder_outputs
        else:         # 예측(추론)
            context_vector=self.encoder(inputs)  # [hidden_state, cell_state]

            # 상수형 : 2차원 / 디코더에 입력할 첫번째 토큰은 <sos>
            target_seq=tf.constant([[self.start_token]], dtype=tf.float32)


            # 주의). call() 안에서는 numpy 배열 사용하면 안된다.
            #      내부적으로 graph 가 형성되어야 하기 때문에 TF 의 TensorArray
            results=tf.TensorArray(tf.int32, self.time_steps)  # 단어 하나하나를 예측하여 배열에 담아준다.


            # decoder 에 차례대로 토큰 넣고, 결과 내고, 그 결과를 다음 타임스텝에 넣고...를 반복.
            # 결과가 <eos> 일때까지!
            for i in tf.range(self.time_steps):
                decoder_output, decoder_hidden, decoder_cell=self.decoder(target_seq, initial_state=context_vector)

                # decoder 출력을 배열에 담기
                decoder_output=tf.cast(tf.argmax(decoder_output, axis=-1), dtype=tf.int32)   # 확률 벡터(가장 높은 확률을 가진 인덱스)
                # 차원을 바꾸는 이유  (batch_size, sequence_length)
                decoder_output=tf.reshape(decoder_output, shape=(1, 1))   # 다음 시점의 디코더 입력
                results=results.write(i, decoder_output)                  # 예측된 단어(토큰)를 차례대로 저장

                if decoder_output == self.end_token:
                    break

                # 다음 타임스텝에 전달할 입력토큰
                target_seq=decoder_output

                 # 다음 타임스텝에 전달할 context vector <- decoder 가 현재 타임스텝에서 출력한 은닉상태와 셀상태로 만들어 준다.
                context_vector=[decoder_hidden, decoder_cell]

            return tf.reshape(results.stack(), shape=(1, self.time_steps))   # 예측 결과를 한꺼번에 텐서로 묶는다. 1차원형태 패딩으로 채워짐

#3.학습

In [None]:
# 1) 모델 컴파일
seq2seq=Seq2Seq(units=128,
                vocab_size=len(tokenizer.word_index)+1,
                embedding_dim=100,
                max_len=MAX_LENGTH,
                start_token=tokenizer.word_index['<SOS>'],
                end_token=tokenizer.word_index['<EOS>'])

seq2seq.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [None]:
# 2) 예측 함수 : 미리 만들어 놓고 학습 중간중간 예측
def make_prediction(model, question_inputs):
  results=model(inputs=question_inputs, training=False)

  # results의 인덱스를 문장으로 변환하기 위해 1차원으로 변환
  results=np.asarray(results).reshape(-1)
  return results    # 나중에 이 리턴값을 문장으로 변형할 예정.

In [None]:
# index => word 변환
# 나중에 딥러닝 모델이 예측을 하게 되면, 숫자값들이 나오는데,
# 이를 다시 문자열로 바꾸기 위한 함수를 만들어 두자.

def convert_index_to_text(indexes, end_token):
  words=[]

  # 모든 문장에 대해서 반복
  for index in indexes:
    if index == end_token: break   # 문장 끝이면 중지

    # 사전에 존재하는 단어의 경우 추가 - ERROR 방지 예측시 해당단어가 실제 단어로 등록되어 있을때만 처리
    if index > 0 and tokenizer.index_word[index] is not None:   # 0번 인덱스 무시,
      words.append(tokenizer.index_word[index])

  return ' '.join(words)

In [None]:
# 3) 학습
es=EarlyStopping(monitor='loss', patience=5)

for epoch in range(2):   # 20
  print(f'epoch: {epoch * 10 + 1}...')

  seq2seq.fit([question_pad, answer_in_pad],
              answer_out_pad, batch_size=10,
              epochs=10, callbacks=[es])   # 10

    # 랜덤 샘플 추출 : 배열의 길이 내에서 무작위 인덱스 3개
  samples=np.random.randint(len(questions), size=3)

  for idx in samples:
    question_inputs=question_pad[idx]   # 1차원

    # 문자예측
    results=make_prediction(seq2seq, question_inputs.reshape(1, -1))  # 2차원

    # 반환된 인덱스를 문장으로 변환
    results=convert_index_to_text(results, tokenizer.word_index['<EOS>'])

    print(f'Q: {questions[idx]}')
    print(f'A: {results}\n')
    print()

epoch: 1...
Epoch 1/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m55s[0m 110ms/step - accuracy: 0.7768 - loss: 1.2744
Epoch 2/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 109ms/step - accuracy: 0.7999 - loss: 1.1291
Epoch 3/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 110ms/step - accuracy: 0.8107 - loss: 1.0288
Epoch 4/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 110ms/step - accuracy: 0.8212 - loss: 0.9610
Epoch 5/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 106ms/step - accuracy: 0.8328 - loss: 0.8957
Epoch 6/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 106ms/step - accuracy: 0.8367 - loss: 0.8604
Epoch 7/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 107ms/step - accuracy: 0.8449 - loss: 0.8016
Epoch 8/10
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m83s[0m 108ms/step - accuracy: 0.8458 - loss: 0.7804
Epoc

#4.예측

In [None]:
# 전처리 함수 (정제, 형태소변환 등)
def make_question(sentence):
  sentence=clean_and_morph(sentence)
  question_sequence=tokenizer.texts_to_sequences([sentence])
  question_padded=pad_sequences(question_sequence, maxlen=MAX_LENGTH, truncating="post", padding="post")
  return question_padded

In [None]:
make_question('3박4일 놀러가고 싶다')

array([[ 891,  965, 2186,   63,  112,  158,   14,   16,   40,   32,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0]], dtype=int32)

In [None]:
make_question('커피 마시고 싶다.')

array([[218, 186,  16,  40,  32,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
          0,   0,   0,   0]], dtype=int32)

In [None]:
# 예측 결과
def run_chatbot(question):
  question_inputs=make_question(question)
  results=make_prediction(seq2seq, question_inputs)
  results=convert_index_to_text(results, tokenizer.word_index['<EOS>'])
  return results

run_chatbot('안녕하세요')

'잘 하 ᆯ 수 있 을 거 이 예요'

In [None]:
while True:
  user_input=input('<< 말을 걸어보세요!\n')
  if user_input == 'q': break
  print(f'>> 챗봇 응답 : {run_chatbot(user_input)}')

<< 말을 걸어보세요!
공부하기 싫어요
>> 챗봇 응답 : 저 도 요
<< 말을 걸어보세요!
배고파
>> 챗봇 응답 : 저 도 하 고 싶 네요
<< 말을 걸어보세요!
게임할래요?
>> 챗봇 응답 : 저 도 하 고 싶 네요
<< 말을 걸어보세요!
q


- 챗봇 seq2 테디 : https://github.com/teddylee777/machine-learning/blob/master/04-TensorFlow2.0/13-chatbot/02-seq2seq-chatbot-attention.ipynb
- 챗봇 seq2 티스토리 : https://study-oon.tistory.com/entry/Seq2Seq-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%B1%97%EB%B4%87-%EC%83%9D%EC%84%B1
- https://m.blog.naver.com/sungeun09160/223775192091