# Ponderada: Transformer

In [42]:
#importações
import tensorflow as tf
import tensorflow_text as tf_text
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt
import time
import os

In [None]:
# Carregar dataset TED Talk ->EN)
examples, metadata = tfds.load('ted_hrlr_translate/pt_to_en', with_info=True, as_supervised=True)
train_examples, val_examples = examples['train'], examples['validation']

# Subamostragem para treino 
train_examples = train_examples.take(2000)
val_examples = val_examples.take(500)

In [44]:
# Tokenizador oficial
model_name = "ted_hrlr_translate_pt_en_converter"
tf.keras.utils.get_file(
    f"{model_name}.zip",
    f"https://storage.googleapis.com/download.tensorflow.org/models/{model_name}.zip",
    cache_dir='.', cache_subdir='', extract=True
)
tokenizers = tf.saved_model.load(
    "./ted_hrlr_translate_pt_en_converter_extracted/ted_hrlr_translate_pt_en_converter"
)

In [45]:
# Preparar batches
MAX_TOKENS = 128
BATCH_SIZE = 32

def encode_and_tensorize(pt, en):
    """
    Tokeniza, corta e prepara tensores para treino de modelo Transformer.

    Parâmetros:
    -----------
    pt : tf.Tensor ou str
        Frases em português a serem tokenizadas.
    en : tf.Tensor ou str
        Frases correspondentes em inglês a serem tokenizadas.

    Retorno:
    --------
    tuple:
        - inputs: tupla contendo:
            - pt: tensor das frases em português tokenizadas e truncadas.
            - decoder_input: tensor das frases em inglês tokenizadas, truncadas e deslocadas para entrada do decodificador.
        - decoder_target: tensor das frases em inglês tokenizadas e truncadas, deslocadas para servir como alvo do decodificador.

    Observações:
    ------------
    - O comprimento máximo de tokens do português é definido por MAX_TOKENS.
    - O comprimento máximo de tokens do inglês é MAX_TOKENS + 1, devido ao deslocamento do decoder.
    """
    pt = tokenizers.pt.tokenize(pt)[:, :MAX_TOKENS].to_tensor()
    en = tokenizers.en.tokenize(en)[:, :MAX_TOKENS+1].to_tensor()
    decoder_input = en[:, :-1]
    decoder_target = en[:, 1:]
    return (pt, decoder_input), decoder_target


train_batches_for_fit = (train_examples
                         .shuffle(1024)
                         .batch(BATCH_SIZE)
                         .map(encode_and_tensorize, num_parallel_calls=tf.data.AUTOTUNE)
                         .prefetch(tf.data.AUTOTUNE))

val_batches_for_fit = (val_examples
                       .batch(BATCH_SIZE)
                       .map(encode_and_tensorize, num_parallel_calls=tf.data.AUTOTUNE)
                       .prefetch(tf.data.AUTOTUNE))

In [46]:
# definir camadas transformer
def positional_encoding(length, depth):
    """
    Gera codificação posicional usando funções seno e cosseno.

    Parâmetros:
    -----------
    length : int
        Comprimento máximo das sequências.
    depth : int
        Dimensão do embedding (d_model).

    Retorno:
    --------
    tf.Tensor
        Tensor de codificação posicional de forma (length, depth), do tipo float32.
    """
    depth = depth/2
    positions = np.arange(length)[:, np.newaxis]
    depths = np.arange(depth)[np.newaxis, :]/depth
    angle_rates = 1 / (10000**depths)
    angle_rads = positions * angle_rates
    pos_encoding = np.concatenate([np.sin(angle_rads), np.cos(angle_rads)], axis=-1)
    return tf.cast(pos_encoding, dtype=tf.float32)


class PositionalEmbedding(tf.keras.layers.Layer):
    """
    Camada de embedding que combina embedding de tokens com codificação posicional.

    Parâmetros:
    -----------
    vocab_size : int
        Tamanho do vocabulário.
    d_model : int
        Dimensão do embedding.
    """
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.d_model = d_model
        self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True)
        self.pos_encoding = positional_encoding(2048, d_model)

    def call(self, x):
        """
        Aplica embedding e adiciona codificação posicional.

        Parâmetros:
        -----------
        x : tf.Tensor ou tf.RaggedTensor
            Sequência de tokens de entrada.

        Retorno:
        --------
        tf.Tensor
            Sequência com embedding e codificação posicional aplicada.
        """
        if isinstance(x, tf.RaggedTensor):
            x = x.to_tensor()
        length = tf.shape(x)[1]
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x = x + self.pos_encoding[tf.newaxis, :length, :]
        return x


class BaseAttention(tf.keras.layers.Layer):
    """
    Camada base para atenção, encapsulando MultiHeadAttention,
    adição residual e normalização.
    """
    def __init__(self, **kwargs):
        super().__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
        self.add = tf.keras.layers.Add()
        self.layernorm = tf.keras.layers.LayerNormalization()


class CrossAttention(BaseAttention):
    """
    Camada de atenção cruzada (decoder attends to encoder).

    Métodos:
    --------
    call(x, context)
        Aplica atenção cruzada entre `x` e `context`.
    """
    def call(self, x, context):
        attn_output, attn_scores = self.mha(query=x, value=context, return_attention_scores=True)
        x = self.add([x, attn_output])
        x = self.layernorm(x)
        return x, attn_scores


class GlobalSelfAttention(BaseAttention):
    """
    Camada de auto-atenção global (self-attention).

    Métodos:
    --------
    call(x)
        Aplica auto-atenção sobre `x`.
    """
    def call(self, x):
        attn_output = self.mha(query=x, value=x)
        x = self.add([x, attn_output])
        x = self.layernorm(x)
        return x


class FeedForward(tf.keras.layers.Layer):
    """
    Camada feed-forward totalmente conectada com dropout, adição residual e normalização.

    Parâmetros:
    -----------
    d_model : int
        Dimensão da saída.
    dff : int
        Dimensão oculta da camada interna.
    dropout_rate : float
        Taxa de dropout.
    """
    def __init__(self, d_model, dff, dropout_rate=0.1):
        super().__init__()
        self.seq = tf.keras.Sequential([
            tf.keras.layers.Dense(dff, activation='relu'),
            tf.keras.layers.Dense(d_model),
            tf.keras.layers.Dropout(dropout_rate)
        ])
        self.add = tf.keras.layers.Add()
        self.layernorm = tf.keras.layers.LayerNormalization()

    def call(self, x):
        """
        Aplica a camada feed-forward com residual connection e normalização.

        Parâmetros:
        -----------
        x : tf.Tensor
            Entrada da camada.

        Retorno:
        --------
        tf.Tensor
            Saída normalizada e com conexão residual.
        """
        x = self.add([x, self.seq(x)])
        x = self.layernorm(x)
        return x


class EncoderLayer(tf.keras.layers.Layer):
    """
    Camada de encoder Transformer contendo self-attention e feed-forward.

    Parâmetros:
    -----------
    d_model : int
        Dimensão do embedding.
    num_heads : int
        Número de cabeças da atenção.
    dff : int
        Dimensão da camada feed-forward.
    dropout_rate : float
        Taxa de dropout.
    """
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        super().__init__()
        self.self_attention = GlobalSelfAttention(num_heads=num_heads, key_dim=d_model)
        self.ffn = FeedForward(d_model, dff, dropout_rate)

    def call(self, x):
        """
        Aplica atenção e feed-forward no encoder.

        Parâmetros:
        -----------
        x : tf.Tensor
            Entrada do encoder.

        Retorno:
        --------
        tf.Tensor
            Saída processada pelo encoder.
        """
        x = self.self_attention(x)
        x = self.ffn(x)
        return x


class DecoderLayer(tf.keras.layers.Layer):
    """
    Camada de decoder Transformer com self-attention, cross-attention e feed-forward.

    Parâmetros:
    -----------
    d_model : int
        Dimensão do embedding.
    num_heads : int
        Número de cabeças da atenção.
    dff : int
        Dimensão da camada feed-forward.
    dropout_rate : float
        Taxa de dropout.
    """
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        super().__init__()
        self.self_attention = GlobalSelfAttention(num_heads=num_heads, key_dim=d_model)
        self.cross_attention = CrossAttention(num_heads=num_heads, key_dim=d_model)
        self.ffn = FeedForward(d_model, dff, dropout_rate)

    def call(self, x, context):
        """
        Aplica self-attention, cross-attention e feed-forward no decoder.

        Parâmetros:
        -----------
        x : tf.Tensor
            Entrada do decoder.
        context : tf.Tensor
            Saída do encoder usada para atenção cruzada.

        Retorno:
        --------
        x : tf.Tensor
            Saída processada pelo decoder.
        attn_scores : tf.Tensor
            Scores da atenção cruzada.
        """
        x = self.self_attention(x)
        x, attn_scores = self.cross_attention(x, context)
        x = self.ffn(x)
        return x, attn_scores


class Encoder(tf.keras.layers.Layer):
    """
    Encoder completo do Transformer.

    Parâmetros:
    -----------
    num_layers : int
        Número de camadas de encoder.
    d_model : int
        Dimensão do embedding.
    num_heads : int
        Número de cabeças da atenção.
    dff : int
        Dimensão da camada feed-forward.
    vocab_size : int
        Tamanho do vocabulário de entrada.
    """
    def __init__(self, num_layers, d_model, num_heads, dff, vocab_size):
        super().__init__()
        self.pos_embedding = PositionalEmbedding(vocab_size, d_model)
        self.enc_layers = [EncoderLayer(d_model, num_heads, dff) for _ in range(num_layers)]

    def call(self, x):
        """
        Aplica embedding posicional e todas as camadas do encoder.

        Parâmetros:
        -----------
        x : tf.Tensor
            Entrada do encoder.

        Retorno:
        --------
        tf.Tensor
            Saída final do encoder.
        """
        x = self.pos_embedding(x)
        for enc_layer in self.enc_layers:
            x = enc_layer(x)
        return x


class Decoder(tf.keras.layers.Layer):
    """
    Decoder completo do Transformer.

    Parâmetros:
    -----------
    num_layers : int
        Número de camadas de decoder.
    d_model : int
        Dimensão do embedding.
    num_heads : int
        Número de cabeças da atenção.
    dff : int
        Dimensão da camada feed-forward.
    vocab_size : int
        Tamanho do vocabulário de saída.
    """
    def __init__(self, num_layers, d_model, num_heads, dff, vocab_size):
        super().__init__()
        self.pos_embedding = PositionalEmbedding(vocab_size, d_model)
        self.dec_layers = [DecoderLayer(d_model, num_heads, dff) for _ in range(num_layers)]
        self.output_layer = tf.keras.layers.Dense(vocab_size)

    def call(self, x, context):
        """
        Aplica embedding posicional, todas as camadas do decoder e projeta para o vocabulário.

        Parâmetros:
        -----------
        x : tf.Tensor
            Entrada do decoder.
        context : tf.Tensor
            Saída do encoder para atenção cruzada.

        Retorno:
        --------
        x : tf.Tensor
            Logits sobre o vocabulário.
        attn_scores : tf.Tensor
            Scores da atenção cruzada da última camada.
        """
        x = self.pos_embedding(x)
        for dec_layer in self.dec_layers:
            x, attn_scores = dec_layer(x, context)
        x = self.output_layer(x)
        return x, attn_scores


class Transformer(tf.keras.Model):
    """
    Modelo Transformer completo para tradução de sequência para sequência.

    Parâmetros:
    -----------
    num_layers : int
        Número de camadas no encoder e decoder.
    d_model : int
        Dimensão do embedding.
    num_heads : int
        Número de cabeças da atenção.
    dff : int
        Dimensão da camada feed-forward.
    input_vocab_size : int
        Tamanho do vocabulário de entrada.
    target_vocab_size : int
        Tamanho do vocabulário de saída.
    """
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size):
        super().__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size)
        self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size)

    def call(self, inputs, training=False):
        """
        Realiza a passagem forward do Transformer.

        Parâmetros:
        -----------
        inputs : tuple
            Tupla (encoder_input, decoder_input).
        training : bool
            Indica se está em modo treino (não usado diretamente neste método).

        Retorno:
        --------
        tf.Tensor
            Logits do modelo sobre o vocabulário de saída.
        """
        encoder_input, decoder_input = inputs
        context = self.encoder(encoder_input)
        x, _ = self.decoder(decoder_input, context)
        return x


In [47]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True,
    reduction='none'
)
"""
Função de perda base para classificação categórica esparsa.

Parâmetros:
-----------
from_logits : bool
    Indica que os logits são passados diretamente (não aplicamos softmax antes).
reduction : str
    Define que a redução será feita manualmente (nenhuma redução automática).
"""


def masked_loss(y_true, y_pred):
    """
    Calcula a perda categórica esparsa ignorando os tokens de padding (0).

    Parâmetros:
    -----------
    y_true : tf.Tensor
        Sequência alvo com índices inteiros dos tokens.
    y_pred : tf.Tensor
        Logits preditos pelo modelo.

    Retorno:
    --------
    tf.Tensor
        Perda média considerando apenas tokens não nulos.
    """
    mask = tf.math.logical_not(tf.math.equal(y_true, 0))  # Ignora padding
    loss_ = loss_object(y_true, y_pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_sum(loss_) / tf.reduce_sum(mask)


# Obtendo tamanhos de vocabulário
VOCAB_SIZE_PT = tokenizers.pt.get_vocab_size().numpy()
VOCAB_SIZE_EN = tokenizers.en.get_vocab_size().numpy()


# Inicializando o modelo Transformer
model = Transformer(
    num_layers=2,
    d_model=128,
    num_heads=4,
    dff=512,
    input_vocab_size=VOCAB_SIZE_PT,
    target_vocab_size=VOCAB_SIZE_EN
)

# Compilando o modelo
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=masked_loss
)

# Treinamento
history = model.fit(
    train_batches_for_fit,
    validation_data=val_batches_for_fit,
    epochs=20
)
"""
Treina o modelo Transformer usando batches de treino e validação.

Parâmetros principais:
---------------------
train_batches_for_fit : tf.data.Dataset
    Dataset de treino preparado com batches e pré-processamento.
val_batches_for_fit : tf.data.Dataset
    Dataset de validação.
epochs : int
    Número de épocas de treinamento.

Retorno:
--------
history : tf.keras.callbacks.History
    Objeto contendo métricas de treino e validação por época.
"""


Epoch 1/20
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m155s[0m 2s/step - loss: 7.4559 - val_loss: 5.8764
Epoch 2/20
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m131s[0m 2s/step - loss: 5.5415 - val_loss: 5.2746
Epoch 3/20
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m124s[0m 2s/step - loss: 4.7160 - val_loss: 4.4639
Epoch 4/20
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m125s[0m 2s/step - loss: 3.6595 - val_loss: 3.6626
Epoch 5/20
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m142s[0m 2s/step - loss: 2.7171 - val_loss: 2.9489
Epoch 6/20
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m143s[0m 2s/step - loss: 1.9341 - val_loss: 2.4159
Epoch 7/20
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m141s[0m 2s/step - loss: 1.3249 - val_loss: 1.9726
Epoch 8/20
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m126s[0m 2s/step - loss: 0.8159 - val_loss: 1.6782
Epoch 9/20
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━

'\nTreina o modelo Transformer usando batches de treino e validação.\n\nParâmetros principais:\n---------------------\ntrain_batches_for_fit : tf.data.Dataset\n    Dataset de treino preparado com batches e pré-processamento.\nval_batches_for_fit : tf.data.Dataset\n    Dataset de validação.\nepochs : int\n    Número de épocas de treinamento.\n\nRetorno:\n--------\nhistory : tf.keras.callbacks.History\n    Objeto contendo métricas de treino e validação por época.\n'

In [48]:
# Gráfico de perda
os.makedirs("img", exist_ok=True)
plt.figure(figsize=(8,5))
plt.plot(history.history["loss"], label="Treino")
plt.plot(history.history["val_loss"], label="Validação")
plt.xlabel("Época")
plt.ylabel("Loss")
plt.title("Curva de perda")
plt.legend()
plt.savefig("img/loss.png")
plt.close()
print("✅ Gráfico salvo em img/loss.png")

✅ Gráfico salvo em img/loss.png


In [49]:
# Função de tradução
def translate(sentence):
    """
    Traduz uma frase do português para o inglês usando o modelo Transformer treinado.

    Parâmetros:
    -----------
    sentence : str
        Frase em português a ser traduzida.

    Retorno:
    --------
    str
        Tradução gerada em inglês.

    Descrição:
    -----------
    - A frase é tokenizada e truncada para o comprimento máximo (MAX_TOKENS).
    - A decodificação é feita de forma autoregressiva:
        1. Inicializa-se o decoder com o token de início (start_token).
        2. Em cada passo, o modelo prevê o próximo token.
        3. O token predito é concatenado à sequência de saída.
        4. O loop continua até atingir o token de fim (end_token) ou MAX_TOKENS.
    - A sequência final é detokenizada para retornar a tradução em formato de texto.
    """
    pt_tokens = tokenizers.pt.tokenize([sentence])[:, :MAX_TOKENS].to_tensor()
    start_token = 1
    end_token = 2

    decoder_input = tf.constant([[start_token]])
    output = decoder_input

    for i in range(MAX_TOKENS):
        logits = model((pt_tokens, output))
        next_token = tf.argmax(logits[:, -1:, :], axis=-1)
        output = tf.concat([output, next_token], axis=-1)
        if next_token.numpy()[0][0] == end_token:
            break

    translation = tokenizers.en.detokenize(output[:, 1:])
    return translation.numpy()[0].decode('utf-8')


In [50]:
def train_with_device(device_name):
    """
    Treina o modelo Transformer em um dispositivo específico (CPU ou GPU) e mede o tempo de execução.

    Parâmetros:
    -----------
    device_name : str
        Nome do dispositivo a ser usado, por exemplo '/CPU:0' ou '/GPU:0'.

    Retorno:
    --------
    tuple
        - duration : float
            Tempo total de treinamento em segundos.
        - history : tf.keras.callbacks.History
            Histórico de métricas de treino e validação por época.

    Descrição:
    -----------
    - Define o dispositivo a ser usado com `tf.device`.
    - Compila o modelo usando o otimizador Adam e a função de perda `masked_loss`.
    - Executa o treinamento por 10 épocas.
    - Mede e retorna a duração total do treinamento juntamente com o histórico do treino.
    """
    with tf.device(device_name):
        model.compile(optimizer=tf.keras.optimizers.Adam(), loss=masked_loss)
        start = time.time()
        history = model.fit(train_batches_for_fit, validation_data=val_batches_for_fit, epochs=10)
        duration = time.time() - start
        return duration, history



In [51]:
cpu_time, _ = train_with_device("/CPU:0")
gpu_time, _ = train_with_device("/GPU:0")

print(f"CPU time: {cpu_time:.2f}s")
print(f"GPU time: {gpu_time:.2f}s")


Epoch 1/10
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m146s[0m 2s/step - loss: 0.0228 - val_loss: 1.1445
Epoch 2/10
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m139s[0m 2s/step - loss: 0.0178 - val_loss: 1.1787
Epoch 3/10
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m154s[0m 2s/step - loss: 0.0264 - val_loss: 1.1945
Epoch 4/10
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m123s[0m 2s/step - loss: 0.0225 - val_loss: 1.2311
Epoch 5/10
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m125s[0m 2s/step - loss: 0.0180 - val_loss: 1.1967
Epoch 6/10
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m143s[0m 2s/step - loss: 0.0145 - val_loss: 1.1696
Epoch 7/10
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m139s[0m 2s/step - loss: 0.0103 - val_loss: 1.1662
Epoch 8/10
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m147s[0m 2s/step - loss: 0.0119 - val_loss: 1.3013
Epoch 9/10
[1m63/63[0m [32m━━━━━━━━━━━━━━━━━━