# 보충 마스크드 어텐션 전후 코드 비교

In [1]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Embedding, GlobalAveragePooling1D, LayerNormalization, Dropout, Add, Input
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# CSV 파일 로드
dataframe = pd.read_csv('sentiment_data.csv')

# 데이터와 라벨 추출
sentences = dataframe['sentence'].tolist()
labels = dataframe['label'].tolist()

# 임베딩 벡터 크기와 최대 문장 길이 설정
embedding_dim = 128
max_len = 10

# 토크나이저 초기화 및 텍스트를 시퀀스로 변환
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(sentences)
sequences = tokenizer.texts_to_sequences(sentences)
word_index = tokenizer.word_index

# 패딩을 사용하여 시퀀스 길이를 동일하게 맞춤
data = tf.keras.preprocessing.sequence.pad_sequences(sequences, maxlen=max_len, padding='post')

# 데이터셋을 훈련 세트와 검증 세트로 분리
X_train, X_val, y_train, y_val = train_test_split(data, labels, test_size=0.2, random_state=42)

# 포지셔널 인코딩 함수
def get_positional_encoding(max_len, d_model):
    pos_enc = np.zeros((max_len, d_model))
    for pos in range(max_len):
        for i in range(0, d_model, 2):
            pos_enc[pos, i] = np.sin(pos / (10000 ** (2 * i / d_model)))
            if i + 1 < d_model:
                pos_enc[pos, i + 1] = np.cos(pos / (10000 ** (2 * (i + 1) / d_model)))
    return pos_enc

# 포지셔널 인코딩 생성
positional_encoding = get_positional_encoding(max_len, embedding_dim)

# 사용자 정의 레이어: MultiHeadAttention을 포함한 레이어 정의
class MultiHeadSelfAttentionLayer(tf.keras.layers.Layer):
    def __init__(self, num_heads, key_dim):
        super(MultiHeadSelfAttentionLayer, self).__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)
        self.norm = LayerNormalization()

    def call(self, x):
        attn_output = self.mha(query=x, value=x, key=x)
        attn_output = self.norm(attn_output + x)
        return attn_output

# 모델 생성 함수
def create_model():
    inputs = Input(shape=(max_len,))
    embedding_layer = Embedding(input_dim=len(word_index) + 1, output_dim=embedding_dim, input_length=max_len)
    embedded_sequences = embedding_layer(inputs)
    embedded_sequences_with_positional_encoding = embedded_sequences + positional_encoding
    attention_layer = MultiHeadSelfAttentionLayer(num_heads=8, key_dim=embedding_dim)
    attention_output = attention_layer(embedded_sequences_with_positional_encoding)
    attention_output_with_residual = Add()([embedded_sequences_with_positional_encoding, attention_output])
    pooled_output = GlobalAveragePooling1D()(attention_output_with_residual)
    dense_layer = Dense(128, activation='relu')(pooled_output)
    dropout_layer = Dropout(0.5)(dense_layer)
    output_layer = Dense(1, activation='sigmoid')(dropout_layer)
    model = Model(inputs=inputs, outputs=output_layer)
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    return model

# 샘플 양을 100%, 20%, 10%, 5%, 1%씩 줄였을 때의 정확도 추적
accuracies = []
sample_sizes = [1.0, 0.2, 0.1, 0.05, 0.01]

for sample_size in sample_sizes:
    # 데이터셋의 일부를 샘플링
    sample_indices = np.random.choice(len(X_train), int(len(X_train) * sample_size), replace=False)
    X_train_sample = X_train[sample_indices]
    y_train_sample = np.array(y_train)[sample_indices]

    # 모델 생성 및 컴파일
    model_sample = create_model()
    
    # 모델 학습
    history_sample = model_sample.fit(X_train_sample, y_train_sample, epochs=10, batch_size=16, validation_data=(X_val, np.array(y_val)), verbose=0)

    # 샘플링된 데이터셋에서의 정확도 기록
    accuracies.append(max(history_sample.history['val_accuracy']))

# 정확도 출력
for i, sample_size in enumerate(sample_sizes):
    print(f"Sample Size: {int(sample_size * 100)}% - Validation Accuracy: {accuracies[i]}")


Sample Size: 100% - Validation Accuracy: 0.9975000023841858
Sample Size: 20% - Validation Accuracy: 0.9975000023841858
Sample Size: 10% - Validation Accuracy: 0.9775000214576721
Sample Size: 5% - Validation Accuracy: 0.824999988079071
Sample Size: 1% - Validation Accuracy: 0.5174999833106995


In [2]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Embedding, GlobalAveragePooling1D, LayerNormalization, Dropout, Add, Input, Lambda
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

# CSV 파일 로드
dataframe = pd.read_csv('sentiment_data.csv')

# 데이터와 라벨 추출
sentences = dataframe['sentence'].tolist()
labels = dataframe['label'].tolist()

# 임베딩 벡터 크기와 최대 문장 길이 설정
embedding_dim = 128
max_len = 10

# 토크나이저 초기화 및 텍스트를 시퀀스로 변환
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(sentences)
sequences = tokenizer.texts_to_sequences(sentences)
word_index = tokenizer.word_index

# 패딩을 사용하여 시퀀스 길이를 동일하게 맞춤
data = tf.keras.preprocessing.sequence.pad_sequences(sequences, maxlen=max_len, padding='post')

# 데이터셋을 훈련 세트와 검증 세트로 분리
X_train, X_val, y_train, y_val = train_test_split(data, labels, test_size=0.2, random_state=42)

# 포지셔널 인코딩 함수
def get_positional_encoding(max_len, d_model):
    pos_enc = np.zeros((max_len, d_model))
    for pos in range(max_len):
        for i in range(0, d_model, 2):
            pos_enc[pos, i] = np.sin(pos / (10000 ** (2 * i / d_model)))
            if i + 1 < d_model:
                pos_enc[pos, i + 1] = np.cos(pos / (10000 ** (2 * (i + 1) / d_model)))
    return pos_enc

# 포지셔널 인코딩 생성
positional_encoding = get_positional_encoding(max_len, embedding_dim)

# 사용자 정의 레이어: MultiHeadAttention을 포함한 레이어 정의
class MultiHeadSelfAttentionLayer(tf.keras.layers.Layer):
    def __init__(self, num_heads, key_dim, masked=False):
        super(MultiHeadSelfAttentionLayer, self).__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)
        self.norm = LayerNormalization()
        self.masked = masked

    def call(self, x):
        if self.masked:
            batch_size = tf.shape(x)[0]
            seq_len = tf.shape(x)[1]
            mask = tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
            mask = tf.reshape(mask, (1, 1, seq_len, seq_len))
            mask = tf.tile(mask, [batch_size, 1, 1, 1])
            attn_output = self.mha(query=x, value=x, key=x, attention_mask=mask)
        else:
            attn_output = self.mha(query=x, value=x, key=x)
        attn_output = self.norm(attn_output + x)
        return attn_output

# 모델 생성 함수
def create_model():
    inputs = Input(shape=(max_len,))
    embedding_layer = Embedding(input_dim=len(word_index) + 1, output_dim=embedding_dim, input_length=max_len)
    embedded_sequences = embedding_layer(inputs)
    embedded_sequences_with_positional_encoding = embedded_sequences + positional_encoding
    attention_layer = MultiHeadSelfAttentionLayer(num_heads=8, key_dim=embedding_dim)
    attention_output = attention_layer(embedded_sequences_with_positional_encoding)
    attention_output_with_residual = Add()([embedded_sequences_with_positional_encoding, attention_output])
    masked_attention_layer = MultiHeadSelfAttentionLayer(num_heads=8, key_dim=embedding_dim, masked=True)
    masked_attention_output = masked_attention_layer(attention_output_with_residual)
    masked_attention_output_with_residual = Add()([attention_output_with_residual, masked_attention_output])
    pooled_output = GlobalAveragePooling1D()(masked_attention_output_with_residual)
    dense_layer = Dense(128, activation='relu')(pooled_output)
    dropout_layer = Dropout(0.5)(dense_layer)
    output_layer = Dense(1, activation='sigmoid')(dropout_layer)
    model = Model(inputs=inputs, outputs=output_layer)
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    return model

# 샘플 양을 100%, 20%, 10%, 5%, 1%씩 줄였을 때의 정확도 추적
accuracies = []
sample_sizes = [1.0, 0.2, 0.1, 0.05, 0.01]

for sample_size in sample_sizes:
    # 데이터셋의 일부를 샘플링
    sample_indices = np.random.choice(len(X_train), int(len(X_train) * sample_size), replace=False)
    X_train_sample = X_train[sample_indices]
    y_train_sample = np.array(y_train)[sample_indices]

    # 모델 생성 및 컴파일
    model_sample = create_model()
    
    # 모델 학습
    history_sample = model_sample.fit(X_train_sample, y_train_sample, epochs=10, batch_size=16, validation_data=(X_val, np.array(y_val)), verbose=0)

    # 샘플링된 데이터셋에서의 정확도 기록
    accuracies.append(max(history_sample.history['val_accuracy']))

# 정확도 출력
for i, sample_size in enumerate(sample_sizes):
    print(f"Sample Size: {int(sample_size * 100)}% - Validation Accuracy: {accuracies[i]}")


Sample Size: 100% - Validation Accuracy: 1.0
Sample Size: 20% - Validation Accuracy: 0.9925000071525574
Sample Size: 10% - Validation Accuracy: 0.6850000023841858
Sample Size: 5% - Validation Accuracy: 0.5174999833106995
Sample Size: 1% - Validation Accuracy: 0.5174999833106995


<img src="../data/img/validation_accuracy_comparison_smooth.png" alt="Validation Accuracy Comparison" width="600"/>