# 24장 트랜스포머 직접 만들어 보기

## 트랜스포머 인코더를 활용한 긍정 부정 예측


[<img src="https://raw.githubusercontent.com/taehojo/taehojo.github.io/master/assets/images/linktocolab.png" align="left"/> ](https://colab.research.google.com/github/taehojo/deeplearning_4th/blob/master/colab/ch24-colab.ipynb)

In [7]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Embedding, GlobalAveragePooling1D, LayerNormalization, Dropout, Add, Input
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# 깃허브에 준비된 데이터를 가져옵니다.
!git clone https://github.com/taehojo/data.git

# CSV 파일 로드
dataframe = pd.read_csv('./data/sentiment_data.csv')

# 데이터와 라벨 추출
sentences = dataframe['sentence'].tolist()
labels = dataframe['label'].tolist()

# 임베딩 벡터 크기와 최대 문장 길이 설정
embedding_dim = 128
max_len = 10

# 토크나이저 초기화 및 텍스트를 시퀀스로 변환
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(sentences)
sequences = tokenizer.texts_to_sequences(sentences)
word_index = tokenizer.word_index

# 패딩을 사용하여 시퀀스 길이를 동일하게 맞춤
data = tf.keras.preprocessing.sequence.pad_sequences(sequences, maxlen=max_len, padding='post')

# 데이터셋을 훈련 세트와 검증 세트로 분리
X_train, X_val, y_train, y_val = train_test_split(data, labels, test_size=0.2, random_state=42)

# 포지셔널 인코딩 함수
def get_positional_encoding(max_len, d_model):
    pos_enc = np.zeros((max_len, d_model))
    for pos in range(max_len):
        for i in range(0, d_model, 2):
            pos_enc[pos, i] = np.sin(pos / (10000 ** (2 * i / d_model)))
            if i + 1 < d_model:
                pos_enc[pos, i + 1] = np.cos(pos / (10000 ** (2 * (i + 1) / d_model)))
    return pos_enc

# 포지셔널 인코딩 생성
positional_encoding = get_positional_encoding(max_len, embedding_dim)

# 멀티헤드 어텐션 레이어 (마스크 미사용)
class MultiHeadSelfAttentionLayer(tf.keras.layers.Layer):
    def __init__(self, num_heads, key_dim):
        super(MultiHeadSelfAttentionLayer, self).__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)
        self.norm = LayerNormalization()

    def call(self, x):
        attn_output = self.mha(query=x, value=x, key=x)
        attn_output = self.norm(attn_output + x)
        return attn_output

# 모델 설정
inputs = Input(shape=(max_len,))

# 1. 임베딩 레이어
embedding_layer = Embedding(input_dim=len(word_index) + 1, output_dim=embedding_dim)
embedded_sequences = embedding_layer(inputs)

# 2. 포지셔널 인코딩 추가
embedded_sequences_with_positional_encoding = embedded_sequences + positional_encoding

# 3. 첫 번째 멀티헤드 어텐션 (마스크 없음)
attention_layer = MultiHeadSelfAttentionLayer(num_heads=8, key_dim=embedding_dim)
attention_output = attention_layer(embedded_sequences_with_positional_encoding)

# 4. 잔차 연결
attention_output_with_residual = Add()([embedded_sequences_with_positional_encoding, attention_output])

# 5. GlobalAveragePooling1D
pooled_output = GlobalAveragePooling1D()(attention_output_with_residual)

# 6. 피드 포워드 네트워크
dense_layer = Dense(128, activation='relu')(pooled_output)
dropout_layer = Dropout(0.5)(dense_layer)
output_layer = Dense(1, activation='sigmoid')(dropout_layer)

model = Model(inputs=inputs, outputs=output_layer)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.fit(X_train, np.array(y_train), epochs=10, batch_size=16, validation_data=(X_val, np.array(y_val)))

# 새 문장 예측
sample_texts = ["I absolutely love this!", "I can't stand this product"]
sample_sequences = tokenizer.texts_to_sequences(sample_texts)
sample_data = tf.keras.preprocessing.sequence.pad_sequences(sample_sequences, maxlen=max_len, padding='post')
predictions = model.predict(sample_data)

for i, text in enumerate(sample_texts):
    print(f"Text: {text}")
    print(f"Prediction: {'Positive' if predictions[i] > 0.5 else 'Negative'}")


Epoch 1/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - accuracy: 0.5680 - loss: 0.8149 - val_accuracy: 0.9975 - val_loss: 0.0172
Epoch 2/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 17ms/step - accuracy: 0.9898 - loss: 0.0427 - val_accuracy: 0.9975 - val_loss: 0.0219
Epoch 3/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 18ms/step - accuracy: 0.9894 - loss: 0.0393 - val_accuracy: 0.9975 - val_loss: 0.0181
Epoch 4/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 17ms/step - accuracy: 0.9961 - loss: 0.0160 - val_accuracy: 1.0000 - val_loss: 0.0027
Epoch 5/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 18ms/step - accuracy: 0.9990 - loss: 0.0089 - val_accuracy: 1.0000 - val_loss: 0.0013
Epoch 6/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 20ms/step - accuracy: 0.9964 - loss: 0.0082 - val_accuracy: 0.9850 - val_loss: 0.0342
Epoch 7/10
[1m100/100

## 전체 트랜스 포머 실행하기

In [3]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Embedding, GlobalAveragePooling1D, LayerNormalization, Dropout, Add, Input
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# 깃허브에 준비된 데이터를 가져옵니다.
!git clone https://github.com/taehojo/data.git

# CSV 파일 로드
dataframe = pd.read_csv('./data/sentiment_data.csv')

# 데이터와 라벨 추출
sentences = dataframe['sentence'].tolist()
labels = dataframe['label'].tolist()

# 임베딩 벡터 크기와 최대 문장 길이 설정
embedding_dim = 128
max_len = 10

# 토크나이저 초기화 및 텍스트를 시퀀스로 변환
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(sentences)
sequences = tokenizer.texts_to_sequences(sentences)
word_index = tokenizer.word_index

# 패딩을 사용하여 시퀀스 길이를 동일하게 맞춤
data = tf.keras.preprocessing.sequence.pad_sequences(sequences, maxlen=max_len, padding='post')

# 데이터셋을 훈련 세트와 검증 세트로 분리
X_train, X_val, y_train, y_val = train_test_split(data, labels, test_size=0.2, random_state=42)

# 포지셔널 인코딩 함수
def get_positional_encoding(max_len, d_model):
    pos_enc = np.zeros((max_len, d_model))
    for pos in range(max_len):
        for i in range(0, d_model, 2):
            pos_enc[pos, i] = np.sin(pos / (10000 ** (2 * i / d_model)))
            if i + 1 < d_model:
                pos_enc[pos, i + 1] = np.cos(pos / (10000 ** (2 * (i + 1) / d_model)))
    return pos_enc

# 포지셔널 인코딩 생성
positional_encoding = get_positional_encoding(max_len, embedding_dim)

# 멀티헤드 어텐션 레이어(마스크 지원)
class MultiHeadSelfAttentionLayer(tf.keras.layers.Layer):
    def __init__(self, num_heads, key_dim, masked=False):
        super(MultiHeadSelfAttentionLayer, self).__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)
        self.norm = LayerNormalization()
        self.masked = masked

    def call(self, x):
        if self.masked:
            batch_size = tf.shape(x)[0]
            seq_len = tf.shape(x)[1]
            mask = tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
            mask = tf.reshape(mask, (1, 1, seq_len, seq_len))
            mask = tf.tile(mask, [batch_size, 1, 1, 1])
            mask = (1-mask) * -1e9
            attn_output = self.mha(query=x, value=x, key=x, attention_mask=mask)
        else:
            attn_output = self.mha(query=x, value=x, key=x)
        attn_output = self.norm(attn_output + x)
        return attn_output

# 모델 설정
inputs = Input(shape=(max_len,))

# 1. 임베딩 레이어
embedding_layer = Embedding(input_dim=len(word_index) + 1, output_dim=embedding_dim)
embedded_sequences = embedding_layer(inputs)

# 2. 포지셔널 인코딩 추가
embedded_sequences_with_positional_encoding = embedded_sequences + positional_encoding

# 3. 첫 번째 멀티헤드 어텐션 (마스크 없음)
attention_layer = MultiHeadSelfAttentionLayer(num_heads=8, key_dim=embedding_dim)
attention_output = attention_layer(embedded_sequences_with_positional_encoding)

# 4. 잔차 연결
attention_output_with_residual = Add()([embedded_sequences_with_positional_encoding, attention_output])

# 5. 마스크드 멀티헤드 어텐션 (마스크 있음)
masked_attention_layer = MultiHeadSelfAttentionLayer(num_heads=8, key_dim=embedding_dim, masked=True)
masked_attention_output = masked_attention_layer(attention_output_with_residual)

# 6. 잔차 연결
masked_attention_output_with_residual = Add()([attention_output_with_residual, masked_attention_output])

# 7. GlobalAveragePooling1D
pooled_output = GlobalAveragePooling1D()(masked_attention_output_with_residual)

# 8. 피드 포워드 네트워크
dense_layer = Dense(128, activation='relu')(pooled_output)
dropout_layer = Dropout(0.5)(dense_layer)
output_layer = Dense(1, activation='sigmoid')(dropout_layer)

model = Model(inputs=inputs, outputs=output_layer)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.fit(X_train, np.array(y_train), epochs=10, batch_size=16, validation_data=(X_val, np.array(y_val)))

# 새 문장 예측
sample_texts = ["I absolutely love this!", "I can't stand this product"]
sample_sequences = tokenizer.texts_to_sequences(sample_texts)
sample_data = tf.keras.preprocessing.sequence.pad_sequences(sample_sequences, maxlen=max_len, padding='post')
predictions = model.predict(sample_data)

for i, text in enumerate(sample_texts):
    print(f"Text: {text}")
    print(f"Prediction: {'Positive' if predictions[i] > 0.5 else 'Negative'}")

# 여기서 seq_len=4, batch_size=2로 예를 들어 마스크 행렬을 생성하고 출력해보는 예제
seq_len_example = 4
batch_size_example = 2
mask_example = tf.linalg.band_part(tf.ones((seq_len_example, seq_len_example)), -1, 0)
mask_example = tf.reshape(mask_example, (1, 1, seq_len_example, seq_len_example))
mask_example = tf.tile(mask_example, [batch_size_example, 1, 1, 1])

print("\n예시 마스크 행렬 :")
print(mask_example.numpy()[0, 0, :, :])

mask_example = (1-mask_example) * -1e9
print("\n-∞로 치환한 마스크 행렬:")
print(mask_example.numpy()[0, 0, :, :])


Epoch 1/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 37ms/step - accuracy: 0.5022 - loss: 1.1726 - val_accuracy: 0.9900 - val_loss: 0.0442
Epoch 2/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 34ms/step - accuracy: 0.9829 - loss: 0.0847 - val_accuracy: 0.9975 - val_loss: 0.0199
Epoch 3/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 33ms/step - accuracy: 0.9907 - loss: 0.0472 - val_accuracy: 0.9975 - val_loss: 0.0131
Epoch 4/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 29ms/step - accuracy: 0.9966 - loss: 0.0219 - val_accuracy: 0.9975 - val_loss: 0.0196
Epoch 5/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 33ms/step - accuracy: 0.9955 - loss: 0.0380 - val_accuracy: 0.9975 - val_loss: 0.0176
Epoch 6/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 33ms/step - accuracy: 0.9965 - loss: 0.0227 - val_accuracy: 0.9975 - val_loss: 0.0179
Epoch 7/10
[1m100/100