<a href="https://colab.research.google.com/github/juhumkwon/Defense_Cloud/blob/main/9_1_1_ScaledDotProductAttention_MultiHeadSelfAttention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense

class ScaledDotProductAttention(Layer):
    def __init__(self):
        super().__init__()

    def call(self, q, k, v, mask=None):
        matmul_qk = tf.matmul(q, k, transpose_b=True)  # (batch, num_heads, seq_len, seq_len)
        dk = tf.cast(tf.shape(k)[-1], tf.float32)
        scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

        if mask is not None:
            scaled_attention_logits += (mask * -1e9)  # mask된 위치는 매우 작은 값으로 만들어 무시

        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
        output = tf.matmul(attention_weights, v)  # (batch, num_heads, seq_len, depth)

        return output, attention_weights

In [None]:

class MultiHeadSelfAttention(Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0 # assert는 Python에서 조건이 참인지 검사하는 키워드. 만약 조건이 False면 프로그램을 즉시 멈추고 에러를 발생시킴
        self.num_heads = num_heads
        self.depth = d_model // num_heads # d_model = 512, num_heads = 8 이면 각 head가 64차원짜리 쿼리/키/밸류 벡터를 가진다는 뜻

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)
        self.dense = Dense(d_model)
        self.attention = ScaledDotProductAttention()

# 이 함수는 입력 텐서를 "head 개수만큼 쪼개는 역할"을
# 단계	텐서 shape
# 입력 x	(batch_size, seq_len, d_model)
# reshape 후	(batch_size, seq_len, num_heads, depth)
# transpose 후 (return)	(batch_size, num_heads, seq_len, depth) ← 최종 결과
# 이렇게 바꾸는 이유는 각 head를 병렬로 처리하기 위함 --> num_heads 차원을 앞쪽으로 빼내서 각 head마다 독립적으로 attention을 계산하게 함

    def split_heads(self, x, batch_size):
        # x: (batch_size, seq_len, d_model) -> (batch_size, num_heads, seq_len, depth)
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth)) # -1은 TensorFlow에게 "이 자리는 자동으로 계산해줘"라는 의미
        return tf.transpose(x, perm=[0, 2, 1, 3]) # batch_size, seq_len, num_heads, depth의 순서를 --> batch_size, num_heads, seq_len, depth로 바꿈

    def call(self, x, mask=None):
        batch_size = tf.shape(x)[0] # 입력 텐서 x의 첫 번째 차원(배치 크기)을 실행 중에 안전하게 가져오는 코드

        q = self.split_heads(self.wq(x), batch_size)
        k = self.split_heads(self.wk(x), batch_size)
        v = self.split_heads(self.wv(x), batch_size)

        scaled_attention, attention_weights = self.attention(q, k, v, mask)

        # Concatenate heads
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch, seq_len, num_heads, depth)
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.num_heads * self.depth))

        output = self.dense(concat_attention)  # (batch_size, seq_len, d_model)
        return output, attention_weights

In [None]:

# 하이퍼파라미터
batch_size = 2
seq_len = 5
d_model = 512
num_heads = 8

# 입력 예시
dummy_input = tf.random.uniform((batch_size, seq_len, d_model))

# 셀프 어텐션 레이어 실행
self_attention = MultiHeadSelfAttention(d_model=d_model, num_heads=num_heads)
output, attn_weights = self_attention(dummy_input)

print("출력 shape:", output.shape)            # (2, 5, 512)
print("어텐션 가중치 shape:", attn_weights.shape)  # (2, 8, 5, 5)

출력 shape: (2, 5, 512)
어텐션 가중치 shape: (2, 8, 5, 5)


In [None]:
"""
코드	설명
x.shape[0]     	정적(static) shape — 컴파일 시점에만 사용 가능
tf.shape(x)[0]	동적(dynamic) shape — 실행 중에도 항상 안전하게 사용 가능
"""

x = tf.random.uniform((32, 50, 512))  # batch_size=32
batch_size = tf.shape(x)[0]
print(batch_size)  # → tf.Tensor(32, shape=(), dtype=int32)

tf.Tensor(32, shape=(), dtype=int32)
