<a href="https://colab.research.google.com/github/ysooch0819/AI16-Projects/blob/main/n424%20%EA%B0%95%EC%9D%98%EB%85%B8%ED%8A%B8%20%ED%95%99%EC%8A%B5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# 트랜스포머
# 병렬화를 위해  모든 단어 벡터를 동시에 입력받음
# 단어의 상대적인 위치정보를 담은 벡터를 만드는 과정: Positional Encoding


In [2]:
import numpy as np

def get_angles(pos, i, d_model):

    angle_rates = 1/np.powere(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

In [3]:
import tensorflow as tf
def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                            np.arange(d_model)[np.newaxis, :],
                            d_model)
    
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

In [7]:
# Self-Attention 구현
def scaled_dot_product_attention(q, k, v, mask):
    """
    Attention 가중치를 구하는 함수
    q, k, v의  leading dimension은 동일해야 한다
    k, v의 penultimate dimension이 동일해야 한다. seq_len_k = seq_len_v

    Mask는 타입(padding or look ahead)에 따라 다른 차원을 가질 수 있다.
    덧셈 시 브로드캐스팅 될 수 있어야 한다.

    q: query shape (seq_len_q, depth)
    k: key shape (seq_len_k, depth)
    v: value shape (seq_len_v, depth_v)
    mask: Float tensor with shape broadcastable to (..., seq_len_q, seq_len_k), Defaults to None.

    returns:
      output, attention_weights
    """

    matmul_qk = tf.matmul(q, k, transpose_b=True)

    # matmul_qk(쿼리와 키의 내적)을 dk의 제곱근으로 scaling
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    """
    mask가 있을 경우, masking 된 자리(mask=1)에는 (-inf)에 해당하는 절댓값이 큰 음수 -1e9(=-10억)을 더해준다.
    그 값에 softmax를 취해주면 거의 0에 가까운 값이 나온다. 그러면 다음 value 계산 시 반영 x
    """

    # 마스킹
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
    
    # 소프트맥스 함수를 통해서 attention weight을 구하기
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)

    output = tf.matmul(attention_weights, v)

    return output, attention_weights

In [6]:
# Multi-Head Attention 구현
def point_wise_feed_forward_network(d_model, dff):
    '''
    FFNN 을 구현
    Args:
      d_model : 모델의 차원
      dff : 은닉층의 차원 수, 논문에서는 2048 사용
    '''
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'),
        tf.keras.layers.Dense(d_model)
    ])

In [8]:
# 트랜스포머 예제
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.layers import TextVectorization

In [9]:
text_file = keras.utils.get_file(
    fname='spa-eng.zip',
    origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
    extract = True,
)
text_file = pathlib.Path(text_file).parent / "spa-eng" / "spa.txt"

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


In [10]:
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]

text_pairs = []

for line in lines:
    eng, spa = line.split('\t')
    spa = "[start] " + spa + " [end]"
    text_pairs.append((eng, spa))

In [11]:
for _ in range(5):
    print(random.choice(text_pairs))

('I think he is right.', '[start] Pienso que tiene razón. [end]')
('What have you heard?', '[start] ¿Qué has oído? [end]')
('Generally speaking, the climate of Japan is mild.', '[start] Por lo general, el clima de Japón es suave. [end]')
('Nobody thinks Tom will win the race.', '[start] Nadie cree que Tom gane la carrera. [end]')
('You have one minute to defend your point of view.', '[start] Tenés un minuto para defender tu punto de vista. [end]')


In [13]:
# 데이터셋 split
random.shuffle(text_pairs)
num_val_samples = int(0.15*len(text_pairs))
num_train_samples = len(text_pairs) - 2* num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"전체 데이터셋 Pair 개수: {len(text_pairs)}")
print(f"학습 데이터셋 Pair 개수: {len(train_pairs)}")
print(f"검증 데이터셋 Pair 개수: {len(val_pairs)}")
print(f"시험 데이터셋 Pair 개수: {len(test_pairs)}")

전체 데이터셋 Pair 개수: 118964
학습 데이터셋 Pair 개수: 83276
검증 데이터셋 Pair 개수: 17844
시험 데이터셋 Pair 개수: 17844


In [14]:
# 추가 전처리 및 필요한 파라미터 지정, 텍스트 벡터화
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 15000
sequence_length = 20
batch_size = 64

def custom_standardization(input_string):
    '''
    입력 문자를 소문자로 변경, 필요없는 문자 잘라내기
    '''
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

eng_vectorization = TextVectorization(
    max_tokens = vocab_size, output_mode = 'int', output_sequence_length=sequence_length,
)
spa_vectorization = TextVectorization(
    max_tokens = vocab_size, 
    output_mode = 'int', 
    output_sequence_length=sequence_length+1,
    standardize = custom_standardization,
)

train_eng_texts = [pair[0] for pair in train_pairs]
train_spa_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)
spa_vectorization.adapt(train_spa_texts)

In [18]:
# 데이터를 모델에 넣을 수 있도록 구성
def format_dataset(eng, spa):
    eng = eng_vectorization(eng)
    spa = spa_vectorization(spa)
    return ({"encoder_inputs": eng, "decoder_inputs": spa[:, :-1],}, spa[:, 1:])

def make_dataset(pairs):
    '''
    데이터셋을 배치 단위로 구성
    '''
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)

    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [19]:
for inputs, targets in train_ds.take(1):
    print(f"""인코더 입력의 Shape: {inputs["encoder_inputs"].shape}""")
    print(f"""디코더 입력의 Shape: {inputs["decoder_inputs"].shape}""")
    print(f"""타겟의 Shape: {targets.shape}""")

인코더 입력의 Shape: (64, 20)
디코더 입력의 Shape: (64, 20)
타겟의 Shape: (64, 20)


In [20]:
# 모델의 일부가 되는 클래스 구축
class TransformerEncoder(layers.Layer):
    '''
    트랜스포머 인코더 구축하는 클래스
    Args:
        embed_dim: 임베딩 벡터의 차원 수
        dense_dim: FFNN에서 사용되는 은닉층의 노드 수
        num_heads: Multi-Head에서 적용할 Head의 수
    '''
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads = num_heads, key_dim = embed_dim
        )
        self.dense_proj = keras.Sequential([
            layers.Dense(dense_dim, activation='relu'),
            layers.Dense(embed_dim),
        ])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True
    
    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype='int32')
        attention_output=self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

In [26]:
class PositionalEmbedding(layers.Layer):
    """
    포지셔널 인코딩을 진행합니다.

    Args:
        embed_dim: 임베딩 벡터의 차원 수
        dense_dim: FFNN 에서 사용되는 은닉층의 노드 수
        num_heads: Multi-Head에서 적용할 Head의 수
    """
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super(PositionalEmbedding, self).__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

In [28]:
class TransformerDecoder(layers.Layer):
    """
    트랜스포머 디코더를 구축하는 클래스입니다.

    Args:
        embed_dim: 임베딩 벡터의 차원 수
        latent_dim: FFNN 에서 사용되는 은닉층의 노드 수
        num_heads: Multi-Head에서 적용할 Head의 수
    """
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super(TransformerDecoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

In [30]:
# 클래스 연결, 모델 정의 후 학습
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype='int64', name='encoder_inputs')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype='int64', name='decoder_inputs')
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name='decoder_state_inputs')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation='softmax')(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name='transformer')

In [31]:
epochs = 10

transformer.summary()
transformer.compile(
    'rmsprop', loss='sparse_categorical_crossentropy', metrics=['accuracy']
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding_5 (Positi  (None, None, 256)   3845120     ['encoder_inputs[0][0]']         
 onalEmbedding)                                                                                   
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder_2 (Transfo  (None, None, 256)   3155456     ['positional_embedding_

<keras.callbacks.History at 0x7f2a7c947b20>

In [34]:
spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    '''
    Inference 를 위한 함수
    '''
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = '[start]'
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == '[end]':
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(30):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
    print(f"{input_sentence} -> {translated}")

I had the boy carry my bag. -> [start] tuve el chico que mi bolsa [end]
Tom took a picture of Mary's new car and sent it to John. -> [start] tom le dio una foto a mary y la [UNK] de mary y la john [end]
We went to Boston by car. -> [start] fue a boston en coche [end]
I suppose you like her. -> [start] supongo que como ella [end]
My father is in his room. -> [start] mi padre está en su habitación [end]
Who first split the atom? -> [start] quién lo [UNK] el [UNK] [end]
She advised him not to buy a used car, but he didn't follow her advice. -> [start] Él le aconsejó que no le [UNK] un auto pero no le habló [end]
I want exactly what you want. -> [start] quiero exactamente lo que quieres [end]
His mother was a singer. -> [start] su madre era un [UNK] [end]
We need to win. -> [start] tenemos que ganar [end]
Tom asked Mary many questions about Australia. -> [start] tom le hizo a mary muchas preguntas sobre australia [end]
Tom doesn't know why Mary quit her job. -> [start] tom no sabe por qué 

In [35]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [40]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation='relu'), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
    
    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1+ffn_output)

In [37]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_sizee, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)
    
    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [38]:
vocab_size = 20000 # 20000 개의 단어만 토큰화
maxlen = 200 # 리뷰 당 최대 200길이

(x_train, y_train), (x_val, y_val) = keras.datasets.imdb.load_data(num_words=vocab_size)
print(f"학습 데이터 수 : {len(x_train)}")
print(f"검증 데이터 수 : {len(x_val)}")

x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)
x_val = keras.preprocessing.sequence.pad_sequences(x_val, maxlen=maxlen)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/imdb.npz
학습 데이터 수 : 25000
검증 데이터 수 : 25000


In [43]:
embed_dim=32
num_heads=2
ff_dim=32

inputs = layers.Input(shape=(maxlen,))
embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
x = transformer_block(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation='relu')(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(2, activation='softmax')(x)

model = keras.Model(inputs=inputs, outputs=outputs)

In [44]:
model.compile('adam', 'sparse_categorical_crossentropy', metrics=['accuracy'])
history = model.fit(
    x_train, y_train, batch_size=32, epochs=2, validation_data=(x_val, y_val)
)

Epoch 1/2
Epoch 2/2
