In [None]:
# Step 1: 데이터 수집하기

In [36]:
import pandas as pd
import urllib.request

def download_chatbot_data(url, save_path):
    """
    챗봇 데이터를 GitHub에서 다운로드하여 로컬에 저장하는 함수.
    
    Args:
        url (str): 데이터 다운로드 URL.
        save_path (str): 저장할 로컬 파일 경로.
    """
    try:
        urllib.request.urlretrieve(url, save_path)
        print(f"데이터 다운로드 성공: {save_path}")
    except Exception as e:
        print(f"데이터 다운로드 실패: {e}")

def load_chatbot_data(file_path):
    """
    송영숙님의 챗봇 데이터를 로드하는 함수.
    
    Args:
        file_path (str): 데이터 파일 경로.
        
    Returns:
        pandas.DataFrame: 로드된 데이터프레임.
    """
    try:
        data = pd.read_csv(file_path)
        print("데이터 로드 성공!")
        return data
    except Exception as e:
        print(f"데이터 로드 실패: {e}")
        return None

# 데이터 다운로드 URL 및 저장 경로
data_url = "https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv"
local_file_path = "ChatbotData.csv"

# 데이터 다운로드
download_chatbot_data(data_url, local_file_path)

# 데이터 로드
chatbot_data = load_chatbot_data(local_file_path)

# 데이터 확인
if chatbot_data is not None:
    print(chatbot_data.head())  # 상위 5개 데이터 출력


데이터 다운로드 성공: ChatbotData.csv
데이터 로드 성공!
                 Q            A  label
0           12시 땡!   하루가 또 가네요.      0
1      1지망 학교 떨어졌어    위로해 드립니다.      0
2     3박4일 놀러가고 싶다  여행은 언제나 좋죠.      0
3  3박4일 정도 놀러가고 싶다  여행은 언제나 좋죠.      0
4          PPL 심하네   눈살이 찌푸려지죠.      0


In [None]:
# Step 2: 데이터 전처리하기

In [37]:
data_info = chatbot_data.shape  # (행 개수, 열 개수)
print(f"전체 데이터 크기: {data_info[0]}행, {data_info[1]}열")

전체 데이터 크기: 11823행, 3열


In [38]:
import re

def preprocess_chatbot_data(data):
    """
    챗봇 데이터를 전처리하는 함수.
    - 특수문자 제거 (숫자와 한글, 공백은 유지)
    - 텍스트 정규화
    - 중복 및 결측치 제거

    Args:
        data (pandas.DataFrame): 원본 데이터프레임

    Returns:
        pandas.DataFrame: 전처리된 데이터프레임
    """
    # 특수문자 제거 (숫자, 한글, 공백만 남김)
    data['Q'] = data['Q'].apply(lambda x: re.sub(r'[^가-힣ㄱ-ㅎㅏ-ㅣ0-9\s]', '', x).strip())
    data['A'] = data['A'].apply(lambda x: re.sub(r'[^가-힣ㄱ-ㅎㅏ-ㅣ0-9\s]', '', x).strip())

    # 중복 제거
    data = data.drop_duplicates()

    # 결측치 제거
    data = data.dropna()

    return data

# 데이터 전처리 실행
preprocessed_data = preprocess_chatbot_data(chatbot_data)

# 전처리 결과 확인
print(preprocessed_data.head())


                 Q           A  label
0            12시 땡   하루가 또 가네요      0
1      1지망 학교 떨어졌어    위로해 드립니다      0
2     3박4일 놀러가고 싶다  여행은 언제나 좋죠      0
3  3박4일 정도 놀러가고 싶다  여행은 언제나 좋죠      0
4              심하네   눈살이 찌푸려지죠      0


In [None]:
# Step 3: SubwordTextEncoder 사용하기

In [39]:
import tensorflow_datasets as tfds

# SubwordTextEncoder를 사용하여 데이터 토크나이징
def tokenize_with_subword(data, column):
    """
    SubwordTextEncoder를 사용하여 데이터를 토크나이징하는 함수.

    Args:
        data (pandas.DataFrame): 전처리된 데이터프레임
        column (str): 토크나이징할 열 이름 (예: 'Q' 또는 'A')

    Returns:
        encoder, tokenized_data: SubwordTextEncoder 객체와 토크나이징된 데이터 리스트
    """
    # 모든 문장을 리스트로 변환
    sentences = data[column].tolist()

    # SubwordTextEncoder 학습
    encoder = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
        sentences, target_vocab_size=2**13
    )

    # 데이터를 토큰화
    tokenized_data = [encoder.encode(sentence) for sentence in sentences]

    return encoder, tokenized_data

# Q와 A 열 각각에 대해 토크나이징 수행
q_encoder, q_tokenized = tokenize_with_subword(preprocessed_data, 'Q')
a_encoder, a_tokenized = tokenize_with_subword(preprocessed_data, 'A')

# 결과 확인
print(f"Q 데이터 토큰화 개수: {len(q_tokenized)}")
print(f"A 데이터 토큰화 개수: {len(a_tokenized)}")
print(f"Q 데이터 예시: {q_tokenized[:2]}")
print(f"A 데이터 예시: {a_tokenized[:2]}")


Q 데이터 토큰화 개수: 11820
A 데이터 토큰화 개수: 11820
Q 데이터 예시: [[8354, 8393, 8596, 8510, 8522], [8339, 8393, 504, 642]]
A 데이터 예시: [[4095, 162, 2838], [760, 2618]]


In [None]:
# Step 4: 트랜스포머 모델 구성하기

In [40]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Embedding, MultiHeadAttention, LayerNormalization, Dropout
from tensorflow.keras.models import Model

# 트랜스포머 블록 클래스 정의
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential([
            Dense(ff_dim, activation="relu"),
            Dense(embed_dim),
        ])
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

# 토큰 및 포지션 임베딩 클래스 정의
class TokenAndPositionEmbedding(tf.keras.layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

# 트랜스포머 모델 구성 함수
def build_transformer_model(vocab_size, maxlen, embed_dim=128, num_heads=4, ff_dim=512, num_blocks=4):
    """
    트랜스포머 기반 모델을 구성하는 함수.

    Args:
        vocab_size (int): 어휘 크기
        maxlen (int): 입력 시퀀스 최대 길이
        embed_dim (int): 임베딩 차원 크기
        num_heads (int): 멀티헤드 어텐션의 헤드 수
        ff_dim (int): 피드포워드 네트워크의 차원 크기
        num_blocks (int): 트랜스포머 블록 수

    Returns:
        tensorflow.keras.Model: 트랜스포머 모델 객체
    """
    inputs = Input(shape=(maxlen,))
    embedding_layer = TokenAndPositionEmbedding(maxlen=maxlen, vocab_size=vocab_size, embed_dim=embed_dim)
    x = embedding_layer(inputs)

    for _ in range(num_blocks):
        x = TransformerBlock(embed_dim=embed_dim, num_heads=num_heads, ff_dim=ff_dim)(x)

    outputs = Dense(vocab_size, activation="softmax")(x)
    model = Model(inputs=inputs, outputs=outputs)
    return model

# 하이퍼파라미터 설정 및 모델 생성
vocab_size = q_encoder.vocab_size  # SubwordTextEncoder에서 생성된 어휘 크기 사용
maxlen = 40  # 입력 시퀀스 최대 길이 설정
transformer_model = build_transformer_model(vocab_size=vocab_size, maxlen=maxlen)

# 모델 요약 출력
transformer_model.summary()


Model: "model_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_5 (InputLayer)         [(None, 40)]              0         
_________________________________________________________________
token_and_position_embedding (None, 40, 128)           1108096   
_________________________________________________________________
transformer_block (Transform (None, 40, 128)           396032    
_________________________________________________________________
transformer_block_1 (Transfo (None, 40, 128)           396032    
_________________________________________________________________
transformer_block_2 (Transfo (None, 40, 128)           396032    
_________________________________________________________________
transformer_block_3 (Transfo (None, 40, 128)           396032    
_________________________________________________________________
dense_30 (Dense)             (None, 40, 8617)          1111

In [None]:
# Step 5: 모델 학습 및 평가

In [41]:
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split

# 데이터 준비 (Step 3에서 생성된 토큰화된 데이터 사용)
q_data = pad_sequences(q_tokenized, maxlen=maxlen, padding='post')  # 질문 데이터 패딩
a_data = pad_sequences(a_tokenized, maxlen=maxlen, padding='post')  # 답변 데이터 패딩

# 학습 데이터와 테스트 데이터 분리
q_train, q_test, a_train, a_test = train_test_split(q_data, a_data, test_size=0.2, random_state=42)

# 데이터 준비 결과 출력
print(f"훈련 데이터 질문 개수: {len(q_train)}")
print(f"테스트 데이터 질문 개수: {len(q_test)}")
print(f"훈련 데이터 답변 개수: {len(a_train)}")
print(f"테스트 데이터 답변 개수: {len(a_test)}")


훈련 데이터 질문 개수: 9456
테스트 데이터 질문 개수: 2364
훈련 데이터 답변 개수: 9456
테스트 데이터 답변 개수: 2364


In [42]:
from tensorflow.keras.optimizers import Adam

# 모델 컴파일
transformer_model.compile(optimizer=Adam(learning_rate=0.001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# 모델 학습
history = transformer_model.fit(
    q_train,
    a_train,
    validation_data=(q_test, a_test),
    batch_size=64,
    epochs=10
)

# 학습 결과 출력
print("모델 학습 완료!")


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
모델 학습 완료!


In [43]:
# 테스트 데이터로 모델 평가
test_loss, test_accuracy = transformer_model.evaluate(q_test, a_test)
print(f"테스트 손실: {test_loss}")
print(f"테스트 정확도: {test_accuracy}")


테스트 손실: 1.0854589939117432
테스트 정확도: 0.8894141316413879


In [None]:
# Step 5-2: 한국어 입력 문장에 대한 답변 생성 함수 구현

In [44]:
def predict_answer(input_sentence, model, q_encoder, a_encoder, maxlen):
    """
    입력 문장에 대해 모델이 예측한 답변을 생성하는 함수.

    Args:
        input_sentence (str): 사용자 입력 문장
        model (tensorflow.keras.Model): 학습된 트랜스포머 모델
        q_encoder (SubwordTextEncoder): 질문 토크나이저
        a_encoder (SubwordTextEncoder): 답변 토크나이저
        maxlen (int): 입력 시퀀스 최대 길이

    Returns:
        str: 모델이 생성한 답변
    """
    # 입력 문장을 토큰화
    input_tokens = q_encoder.encode(input_sentence)
    input_tokens = pad_sequences([input_tokens], maxlen=maxlen, padding='post')

    # 모델 예측 수행
    predictions = model.predict(input_tokens)

    # 가장 높은 확률의 토큰 선택
    predicted_tokens = np.argmax(predictions[0], axis=-1)

    # 종료 토큰(<end>) 이후의 토큰 제거 (수동 설정 필요 시 사용)
    if 0 in predicted_tokens:  # 0은 일반적으로 패딩 토큰으로 사용됨
        predicted_tokens = predicted_tokens[:predicted_tokens.tolist().index(0)]

    # 토큰을 텍스트로 디코딩
    answer = a_encoder.decode(predicted_tokens)

    return answer




In [45]:
# 예시 입력 문장
input_sentence = " 이봐요"
predicted_answer = predict_answer(input_sentence, transformer_model, q_encoder, a_encoder, maxlen)

# 결과 출력
print(f"입력: {input_sentence}")
print(f"출력: {predicted_answer}")

입력:  이봐요
출력: 좋은 찾아보세요


In [None]:







# 모델을 인코더, 디코더 구조로 해서 다시 시도








In [46]:
from tensorflow.keras.layers import Input, Dense, Embedding, MultiHeadAttention, LayerNormalization, Dropout, Concatenate
from tensorflow.keras.models import Model

# 인코더 정의
def build_encoder(vocab_size, embed_dim, num_heads, ff_dim, num_blocks, maxlen):
    inputs = Input(shape=(maxlen,))
    embedding_layer = TokenAndPositionEmbedding(maxlen=maxlen, vocab_size=vocab_size, embed_dim=embed_dim)
    x = embedding_layer(inputs)

    for _ in range(num_blocks):
        x = TransformerBlock(embed_dim=embed_dim, num_heads=num_heads, ff_dim=ff_dim)(x)

    return Model(inputs=inputs, outputs=x)

# 디코더 정의
def build_decoder(vocab_size, embed_dim, num_heads, ff_dim, num_blocks, maxlen):
    inputs = Input(shape=(maxlen,))
    encoder_outputs = Input(shape=(maxlen, embed_dim))
    
    embedding_layer = TokenAndPositionEmbedding(maxlen=maxlen, vocab_size=vocab_size, embed_dim=embed_dim)
    x = embedding_layer(inputs)

    for _ in range(num_blocks):
        # 디코더는 인코더 출력과 자기 자신에 대해 어텐션 수행
        attn1 = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)(x, x)
        attn1 = Dropout(0.1)(attn1)
        out1 = LayerNormalization(epsilon=1e-6)(x + attn1)

        attn2 = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)(out1, encoder_outputs)
        attn2 = Dropout(0.1)(attn2)
        out2 = LayerNormalization(epsilon=1e-6)(out1 + attn2)

        ffn_output = Dense(ff_dim, activation="relu")(out2)
        ffn_output = Dense(embed_dim)(ffn_output)
        x = LayerNormalization(epsilon=1e-6)(out2 + ffn_output)

    outputs = Dense(vocab_size, activation="softmax")(x)
    return Model(inputs=[inputs, encoder_outputs], outputs=outputs)

# 인코더-디코더 모델 통합
def build_encoder_decoder_model(vocab_size, embed_dim=128, num_heads=4, ff_dim=512, num_blocks=4, maxlen=40):
    encoder = build_encoder(vocab_size=vocab_size, embed_dim=embed_dim,
                            num_heads=num_heads, ff_dim=ff_dim,
                            num_blocks=num_blocks, maxlen=maxlen)
    
    decoder = build_decoder(vocab_size=vocab_size, embed_dim=embed_dim,
                            num_heads=num_heads, ff_dim=ff_dim,
                            num_blocks=num_blocks, maxlen=maxlen)
    
    encoder_inputs = encoder.input
    decoder_inputs = decoder.input[0]
    
    # 디코더에 인코더 출력 연결
    decoder_outputs = decoder([decoder_inputs, encoder.output])
    
    model = Model(inputs=[encoder_inputs, decoder_inputs], outputs=decoder_outputs)
    return model

# 모델 생성
vocab_size = q_encoder.vocab_size  # 기존 SubwordTextEncoder에서 가져옴
maxlen = 40  # 기존 설정 유지
encoder_decoder_model = build_encoder_decoder_model(vocab_size=vocab_size)

# 모델 요약 출력
encoder_decoder_model.summary()


Model: "model_5"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_6 (InputLayer)            [(None, 40)]         0                                            
__________________________________________________________________________________________________
token_and_position_embedding_1  (None, 40, 128)      1108096     input_6[0][0]                    
__________________________________________________________________________________________________
transformer_block_4 (Transforme (None, 40, 128)      396032      token_and_position_embedding_1[0]
__________________________________________________________________________________________________
transformer_block_5 (Transforme (None, 40, 128)      396032      transformer_block_4[0][0]        
____________________________________________________________________________________________

In [47]:
# 디코더 입력과 출력 데이터 생성
start_token_id = a_encoder.encode('<start>')[0]
end_token_id = a_encoder.encode('<end>')[0]

decoder_input_data = [[start_token_id] + seq for seq in a_tokenized]
decoder_output_data = [seq + [end_token_id] for seq in a_tokenized]

# 패딩 처리
decoder_input_data = pad_sequences(decoder_input_data, maxlen=maxlen, padding='post')
decoder_output_data = pad_sequences(decoder_output_data, maxlen=maxlen, padding='post')

# 학습 데이터 분리 (기존 질문 데이터도 사용)
q_train_enc_inp, q_test_enc_inp, a_train_dec_inp, a_test_dec_inp,\
a_train_dec_outp, a_test_dec_outp = train_test_split(
    q_data,
    decoder_input_data,
    decoder_output_data,
    test_size=0.2,
    random_state=42
)


In [48]:
from tensorflow.keras.optimizers import Adam

# 모델 컴파일 및 학습
encoder_decoder_model.compile(optimizer=Adam(learning_rate=0.001),
                              loss='sparse_categorical_crossentropy',
                              metrics=['accuracy'])

history = encoder_decoder_model.fit(
    [q_train_enc_inp, a_train_dec_inp],
    a_train_dec_outp,
    validation_data=([q_test_enc_inp, a_test_dec_inp], a_test_dec_outp),
    batch_size=64,
    epochs=10
)

# 테스트 평가
test_loss, test_accuracy = encoder_decoder_model.evaluate(
    [q_test_enc_inp, a_test_dec_inp], a_test_dec_outp)
print(f"테스트 손실: {test_loss}")
print(f"테스트 정확도: {test_accuracy}")


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
테스트 손실: 0.1271609514951706
테스트 정확도: 0.9871721863746643


In [53]:
# 예시 입력 문장
input_sentence = "이봐요"

# 모델을 사용하여 답변 생성
predicted_answer = predict_answer(input_sentence, transformer_model, q_encoder, a_encoder, maxlen)

# 결과 출력
print(f"입력: {input_sentence}")
print(f"출력: {predicted_answer}")


입력: 이봐요
출력: 제가 치과에 
