# 6교시 2. 트랜스포머의 기초

입력 데이터 준비

In [2]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Embedding, GlobalAveragePooling1D, LayerNormalization, Dropout, Add, Input
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# 깃허브에 준비된 데이터를 가져옵니다.
!git clone https://github.com/taehojo/data.git

# CSV 파일 로드
dataframe = pd.read_csv('./data/sentiment_data.csv')

# 데이터와 라벨 추출
sentences = dataframe['sentence'].tolist()
labels = dataframe['label'].tolist()

# 임베딩 벡터 크기와 최대 문장 길이 설정
embedding_dim = 128
max_len = 10

# 토크나이저 초기화 및 텍스트를 시퀀스로 변환
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(sentences)
sequences = tokenizer.texts_to_sequences(sentences)
word_index = tokenizer.word_index

# 패딩을 사용하여 시퀀스 길이를 동일하게 맞춤
data = tf.keras.preprocessing.sequence.pad_sequences(sequences, maxlen=max_len, padding='post')

# 데이터셋을 훈련 세트와 검증 세트로 분리
X_train, X_val, y_train, y_val = train_test_split(data, labels, test_size=0.2, random_state=42)


fatal: destination path 'data' already exists and is not an empty directory.


포지셔널 인코딩

<img src="https://github.com/taehojo/fastcampus_ai/blob/master/data/img/06-05.png?raw=1" width="600"/> <img src="https://github.com/taehojo/fastcampus_ai/blob/master/data/img/06-09.png?raw=1" width="200"/>

In [3]:
# 포지셔널 인코딩 함수

def get_positional_encoding(max_len, d_model):
    pos_enc = np.zeros((max_len, d_model)) # 포지셔널 인코딩 배열 초기화. max_len: 최대 시퀀스 길이, d_model:임베딩 벡터의 차원

    # 시퀀스의 각 위치에 대해 포지셔널 인코딩 값 계산
    for pos in range(max_len):
        for i in range(0, d_model, 2):
            pos_enc[pos, i] = np.sin(pos / (10000 ** (2 * i / d_model))) # 짝수 인덱스
            if i + 1 < d_model:
                pos_enc[pos, i + 1] = np.cos(pos / (10000 ** (2 * (i + 1) / d_model))) # 홀수 인덱스

# 포지셔널 인코딩 생성
positional_encoding = get_positional_encoding(max_len, embedding_dim)


정규화, 잔차 연결

<img src="https://github.com/taehojo/fastcampus_ai/blob/master/data/img/06-06.png?raw=1" width="500"/><img src="https://github.com/taehojo/fastcampus_ai/blob/master/data/img/06-07.png?raw=1" width="400"/>

In [4]:
# 멀티헤드 어텐션 레이어

class MultiHeadSelfAttentionLayer(tf.keras.layers.Layer):
    def __init__(self, num_heads, key_dim):
        super(MultiHeadSelfAttentionLayer, self).__init__() # 레이어가 생성될 때 한 번 실행.
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=key_dim) # 멀티헤드 어텐션 레이어 생성. num_heads: 어텐션 헤드의 수, key_dim은 각 헤드의 차원 수
        self.norm = LayerNormalization() #레이어 정규화

    def call(self, x):
        attn_output = self.mha(query=x, value=x, key=x) # 멀티헤드 어텐션 적용
        attn_output = self.norm(attn_output + x) # 잔차 연결 적용
        return attn_output


## 전체 코드

<img src="https://github.com/taehojo/fastcampus_ai/blob/master/data/img/06-08.png?raw=1" width="250"/>

In [6]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Embedding, GlobalAveragePooling1D, LayerNormalization, Dropout, Add, Input
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# CSV 파일 로드
dataframe = pd.read_csv('./data/sentiment_data.csv')

# 데이터와 라벨 추출
sentences = dataframe['sentence'].tolist()
labels = dataframe['label'].tolist()

# 임베딩 벡터 크기와 최대 문장 길이 설정
embedding_dim = 128
max_len = 10

# 토크나이저 초기화 및 텍스트를 시퀀스로 변환
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(sentences)
sequences = tokenizer.texts_to_sequences(sentences)
word_index = tokenizer.word_index

# 패딩을 사용하여 시퀀스 길이를 동일하게 맞춤
data = tf.keras.preprocessing.sequence.pad_sequences(sequences, maxlen=max_len, padding='post')

# 데이터셋을 훈련 세트와 검증 세트로 분리
X_train, X_val, y_train, y_val = train_test_split(data, labels, test_size=0.2, random_state=42)

# 포지셔널 인코딩 함수
def get_positional_encoding(max_len, d_model):
    pos_enc = np.zeros((max_len, d_model)) # 포지셔널 인코딩 배열 초기화. max_len: 최대 시퀀스 길이, d_model:임베딩 벡터의 차원

    # 시퀀스의 각 위치에 대해 포지셔널 인코딩 값 계산
    for pos in range(max_len):
        for i in range(0, d_model, 2):
            pos_enc[pos, i] = np.sin(pos / (10000 ** (2 * i / d_model))) # 짝수 인덱스
            if i + 1 < d_model:
                pos_enc[pos, i + 1] = np.cos(pos / (10000 ** (2 * (i + 1) / d_model))) # 홀수 인덱스
    return pos_enc  # 포지셔널 인코딩 값을 반환
# 포지셔널 인코딩 생성
positional_encoding = get_positional_encoding(max_len, embedding_dim)

# 멀티헤드 어텐션 레이어
class MultiHeadSelfAttentionLayer(tf.keras.layers.Layer):
    def __init__(self, num_heads, key_dim):
        super(MultiHeadSelfAttentionLayer, self).__init__() # 레이어가 생성될 때 한 번 실행.
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=key_dim) # 멀티헤드 어텐션 레이어 생성. num_heads: 어텐션 헤드의 수, key_dim은 각 헤드의 차원 수
        self.norm = LayerNormalization() #레이어 정규화

    def call(self, x):
        attn_output = self.mha(query=x, value=x, key=x) # 멀티헤드 어텐션 적용
        attn_output = self.norm(attn_output + x) # 잔차 연결 적용 후 레이어 정규화
        return attn_output

# 모델 설정
inputs = Input(shape=(max_len,))

# 1. 임베딩 레이어: 텍스트 데이터를 임베딩 벡터로 변환합니다.
embedding_layer = Embedding(input_dim=len(word_index) + 1, output_dim=embedding_dim, input_length=max_len)
embedded_sequences = embedding_layer(inputs)

# 2. 포지셔널 인코딩 추가
embedded_sequences_with_positional_encoding = embedded_sequences + positional_encoding

# 3. 멀티헤드 어텐션 레이어 추가
attention_layer = MultiHeadSelfAttentionLayer(num_heads=8, key_dim=embedding_dim)
attention_output = attention_layer(embedded_sequences_with_positional_encoding)

# 4. 잔차 연결
attention_output_with_residual = Add()([embedded_sequences_with_positional_encoding, attention_output])

# 5. GlobalAveragePooling1D 레이어 추가 (시퀀스의 전체 정보를 요약하여 다음 레이어로 전달)
pooled_output = GlobalAveragePooling1D()(attention_output_with_residual)

# 6. 피드 포워드 신경망
dense_layer = Dense(128, activation='relu')(pooled_output)
dropout_layer = Dropout(0.5)(dense_layer)
output_layer = Dense(1, activation='sigmoid')(dropout_layer)

# 모델 생성
model = Model(inputs=inputs, outputs=output_layer)

# 모델 컴파일
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# 모델 학습
history = model.fit(X_train, np.array(y_train), epochs=10, batch_size=16, validation_data=(X_val, np.array(y_val)))

# 샘플 데이터 예측
sample_texts = ["I absolutely love this!", "I can't stand this product"]
sample_sequences = tokenizer.texts_to_sequences(sample_texts)
sample_data = tf.keras.preprocessing.sequence.pad_sequences(sample_sequences, maxlen=max_len, padding='post')
predictions = model.predict(sample_data)

for i, text in enumerate(sample_texts):
    print(f"Text: {text}")
    print(f"Prediction: {'Positive' if predictions[i] > 0.5 else 'Negative'}")


Epoch 1/10




[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 11ms/step - accuracy: 0.5021 - loss: 0.9174 - val_accuracy: 0.9175 - val_loss: 0.3087
Epoch 2/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 13ms/step - accuracy: 0.9781 - loss: 0.1498 - val_accuracy: 1.0000 - val_loss: 0.0116
Epoch 3/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 14ms/step - accuracy: 0.9907 - loss: 0.0580 - val_accuracy: 1.0000 - val_loss: 0.0011
Epoch 4/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 13ms/step - accuracy: 0.9946 - loss: 0.0335 - val_accuracy: 0.9925 - val_loss: 0.0212
Epoch 5/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 15ms/step - accuracy: 0.9932 - loss: 0.0363 - val_accuracy: 1.0000 - val_loss: 0.0026
Epoch 6/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 14ms/step - accuracy: 0.9990 - loss: 0.0073 - val_accuracy: 0.9975 - val_loss: 0.0034
Epoch 7/10
[1m100/100[0m [32m━