In [3]:
import tensorflow as tf
import tensorflow_datasets as tfds
import os
import re
import numpy as np
import matplotlib.pyplot as plt

# Positional embedding

위치 정보를 반영하기 위해 positional encoding 대신 positional embedding 을 사용.

위치 정보를 주기함수를 통해 직접 코드로 반영하지 않고 임베딩 레이어를 통해 학습.

In [4]:
# vocab_size : 단어사전 크기
# d_model : 임베딩 차원
# max_position_embeddings : 최대 시퀀스 길이

class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, vocab_size, d_model, max_position_embeddings):
        super(PositionalEmbedding, self).__init__()
        self.token_embeddings = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=d_model)
        self.position_embeddings = tf.keras.layers.Embedding(input_dim=max_position_embeddings, output_dim=d_model)
        self.max_position_embeddings = max_position_embeddings
        self.d_model = d_model

    def call(self, inputs):
        seq_length = tf.shape(inputs)[1]    # inputs_shape = (문장개수(batch), seq_lenth=max_pos , vocab_size)
        positions = tf.range(start=0, limit=seq_length, delta=1)    # [0, 1, .. , seq_l - 1] : 1D-tensor with shape (seq_l,)
        embedded_tokens = self.token_embeddings(inputs) # shape = (batch, seq, d_model)
        embedded_positions = self.position_embeddings(positions)    # shape = (seq_lenth, d_model)
        return embedded_tokens + embedded_positions     
            # 모든 문장에 대해 같은 positional embedding 벡터를 더해줌
            # broadcast 로 더해준다 (0-th index 에 대해)

# Scaled dot product attention

In [5]:
def scaled_dot_product_attention(query, key, value, mask):
  # 어텐션 가중치는 Q와 K의 내적
  matmul_qk = tf.matmul(query, key, transpose_b=True)

  # 가중치를 정규화
  depth = tf.cast(tf.shape(key)[-1], tf.float32)
  logits = matmul_qk / tf.math.sqrt(depth)

  # 패딩에 마스크 추가
  if mask is not None:
    logits += (mask * -1e9)

  # softmax적용
  attention_weights = tf.nn.softmax(logits, axis=-1)

  # 최종 어텐션은 가중치와 V의 닷 프로덕트
  output = tf.matmul(attention_weights, value)
  return output


# Multi-head attention

In [6]:
class MultiHeadAttention(tf.keras.layers.Layer):

  def __init__(self, d_model, num_heads, name="multi_head_attention"):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0

    self.depth = d_model // self.num_heads

    # 각 어텐션 헤드는 서로 다른 가중치를 이용하여 Q, K, V 를 생성함을 상기
    # 서로 다른 어텐션 헤드들을 동시에 병렬로 늘어놓으면 결국 d_model 과 같아지므로
    # 병렬 처리 시 출력 차원이 d_model 임.
    self.query_dense = tf.keras.layers.Dense(units=d_model)
    self.key_dense = tf.keras.layers.Dense(units=d_model)
    self.value_dense = tf.keras.layers.Dense(units=d_model)

    self.dense = tf.keras.layers.Dense(units=d_model)

  def split_heads(self, inputs, batch_size): # batch_size : 한 번에 처리할 문장의 개수.
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth)) # 둘 째 차원은 seq_len
    return tf.transpose(inputs, perm=[0, 2, 1, 3]) # shape becomes (batch_size, num_heads, seq_len, depth)
    # 결론적으로 (batch, seq_len, d_model) |-> (batch, num_heads, seq_len, depth) 로 reshape 함.

  def call(self, inputs):
    query, key, value, mask = inputs['query'], inputs['key'], inputs[
        'value'], inputs['mask']
    batch_size = tf.shape(query)[0]

    # Q, K, V에 각각 Dense를 적용합니다
    query = self.query_dense(query)
    key = self.key_dense(key)
    value = self.value_dense(value) # 모두 shape 가 (batch, seq_len, d_model) 이 된다.

    # 병렬 연산을 위한 머리를 여러 개 만듭니다
    query = self.split_heads(query, batch_size)
    key = self.split_heads(key, batch_size)
    value = self.split_heads(value, batch_size) # shape = (batch, num_heads, seq_len, depth)

    # 스케일드 닷 프로덕트 어텐션 함수
    scaled_attention = scaled_dot_product_attention(query, key, value, mask)

    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

    # 어텐션 연산 후에 각 결과를 다시 연결(concatenate)합니다
    concat_attention = tf.reshape(scaled_attention,
                                  (batch_size, -1, self.d_model))

    # 최종 결과에도 Dense를 한 번 더 적용합니다
    outputs = self.dense(concat_attention)

    return outputs

# Mask

In [8]:
def create_padding_mask(x): # 
  mask = tf.cast(tf.math.equal(x, 0), tf.float32)
  # (batch_size, 1, 1, sequence length)
  return mask[:, tf.newaxis, tf.newaxis, :]

In [9]:
def create_look_ahead_mask(x): # 다음 단어를 가려주는 마스크
  seq_len = tf.shape(x)[1]
  look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
  padding_mask = create_padding_mask(x)
  return tf.maximum(look_ahead_mask, padding_mask)

# Decoder without Encoding : Transformer for GPT

GPT 모델에서는 인코딩 없이 디코딩 레이어만 쌓은 트랜스포머 모델을 사용

In [10]:
# 디코더 하나의 레이어를 함수로 구현.
# 기존의 Encoder 의 output 을 받는 구조와 달리,
# 그를 받는 어텐션 레이어는 삭제하고 나머지만 사용.

def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
  
  look_ahead_mask = tf.keras.Input(shape=(1, None, None), name="look_ahead_mask")
  

  # 첫 번째 서브 레이어 : 멀티 헤드 어텐션 (셀프 어텐션)
  attention1 = MultiHeadAttention(
      d_model, num_heads, name="attention_1")(inputs={
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': look_ahead_mask
      })

  # 멀티 헤드 어텐션의 결과는 LayerNormalization이라는 훈련을 돕는 테크닉을 수행
  attention1 = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(attention1 + inputs)



  # 두 번째 서브 레이어 : 2개의 완전연결층
  outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention1)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)

  # 완전연결층의 결과는 Dropout과 LayerNormalization 수행
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(outputs + attention1)

  return tf.keras.Model(
      inputs=[inputs, look_ahead_mask],
      outputs=outputs,
      name=name)

In [11]:
# encoder 없이 decoder layer 를 쌓은 것이 GPT 를 위한 Transformer 구조.
def decoder(vocab_size,
            num_layers,
            units,
            d_model,
            num_heads,
            dropout,
            name='decoder'):
  
  inputs = tf.keras.Input(shape=(None,), name='inputs')
  
  look_ahead_mask = tf.keras.Input(
      shape=(1, None, None), name='look_ahead_mask')
  
  
  # 임베딩 레이어
  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))

  # 포지셔널 인코딩
  embeddings = PositionalEmbedding(vocab_size, d_model)(embeddings)

  # Dropout
  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

# GPT 디코더-레이어 스택
  for i in range(num_layers):
    outputs = decoder_layer(
        units=units,
        d_model=d_model,
        num_heads=num_heads,
        dropout=dropout,
        name='decoder_layer_{}'.format(i),
    )(inputs=[outputs, look_ahead_mask])

  return tf.keras.Model(
      inputs=[inputs, look_ahead_mask],
      outputs=outputs,
      name=name)