# English-to-Spanish translation with a sequence-to-sequence Transformer

**Author:** [fchollet](https://twitter.com/fchollet)<br>
**Date created:** 2021/05/26<br>
**Last modified:** 2024/11/18<br>
**Description:** Implementing a sequence-to-sequence Transformer and training it on a machine translation task.

## Introduction

In this example, we'll build a sequence-to-sequence Transformer model, which
we'll train on an English-to-Spanish machine translation task.

You'll learn how to:

- Vectorize text using the Keras `TextVectorization` layer.
- Implement a `TransformerEncoder` layer, a `TransformerDecoder` layer,
and a `PositionalEmbedding` layer.
- Prepare data for training a sequence-to-sequence model.
- Use the trained model to generate translations of never-seen-before
input sentences (sequence-to-sequence inference).

The code featured here is adapted from the book
[Deep Learning with Python, Second Edition](https://www.manning.com/books/deep-learning-with-python-second-edition)
(chapter 11: Deep learning for text).
The present example is fairly barebones, so for detailed explanations of
how each building block works, as well as the theory behind Transformers,
I recommend reading the book.

## Setup

In [1]:
# We set the backend to TensorFlow. The code works with
# both `tensorflow` and `torch`. It does not work with JAX
# due to the behavior of `jax.numpy.tile` in a jit scope
# (used in `TransformerDecoder.get_causal_attention_mask()`:
# `tile` in JAX does not support a dynamic `reps` argument.
# You can make the code work in JAX by wrapping the
# inside of the `get_causal_attention_mask` method in
# a decorator to prevent jit compilation:
# `with jax.ensure_compile_time_eval():`.
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import pathlib
import random
import string
import re
import numpy as np

import tensorflow.data as tf_data
import tensorflow.strings as tf_strings

import keras
from keras import layers
from keras import ops
from keras.layers import TextVectorization

## Downloading the data

We'll be working with an English-to-Spanish translation dataset
provided by [Anki](https://www.manythings.org/anki/). Let's download it:

In [2]:
text_file = keras.utils.get_file(
    fname="spa-eng.zip",
    origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
    extract=True,
)
text_file = pathlib.Path(text_file).parent / "spa-eng" / "spa.txt"

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
[1m2638744/2638744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 0us/step


## Parsing the data

Each line contains an English sentence and its corresponding Spanish sentence.
The English sentence is the *source sequence* and Spanish one is the *target sequence*.
We prepend the token `"[start]"` and we append the token `"[end]"` to the Spanish sentence.

In [3]:
import os

data_dir = "/root/.keras/datasets/spa-eng_extracted"
for root, dirs, files in os.walk(data_dir):
    print(root, files)


/root/.keras/datasets/spa-eng_extracted []
/root/.keras/datasets/spa-eng_extracted/spa-eng ['_about.txt', 'spa.txt']


In [4]:
import zipfile
zip_path = "/root/.keras/datasets/spa-eng.zip"
extract_dir = "/root/.keras/datasets/spa-eng_manual"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

print(os.listdir(extract_dir))


['spa-eng']


In [5]:
path_to_file = "/root/.keras/datasets/spa-eng_manual/spa-eng/spa.txt"

with open(path_to_file, encoding="utf-8") as f:
    lines = f.read().split("\n")[:-1]


Here's what our sentence pairs look like:

In [6]:
path_to_file = "/root/.keras/datasets/spa-eng_extracted/spa-eng/spa.txt"

with open(path_to_file, encoding="utf-8") as f:
    lines = f.read().split("\n")[:-1]

text_pairs = []
for line in lines:
    eng, spa = line.split("\t")
    spa = "[start] " + spa + " [end]"
    text_pairs.append((eng, spa))

print("샘플 개수:", len(text_pairs))


샘플 개수: 118964


In [7]:
import random

def show_random_pairs(pairs, n=5):
    samples = random.sample(pairs, n)
    for eng, spa in samples:
        print(f"ENG: {eng}")
        print(f"SPA: {spa}")
        print("-" * 40)

# 실행
show_random_pairs(text_pairs, 5)


ENG: Man is the only animal that laughs.
SPA: [start] El único animal que ríe es el hombre. [end]
----------------------------------------
ENG: They robbed the man of all his belongings.
SPA: [start] Le robaron al hombre todas sus pertenencias. [end]
----------------------------------------
ENG: Tom made a necklace for Mary.
SPA: [start] Tom hizo un collar para Mary. [end]
----------------------------------------
ENG: Tom went shopping with his family.
SPA: [start] Tom se fue de compras con su familia. [end]
----------------------------------------
ENG: Please say hello to him for me.
SPA: [start] Por favor, salúdale por mí. [end]
----------------------------------------


Now, let's split the sentence pairs into a training set, a validation set,
and a test set.

In [8]:
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

118964 total pairs
83276 training pairs
17844 validation pairs
17844 test pairs


## Vectorizing the text data

We'll use two instances of the `TextVectorization` layer to vectorize the text
data (one for English and one for Spanish),
that is to say, to turn the original strings into integer sequences
where each integer represents the index of a word in a vocabulary.

The English layer will use the default string standardization (strip punctuation characters)
and splitting scheme (split on whitespace), while
the Spanish layer will use a custom standardization, where we add the character
`"¿"` to the set of punctuation characters to be stripped.

Note: in a production-grade machine translation model, I would not recommend
stripping the punctuation characters in either language. Instead, I would recommend turning
each punctuation character into its own token,
which you could achieve by providing a custom `split` function to the `TextVectorization` layer.

In [9]:
import re
import string
import tensorflow as tf
from tensorflow.keras.layers import TextVectorization

# 불필요한 문자 제거용 문자셋 정의
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "").replace("]", "")

vocab_size = 15000
sequence_length = 20
batch_size = 64

# 표준화 함수 정의
def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

# 텍스트 벡터화 레이어
eng_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)

spa_vectorization = TextVectorization(
    max_tokens=20000,
    output_mode="int",
    output_sequence_length=21  # decoder 입력은 21
)

# train_pairs 준비 (예시: text_pairs를 80%/20% split)
split = int(len(text_pairs) * 0.8)
train_pairs = text_pairs[:split]
val_pairs = text_pairs[split:]

train_eng_texts = [pair[0] for pair in train_pairs]
train_spa_texts = [pair[1] for pair in train_pairs]

# 어휘 사전 적합
eng_vectorization.adapt(train_eng_texts)
spa_vectorization.adapt(train_spa_texts)


Next, we'll format our datasets.

At each training step, the model will seek to predict target words N+1 (and beyond)
using the source sentence and the target words 0 to N.

As such, the training dataset will yield a tuple `(inputs, targets)`, where:

- `inputs` is a dictionary with the keys `encoder_inputs` and `decoder_inputs`.
`encoder_inputs` is the vectorized source sentence and `decoder_inputs` is the target sentence "so far",
that is to say, the words 0 to N used to predict word N+1 (and beyond) in the target sentence.
- `target` is the target sentence offset by one step:
it provides the next words in the target sentence -- what the model will try to predict.

In [10]:
import tensorflow as tf

AUTOTUNE = tf.data.AUTOTUNE

def format_dataset(eng, spa):
    eng = eng_vectorization(eng)    # (batch, 20)
    spa = spa_vectorization(spa)    # (batch, 21)

    # 디코더 입력: 맨 마지막 토큰([end]) 제거 → (batch, 20)
    decoder_in = spa[:, :-1]

    # 타깃: 맨 첫 토큰([start]) 제거 → (batch, 20)
    target = spa[:, 1:]

    return {"encoder_inputs": eng, "decoder_inputs": decoder_in}, target


def make_dataset(pairs, batch_size=batch_size, shuffle=True):
    eng_texts, spa_texts = zip(*pairs)
    dataset = tf.data.Dataset.from_tensor_slices((list(eng_texts), list(spa_texts)))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=AUTOTUNE)
    if shuffle:
        dataset = dataset.shuffle(2048, reshuffle_each_iteration=True)
    return dataset.cache().prefetch(buffer_size=AUTOTUNE)

# 학습/검증 데이터셋 준비
train_ds = make_dataset(train_pairs, batch_size=batch_size, shuffle=True)
val_ds = make_dataset(val_pairs, batch_size=batch_size, shuffle=False)

# 확인
for batch in train_ds.take(1):
    inputs, targets = batch
    print("encoder_inputs:", inputs["encoder_inputs"].shape)
    print("decoder_inputs:", inputs["decoder_inputs"].shape)
    print("targets:", targets.shape)


encoder_inputs: (64, 20)
decoder_inputs: (64, 20)
targets: (64, 20)


Let's take a quick look at the sequence shapes
(we have batches of 64 pairs, and all sequences are 20 steps long):

In [11]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


## Building the model

Our sequence-to-sequence Transformer consists of a `TransformerEncoder`
and a `TransformerDecoder` chained together. To make the model aware of word order,
we also use a `PositionalEmbedding` layer.

The source sequence will be pass to the `TransformerEncoder`,
which will produce a new representation of it.
This new representation will then be passed
to the `TransformerDecoder`, together with the target sequence so far (target words 0 to N).
The `TransformerDecoder` will then seek to predict the next words in the target sequence (N+1 and beyond).

A key detail that makes this possible is causal masking
(see method `get_causal_attention_mask()` on the `TransformerDecoder`).
The `TransformerDecoder` sees the entire sequences at once, and thus we must make
sure that it only uses information from target tokens 0 to N when predicting token N+1
(otherwise, it could use information from the future, which would
result in a model that cannot be used at inference time).

In [12]:
# ✅ Keras3/TF용: 바로 실행 가능 리팩터
import keras
from keras import layers
import keras.ops as ops

# ---------------------------
# Positional + Token Embedding
# ---------------------------
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, dropout=0.0, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(vocab_size, embed_dim, mask_zero=True)
        self.position_embeddings = layers.Embedding(sequence_length, embed_dim)
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.dropout = layers.Dropout(dropout)

    def call(self, inputs, training=False):
        # inputs: (B, T)
        length = ops.shape(inputs)[-1]
        positions = ops.arange(0, length, 1)  # (T,)
        x = self.token_embeddings(inputs)                     # (B, T, D)
        pos = self.position_embeddings(positions)[None, ...]  # (1, T, D)
        x = x + pos
        return self.dropout(x, training=training)

    # mask_zero=True로 자동 마스크 생성
    def compute_mask(self, inputs, mask=None):
        return ops.not_equal(inputs, 0)

    def get_config(self):
        cfg = super().get_config()
        cfg.update(dict(sequence_length=self.sequence_length,
                        vocab_size=self.vocab_size,
                        embed_dim=self.embed_dim))
        return cfg


# ---------------------------
# Transformer Encoder
# ---------------------------
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, ff_dim, num_heads, dropout=0.1, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.ff_dim = ff_dim
        self.num_heads = num_heads
        self.attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim, dropout=dropout)
        self.ffn = keras.Sequential([
            layers.Dense(ff_dim, activation="relu"),
            layers.Dropout(dropout),
            layers.Dense(embed_dim),
        ])
        self.norm1 = layers.LayerNormalization(epsilon=1e-5)
        self.norm2 = layers.LayerNormalization(epsilon=1e-5)
        self.drop = layers.Dropout(dropout)
        self.supports_masking = True

    def call(self, x, mask=None, training=False):
        # mask: (B, T) boolean -> (B, 1, T)로 확장해서 MHA에 전달
        attn_mask = None
        if mask is not None:
            attn_mask = ops.expand_dims(ops.cast(mask, "bool"), axis=1)  # (B,1,T)

        attn_out = self.attn(query=x, value=x, key=x, attention_mask=attn_mask, training=training)
        x = self.norm1(x + self.drop(attn_out, training=training))
        ffn_out = self.ffn(x, training=training)
        x = self.norm2(x + self.drop(ffn_out, training=training))
        return x

    def get_config(self):
        cfg = super().get_config()
        cfg.update(dict(embed_dim=self.embed_dim, ff_dim=self.ff_dim, num_heads=self.num_heads))
        return cfg


# ---------------------------
# Transformer Decoder
# ---------------------------
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, ff_dim, num_heads, dropout=0.1, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.ff_dim = ff_dim
        self.num_heads = num_heads
        self.self_attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim, dropout=dropout)
        self.cross_attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim, dropout=dropout)
        self.ffn = keras.Sequential([
            layers.Dense(ff_dim, activation="relu"),
            layers.Dropout(dropout),
            layers.Dense(embed_dim),
        ])
        self.norm1 = layers.LayerNormalization(epsilon=1e-5)
        self.norm2 = layers.LayerNormalization(epsilon=1e-5)
        self.norm3 = layers.LayerNormalization(epsilon=1e-5)
        self.drop = layers.Dropout(dropout)
        self.supports_masking = True

    @staticmethod
    def _causal_mask(x):
        # x: (B, T, D) or (B, T)
        T = ops.shape(x)[-2] if len(x.shape) == 3 else ops.shape(x)[-1]
        i = ops.arange(T)[:, None]
        j = ops.arange(T)[None, :]
        mask = i >= j  # (T,T) lower-triangular
        return mask  # boolean

    def call(self, inputs, mask=None, training=False):
        # inputs: (dec_inputs, enc_outputs)
        dec, enc = inputs
        # mask: (dec_mask, enc_mask) from Keras masking (boolean, shape (B,T))
        dec_mask = enc_mask = None
        if mask is not None:
            dec_mask, enc_mask = mask

        # 1) Causal + padding mask for self-attn
        causal = self._causal_mask(dec)                      # (T,T)
        causal = ops.expand_dims(causal, 0)                  # (1,T,T)
        if dec_mask is not None:
            # dec_mask: (B,T) -> (B,1,T) to allow padding on keys
            pad_k = ops.expand_dims(ops.cast(dec_mask, "bool"), 1)  # (B,1,T)
            # broadcast causal to (B,T,T)
            B = ops.shape(dec)[0]
            causal = ops.tile(causal, (B, 1, 1))             # (B,T,T)
            # combine: valid only if both causal & key not padded
            self_attn_mask = ops.logical_and(causal, pad_k)  # (B,1,T) broadcast on query dim
        else:
            self_attn_mask = causal  # (1,T,T) -> broadcast

        sa = self.self_attn(dec, dec, dec, attention_mask=self_attn_mask, training=training)
        x = self.norm1(dec + self.drop(sa, training=training))

        # 2) Cross-attn: query=x (B,Td,D), key/value=enc (B,Te,D)
        cross_mask = None
        if enc_mask is not None:
            cross_mask = ops.expand_dims(ops.cast(enc_mask, "bool"), 1)  # (B,1,Te)

        ca = self.cross_attn(x, enc, enc, attention_mask=cross_mask, training=training)
        x2 = self.norm2(x + self.drop(ca, training=training))

        ffn_out = self.ffn(x2, training=training)
        out = self.norm3(x2 + self.drop(ffn_out, training=training))
        return out

    def get_config(self):
        cfg = super().get_config()
        cfg.update(dict(embed_dim=self.embed_dim, ff_dim=self.ff_dim, num_heads=self.num_heads))
        return cfg


# ---------------------------
# 모델 조립 helper
# ---------------------------
def build_nmt_model(
    src_vocab_size, tgt_vocab_size,
    src_seq_len=20, tgt_seq_len=20,   # decoder_inputs는 20으로 맞춤
    embed_dim=256, ff_dim=512, num_heads=4, dropout=0.1
):
    # Encoder
    enc_inputs = layers.Input(shape=(src_seq_len,), dtype="int32", name="encoder_inputs")
    enc_embed = PositionalEmbedding(src_seq_len, src_vocab_size, embed_dim, dropout=dropout)(enc_inputs)
    enc_out = TransformerEncoder(embed_dim, ff_dim, num_heads, dropout=dropout)(enc_embed)

    # Decoder (길이 20)
    dec_inputs = layers.Input(shape=(tgt_seq_len,), dtype="int32", name="decoder_inputs")
    dec_embed = PositionalEmbedding(tgt_seq_len, tgt_vocab_size, embed_dim, dropout=dropout)(dec_inputs)
    dec_out = TransformerDecoder(embed_dim, ff_dim, num_heads, dropout=dropout)([dec_embed, enc_out])

    # LM head
    logits = layers.Dense(tgt_vocab_size, name="logits")(dec_out)

    model = keras.Model([enc_inputs, dec_inputs], logits, name="TransformerNMT")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=3e-4),
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True, ignore_class=0),
        metrics=[keras.metrics.SparseCategoricalAccuracy(name="acc")]
    )
    return model



Next, we assemble the end-to-end model.

In [13]:
embed_dim = 256
ff_dim = 2048
num_heads = 8

# Encoder
encoder_inputs = keras.Input(shape=(sequence_length,), dtype="int32", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, ff_dim, num_heads)(x)

# Decoder
decoder_inputs = keras.Input(shape=(sequence_length+1,), dtype="int32", name="decoder_inputs")
x = PositionalEmbedding(sequence_length+1, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, ff_dim, num_heads)([x, encoder_outputs])
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size)(x)  # softmax 안 씀 (from_logits=True)

# 최종 모델
transformer = keras.Model(
    inputs={"encoder_inputs": encoder_inputs, "decoder_inputs": decoder_inputs},
    outputs=decoder_outputs,
    name="transformer",
)

# 컴파일 (로짓 출력 → from_logits=True)
transformer.compile(
    optimizer=keras.optimizers.Adam(learning_rate=3e-4),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()]
)

transformer.summary()


## Training our model

We'll use accuracy as a quick way to monitor training progress on the validation data.
Note that machine translation typically uses BLEU scores as well as other metrics, rather than accuracy.

Here we only train for 1 epoch, but to get the model to actually converge
you should train for at least 30 epochs.

In [14]:
epochs = 1  # 실제 수렴을 위해서는 최소 30 이상 권장

transformer.summary()

transformer.compile(
    optimizer=keras.optimizers.RMSprop(learning_rate=1e-3),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True, ignore_class=0),
    metrics=[keras.metrics.SparseCategoricalAccuracy(name="accuracy")],
)

history = transformer.fit(
    train_ds,
    epochs=epochs,
    validation_data=val_ds,
)

ValueError: Input 0 of layer "transformer" is incompatible with the layer: expected shape=(None, 21), found shape=(None, 20)

In [15]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# -----------------------------
# 0. GPU 확인 및 세팅
# -----------------------------
print("TensorFlow version:", tf.__version__)
print("Num GPUs Available:", len(tf.config.list_physical_devices("GPU")))
print("GPU Device:", tf.test.gpu_device_name())

# GPU 메모리 증가 설정 (OOM 방지)
gpus = tf.config.list_physical_devices("GPU")
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU memory growth enabled")
    except RuntimeError as e:
        print(e)

# -----------------------------
# 1. 데이터 준비 (vectorization)
# -----------------------------
def format_dataset(eng, spa):
    eng = eng_vectorization(eng)      # (batch, 20)
    spa = spa_vectorization(spa)      # (batch, 21)

    decoder_in = spa[:, :-1]          # (batch, 20)
    target = spa[:, 1:]               # (batch, 20)

    return {"encoder_inputs": eng, "decoder_inputs": decoder_in}, target

train_ds = (
    tf.data.Dataset.from_tensor_slices((train_eng_texts, train_spa_texts))
    .batch(64)
    .map(format_dataset)
    .shuffle(2048)
    .prefetch(tf.data.AUTOTUNE)
)

val_ds = (
    tf.data.Dataset.from_tensor_slices(([p[0] for p in val_pairs], [p[1] for p in val_pairs]))
    .batch(64)
    .map(format_dataset)
    .prefetch(tf.data.AUTOTUNE)
)

# -----------------------------
# 2. 모델 정의 (PositionalEmbedding / Encoder / Decoder는 이미 정의돼 있다고 가정)
# -----------------------------
def build_nmt_model(
    src_vocab_size, tgt_vocab_size,
    src_seq_len=20, tgt_seq_len=20,
    embed_dim=256, ff_dim=512, num_heads=4, dropout=0.1
):
    # Encoder
    enc_inputs = layers.Input(shape=(src_seq_len,), dtype="int32", name="encoder_inputs")
    enc_embed = PositionalEmbedding(src_seq_len, src_vocab_size, embed_dim, dropout=dropout)(enc_inputs)
    enc_out = TransformerEncoder(embed_dim, ff_dim, num_heads, dropout=dropout)(enc_embed)

    # Decoder
    dec_inputs = layers.Input(shape=(tgt_seq_len,), dtype="int32", name="decoder_inputs")
    dec_embed = PositionalEmbedding(tgt_seq_len, tgt_vocab_size, embed_dim, dropout=dropout)(dec_inputs)
    dec_out = TransformerDecoder(embed_dim, ff_dim, num_heads, dropout=dropout)([dec_embed, enc_out])

    # LM head
    logits = layers.Dense(tgt_vocab_size, name="logits")(dec_out)

    model = keras.Model([enc_inputs, dec_inputs], logits, name="TransformerNMT")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=3e-4),
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True, ignore_class=0),
        metrics=[keras.metrics.SparseCategoricalAccuracy(name="acc")]
    )
    return model

# -----------------------------
# 3. 모델 생성 및 학습 (GPU 자동 사용)
# -----------------------------
transformer = build_nmt_model(
    src_vocab_size=vocab_size,
    tgt_vocab_size=20000,
    src_seq_len=20,
    tgt_seq_len=20
)

transformer.summary()

with tf.device("/GPU:0"):   # 명시적으로 GPU에 올리고 싶을 때
    history = transformer.fit(
        train_ds,
        epochs=30,              # 테스트라서 1, 실제는 30 이상 권장
        validation_data=val_ds
    )


TensorFlow version: 2.19.0
Num GPUs Available: 1
GPU Device: /device:GPU:0
Physical devices cannot be modified after being initialized


Epoch 1/30
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m96s[0m 37ms/step - acc: 0.0978 - loss: 5.3468 - val_acc: 0.1946 - val_loss: 2.6567
Epoch 2/30
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 10ms/step - acc: 0.2051 - loss: 2.4625 - val_acc: 0.2257 - val_loss: 1.9543
Epoch 3/30
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 10ms/step - acc: 0.2353 - loss: 1.7297 - val_acc: 0.2363 - val_loss: 1.7129
Epoch 4/30
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 10ms/step - acc: 0.2510 - loss: 1.3598 - val_acc: 0.2421 - val_loss: 1.5972
Epoch 5/30
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 10ms/step - acc: 0.2633 - loss: 1.1161 - val_acc: 0.2438 - val_loss: 1.5782
Epoch 6/30
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 10ms/step - acc: 0.2721 - loss: 0.9568 - val_acc: 0.2457 - val_loss: 1.5542
Epoch 7/30
[1m1488/1488[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m

## Decoding test sentences

Finally, let's demonstrate how to translate brand new English sentences.
We simply feed into the model the vectorized English sentence
as well as the target token `"[start]"`, then we repeatedly generated the next token, until
we hit the token `"[end]"`.

In [16]:
import numpy as np
import random
from keras import ops

spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    # 영어 문장 → 벡터화
    tokenized_input_sentence = eng_vectorization([input_sentence])

    # 시작 토큰 세팅
    decoded_sentence = "[start]"

    for i in range(max_decoded_sentence_length):
        # 현재까지의 번역 결과를 decoder 입력으로 변환
        tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]

        # 모델 예측 (logits 반환)
        predictions = transformer(
            {
                "encoder_inputs": tokenized_input_sentence,
                "decoder_inputs": tokenized_target_sentence,
            },
            training=False,
        )

        # 현재 step에서 가장 높은 확률 토큰 뽑기
        sampled_token_index = int(
            np.argmax(predictions[0, i, :].numpy())
        )
        sampled_token = spa_index_lookup[sampled_token_index]

        # 토큰 추가
        decoded_sentence += " " + sampled_token

        # 종료 토큰 나오면 중단
        if sampled_token == "[end]":
            break

    return decoded_sentence


In [17]:
test_eng_texts = [pair[0] for pair in test_pairs]

for _ in range(30):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
    print(f"EN: {input_sentence}")
    print(f"ES: {translated}")
    print("-" * 50)


EN: I like to eat Korean food.
ES: [start] me gusta comer comida coreana end end end end end end end end end end end end end end end
--------------------------------------------------
EN: He enjoyed playing baseball.
ES: [start] Él disfrutaba jugar béisbol end end end end end end end end end end end end end end end end
--------------------------------------------------
EN: I feel like going on a trip.
ES: [start] tengo ganas de ir en un viaje end end end end end end end end end end end end end
--------------------------------------------------
EN: Tom is right, of course.
ES: [start] tom tiene razón por supuesto end end end end end end end end end end end end end end end
--------------------------------------------------
EN: Tom laughs at his own jokes.
ES: [start] tom se ríe de sus propias bromas end end end end end end end end end end end end end
--------------------------------------------------
EN: I'll be back by 2:30.
ES: [start] estaré de vuelta para las dos y media end end end 

After 30 epochs, we get results such as:

> She handed him the money.
> [start] ella le pasó el dinero [end]

> Tom has never heard Mary sing.
> [start] tom nunca ha oído cantar a mary [end]

> Perhaps she will come tomorrow.
> [start] tal vez ella vendrá mañana [end]

> I love to write.
> [start] me encanta escribir [end]

> His French is improving little by little.
> [start] su francés va a [UNK] sólo un poco [end]

> My hotel told me to call you.
> [start] mi hotel me dijo que te [UNK] [end]