# 06-2 T5

<table align="left"><tr><td>
<a href="https://colab.research.google.com/github/rickiepark/hm-dl/blob/main/06-2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="코랩에서 실행하기"/></a>
</td></tr></table>

## T5의 상대 위치 임베딩

In [1]:
import numpy as np

In [2]:
int(5 * (1 + np.emath.logn(20/5, 6/5)))

5

In [3]:
int(5 * (1 + np.emath.logn(20/5, 7/5)))

6

In [4]:
int(5 * (1 + np.emath.logn(20/5, 8/5)))

6

In [5]:
# 상대 위치 임베딩을 위한
def relpos_bucket_index(query_key_len, num_buckets,
                        max_pos, bidirectional):
    query_pos = np.arange(query_key_len).reshape(-1, 1)
    key_pos = np.arange(query_key_len).reshape(1, -1)
    # 쿼리와 키의 상대 위치를 나타내는 (len, len) 크기의 배열을 만듭니다.
    # 주대각선 위는 양수이고 아래에는 음수입니다.
    rel_pos = (key_pos - query_pos)
    # 양방향 셀프 어텐션은 주대각선 위와 아래를 위해 버킷을 절반으로 나눕니다.
    if bidirectional:
        num_buckets //= 2
    # 마스크드 셀프 어텐션은 주대각선 위의 값을 모두 삭제합니다.
    else:
        rel_pos = np.minimum(rel_pos, 0)
    # 상대 거리를 모두 양수로 바꿉니다.
    rel_pos = np.abs(rel_pos)
    # 버킷의 절반부터는 로그 스케일로 인덱스를 할당합니다.
    start_log_index = num_buckets // 2
    # 로그 인덱스 부분을 표시한 행렬을 만듭니다.
    is_log_index = rel_pos >= start_log_index
    # 로그 인덱스를 생성합니다.
    base = max_pos/start_log_index
    value = rel_pos/start_log_index
    # 로그 계산 log_{base}(value)를 ln(value)/ln(base)로 바꿉니다.
    log_index = start_log_index * \
                (1 + np.log(value, where=(value!=0))/np.log(base))
    log_index = log_index.astype('int')
    # 로그 인덱스가 전체 버킷 개수를 넘어가면 마지막 버킷 인덱스를 사용합니다.
    log_index = np.minimum(log_index, num_buckets - 1)
    # start_log_index부터는 로그 인덱스를 사용하고,
    # 그 이전은 상대 위치를 버킷 인덱스로 사용합니다.
    rel_pos = np.where(is_log_index, log_index, rel_pos)
    # 양방향 셀프 어텐션일 경우 주대각선 위의 값은 버킷의 중간부터 사용합니다.
    if bidirectional:
        upper_indexes = np.triu_indices(query_key_len, 1)
        rel_pos[upper_indexes] += num_buckets
    return rel_pos

relpos_bucket_index(10, 20, 20, True)

array([[ 0, 11, 12, 13, 14, 15, 15, 16, 16, 17],
       [ 1,  0, 11, 12, 13, 14, 15, 15, 16, 16],
       [ 2,  1,  0, 11, 12, 13, 14, 15, 15, 16],
       [ 3,  2,  1,  0, 11, 12, 13, 14, 15, 15],
       [ 4,  3,  2,  1,  0, 11, 12, 13, 14, 15],
       [ 5,  4,  3,  2,  1,  0, 11, 12, 13, 14],
       [ 5,  5,  4,  3,  2,  1,  0, 11, 12, 13],
       [ 6,  5,  5,  4,  3,  2,  1,  0, 11, 12],
       [ 6,  6,  5,  5,  4,  3,  2,  1,  0, 11],
       [ 7,  6,  6,  5,  5,  4,  3,  2,  1,  0]])

## T5 구현하기

In [6]:
import keras
import keras_nlp
from keras import layers

In [7]:
def make_causal_mask(seq_len):
    n_hori = keras.ops.arange(seq_len)
    n_vert = keras.ops.expand_dims(n_hori, axis=-1)
    mask = n_vert >= n_hori
    return mask

def make_attention_mask(padding_mask):
    # padding_mask 크기가 (2, 5)라고 가정해 보죠.
    batch_size, seq_len = keras.ops.shape(padding_mask)
    # causal_mask 크기는 (5, 5)가 됩니다.
    causal_mask = make_causal_mask(seq_len)
    # 배치 차원을 추가해 (2, 5, 5)로 만듭니다.
    causal_mask = keras.ops.broadcast_to(causal_mask, (batch_size, seq_len, seq_len))
    # 브로드캐스팅을 위해 padding_mask 크기를 (2, 1, 5)로 만듭니다.
    padding_mask = keras.ops.expand_dims(padding_mask, axis=1)
    return keras.ops.minimum(causal_mask, padding_mask)

class AttentionMask(keras.Layer):
    def call(self, padding_mask):
        return make_attention_mask(padding_mask)

In [8]:
from keras_nlp.src.models.t5.t5_layer_norm import T5LayerNorm
from keras_nlp.src.models.t5.t5_multi_head_attention import T5MultiHeadAttention

def t5_encoder(x, position_bias,
               padding_mask, dropout, activation='relu'):
    residual = x
    x = T5LayerNorm()(x)
    # position_bias가 None이면 use_relative_attention_bias을 True로 지정합니다.
    if position_bias is None:
        use_relative_attention_bias = True
    else:
        use_relative_attention_bias = False
    # use_relative_attention_bias로 position_bias를 재사용할지 결정합니다.
    self_attention = T5MultiHeadAttention(
        is_decoder=False, hidden_dim=hidden_dim,
        key_value_dim=key_value_dim, num_heads=num_heads,
        use_relative_attention_bias=use_relative_attention_bias,
        dropout=dropout
        )
    padding_mask = keras.ops.expand_dims(padding_mask, axis=1)
    x, position_bias = self_attention(x, mask=padding_mask,
                                      position_bias=position_bias)
    x = layers.Dropout(dropout)(x)
    # 스킵 연결
    x = x + residual
    # 스킵 연결을 준비합니다.
    residual = x
    # 위치별 피드 포워드 네트워크
    x = T5LayerNorm()(x)
    x = layers.Dense(intermediate_dim, activation=activation, use_bias=False)(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Dense(hidden_dim, use_bias=False)(x)
    x = layers.Dropout(dropout)(x)
    # 스킵 연결
    x = x + residual
    # 다음 층에서 재사용할 수 있도록 어텐션 편향을 반환합니다.
    return x, position_bias

In [9]:
def t5_decoder(x, encoder_output, position_bias,
               padding_mask, encoder_padding_mask, dropout, activation='relu'):
    # 어텐션 마스크를 계산합니다.
    attention_mask = AttentionMask()(padding_mask)
    # 스킵 연결을 준비합니다.
    residual = x
    x = T5LayerNorm()(x)
    # position_bias가 None이면 use_relative_attention_bias을 True로 지정합니다.
    if position_bias is None:
        use_relative_attention_bias = True
    else:
        use_relative_attention_bias = False
    # use_relative_attention_bias로 position_bias를 재사용할지 결정합니다.
    self_attention = T5MultiHeadAttention(
        is_decoder=True, hidden_dim=hidden_dim,
        key_value_dim=key_value_dim, num_heads=num_heads,
        use_relative_attention_bias=use_relative_attention_bias,
        dropout=dropout
        )
    x, position_bias = self_attention(x, mask=attention_mask,
                                      position_bias=position_bias)
    x = layers.Dropout(dropout)(x)
    # 스킵 연결
    x = x + residual
    # 스킵 연결을 준비합니다.
    residual = x
    x = T5LayerNorm()(x)
    # 크로스 어텐션에는 상대 위치 임베딩을 적용하지 않으므로
    # use_relative_attention_bias를 False로 지정합니다.
    cross_attention = T5MultiHeadAttention(
        is_decoder=True, hidden_dim=hidden_dim,
        key_value_dim=key_value_dim, num_heads=num_heads,
        use_relative_attention_bias=False,
        dropout=dropout
        )
    encoder_padding_mask = keras.ops.expand_dims(encoder_padding_mask, axis=1)
    x, _ = cross_attention(x, key_value_states=encoder_output,
                           mask=encoder_padding_mask)
    x = layers.Dropout(dropout)(x)
    # 스킵 연결
    x = x + residual
    # 스킵 연결을 준비합니다.
    residual = x
    # 위치별 피드 포워드 네트워크
    x = T5LayerNorm()(x)
    x = layers.Dense(intermediate_dim, activation=activation, use_bias=False)(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Dense(hidden_dim, use_bias=False)(x)
    x = layers.Dropout(dropout)(x)
    # 스킵 연결
    x = x + residual
    # 다음 층에서 재사용할 수 있도록 어텐션 편향을 반환합니다.
    return x, position_bias

In [10]:
# T5
vocab_size = 32128
num_layers = 6
num_heads = 8
key_value_dim = 64
hidden_dim = 512
intermediate_dim = 2048
dropout = 0.1
activation = 'relu'

encoder_token_ids = keras.Input(shape=(None,))
encoder_padding_mask = keras.Input(shape=(None,))
decoder_token_ids = keras.Input(shape=(None,))
decoder_padding_mask = keras.Input(shape=(None,))

token_embedding_layer = keras_nlp.layers.ReversibleEmbedding(vocab_size, hidden_dim)
encoder_token_embedding = token_embedding_layer(encoder_token_ids)
x = layers.Dropout(dropout)(encoder_token_embedding)

# 어텐션 편향 배열을 초기화합니다.
position_bias = None
for i in range(num_layers):
    # 첫 번째 층에서만 어텐션 편향을 계산하고 다른 층은 맨 처음 계산하여 구한 값을 재사용합니다.
    x, position_bias = t5_encoder(
        x, position_bias=position_bias,
        padding_mask=encoder_padding_mask, dropout=dropout)
x = T5LayerNorm()(x)
x = layers.Dropout(dropout)(x)
encoder_output = x

decoder_token_embedding = token_embedding_layer(decoder_token_ids)
x = layers.Dropout(dropout)(decoder_token_embedding)

# 어텐션 편향 배열을 초기화합니다.
position_bias = None
for i in range(num_layers):
    # 첫 번째 층에서만 어텐션 편향을 계산하고 다른 층은 맨 처음 계산하여 구한 값을 재사용합니다.
    x, position_bias = t5_decoder(
        x, encoder_output=encoder_output,
        position_bias=position_bias,
        padding_mask=decoder_padding_mask,
        encoder_padding_mask=encoder_padding_mask, dropout=dropout)
x = T5LayerNorm()(x)
x = layers.Dropout(dropout)(x)
decoder_output = token_embedding_layer(x, reverse=True)

model = keras.Model(inputs=(encoder_token_ids, encoder_padding_mask,
                            decoder_token_ids, decoder_padding_mask),
                    outputs=(encoder_output, decoder_output))
model.summary()

## 사전 훈련된 T5 사용하기

In [11]:
from transformers import pipeline, set_seed

t5_pipe = pipeline("summarization", model="google-t5/t5-small")

Device set to use cuda:0


In [12]:
total_params = sum(p.numel() for p in t5_pipe.model.parameters())
print(total_params)

60506624


In [13]:
ENG_TEXT = """
Voyager 1 is a space probe launched by NASA on September 5, 1977, as part of the Voyager program to study the outer Solar System and the interstellar space beyond the Sun's heliosphere. It was launched 16 days after its twin, Voyager 2. It communicates through the NASA Deep Space Network (DSN) to receive routine commands and to transmit data to Earth. Real-time distance and velocity data are provided by NASA and JPL. At a distance of 162.7 AU (24.3 billion km; 15.1 billion mi) from Earth as of May 2024, it is the most distant humanmade object from Earth.
"""

In [14]:
set_seed(42)
t5_pipe(ENG_TEXT, max_length=70, do_sample=True, top_k=10, temperature=3.0)

[{'summary_text': 'Voyager was launched 16 days after its twin . it communicates through the NASA Deep Space Network . It is a distance of 162.7 AU (24.3 billion km; 15.1 billion mi) from Earth as of May 2024 .'}]

In [15]:
KOR_TEXT = """
2023-2024년 쉰드흐누퀴르 분화는 2023년 12월 18일 저녁 아이슬란드 그린다비크에 있는 쉰드흐누퀴르 분화구에서 화산 폭발이 발생해 지상에 있는 열극에서 용암이 분출한 사건이다. 용암 분출과 뒤따른 지진 활동 빈도는 다음 날인 2023년 12월 19일부터 감소했으나 새로 열린 열극의 양쪽에서 용암이 옆으로 넓게 퍼져나갔다. 이번 분화는 2021년 분화 시작 이래 쉬뒤르네스에서 일어난 가장 큰 분화로 최대 100 m 높이의 용암 분수가 관측되었으며 분화지에서 약 42 km 떨어진 아이슬란드의 수도 레이캬비크에서도 화산 분화 장면을 볼 수 있었다. 화산 분화는 2023년 12월 21일 화산 상공 관측 결과 더 이상의 용암 분출이 보이지 않아 종료되었으나 아이슬란드 기상청은 "분화 종식을 선언하기에는 너무 이르다"며 지속적으로 관측하겠다고 말했다. 쉰드흐누퀴르는 현재 화산지대이자 쉬뒤르네스 열곡대의 활성 열극에 속한다.
"""

In [16]:
t5_ko_pipe = pipeline("summarization", model="csebuetnlp/mT5_multilingual_XLSum")
set_seed(42)
t5_ko_pipe(KOR_TEXT, max_length=70)

config.json:   0%|          | 0.00/730 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/2.33G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/375 [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/4.31M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/2.33G [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/65.0 [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565
Device set to use cuda:0


[{'summary_text': '아이슬란드에서 화산 폭발로 일어난 최대 용암 분화가 사실상 종식됐다.'}]

## 심화 예제 솔루션

In [17]:
def t5_1_1_encoder(x, position_bias,
                   padding_mask, dropout):
    residual = x
    x = T5LayerNorm()(x)
    # position_bias가 None이면 use_relative_attention_bias을 True로 지정합니다.
    if position_bias is None:
        use_relative_attention_bias = True
    else:
        use_relative_attention_bias = False
    # use_relative_attention_bias로 position_bias를 재사용할지 결정합니다.
    self_attention = T5MultiHeadAttention(
        is_decoder=False, hidden_dim=hidden_dim,
        key_value_dim=key_value_dim, num_heads=num_heads,
        use_relative_attention_bias=use_relative_attention_bias,
        dropout=dropout
        )
    padding_mask = keras.ops.expand_dims(padding_mask, axis=1)
    x, position_bias = self_attention(x, mask=padding_mask,
                                      position_bias=position_bias)
    x = layers.Dropout(dropout)(x)
    # 스킵 연결
    x = x + residual
    # 스킵 연결을 준비합니다.
    residual = x
    # 위치별 피드 포워드 네트워크
    x = T5LayerNorm()(x)
    x1 = layers.Dense(intermediate_dim, activation='gelu', use_bias=False)(x)
    x2 = layers.Dense(intermediate_dim, use_bias=False)(x)
    x = x1 * x2
    x = layers.Dropout(dropout)(x)
    x = layers.Dense(hidden_dim, use_bias=False)(x)
    x = layers.Dropout(dropout)(x)
    # 스킵 연결
    x = x + residual
    # 다음 층에서 재사용할 수 있도록 어텐션 편향을 반환합니다.
    return x, position_bias

In [18]:
def t5_1_1_decoder(x, encoder_output, position_bias,
                   padding_mask, encoder_padding_mask, dropout):
    # 어텐션 마스크를 계산합니다.
    attention_mask = AttentionMask()(padding_mask)
    # 스킵 연결을 준비합니다.
    residual = x
    x = T5LayerNorm()(x)
    # position_bias가 None이면 use_relative_attention_bias을 True로 지정합니다.
    if position_bias is None:
        use_relative_attention_bias = True
    else:
        use_relative_attention_bias = False
    # use_relative_attention_bias로 position_bias를 재사용할지 결정합니다.
    self_attention = T5MultiHeadAttention(
        is_decoder=True, hidden_dim=hidden_dim,
        key_value_dim=key_value_dim, num_heads=num_heads,
        use_relative_attention_bias=use_relative_attention_bias,
        dropout=dropout
        )
    x, position_bias = self_attention(x, mask=attention_mask,
                                      position_bias=position_bias)
    x = layers.Dropout(dropout)(x)
    # 스킵 연결
    x = x + residual
    # 스킵 연결을 준비합니다.
    residual = x
    x = T5LayerNorm()(x)
    # 크로스 어텐션에는 상대 위치 임베딩을 적용하지 않으므로
    # use_relative_attention_bias를 False로 지정합니다.
    cross_attention = T5MultiHeadAttention(
        is_decoder=True, hidden_dim=hidden_dim,
        key_value_dim=key_value_dim, num_heads=num_heads,
        use_relative_attention_bias=False,
        dropout=dropout
        )
    encoder_padding_mask = keras.ops.expand_dims(encoder_padding_mask, axis=1)
    x, _ = cross_attention(x, key_value_states=encoder_output,
                           mask=encoder_padding_mask)
    x = layers.Dropout(dropout)(x)
    # 스킵 연결
    x = x + residual
    # 스킵 연결을 준비합니다.
    residual = x
    # 위치별 피드 포워드 네트워크
    x = T5LayerNorm()(x)
    x1 = layers.Dense(intermediate_dim, activation='gelu', use_bias=False)(x)
    x2 = layers.Dense(intermediate_dim, use_bias=False)(x)
    x = x1 * x2
    x = layers.Dropout(dropout)(x)
    x = layers.Dense(hidden_dim, use_bias=False)(x)
    x = layers.Dropout(dropout)(x)
    # 스킵 연결
    x = x + residual
    # 다음 층에서 재사용할 수 있도록 어텐션 편향을 반환합니다.
    return x, position_bias

In [19]:
# T5 1.1
vocab_size = 32128
num_layers = 8
num_heads = 6
key_value_dim = 64
hidden_dim = 512
intermediate_dim = 1024
dropout = 0.1

encoder_token_ids = keras.Input(shape=(None,))
encoder_padding_mask = keras.Input(shape=(None,))
decoder_token_ids = keras.Input(shape=(None,))
decoder_padding_mask = keras.Input(shape=(None,))

token_embedding_layer = keras_nlp.layers.ReversibleEmbedding(
    vocab_size, hidden_dim,
    tie_weights=False)
encoder_token_embedding = token_embedding_layer(encoder_token_ids)
x = layers.Dropout(dropout)(encoder_token_embedding)

# 어텐션 편향 배열을 초기화합니다.
position_bias = None
for i in range(num_layers):
    # 첫 번째 층에서만 어텐션 편향을 계산하고 다른 층은 맨 처음 계산하여 구한 값을 재사용합니다.
    x, position_bias = t5_1_1_encoder(
        x, position_bias=position_bias,
        padding_mask=encoder_padding_mask, dropout=dropout)
x = T5LayerNorm()(x)
x = layers.Dropout(dropout)(x)
encoder_output = x

decoder_token_embedding = token_embedding_layer(decoder_token_ids)
x = layers.Dropout(dropout)(decoder_token_embedding)

# 어텐션 편향 배열을 초기화합니다.
position_bias = None
for i in range(num_layers):
    # 첫 번째 층에서만 어텐션 편향을 계산하고 다른 층은 맨 처음 계산하여 구한 값을 재사용합니다.
    x, position_bias = t5_1_1_decoder(
        x, encoder_output=encoder_output,
        position_bias=position_bias,
        padding_mask=decoder_padding_mask,
        encoder_padding_mask=encoder_padding_mask, dropout=dropout)
x = T5LayerNorm()(x)
x = layers.Dropout(dropout)(x)
decoder_output = token_embedding_layer(x, reverse=True)

model = keras.Model(inputs=(encoder_token_ids, encoder_padding_mask,
                            decoder_token_ids, decoder_padding_mask),
                    outputs=(encoder_output, decoder_output))
model.summary()