In [112]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

# SinusoidalPositionalEncoding

<img src="image/sinusoidal.png" width="300"/>

In [113]:
class SinusoidalPositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, max_seq_len, d_model, **kwargs):
        super().__init__(**kwargs)

        # 입력 마스크를 다음 층으로 자동으로 전파
        self.support_masking = True

        if d_model % 2 != 0:
            raise ValueError("d_model must be even to use sinusoidal positional encoding.")

        # 생성자에서 `모델의 최대 입력 시퀀스` X `임베딩 차원` 크기의 위치 인코딩 행렬을 미리 계산한다.

        # 위치와 차원에 대한 격자(grid) 생성
        position = np.arange(max_seq_len)[:, np.newaxis]
        div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))

        # 3차원 포지셔널 인코딩 행렬 생성
        pos_emb = np.zeros((1, max_seq_len, d_model))

        # sin 함수를 짝수 인덱스에 적용
        pos_emb[0, :, 0::2] = np.sin(position * div_term)
        # cos 함수를 홀수 인덱스에 적용
        pos_emb[0, :, 1::2] = np.cos(position * div_term)

        # tensorflow 상수로 변환하고 float32로 타입 지정
        self.pos_emb = tf.constant(pos_emb, dtype=tf.float32)

    def call(self, inputs):
        batch_max_seq_len = tf.shape(inputs)[1] # 인코더로 입력된 시퀀스의 길이를 동적으로 계산한다. [batch_size, sequence_length, embed_size]
        return inputs + self.pos_emb[:, :batch_max_seq_len] # 필요한 만큼 위치인코딩을 잘라서 더한다.

# Scaled Dot-Product Attention

<img src="image/Screenshot 2025-02-20 at 8.43.14 PM.png" width="500"/>

In [114]:
class ScaledDotProductAttention(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        self.support_masking = True # 입력 마스크를 다음 층으로 전파.

    def call(self, q, k, v, mask=None):
        # 쿼리와 키의 점곱으로 어텐션 스코어 계산
        # 이 층의 입력으로는 h개의 헤드로 쪼개진 상태의 [batch, n_heads, seq_len, d_k] shape의 텐서가 들어온다.
        # 같은 배치, 같은 헤드의 쿼리와 키 끼리 행렬곱을 수행한다.

        # k의 전치는 마지막 2개 차원에 대해서만 수행해야 한다.
        attention_score = tf.matmul(q, tf.transpose(k, perm=[0,1,3,2]))

        # 패딩 토큰 부분에 극단적인 음수값 부여, 소프트맥스 함수를 통과했을 때 0에 수렴한다.
        if mask is not None:
            # mask의 shape는 [batch, 1, 1, key_seq_len] 또는 브로드캐스팅 가능한 형태여야 함
            # ID가 0인 위치가 False다.
            attention_score += (mask - 1) * 1e9

        # 스케일
        # 마지막 차원의 제곱근으로 나누어서 스케일링 한다. 마지막 차원은 한 헤드의 차원과 같다.
        d_k = tf.cast(tf.shape(k)[-1], dtype=tf.float32)
        attention_score = attention_score / tf.math.sqrt(d_k)

        # 소프트맥스 함수 통과
        attention_weight = tf.keras.activations.softmax(attention_score)

        # weighted sum, value에 가중치를 곱한다.
        output = tf.matmul(attention_weight, v)

        return output

# Multi Head Attention

<p float="left">
  <img src="https://wikidocs.net/images/page/159310/mha_img_original.png" width="500"/>
  <img src="https://wikidocs.net/images/page/159310/mha_visualization-930x1030.png" width="300"/>
</p>

In [115]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model ,n_heads, **kwargs):
        super().__init__(**kwargs)

        self.support_masking=True # 다음 층으로 마스킹 자동 전파

        # Wq Wk Wv 선형 변환 층
        # 입력 텐서가 이 선형 변환 층을 통과한 다음에 여러개의 헤드로 쪼개진다.
        self.Wq = tf.keras.layers.Dense(d_model)
        self.Wk = tf.keras.layers.Dense(d_model)
        self.Wv = tf.keras.layers.Dense(d_model)

        # n_heads개의 헤드로 쪼갰을 떄 d_k를 계산한다.
        if d_model % n_heads != 0 : # 나누어 떨어지지 않으면 예외를 발생시킴
            raise ValueError(f"d_model % n_heads should be 0 d_model: {d_model}, n_heads: {n_heads}")

        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads # 한 헤드의 차원

        # 마지막으로 concat된 행렬을 통과시키는 선형변환 층, 입력과 출력의 크기가 똑같아야 한다는 점을 명심하자.
        self.Wo = tf.keras.layers.Dense(d_model)

    def call(self, q, k, v, mask=None): # 마스크를 입력으로 받음.
        # 선형 변환
        q = self.Wq(q)
        k = self.Wk(k)
        v = self.Wv(v)

        batch_size = tf.shape(q)[0]

        # n_heads개의 헤드로 쪼개는 과정

        # reshape, 마지막 차원 d_k를 (self.n_heads X self.d_k) 형태로 분리함.
        # 시퀀스 길이는 -1로 표시, 자동계산된다.
        q = tf.reshape(q, [batch_size, -1, self.n_heads, self.d_k])
        k = tf.reshape(k, [batch_size, -1, self.n_heads, self.d_k])
        v = tf.reshape(v, [batch_size, -1, self.n_heads, self.d_k])

        # 차원 순서를 바꾼다. 같은 배치, 같은 헤드의 쿼리와 키 끼리 행렬곱하게 된다.
        # 덕분에 매우 효율적으로 어텐션 연산을 수행할 수 있음.
        q = tf.transpose(q, [0,2,1,3])
        k = tf.transpose(k, [0,2,1,3])
        v = tf.transpose(v, [0,2,1,3])

        # scaled dot-product attention층에 집어넣어서 어텐션 연산을 수행한다.
        attention_layer = ScaledDotProductAttention()
        scaled_attention = attention_layer(q,k,v,mask) # 어텐션 층에 마스크를 넣어줌

        # concat, 헤드를 쪼개는 과정을 반대로 해주면 된다.
        scaled_attention = tf.transpose(scaled_attention, [0,2,1,3]) # 다시 [batch, seq_length, n_heads, d_k]
        scaled_attention = tf.reshape(scaled_attention, [batch_size, -1, self.d_model]) # 다시 [batch, seq_length, d_model]

        # 마지막 선형 변환
        output = self.Wo(scaled_attention)

        return output

# Encoder Layer

<img src="https://wikidocs.net/images/page/159310/img_original_paper-726x1030.png" width="300"/>

In [116]:
class EncoderLayer(tf.keras.layers.Layer) :
    def __init__(self, d_model, d_ff, n_heads, dropout, **kwargs):
        super().__init__(**kwargs)

        self.support_masking=True # 다음 층으로 마스킹 자동 전파

        # attention layer normalization
        self.attention_norm = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        # ffn layer normalization
        self.ffn_norm = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        # ffn 레이어
        self.ffn_layer1 = tf.keras.layers.Dense(d_ff)
        self.activation = tf.keras.layers.Activation('relu')
        self.ffn_layer2 = tf.keras.layers.Dense(d_model)

        # 멀티 헤드 어텐션 층 정의
        self.multi_head_attention_layer = MultiHeadAttention(d_model, n_heads)

        # 정규화를 위한 Dropout층 정의
        self.dropout1 = tf.keras.layers.Dropout(dropout) # 어텐션에 사용
        self.dropout2 = tf.keras.layers.Dropout(dropout) # ffn에 사용

    def call(self, inputs, mask=None):
        skip = inputs

        # 멀티 헤드 어텐션 연산
        attention = self.multi_head_attention_layer(inputs, inputs, inputs, mask)

        # Dropout
        attention = self.dropout1(attention)

        # skip connection
        attention = attention + skip

        # layer normalization
        attention = self.attention_norm(attention)

        skip = attention

        # ffn 통과
        ffn_output = self.ffn_layer1(attention)
        ffn_output = self.activation(ffn_output)
        ffn_output = self.ffn_layer2(ffn_output)

        # Dropout
        ffn_output = self.dropout2(ffn_output)

        # skip connection
        ffn_output = ffn_output + skip

        # layer normalization
        ffn_output = self.ffn_norm(ffn_output)

        return ffn_output

# Encoder

In [117]:
vocab_size = 10000  # 어휘 사전 크기
max_seq_len = 500 # 최대 시퀀스 길이
d_model = 128
d_ff = 512
n_heads = 8
dropout = 0.1

# 사용자 정의 전처리 함수: 문자를 모두 소문자로 바꾸고, 구두점을 제거, [CLS]토큰을 추가
def custom_standardize(text):
    text = tf.strings.lower(text)  # Lowercase
    text = tf.strings.regex_replace(text, r"[^\w\s]", "")  # Remove punctuation
    return tf.strings.join(["[CLS]", text], separator=" ") # Add [CLS] token

text_vec_layer = tf.keras.layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=max_seq_len,
    standardize=custom_standardize
)

# 모델 정의

inputs = tf.keras.Input(shape=(1,), dtype=tf.string)  # 입력은 문자열, Keras의 Input 레이어에서 지정하는 shape 인자는 각 샘플의 shape을 나타내며, 배치 차원은 별도로 자동 추가

# 텍스트 벡터화: [batch, seq_len] (정수 인덱스로 변환됨)
x = text_vec_layer(inputs)

# 임베딩, 패딩 토큰에 대해 마스킹 적용
x = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=d_model, mask_zero=True)(x)

# 위치 인코딩
x = SinusoidalPositionalEncoding(max_seq_len=max_seq_len, d_model=d_model)(x)

# 여러 EncoderLayer 적용
x = EncoderLayer(d_model=d_model, d_ff=d_ff, n_heads=n_heads, dropout=dropout)(x)
x = EncoderLayer(d_model=d_model, d_ff=d_ff, n_heads=n_heads, dropout=dropout)(x)

# [CLS] 토큰의 벡터만 추출 (시퀀스의 첫 번째 토큰)
cls_token = tf.keras.layers.Lambda(lambda t: t[:, 0, :])(x)

# 최종 출력, 이진 분류라면 Sigmoid 사용
outputs = tf.keras.layers.Dense(1, activation='sigmoid')(cls_token)

# 모델 생성
model = tf.keras.Model(inputs=inputs, outputs=outputs)



In [118]:
model.summary()

# Datasets

In [119]:
import tensorflow_datasets as tfds

raw_train_set, raw_valid_set, raw_test_set = tfds.load(
    name="imdb_reviews",
    split=["train[:90%]", "train[90%:]", "test"],
    as_supervised=True
)

tf.random.set_seed(42)

train_set = raw_train_set.shuffle(5000, seed=42).batch(32).prefetch(1)
valid_set = raw_valid_set.batch(32).prefetch(1)
test_set = raw_test_set.batch(32).prefetch(1)

# Training

In [120]:
# training dataset에서 텍스트만 추출하여 adapt 수행
train_text = train_set.map(lambda text, label: text)
text_vec_layer.adapt(train_text)

In [121]:
model.compile(optimizer="adam",
              loss="binary_crossentropy",
              metrics=["accuracy"])

In [122]:
import datetime

# 텐서보드 로그 디렉토리 설정
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")


# 콜백 설정
callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True),
    tf.keras.callbacks.TensorBoard(log_dir=log_dir,
                                   histogram_freq=1,
                                   write_graph=True,
                                   update_freq='epoch')
    ]

history = model.fit(
    train_set,
    validation_data=valid_set,
    epochs=30,
    callbacks=callbacks,
    verbose=1
)

Epoch 1/30
[1m704/704[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m154s[0m 214ms/step - accuracy: 0.4963 - loss: 0.7832 - val_accuracy: 0.5024 - val_loss: 0.6968
Epoch 2/30
[1m704/704[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m150s[0m 213ms/step - accuracy: 0.4965 - loss: 0.6968 - val_accuracy: 0.4976 - val_loss: 0.6963
Epoch 3/30
[1m704/704[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m153s[0m 217ms/step - accuracy: 0.5361 - loss: 0.6865 - val_accuracy: 0.6940 - val_loss: 0.6766
Epoch 4/30
[1m704/704[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m155s[0m 220ms/step - accuracy: 0.7058 - loss: 0.5755 - val_accuracy: 0.8132 - val_loss: 0.4289
Epoch 5/30
[1m704/704[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m157s[0m 223ms/step - accuracy: 0.8305 - loss: 0.3860 - val_accuracy: 0.8580 - val_loss: 0.3380
Epoch 6/30
[1m704/704[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m159s[0m 227ms/step - accuracy: 0.9041 - loss: 0.2581 - val_accuracy: 0.8696 - val_loss: 0.3365
Epoc

In [124]:
model.evaluate(test_set)

[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 61ms/step - accuracy: 0.8602 - loss: 0.3635


[0.3617170751094818, 0.859000027179718]