In [1]:
import os

text_file = 'spa.txt'

with open(text_file, encoding='utf-8') as f:
    lines = f.read().split('\n')[:-1]

text_pairs = []

for line in lines:
    english, spanish = line.split('\t')
    spanish = '[start] ' + spanish + ' [end]'
    text_pairs.append((english, spanish))

In [2]:
import random

print(random.choice(text_pairs))

('I need to ask you for a little favor.', '[start] Tengo que pedirte un favorcito. [end]')


In [3]:
random.shuffle(text_pairs)

num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2*num_val_samples

train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

In [4]:
train_pairs[0]

('I usually go home at four.',
 '[start] Generalmente me voy a casa a las cuatro. [end]')

In [5]:
# 텍스트 벡터화
import tensorflow as tf
from tensorflow.keras import layers
import string
import re

strip_chars = string.punctuation + '¿'
strip_chars = strip_chars.replace('[', '')
strip_chars = strip_chars.replace(']', '')

def standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, f'[{re.escape(strip_chars)}]', '')

In [6]:
vocab_size = 15000
sequence_length = 20

# output_mode : 벡터화했을 때 정수 인덱스가 될 수 있도록 int로 설정
source_vectorization = layers.TextVectorization(max_tokens=vocab_size, output_mode='int', output_sequence_length=sequence_length)
target_vectorization = layers.TextVectorization(max_tokens=vocab_size, output_mode='int', output_sequence_length=sequence_length+1,
                                                standardize=standardization)

train_english_texts = [pair[0] for pair in train_pairs]
train_spanish_texts = [pair[1] for pair in train_pairs]

source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts) # 벡터화 처리

2024-07-19 14:43:57.860741: W tensorflow/tsl/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


In [7]:
batch_size = 64

def format_dataset(eng, spa):
    eng = source_vectorization(eng)
    spa = target_vectorization(spa)

    return ({'english': eng, 'spanish': spa[:, :-1]}, spa[:, 1:])

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)

    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts)) # 리스트를 텐서 슬라이스로 변환
    dataset = dataset.batch(batch_size) # batch_size 분할
    dataset = dataset.map(format_dataset, num_parallel_calls=4) # 벡터화
    return dataset.shuffle(42).prefetch(16).cache() # cache() : 메모리에 캐싱하여 데이터의 반복을 제거, prefetch : 배치를 미리 세팅

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)
# (데이터 셋 분할)

In [8]:
for inputs, targets in train_ds.take(1): # 첫번째 배치를 가져온다.
    print('english shape :', inputs['english'].shape)
    print('spanish shape :', inputs['spanish'].shape)
    print('target shape :', targets.shape)

english shape : (64, 20)
spanish shape : (64, 20)
target shape : (64, 20)


2024-07-19 14:44:04.262461: W tensorflow/core/kernels/data/cache_dataset_ops.cc:856] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.


In [9]:
# GRU 기반 디코더
past_target = keras.Input(shape=(None,), dtype='int64', name='spanish')
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)

x = decoder_gru(x, initial_state=encoded_source) # 인코더의 출력을 디코더 초기 상태로 사용
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(vocab_size, activation='softmax')(x)
seq2seq_rnn = keras.Model([source, past_target], target_next_step)

NameError: name 'keras' is not defined

In [None]:
seq2seq_rnn.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy', metrics=['acc'])

seq2seq_rnn.fit(train_ds, epochs=30, validation_data=val_ds)

In [None]:
import numpy as np
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence]) # 영어 문장 벡터화 처리
    decoded_sentence = '[start]' # 번역 문장 저장
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence]) # 스페인어 문장 벡터화 처리
        next_token_predictions = seq2seq_rnn.predict([tokenized_input_sentence, tokenized_target_sentence]) # seq2seq_rnn 모델로 예측
        sampled_token_index = np.argmax(next_token_predictions[0, i, :]) # 예측된 토큰 찾기
        sampled_token = spa_index_lookup[sampled_token_index] # 토큰으로 단어 찾기
        decoded_sentence += ' ' + sampled_token
        if sampled_token == '[end]':
            break
    return decoded_sentence

In [10]:
test_eng_texts = [pair[0] for pair in test_pairs]

for _ in range(5):
    input_sentence = random.choice(test_eng_texts)
    print('english : ', input_sentence)
    print('spanish : ', decode_sequence(input_sentence))

english :  Please put this into English.


NameError: name 'decode_sequence' is not defined

In [11]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(input_dim=input_dim, output_dim = output_dim)
        self.position_embeddings = layers.Embedding(input_dim=input_dim, output_dim = output_dim)
        
        self.sequence = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({'output_dim': self.output_dim, 'sequence_length': self.sequence_length, 'input_dim': self.input_dim})
        return config

In [12]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential([layers.Dense(dense_dim, activation='relu'), layers.Dense(embed_dim)])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]

        attention_output = self.attention(inputs, inputs, attention_mask = mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({'output_dim': self.output_dim, 'sequence_length': self.sequence_length, 'input_dim': self.input_dim})
        return config

In [13]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential([layers.Dense(dense_dim, activation='relu'), layers.Dense(embed_dim)])

        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({'embed_dim':self.embed_dim, 'num_heads': self.num_heads, 'dense_dim':self.dense_dim})
        return config

    def call(self, inputs, encoder_outputs, mask=None):
        attention_output_1 = self.attention_1(query=inputs, value=inputs, key=inputs, use_causal_mask=True)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        
        attention_output_2 = self.attention_2(query=attention_output_1, value=encoder_outputs, key=encoder_outputs)
        attention_output_2 = self.layernorm_2(attention_output_1 + attention_output_2)

        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

In [14]:
# 숙제 : 위에서 만든 transformer을 이용해서 모델 만들어서 학습시키고 번역 예측해보기

In [15]:
vocab_size = 15000
sequence_length = 20
embed_dim = 256
dense_dim = 2048
num_heads = 8

# encoder_inputs
# PositionalEmbedding
# TransformerEncoder

# decoder_inputs
# PositionalEmbedding
# TransformerDecoder

# Dense(activation='softmax')
# 이런 형태로 진행되면 된다.

In [16]:
encoder_inputs = keras.Input(shape=(None,), dtype='int64', name='english')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype='int64', name='spanish')  # 'sqanish'를 'spanish'로 변경
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
decoder_outputs = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)

decoder_outputs = layers.Dense(vocab_size, activation='softmax')(decoder_outputs)

transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

NameError: name 'keras' is not defined

In [17]:
transformer.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy', metrics=['acc'])
transformer.fit(train_ds, epochs=50, validation_data=val_ds)

NameError: name 'transformer' is not defined

In [None]:
import numpy

spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(inputs_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])[:, :-1] # 3차원 세팅
    decoded_sentnece = '[start]' # 처음 시작 시 start 토큰을 들고 시작될 수 있도록

    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = source_vectorization([decode_sentence])
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += ' ' + sampled_token
        if sampled_token == '[end]':
            break

    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]

for _ in range(3):
    input_sentence = random.choice(test_eng_texts)
    print('english : ', input_sentence)
    print('spanish : ', decode_sequence(input_sentence))