<a href="https://colab.research.google.com/github/sleepyMS/Transformer-chatbot/blob/main/%EA%B8%B0%EA%B3%84%ED%95%99%EC%8A%B5_%EB%AA%A8%EB%8D%B8%EC%8B%9C%EC%97%B0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# load_and_run_model.py

# -*- coding: utf-8 -*-
"""Load and Run Transformer Chatbot Model

This script loads the trained Transformer chatbot model and tokenizer, and starts a conversation with the user.
"""

# 필요한 라이브러리 설치 및 임포트
!pip install konlpy
!pip install git+https://github.com/ssut/py-hanspell.git

import tensorflow as tf
import numpy as np
import re
import pickle
from konlpy.tag import Okt
from hanspell import spell_checker

Collecting konlpy
  Downloading konlpy-0.6.0-py2.py3-none-any.whl.metadata (1.9 kB)
Collecting JPype1>=0.7.0 (from konlpy)
  Downloading jpype1-1.5.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading konlpy-0.6.0-py2.py3-none-any.whl (19.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.4/19.4 MB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jpype1-1.5.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (493 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m493.8/493.8 kB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: JPype1, konlpy
Successfully installed JPype1-1.5.1 konlpy-0.6.0
Collecting git+https://github.com/ssut/py-hanspell.git
  Cloning https://github.com/ssut/py-hanspell.git to /tmp/pip-req-build-2he1fh8h
  Running command git clone --filter=blob:none --quiet https://github.com/ssut/py-hanspell.git /tmp/pip-req-build-2he1fh8h
  Resolved https:/

In [None]:


# 형태소 분석기 객체 생성
okt = Okt()

# 문장 전처리 함수 정의
def preprocess_sentence(sentence):
    sentence = sentence.lower()
    sentence = re.sub(r"[^ㄱ-ㅎ가-힣a-zA-Z0-9?.!,/]+", " ", sentence)  # '/'를 유지
    sentence = sentence.strip()
    return sentence

# 형태소 분석 및 품사 태깅 함수 정의
def tokenize_sentence(sentence):
    return okt.pos(sentence)

# 토큰 및 품사 태그를 사용하여 문장 복원 함수 정의
def reconstruct_sentence(pos_tokens):
    sentence = ''
    for word, pos in pos_tokens:
        if pos in ['Josa', 'Suffix', 'Eomi', 'Punctuation']:
            sentence = sentence.rstrip() + word  # 앞 공백 제거하고 붙임
        else:
            sentence += word + ' '
    return sentence.strip()

# 디토큰화 함수 정의
def detokenize_sentence(tokens):
    sentence = ' '.join(tokens)
    pos_tokens = okt.pos(sentence)
    return reconstruct_sentence(pos_tokens)

# 토크나이저 로드
with open('/content/drive/MyDrive/Colab Notebooks/연애챗봇데이터/tokenizer.pkl', 'rb') as handle:
    tokenizer = pickle.load(handle)

vocab_size = len(tokenizer.word_index) + 1

# 하이퍼파라미터 설정 (훈련 시 사용한 것과 동일하게 설정)
num_layers = 4
d_model = 128
dff = 512
num_heads = 4
dropout_rate = 0.1
MAX_LENGTH = 60

# 필요한 함수 및 클래스 정의 (훈련 코드에서 사용한 모든 함수와 클래스를 동일하게 정의)

# 포지셔널 인코딩 함수 정의
def get_angles(pos, i, d_model):
    angles = 1 / np.power(10000, (2*(i//2)) / np.float32(d_model))
    return pos * angles

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                            np.arange(d_model)[np.newaxis, :],
                            d_model)

    # 짝수 인덱스: 사인 함수
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

    # 홀수 인덱스: 코사인 함수
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

# 마스킹 함수 정의
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    # (batch_size, 1, 1, seq_len)
    return seq[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(seq):
    seq_len = tf.shape(seq)[1]
    look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
    padding_mask = create_padding_mask(seq)
    return tf.maximum(look_ahead_mask, padding_mask)

# 스케일드 닷 프로덕트 어텐션 함수 정의
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)

    # 스케일링
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    # 마스킹
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    # 소프트맥스
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)

    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

    return output, attention_weights

# 멀티헤드 어텐션 클래스 정의
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, **kwargs):
        super(MultiHeadAttention, self).__init__(**kwargs)

        assert d_model % num_heads == 0

        self.num_heads = num_heads
        self.d_model = d_model

        self.depth = d_model // num_heads

        self.wq = tf.keras.layers.Dense(d_model)  # 쿼리 가중치
        self.wk = tf.keras.layers.Dense(d_model)  # 키 가중치
        self.wv = tf.keras.layers.Dense(d_model)  # 값 가중치

        self.dense = tf.keras.layers.Dense(d_model)  # 출력 가중치

    def get_config(self):
        config = super(MultiHeadAttention, self).get_config()
        config.update({
            'd_model': self.d_model,
            'num_heads': self.num_heads,
        })
        return config

    def split_heads(self, x, batch_size):
        # 마지막 차원을 (num_heads, depth)로 분할
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        # (batch_size, num_heads, seq_len, depth)
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask=None):
        batch_size = tf.shape(q)[0]

        # 선형 변환
        q = self.wq(q)  # (batch_size, seq_len_q, d_model)
        k = self.wk(k)  # (batch_size, seq_len_k, d_model)
        v = self.wv(v)  # (batch_size, seq_len_v, d_model)

        # 헤드 분할
        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        # 어텐션 계산
        scaled_attention, _ = scaled_dot_product_attention(q, k, v, mask)

        # 헤드 결합
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        # 출력 레이어
        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output

# 포지션 와이즈 피드 포워드 네트워크
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
        tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
    ])

# 인코더 레이어 정의
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1, **kwargs):
        super(EncoderLayer, self).__init__(**kwargs)

        self.mha = MultiHeadAttention(d_model, num_heads)  # 멀티헤드 어텐션
        self.ffn = point_wise_feed_forward_network(d_model, dff)  # 포지션 와이즈 FFN

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)  # 레이어 정규화
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)  # 드롭아웃
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)

        self.d_model = d_model
        self.num_heads = num_heads
        self.dff = dff
        self.dropout_rate = dropout_rate

    def get_config(self):
        config = super(EncoderLayer, self).get_config()
        config.update({
            'd_model': self.d_model,
            'num_heads': self.num_heads,
            'dff': self.dff,
            'dropout_rate': self.dropout_rate,
        })
        return config

    def call(self, x, training=None, mask=None):
        attn_output = self.mha(x, x, x, mask)  # 멀티헤드 어텐션
        attn_output = self.dropout1(attn_output, training=training)  # 드롭아웃
        out1 = self.layernorm1(x + attn_output)  # 잔차 연결과 레이어 정규화

        ffn_output = self.ffn(out1)  # 포지션 와이즈 FFN
        ffn_output = self.dropout2(ffn_output, training=training)  # 드롭아웃
        out2 = self.layernorm2(out1 + ffn_output)  # 잔차 연결과 레이어 정규화

        return out2

# 인코더 정의
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff,
                 input_vocab_size, maximum_position_encoding, dropout_rate=0.1, **kwargs):
        super(Encoder, self).__init__(**kwargs)

        self.d_model = d_model
        self.num_layers = num_layers

        # 임베딩 레이어와 포지셔널 인코딩
        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

        # 인코더 레이어
        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, dropout_rate)
                           for _ in range(num_layers)]

        self.dropout = tf.keras.layers.Dropout(dropout_rate)

        self.input_vocab_size = input_vocab_size
        self.maximum_position_encoding = maximum_position_encoding
        self.dropout_rate = dropout_rate

    def get_config(self):
        config = super(Encoder, self).get_config()
        config.update({
            'num_layers': self.num_layers,
            'd_model': self.d_model,
            'input_vocab_size': self.input_vocab_size,
            'maximum_position_encoding': self.maximum_position_encoding,
            'dropout_rate': self.dropout_rate,
        })
        return config

    def call(self, x, training=None, mask=None):
        seq_len = tf.shape(x)[1]

        # 임베딩과 포지셔널 인코딩
        x = self.embedding(x)  # (batch_size, input_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        # 인코더 레이어 통과
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training=training, mask=mask)

        return x  # (batch_size, input_seq_len, d_model)

# 디코더 레이어 정의
class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1, **kwargs):
        super(DecoderLayer, self).__init__(**kwargs)

        self.mha1 = MultiHeadAttention(d_model, num_heads)  # 마스크드 멀티헤드 어텐션
        self.mha2 = MultiHeadAttention(d_model, num_heads)  # 인코더-디코더 어텐션

        self.ffn = point_wise_feed_forward_network(d_model, dff)  # 포지션 와이즈 FFN

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)  # 레이어 정규화
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)  # 드롭아웃
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout3 = tf.keras.layers.Dropout(dropout_rate)

        self.d_model = d_model
        self.num_heads = num_heads
        self.dff = dff
        self.dropout_rate = dropout_rate

    def get_config(self):
        config = super(DecoderLayer, self).get_config()
        config.update({
            'd_model': self.d_model,
            'num_heads': self.num_heads,
            'dff': self.dff,
            'dropout_rate': self.dropout_rate,
        })
        return config

    def call(self, x, enc_output, training=None,
             look_ahead_mask=None, padding_mask=None):
        attn1 = self.mha1(x, x, x, look_ahead_mask)  # 마스크드 멀티헤드 어텐션
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(x + attn1)  # 잔차 연결과 레이어 정규화

        attn2 = self.mha2(enc_output, enc_output, out1, padding_mask)  # 인코더-디코더 어텐션
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(out1 + attn2)  # 잔차 연결과 레이어 정규화

        ffn_output = self.ffn(out2)  # 포지션 와이즈 FFN
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(out2 + ffn_output)  # 잔차 연결과 레이어 정규화

        return out3

# 디코더 정의
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff,
                 target_vocab_size, maximum_position_encoding, dropout_rate=0.1, **kwargs):
        super(Decoder, self).__init__(**kwargs)

        self.d_model = d_model
        self.num_layers = num_layers

        # 임베딩 레이어와 포지셔널 인코딩
        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

        # 디코더 레이어
        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, dropout_rate)
                           for _ in range(num_layers)]

        self.dropout = tf.keras.layers.Dropout(dropout_rate)

        self.target_vocab_size = target_vocab_size
        self.maximum_position_encoding = maximum_position_encoding
        self.dropout_rate = dropout_rate

    def get_config(self):
        config = super(Decoder, self).get_config()
        config.update({
            'num_layers': self.num_layers,
            'd_model': self.d_model,
            'target_vocab_size': self.target_vocab_size,
            'maximum_position_encoding': self.maximum_position_encoding,
            'dropout_rate': self.dropout_rate,
        })
        return config

    def call(self, x, enc_output, training=None,
             look_ahead_mask=None, padding_mask=None):
        seq_len = tf.shape(x)[1]

        # 임베딩과 포지셔널 인코딩
        x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        # 디코더 레이어 통과
        for i in range(self.num_layers):
            x = self.dec_layers[i](x, enc_output, training=training,
                                   look_ahead_mask=look_ahead_mask,
                                   padding_mask=padding_mask)

        return x  # (batch_size, target_seq_len, d_model)

# 트랜스포머 모델 정의
class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff,
                 vocab_size, pe_input, pe_target, dropout_rate=0.1, **kwargs):
        super(Transformer, self).__init__(**kwargs)

        self.encoder = Encoder(num_layers, d_model, num_heads, dff,
                               vocab_size, pe_input, dropout_rate)
        self.decoder = Decoder(num_layers, d_model, num_heads, dff,
                               vocab_size, pe_target, dropout_rate)

        self.final_layer = tf.keras.layers.Dense(vocab_size)

        self.num_layers = num_layers
        self.d_model = d_model
        self.num_heads = num_heads
        self.dff = dff
        self.vocab_size = vocab_size
        self.pe_input = pe_input
        self.pe_target = pe_target
        self.dropout_rate = dropout_rate

    def get_config(self):
        config = super(Transformer, self).get_config()
        config.update({
            'num_layers': self.num_layers,
            'd_model': self.d_model,
            'num_heads': self.num_heads,
            'dff': self.dff,
            'vocab_size': self.vocab_size,
            'pe_input': self.pe_input,
            'pe_target': self.pe_target,
            'dropout_rate': self.dropout_rate,
        })
        return config

    def call(self, inputs, training=None):
        enc_input = inputs['inputs']
        dec_input = inputs['dec_inputs']

        enc_padding_mask, look_ahead_mask, dec_padding_mask = self.create_masks(enc_input, dec_input)

        # 인코더 출력
        enc_output = self.encoder(enc_input, training=training, mask=enc_padding_mask)  # (batch_size, inp_seq_len, d_model)

        # 디코더 출력
        dec_output = self.decoder(dec_input, enc_output, training=training,
                                  look_ahead_mask=look_ahead_mask, padding_mask=dec_padding_mask)  # (batch_size, tar_seq_len, d_model)

        # 최종 출력
        final_output = self.final_layer(dec_output)  # (batch_size, tar_seq_len, vocab_size)

        return final_output

    def create_masks(self, enc_input, dec_input):
        # 인코더 패딩 마스크
        enc_padding_mask = create_padding_mask(enc_input)

        # 디코더 패딩 마스크
        dec_padding_mask = create_padding_mask(enc_input)

        # 룩어헤드 마스크
        look_ahead_mask = create_look_ahead_mask(dec_input)
        dec_target_padding_mask = create_padding_mask(dec_input)
        combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

        return enc_padding_mask, combined_mask, dec_padding_mask

# 커스텀 객체 사전 정의
custom_objects = {
    'Transformer': Transformer,
    'Encoder': Encoder,
    'Decoder': Decoder,
    'EncoderLayer': EncoderLayer,
    'DecoderLayer': DecoderLayer,
    'MultiHeadAttention': MultiHeadAttention,
    'tf': tf,  # 필요한 경우 추가
}

# 모델 로드
# model = tf.keras.models.load_model('transformer_chatbot_model.keras', custom_objects=custom_objects)
model = tf.keras.models.load_model('/content/drive/MyDrive/Colab Notebooks/연애챗봇데이터/transformer_chatbot_model.keras', custom_objects=custom_objects, compile=False)
# 응답 생성 함수 정의 (맥락을 기억하도록 수정)


In [None]:
def evaluate(sentence, history, max_history=1):
    # 입력 문장 전처리 및 토큰화
    sentence = reconstruct_sentence(tokenize_sentence(preprocess_sentence(sentence)))
    history.append('<usr> ' + sentence)
    if len(history) > max_history * 2:
        history = history[-(max_history * 2):]  # 사용자와 시스템 발화 모두 포함
    combined_sentence = ' '.join(history)

    input_sequence = tokenizer.texts_to_sequences([combined_sentence])
    input_sequence = tf.keras.preprocessing.sequence.pad_sequences(
        input_sequence, maxlen=MAX_LENGTH, padding='post')
    input_tensor = tf.convert_to_tensor(input_sequence)

    output = tf.expand_dims([tokenizer.word_index['<sys>']], 0)
    for i in range(MAX_LENGTH):
        predictions = model(inputs={'inputs': input_tensor, 'dec_inputs': output}, training=False)
        predictions = predictions[:, -1:, :]  # 마지막 단어만 선택
        predicted_id = tf.argmax(predictions, axis=-1, output_type=tf.int32)

        if predicted_id == tokenizer.word_index['<end>']:
            break

        output = tf.concat([output, predicted_id], axis=-1)

    predicted_sentence = tokenizer.sequences_to_texts(output.numpy())[0]
    predicted_sentence = predicted_sentence.replace('<sys>', '').replace('<end>', '').strip()

    # 디토큰화 적용
    predicted_sentence = detokenize_sentence(predicted_sentence.split())

    # 응답을 대화 맥락에 추가
    history.append('<sys> ' + predicted_sentence)
    if len(history) > max_history * 2:
        history = history[-(max_history * 2):]

    return predicted_sentence, history


In [None]:

# 채팅 함수 정의
def chat():
    print("챗봇과의 대화를 시작합니다. 종료하려면 '종료'를 입력하세요.\n")
    conversation_history = []
    while True:
        user_input = input("당신: ")
        if user_input.lower() in ['종료', 'quit', 'exit']:
            print("대화를 종료합니다.")
            break
        response, conversation_history = evaluate(user_input, conversation_history)
        print("여자친구:", response)

# 채팅 시작
if __name__ == "__main__":
    chat()


챗봇과의 대화를 시작합니다. 종료하려면 '종료'를 입력하세요.

