In [1]:
import numpy as np
import math
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [2]:
class PositionalEncoding(layers.Layer):
    def __init__(self, max_len, dim):
        super(PositionalEncoding, self).__init__()
        pos_encoding = np.zeros((max_len, dim))
        positions = np.arange(0, max_len)[:, np.newaxis]

        N = 10000.0
        div_term =  1 / np.power(N, np.arange(0, dim, 2) / dim)
        
        pos_encoding[:, 0::2] = np.sin(positions * div_term)
        pos_encoding[:, 1::2] = np.cos(positions * div_term)
        
        pos_encoding = pos_encoding[np.newaxis, ...]
        self.pos_encoding = tf.cast(pos_encoding, dtype=tf.float32)

    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

# 인코더 모델 구성
def transformer_encoder(inputs, num_heads, key_dim, ff_dim, dropout=0.1):
    # === Self-Attention Block ===
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    MHA_layer = layers.MultiHeadAttention(key_dim=key_dim, num_heads=num_heads, dropout=dropout)
    
    x = MHA_layer(query=x, value=x, key=x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

     # === Feed-Forward Block ===
    ff = layers.LayerNormalization(epsilon=1e-6)(res)
    ff = layers.Dense(units=ff_dim, activation="relu")(ff)  # ff_dim으로 확장
    ff = layers.Dropout(dropout)(ff)
    ff = layers.Dense(units=inputs.shape[-1])(ff) # 원래 차원으로 복원
    return ff + res


def create_look_ahead_mask(size):
    # band_part(A, -1, 0)은 lower triangular 행렬을 추출
    return 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)

# Transformer 디코더 블록
def transformer_decoder(inputs, encoder_output, num_heads, key_dim, ff_dim, dropout=0):
    seq_len = tf.shape(inputs)[1]
    look_ahead_mask = create_look_ahead_mask(seq_len)

    # Masked Multi-Head Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    MHA_layer1 = layers.MultiHeadAttention(key_dim=key_dim, num_heads=num_heads, dropout=dropout)
    x = MHA_layer1(query=x, value=x, key=x, attention_mask=look_ahead_mask)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Encoder-Decoder Attention
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    MHA_layer2 = layers.MultiHeadAttention(key_dim=key_dim, num_heads=num_heads, dropout=dropout)
    x = MHA_layer2(query=x, value=encoder_output, key=encoder_output)
    x = layers.Dropout(dropout)(x)
    res = x + res

    # Feed Forward Part
    ff = layers.LayerNormalization(epsilon=1e-6)(res)
    ff = layers.Dense(units=ff_dim, activation="relu")(ff)
    ff = layers.Dropout(dropout)(ff)
    ff = layers.Dense(units=inputs.shape[-1])(ff)
    return x + res

def build_transformer_model(input_shape, key_dim, num_heads, ff_dim, num_encoder_blocks, num_decoder_blocks, dropout=0):
    encoder_inputs = keras.Input(shape=input_shape)
    x = PositionalEncoding(input_shape[0], input_shape[1])(encoder_inputs)
    for _ in range(num_encoder_blocks):
        x = transformer_encoder(x, key_dim, num_heads, ff_dim, dropout)
    encoder_outputs = x

    decoder_inputs = keras.Input(shape=(input_shape[0] + 1, input_shape[1]))
    x = PositionalEncoding(input_shape[0] + 1, input_shape[1])(decoder_inputs)
    for _ in range(num_decoder_blocks):
        x = transformer_decoder(x, encoder_outputs, key_dim, num_heads, ff_dim, dropout)

    outputs = layers.Dense(input_shape[1], activation="linear")(x)
    return keras.Model([encoder_inputs, decoder_inputs], outputs)


In [14]:
# 모델 하이퍼파라미터
timestep = 6
dim = 4

key_dim = 16
num_heads = 2
ff_dim = 64
num_encoder_blocks = 1
num_decoder_blocks = 1
dropout = 0.1

input_shape = (timestep, dim)
model = build_transformer_model(input_shape, key_dim, num_heads, ff_dim, num_encoder_blocks, num_decoder_blocks, dropout)

In [6]:
# 데이터 생성
num_samples = 10000
input_sequences = np.random.uniform(0, 100, size=(num_samples, timestep, dim)).astype(np.float32)
timesteps_indices = np.arange(1, timestep + 1)  # [1, 2, 3, ..., timesteps]

reversed_sequences = np.flip(input_sequences, axis=1)

token = np.zeros((num_samples, 1, dim), dtype=np.float32)
decoder_input_sequences = np.concatenate([token, reversed_sequences], axis=1)
target_sequences = np.concatenate([reversed_sequences, token], axis=1)

print(input_sequences[0,:,:])
print(target_sequences[0,:,:])

[[76.55356  47.615036 61.180607 30.141968]
 [49.69878  79.40159  30.16528  39.7405  ]
 [23.894135 56.280968 46.571846 19.79676 ]
 [87.69412  36.32097  61.23566   8.343727]
 [57.564095 82.21122  93.45595  60.515392]
 [ 6.847743 92.08058   3.661294 19.053959]]
[[ 6.847743 92.08058   3.661294 19.053959]
 [57.564095 82.21122  93.45595  60.515392]
 [87.69412  36.32097  61.23566   8.343727]
 [23.894135 56.280968 46.571846 19.79676 ]
 [49.69878  79.40159  30.16528  39.7405  ]
 [76.55356  47.615036 61.180607 30.141968]
 [ 0.        0.        0.        0.      ]]


In [18]:
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=3e-1,
    decay_steps=1000,
    decay_rate=0.95
)
optimizer = keras.optimizers.Adam(learning_rate=lr_schedule)
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(
    [input_sequences, decoder_input_sequences],
    target_sequences,
    epochs=500,
    batch_size=32
)

Epoch 1/500
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500
Epoch 15/500
Epoch 16/500
Epoch 17/500
Epoch 18/500
Epoch 19/500
Epoch 20/500
Epoch 21/500
Epoch 22/500
Epoch 23/500
Epoch 24/500
Epoch 25/500
Epoch 26/500
Epoch 27/500
Epoch 28/500
Epoch 29/500
Epoch 30/500
Epoch 31/500
Epoch 32/500
Epoch 33/500
Epoch 34/500
Epoch 35/500
Epoch 36/500
Epoch 37/500
Epoch 38/500
Epoch 39/500
Epoch 40/500
Epoch 41/500
Epoch 42/500
Epoch 43/500
Epoch 44/500
Epoch 45/500
Epoch 46/500
Epoch 47/500
Epoch 48/500
Epoch 49/500
Epoch 50/500
Epoch 51/500
Epoch 52/500
Epoch 53/500
Epoch 54/500
Epoch 55/500
Epoch 56/500
Epoch 57/500
Epoch 58/500
Epoch 59/500
Epoch 60/500
Epoch 61/500
Epoch 62/500
Epoch 63/500
Epoch 64/500
Epoch 65/500
Epoch 66/500
Epoch 67/500
Epoch 68/500
Epoch 69/500
Epoch 70/500
Epoch 71/500
Epoch 72/500
Epoch 73/500
Epoch 74/500
Epoch 75/500
Epoch 76/500
Epoch 77/500
Epoch 78

<keras.callbacks.History at 0x21b53424be0>

In [12]:
# 1개 샘플에 대해 예측
sample_idx = 0

x_enc = input_sequences[sample_idx:sample_idx+1]              # (1, 6, 4)
x_dec = decoder_input_sequences[sample_idx:sample_idx+1]      # (1, 7, 4)

# 예측
y_pred = model.predict([x_enc, x_dec], verbose=0)             # (1, 7, 4)
y_true = target_sequences[sample_idx:sample_idx+1]            # (1, 7, 4)

# 차원 제거
y_pred = y_pred[0]   # (7, 4)
y_true = y_true[0]   # (7, 4)

# 3. 비교 출력
print("예측 결과 (y_pred):")
print(np.round(y_pred, 2))

print("\n정답 (y_true):")
print(np.round(y_true, 2))


예측 결과 (y_pred):
[[ 47.53  58.36  64.36  30.26]
 [ 46.9   59.72  67.96  34.54]
 [ 49.59  61.64  51.67  30.29]
 [ 44.59  65.78  60.42  36.65]
 [ 50.92  59.81  43.55  31.49]
 [ 73.23  57.38  49.84  23.99]
 [ -0.1   22.61   6.54 -10.55]]

정답 (y_true):
[[ 6.85 92.08  3.66 19.05]
 [57.56 82.21 93.46 60.52]
 [87.69 36.32 61.24  8.34]
 [23.89 56.28 46.57 19.8 ]
 [49.7  79.4  30.17 39.74]
 [76.55 47.62 61.18 30.14]
 [ 0.    0.    0.    0.  ]]


In [13]:
x_enc

array([[[76.55356 , 47.615036, 61.180607, 30.141968],
        [49.69878 , 79.40159 , 30.16528 , 39.7405  ],
        [23.894135, 56.280968, 46.571846, 19.79676 ],
        [87.69412 , 36.32097 , 61.23566 ,  8.343727],
        [57.564095, 82.21122 , 93.45595 , 60.515392],
        [ 6.847743, 92.08058 ,  3.661294, 19.053959]]], dtype=float32)