#### 1. sin/cos 함수 없이, Embedding Layer
#### 2. Decoder Layer 변경
- 인코더-디코더 어텐션 제거
- 멀티 헤드 어텐션 삭제
- FeedForward 네트워크: GPT-1.0에서는 FFN이 각 디코더 레이어마다 적용
#### 3. Decoder에서 Encoder와의 상호작용 제거
#### 4. 디코더 기반의 생성모델을 위한 데이터 전처리
#### 5. 디코더만 있는 transformer 함수

In [51]:
import tensorflow as tf
import tensorflow_datasets as tfds
import os
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow_datasets as tfds

In [52]:
#1. sin/cos 함수 없이, Embedding Layer
class GPT1PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, max_position_embeddings, d_model):
        super(GPT1PositionalEmbedding, self).__init__()
        self.position_embeddings = tf.keras.layers.Embedding(max_position_embeddings, d_model)

    def call(self, inputs):
        seq_len = tf.shape(inputs)[1]
        # position_ids는 배치 차원과 동일하게 확장된 (1, seq_len)의 형태
        position_ids = tf.range(seq_len)[tf.newaxis, :]  # (1, seq_len)
        
        # position_embeddings는 배치 차원만큼 확장되어야 하므로 inputs와 동일한 배치 크기를 가져야 함
        position_embeddings = self.position_embeddings(position_ids)  # (1, seq_len, d_model)
        
        # inputs와 position_embeddings을 더함
        return inputs + position_embeddings



# 포지셔널 인코딩 레이어 - 단어 벡터에 위치 정보를 반영한 벡터를 더해주는 역할
# class PositionalEncoding(tf.keras.layers.Layer):

#   def __init__(self, position, d_model):
#     super(PositionalEncoding, self).__init__()
#     self.pos_encoding = self.positional_encoding(position, d_model)

#   def get_angles(self, position, i, d_model):
#     angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
#     return position * angles

#   def positional_encoding(self, position, d_model):
#     # 각도 배열 생성
#     angle_rads = self.get_angles(
#         position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
#         i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
#         d_model=d_model)

#     # 배열의 짝수 인덱스에는 sin 함수 적용
#     sines = tf.math.sin(angle_rads[:, 0::2])
#     # 배열의 홀수 인덱스에는 cosine 함수 적용
#     cosines = tf.math.cos(angle_rads[:, 1::2])

#     # sin과 cosine이 교차되도록 재배열
#     pos_encoding = tf.stack([sines, cosines], axis=0)
#     pos_encoding = tf.transpose(pos_encoding,[1, 2, 0]) 
#     pos_encoding = tf.reshape(pos_encoding, [position, d_model])

#     pos_encoding = pos_encoding[tf.newaxis, ...]
#     return tf.cast(pos_encoding, tf.float32)

#   def call(self, inputs):
#     return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]


In [53]:
# 스케일드 닷 프로덕트 어텐션 함수 - "입력 간 관계를 파악해서 중요한 정보를 뽑아내는 역할"
def scaled_dot_product_attention(query, key, value, mask):
  # 어텐션 가중치는 Q와 K의 닷 프로덕트
  matmul_qk = tf.matmul(query, key, transpose_b=True)

  # 가중치를 정규화
  depth = tf.cast(tf.shape(key)[-1], tf.float32)
  logits = matmul_qk / tf.math.sqrt(depth)

  # 패딩에 마스크 추가
  if mask is not None:
    logits += (mask * -1e9)

  # softmax적용
  attention_weights = tf.nn.softmax(logits, axis=-1)

  # 최종 어텐션은 가중치와 V의 닷 프로덕트
  output = tf.matmul(attention_weights, value)
  return output


In [55]:
# 입력 (query, key, value) 
#   → 각각 Dense 적용해서 Q, K, V 만들기 
#   → Head 개수만큼 나눔 (split_heads)
#   → Scaled Dot-Product Attention 계산
#   → 여러 head의 결과를 다시 합치기 (concat)
#   → 최종 Dense 레이어로 정리

class MultiHeadAttention(tf.keras.layers.Layer):

  def __init__(self, d_model, num_heads, name="multi_head_attention"):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0

    self.depth = d_model // self.num_heads

    self.query_dense = tf.keras.layers.Dense(units=d_model)
    self.key_dense = tf.keras.layers.Dense(units=d_model)
    self.value_dense = tf.keras.layers.Dense(units=d_model)

    self.dense = tf.keras.layers.Dense(units=d_model)

    # 머리를 여러 개로 쪼개기 위한 함수
  def split_heads(self, inputs, batch_size): 
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth))
    return tf.transpose(inputs, perm=[0, 2, 1, 3])

  def call(self, inputs):
    query, key, value, mask = inputs['query'], inputs['key'], inputs[
        'value'], inputs['mask']
    batch_size = tf.shape(query)[0]

    # Q, K, V에 각각 Dense 레이어 적용
    query = self.query_dense(query)
    key = self.key_dense(key)
    value = self.value_dense(value)

    # 머리를 여러 개로 나누기 (split heads)
    query = self.split_heads(query, batch_size)
    key = self.split_heads(key, batch_size)
    value = self.split_heads(value, batch_size)

    # 스케일드 닷 프로덕트 어텐션 함수
    scaled_attention = scaled_dot_product_attention(query, key, value, mask)

    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

    # 어텐션 연산 후에 각 결과를 다시 연결(concatenate)합니다
    concat_attention = tf.reshape(scaled_attention,
                                  (batch_size, -1, self.d_model))

    # 최종 결과에도 Dense를 한 번 더 적용합니다
    outputs = self.dense(concat_attention)

    return outputs

In [56]:
# 멀티헤드 어텐션 (Self-Attention) 에 사용하기 위함
def create_padding_mask(x):
  mask = tf.cast(tf.math.equal(x, 0), tf.float32)
  # (batch_size, 1, 1, sequence length)
  return mask[:, tf.newaxis, tf.newaxis, :]

In [57]:
# 마스크드 멀티헤드 어텐션 (Masked Self-Attention) 에 사용하기 위함
def create_look_ahead_mask(x):
  seq_len = tf.shape(x)[1]

  look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
  padding_mask = create_padding_mask(x)
  return tf.maximum(look_ahead_mask, padding_mask)

In [58]:
# 인코더 하나의 레이어를 함수로 구현.
# # 이 하나의 레이어 안에는 두 개의 서브 레이어가 존재합니다.
# def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
#   inputs = tf.keras.Input(shape=(None, d_model), name="inputs")

#   # 패딩 마스크 사용
#   padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

#   # 첫 번째 서브 레이어 : 멀티 헤드 어텐션 수행 (셀프 어텐션)
#   attention = MultiHeadAttention(
#       d_model, num_heads, name="attention")({
#           'query': inputs,
#           'key': inputs,
#           'value': inputs,
#           'mask': padding_mask
#       })

#   # add & norm - 잔차 연결(inputs + attention) + Layer Normalization (정규화) + 드롭아웃 (과적합방지 필요시)
#   attention = tf.keras.layers.Dropout(rate=dropout)(attention)
#   attention = tf.keras.layers.LayerNormalization(
#       epsilon=1e-6)(inputs + attention)

#   # 두 번째 서브 레이어 : Feed Forward Network 
#   outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention) # units 만큼 차원을 늘리고 (relu로 비선형성 부여)
#   outputs = tf.keras.layers.Dense(units=d_model)(outputs)  # 다시 d_model 차원으로 축소해서 출력

#   # add & norm - 잔차 연결(attention + outputs) + Layer Normalization (정규화) + 드롭아웃 (과적합방지 필요시)
#   outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
#   outputs = tf.keras.layers.LayerNormalization(
#       epsilon=1e-6)(attention + outputs)

#   return tf.keras.Model(
#       inputs=[inputs, padding_mask], outputs=outputs, name=name)


In [2]:
# def encoder(vocab_size,
#             num_layers,
#             units,
#             d_model,
#             num_heads,
#             dropout,
#             name="encoder"):
#   inputs = tf.keras.Input(shape=(None,), name="inputs")

#   # 패딩 마스크 사용 - 의미 없는 패딩 토큰들을 무시하게 하기 위함
#   padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

#   # 임베딩 레이어 - 각 단어 ID를 길이 d_model짜리 벡터로 변환
#   embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
#   embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))

#   # 포지셔널 인코딩 - 순서 정보를 가진 벡터를 더해줘서, "단어 + 위치 정보"를 같이 전달해줌.
#   embeddings = GPT1PositionalEmbedding(vocab_size, d_model)(embeddings)

#   outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

#   # num_layers만큼 쌓아올린 인코더의 층.
#   for i in range(num_layers):
#     outputs = encoder_layer(
#         units=units,
#         d_model=d_model,
#         num_heads=num_heads,
#         dropout=dropout,
#         name="encoder_layer_{}".format(i),
#     )([outputs, padding_mask])

#   return tf.keras.Model(
#       inputs=[inputs, padding_mask], outputs=outputs, name=name)


In [59]:
#2. Decoder Layer 변경
# - 인코더-디코더 어텐션 제거
# - 멀티 헤드 어텐션 수행 (셀프 어텐션) 삭제
# - FeedForward 네트워크: GPT-1.0에서는 FFN이 각 디코더 레이어마다 적용


def gpt1_decoder_layer(units, d_model, num_heads, dropout, name="gpt1_decoder_layer"):
    inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
    look_ahead_mask = tf.keras.Input(shape=(1, None, None), name="look_ahead_mask")

    # 첫 번째 서브 레이어 : 마스크드 멀티 헤드 어텐션 수행 - look_ahead_mask
    attention1 = MultiHeadAttention(
    d_model, num_heads, name="attention_1")(inputs={
      'query': inputs,
      'key': inputs,
      'value': inputs,
      'mask': look_ahead_mask
    })
    
    # add & norm - 잔차 연결(attention1 + inputs) + Layer Normalization (정규화)
    attention1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention1 + inputs)

    # 두 번째 서브 레이어 : Feed Forward Network (FFN)
    outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention1)
    outputs = tf.keras.layers.Dense(units=d_model)(outputs)

    # add & norm - 잔차 연결(outputs + attention1) + Layer Normalization (정규화) + 드롭아웃 (과적합방지)
    outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
    outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(outputs + attention1)

    return tf.keras.Model(inputs=[inputs, look_ahead_mask], outputs=outputs, name=name)


# 디코더 하나의 레이어를 함수로 구현.
# 이 하나의 레이어 안에는 세 개의 서브 레이어가 존재합니다.
# def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
#   inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
#   enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
#   look_ahead_mask = tf.keras.Input(
#       shape=(1, None, None), name="look_ahead_mask")
#   padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')

#   # 첫 번째 서브 레이어 : 마스크드 멀티 헤드 어텐션 수행 - look_ahead_mask
#   attention1 = MultiHeadAttention(
#       d_model, num_heads, name="attention_1")(inputs={
#           'query': inputs,
#           'key': inputs,
#           'value': inputs,
#           'mask': look_ahead_mask
#       })

#   # add & norm - 잔차 연결(attention1 + inputs) + Layer Normalization (정규화)
#   attention1 = tf.keras.layers.LayerNormalization(
#       epsilon=1e-6)(attention1 + inputs)

#   # 두 번째 서브 레이어 : 멀티 헤드 어텐션 수행 (셀프 어텐션) - padding_mask
#   attention2 = MultiHeadAttention(
#       d_model, num_heads, name="attention_2")(inputs={
#           'query': attention1,
#           'key': enc_outputs,
#           'value': enc_outputs,
#           'mask': padding_mask
#       })

#   # add & norm - 잔차 연결(attention2 + attention1) + Layer Normalization (정규화) + 드롭아웃 (과적합방지)
#   attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
#   attention2 = tf.keras.layers.LayerNormalization(
#       epsilon=1e-6)(attention2 + attention1)


#   # 세 번째 서브 레이어 : Feed Forward Network 
#   outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention2)
#   outputs = tf.keras.layers.Dense(units=d_model)(outputs)


#   # add & norm - 잔차 연결(outputs + attention2) + Layer Normalization (정규화) + 드롭아웃 (과적합방지)
#   outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
#   outputs = tf.keras.layers.LayerNormalization(
#       epsilon=1e-6)(outputs + attention2)

#   return tf.keras.Model(
#       inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
#       outputs=outputs,
#       name=name)


In [60]:
#3. Decoder에서 Encoder와의 상호작용 제거

def gpt1_decoder(vocab_size,
                  num_layers,
                  units,
                  d_model,
                  num_heads,
                  dropout,
                  name='gpt1_decoder'):
    inputs = tf.keras.Input(shape=(None,), name='inputs')
    look_ahead_mask = tf.keras.Input(shape=(1, None, None), name='look_ahead_mask')

    # 임베딩 레이어 - 각 단어 ID를 길이 d_model짜리 벡터로 변환
    embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
    embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))

    # 포지셔널 인코딩 - 순서 정보를 가진 벡터를 더해줘서, "단어 + 위치 정보"를 같이 전달해줌.
    embeddings = GPT1PositionalEmbedding(vocab_size, d_model)(embeddings)

    # Dropout - 훈련을 돕는 테크닉
    outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

    # 여러 개의 디코더 레이어 추가
    for i in range(num_layers):
        outputs = gpt1_decoder_layer(
            units=units,
            d_model=d_model,
            num_heads=num_heads,
            dropout=dropout,
            name='gpt1_decoder_layer_{}'.format(i),
        )(inputs=[outputs, look_ahead_mask])

    return tf.keras.Model(
        inputs=[inputs, look_ahead_mask],
        outputs=outputs,
        name=name
    )





# def decoder(vocab_size,
#             num_layers,
#             units,
#             d_model,
#             num_heads,
#             dropout,
#             name='decoder'):
#   inputs = tf.keras.Input(shape=(None,), name='inputs')
#   enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
#   look_ahead_mask = tf.keras.Input(
#       shape=(1, None, None), name='look_ahead_mask')

#   # 패딩 마스크 사용 - 의미 없는 패딩 토큰들을 무시하게 하기 위함
#   padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
  
#   # 임베딩 레이어 - 각 단어 ID를 길이 d_model짜리 벡터로 변환
#   embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
#   embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))

#   # 포지셔널 인코딩 - 순서 정보를 가진 벡터를 더해줘서, "단어 + 위치 정보"를 같이 전달해줌.
#   embeddings = GPT1PositionalEmbedding(vocab_size, d_model)(embeddings)

#   # Dropout이라는 훈련을 돕는 테크닉을 수행
#   outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

#   for i in range(num_layers):
#     outputs = gpt1_decoder_layer(
#         units=units,
#         d_model=d_model,
#         num_heads=num_heads,
#         dropout=dropout,
#         name='gpt1_decoder_layer_{}'.format(i),
#     )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])

#   return tf.keras.Model(
#       inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
#       outputs=outputs,
#       name=name)


In [61]:
file_path = os.path.expanduser('/aiffel/aiffel/transformer_chatbot/data/ChatbotData .csv')

data = pd.read_csv(file_path)

In [62]:
#4. 디코더 기반의 생성모델을 위한 데이터 전처리 - "디코더의 입력과 출력 준비 추가"

# 전처리 함수
def preprocess_sentence(sentence):
    sentence = sentence.strip()
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)  # 구두점(마침표, 물음표, 느낌표, 쉼표 등) 양쪽에 공백을 추가하여 처리
    sentence = re.sub(r'[" "]+', " ", sentence)  # 여러 개의 공백을 하나의 공백으로 변환
    sentence = re.sub(r"[^가-힣a-zA-Z?.!,]+", " ", sentence)  # 한글, 알파벳, 구두점 외의 모든 문자를 공백으로 변환
    sentence = sentence.strip()
    return sentence

# CSV 파일 로드 및 전처리 적용
def load_conversations():
    data = pd.read_csv(file_path)
    questions = [preprocess_sentence(q) for q in data['Q']]  # 질문 처리
    answers = [preprocess_sentence(a) for a in data['A']]    # 답변 처리
    
    # 디코더의 입력과 출력 준비
    dec_inputs = [START_TOKEN + ' ' + ans for ans in answers]  # 디코더의 입력: <start> + 답변
    dec_outputs = [ans + ' ' + END_TOKEN for ans in answers]  # 디코더의 출력: 답변 + <end>
    
    return questions, dec_inputs, dec_outputs

# 실행
questions, dec_inputs, dec_outputs = load_conversations()
print('전체 샘플 수:', len(questions))
print('전체 샘플 수:', len(dec_inputs))
print('전체 샘플 수:', len(dec_outputs))



# # 전처리 함수
# def preprocess_sentence(sentence):
#     sentence = sentence.strip()
#     sentence = re.sub(r"([?.!,])", r" \1 ", sentence) # 구두점(마침표, 물음표, 느낌표, 쉼표 등) 양쪽에 공백을 추가하여 처리
#     sentence = re.sub(r'[" "]+', " ", sentence) # 여러 개의 공백을 하나의 공백으로 변환
#     sentence = re.sub(r"[^가-힣a-zA-Z?.!,]+", " ", sentence) # 한글, 알파벳, 구두점 외의 모든 문자를 공백으로 변환
#     sentence = sentence.strip()
#     return sentence

# # CSV 파일 로드 및 전처리 적용
# def load_conversations():
#     data = pd.read_csv(file_path)
#     questions = [preprocess_sentence(q) for q in data['Q']]
#     answers = [preprocess_sentence(a) for a in data['A']]
#     return questions, answers

# # 실행
# questions, answers = load_conversations()
# print('전체 샘플 수:', len(questions))
# print('전체 샘플 수:', len(answers))

전체 샘플 수: 11823
전체 샘플 수: 11823
전체 샘플 수: 11823


In [63]:
print('전처리 후의 22번째 질문 샘플: {}'.format(questions[21]))
print('전처리 후의 22번째 답변 샘플: {}'.format(dec_inputs[21]))
print('전처리 후의 22번째 답변 샘플: {}'.format(dec_outputs[21]))

전처리 후의 22번째 질문 샘플: 가스비 장난 아님
전처리 후의 22번째 답변 샘플: <start> 다음 달에는 더 절약해봐요 .
전처리 후의 22번째 답변 샘플: 다음 달에는 더 절약해봐요 . <end>


In [64]:
# 단어를 더 작은 조각으로 나누는 규칙을 만들고, 그 규칙으로 텍스트를 처리할 수 있게 해주는 토크나이저 생성
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    questions + dec_inputs + dec_outputs,  # 모든 데이터 합쳐서 토크나이저 구축
    target_vocab_size=2**13  # 어휘 크기 설정 (최대 2^13 크기)
)



# # 단어를 더 작은 조각으로 나누는 규칙을 만들고, 그 규칙으로 텍스트를 처리할 수 있게 해주는 토크나이저를 만드는 작업
# tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(questions + answers, target_vocab_size=2**13)


In [65]:
# 시작 토큰과 종료 토큰에 고유한 정수를 부여합니다.
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]

In [66]:
print('START_TOKEN의 번호 :' ,[tokenizer.vocab_size])
print('END_TOKEN의 번호 :' ,[tokenizer.vocab_size + 1])

START_TOKEN의 번호 : [7807]
END_TOKEN의 번호 : [7808]


In [67]:
# 시작 토큰과 종료 토큰을 고려하여 +2를 하여 단어장의 크기를 산정합니다.
VOCAB_SIZE = tokenizer.vocab_size + 2
print(VOCAB_SIZE)

7809


In [68]:
# 샘플의 최대 허용 길이 또는 패딩 후의 최종 길이
MAX_LENGTH = 40

# 정수 인코딩, 최대 길이를 초과하는 샘플 제거, 패딩
def tokenize_and_filter(inputs, outputs):
  tokenized_inputs, tokenized_outputs = [], []
  
  for (sentence1, sentence2) in zip(inputs, outputs):
    # 정수 인코딩 과정에서 시작 토큰과 종료 토큰을 추가
    sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
    sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN

    # 최대 길이 40 이하인 경우에만 데이터셋으로 허용
    if len(sentence1) <= MAX_LENGTH and len(sentence2) <= MAX_LENGTH:
      tokenized_inputs.append(sentence1)
      tokenized_outputs.append(sentence2)
  
  # 최대 길이 40으로 모든 데이터셋을 패딩
  tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_inputs, maxlen=MAX_LENGTH, padding='post')
  tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_outputs, maxlen=MAX_LENGTH, padding='post')
  
  return tokenized_inputs, tokenized_outputs

In [69]:
questions, answers = tokenize_and_filter(questions, answers)
print('단어장의 크기 :',(VOCAB_SIZE))
print('필터링 후의 질문 샘플 개수: {}'.format(len(questions)))
print('필터링 후의 답변 샘플 개수: {}'.format(len(answers)))

단어장의 크기 : 7809
필터링 후의 질문 샘플 개수: 11823
필터링 후의 답변 샘플 개수: 11823


In [70]:
BATCH_SIZE = 64
BUFFER_SIZE = 20000

# 디코더는 이전의 target을 다음의 input으로 사용합니다.
# 이에 따라 outputs에서는 START_TOKEN을 제거하겠습니다.
dataset = tf.data.Dataset.from_tensor_slices((
    {
        'inputs': questions,
        'dec_inputs': answers[:, :-1]
    },
    {
        'outputs': answers[:, 1:]
    },
))

dataset = dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

In [71]:
#5.디코더만 있는 transformer

def gpt1_transformer(vocab_size,
                     num_layers,
                     units,
                     d_model,
                     num_heads,
                     dropout,
                     name="gpt1_transformer"):
    # Encoder와 Decoder 입력 정의
    inputs = tf.keras.Input(shape=(None,), name="inputs")
    dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")

    # 디코더에서 미래의 토큰을 마스크하기 위한 Look-Ahead Mask 생성
    look_ahead_mask = tf.keras.layers.Lambda(
        create_look_ahead_mask,
        output_shape=(1, None, None),
        name="look_ahead_mask"
    )(dec_inputs)

    # 임베딩 레이어: 각 단어 ID를 길이 d_model짜리 벡터로 변환
    embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
    embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))  # 차원 크기의 제곱근을 곱해 스케일링

    # 포지셔널 인코딩 추가
    embeddings = GPT1PositionalEmbedding(vocab_size, d_model)(embeddings)

    # Dropout 적용
    outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
    
    # GPT-1 디코더 레이어 사용 (여기서 gpt1_decoder는 여러 레이어를 처리)
    outputs = gpt1_decoder(
        vocab_size=vocab_size,
        num_layers=num_layers,
        units=units,
        d_model=d_model,
        num_heads=num_heads,
        dropout=dropout,
    )(inputs=[dec_inputs, look_ahead_mask])
    
    
    # 최종 출력 레이어: vocab_size 차원의 출력 생성
    outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(outputs)

    return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)


# def transformer(vocab_size,
#                 num_layers,
#                 units,
#                 d_model,
#                 num_heads,
#                 dropout,
#                 name="transformer"):
#   inputs = tf.keras.Input(shape=(None,), name="inputs")
#   dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")


#   # 디코더에서 미래의 토큰을 마스크 하기 위해서 사용합니다.
#   # 내부적으로 패딩 마스크도 포함되어져 있습니다.
#   look_ahead_mask = tf.keras.layers.Lambda(
#       create_look_ahead_mask,
#       output_shape=(1, None, None),
#       name='look_ahead_mask')(dec_inputs)

#   # 디코더
#   dec_outputs = gpt1_decoder(
#       vocab_size=vocab_size,
#       num_layers=num_layers,
#       units=units,
#       d_model=d_model,
#       num_heads=num_heads,
#       dropout=dropout,
#   )(inputs=[dec_inputs, look_ahead_mask])

#   # 완전연결층
#   outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)

#   return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)


In [72]:
tf.keras.backend.clear_session()

# 하이퍼파라미터
NUM_LAYERS = 2 # 인코더와 디코더의 층의 개수
D_MODEL = 256 # 인코더와 디코더 내부의 입, 출력의 고정 차원
NUM_HEADS = 8 # 멀티 헤드 어텐션에서의 헤드 수 
UNITS = 512 # 피드 포워드 신경망의 은닉층의 크기
DROPOUT = 0.1 # 드롭아웃의 비율

model = gpt1_transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT)

model.summary()

Model: "gpt1_transformer"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
dec_inputs (InputLayer)         [(None, None)]       0                                            
__________________________________________________________________________________________________
look_ahead_mask (Lambda)        (None, 1, None, None 0           dec_inputs[0][0]                 
__________________________________________________________________________________________________
gpt1_decoder (Functional)       (None, None, 256)    5052416     dec_inputs[0][0]                 
                                                                 look_ahead_mask[0][0]            
__________________________________________________________________________________________________
inputs (InputLayer)             [(None, None)]       0                             

In [73]:
# 패딩된 부분을 제외하고 손실을 계산하는 함수 - SparseCategoricalCrossentropy 사용
def loss_function(y_true, y_pred):
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
  
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction='none')(y_true, y_pred)

  mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
  loss = tf.multiply(loss, mask)

  return tf.reduce_mean(loss)

In [74]:
# 학습률을 조정하는 스케줄러로, 워밍업(warmup) 기간 동안 학습률을 서서히 증가시키고, 이후에는 감소
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps**-1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

In [75]:
learning_rate = CustomSchedule(D_MODEL)

optimizer = tf.keras.optimizers.Adam(
    learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

def accuracy(y_true, y_pred):
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
  return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)

model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])

In [76]:
EPOCHS = 15
model.fit(dataset, epochs=EPOCHS, verbose=1)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<keras.callbacks.History at 0x7852841bce20>

In [84]:
def decoder_inference(sentence):
  sentence = preprocess_sentence(sentence)

  # 입력된 문장을 정수 인코딩 후, 시작 토큰과 종료 토큰을 앞뒤로 추가.
  # ex) Where have you been? → [[8331   86   30    5 1059    7 8332]]
  sentence = tf.expand_dims(
      START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)

  # 디코더의 현재까지의 예측한 출력 시퀀스가 지속적으로 저장되는 변수.
  # 처음에는 예측한 내용이 없음으로 시작 토큰만 별도 저장. ex) 8331
  output_sequence = tf.expand_dims(START_TOKEN, 0)

  # 디코더의 인퍼런스 단계
  for i in range(MAX_LENGTH):
    # 디코더는 최대 MAX_LENGTH의 길이만큼 다음 단어 예측을 반복합니다.
    predictions = model(inputs=[sentence, output_sequence], training=False)
    predictions = predictions[:, -1:, :]

    # 현재 예측한 단어의 정수
    predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

    # 만약 현재 예측한 단어가 종료 토큰이라면 for문을 종료
    if tf.equal(predicted_id, END_TOKEN[0]):
      break

    # 예측한 단어들은 지속적으로 output_sequence에 추가됩니다.
    # 이 output_sequence는 다시 디코더의 입력이 됩니다.
    output_sequence = tf.concat([output_sequence, predicted_id], axis=-1)

  return tf.squeeze(output_sequence, axis=0)

In [85]:
def sentence_generation(sentence):
  # 입력 문장에 대해서 디코더를 동작 시켜 예측된 정수 시퀀스를 리턴받습니다.
  prediction = decoder_inference(sentence)

  # 정수 시퀀스를 다시 텍스트 시퀀스로 변환합니다.
  predicted_sentence = tokenizer.decode(
      [i for i in prediction if i < tokenizer.vocab_size])

  print('입력 : {}'.format(sentence))
  print('출력 : {}'.format(predicted_sentence))

  return predicted_sentence

In [86]:
sentence_generation('오늘 날씨 어때요?')

입력 : 오늘 날씨 어때요?
출력 : 많이 지쳤나봐요 .


'많이 지쳤나봐요 .'