This is a companion notebook for the book [Deep Learning with Python, Second Edition](https://www.manning.com/books/deep-learning-with-python-second-edition?a_aid=keras&a_bid=76564dff). For readability, it only contains runnable code blocks and section titles, and omits everything else in the book: text paragraphs, figures, and pseudocode.

**If you want to be able to follow what's going on, I recommend reading the notebook side by side with your copy of the book.**

This notebook was generated for TensorFlow 2.6.

## Beyond text classification: Sequence-to-sequence learning

### A machine translation example

In [4]:
# !wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
!unzip -q spa-eng.zip

replace spa-eng/_about.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C


In [52]:
text_file = "spa-eng/spa.txt"
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    english, spanish = line.split("\t")
    spanish = "[start] " + spanish + " [end]"
    text_pairs.append((english, spanish))

In [2]:
import pandas as pd

In [3]:
for i in range(2):
    eng_span = pd.read_pickle(f"~/Desktop/deep_learning/MT/train_en_es/train_{i}.pkl")
    port_eng = pd.read_pickle(f"~/Desktop/deep_learning/MT/train_en_pt/train_{i}.pkl")
    text_pairs = []
    for en, es in zip(eng_span["en_sent"] ,eng_span["span_sent"]):
        en = "<sp2> "+en
        es = "[strat] "+es+" [end]"
        text_pairs.append((en,es))

    for pt, en in zip(port_eng["port_sent"] ,port_eng["en_sent"]):
        pt = "<en2> "+pt
        en = "[strat] "+en+" [end]"
        text_pairs.append((pt,en))
# text_pairs = pd.concat((eng_span["en_sent"] ,eng_span["span_sent"]),axis=1)

In [4]:
import random
print(random.choice(text_pairs))

('<en2> O carro de Tom está estacionado lá fora.', "[strat] Tom's car is parked outside. [end]")


In [4]:
import random
random.shuffle(text_pairs[:30001])
random.shuffle(text_pairs[30001:30000*2])

In [5]:
val_num = int(0.2 * len(text_pairs))//2
val_pairs = text_pairs[:val_num] + text_pairs[30001:val_num+30001]
train_pairs = text_pairs[val_num:30001] + text_pairs[val_num+30001:]

In [21]:
len(val_pairs) + len(train_pairs)

60000

In [5]:
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

In [4]:
train_pairs = text_pairs

**Vectorizing the English and Spanish text pairs**

In [6]:
import tensorflow as tf
import string
import re
from tensorflow import keras
from tensorflow.keras import layers

strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")

vocab_size = 15000
sequence_length = 20

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_english_texts = [pair[0] for pair in train_pairs]
train_spanish_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)

2025-03-18 22:39:33.668107: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-18 22:39:33.677003: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1742317773.685213    4311 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1742317773.687356    4311 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-03-18 22:39:33.697275: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

**Preparing datasets for the translation task**

In [7]:
batch_size = 16

def format_dataset(eng, spa):
    eng = source_vectorization(eng)
    spa = target_vectorization(spa)
    return ({
        "english": eng,
        "spanish": spa[:, :-1],
    }, spa[:, 1:])

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048)

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [7]:
for inputs, targets in train_ds.take(1):
    print(f"inputs['english'].shape: {inputs['english'].shape}")
    print(f"inputs['spanish'].shape: {inputs['spanish'].shape}")
    print(f"targets.shape: {targets.shape}")

inputs['english'].shape: (16, 20)
inputs['spanish'].shape: (16, 20)
targets.shape: (16, 20)


2025-03-15 23:23:10.562671: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


### Sequence-to-sequence learning with RNNs

**GRU-based encoder**

In [10]:
from tensorflow import keras
from tensorflow.keras import layers

embed_dim = 256
latent_dim = 1024


source = keras.Input(shape=(None,), dtype="int64", name="english")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)
encoded_source = layers.Bidirectional(
    layers.LSTM(latent_dim), merge_mode="sum")(x)

**GRU-based decoder and the end-to-end model**

In [11]:
past_target = keras.Input(shape=(None,), dtype="int64", name="spanish")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)
x = decoder_gru(x, initial_state=encoded_source)
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(vocab_size, activation="softmax")(x)
seq2seq_rnn = keras.Model([source, past_target], target_next_step)

In [8]:
embed_dim = 256
latent_dim = 1024

In [11]:
import tensorflow as tf
import numpy as np


class Encoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.enc_units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')

    def call(self, x, hidden):
        x = self.embedding(x)
        output, state = self.gru(x, initial_state=hidden)
        return output, state

    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz, self.enc_units))


class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, query, values):
        # query shape == (batch_size, hidden size)
        # query_with_time_axis shape == (batch_size, 1, hidden size)
        # values shape == (batch_size, max_len, hidden size)
        query_with_time_axis = tf.expand_dims(query, 1)

        # score shape == (batch_size, max_length, 1)
        # we get 1 at the last axis because we are applying score to self.V
        # the shape of the tensor before applying self.V is (batch_size, max_length, units)
        score = self.V(tf.nn.tanh(
            self.W1(query_with_time_axis) + self.W2(values)))

        # attention_weights shape == (batch_size, max_length, 1)
        attention_weights = tf.nn.softmax(score, axis=1)

        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights


class Decoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.dec_units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc = tf.keras.layers.Dense(vocab_size)

        # used for attention
        self.attention = BahdanauAttention(self.dec_units)

    def call(self, x, hidden, enc_output):
        # enc_output shape == (batch_size, max_length, hidden_size)
        context_vector, attention_weights = self.attention(hidden, enc_output)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x, initial_state=hidden)

        # output shape == (batch_size * 1, hidden_size)
        output = tf.reshape(output, (-1, output.shape[2]))

        # output shape == (batch_size, vocab)
        x = self.fc(output)

        return x, state, attention_weights


class Seq2SeqWithAttention(tf.keras.Model):
    def __init__(self, inp_vocab_size, targ_vocab_size, embedding_dim, 
                 enc_units, dec_units, batch_sz, targ_lang_tokenizer):
        super(Seq2SeqWithAttention, self).__init__()
        self.encoder = Encoder(inp_vocab_size, embedding_dim, enc_units, batch_sz)
        self.decoder = Decoder(targ_vocab_size, embedding_dim, dec_units, batch_sz)
        self.targ_lang_tokenizer = targ_lang_tokenizer
        self.batch_sz = batch_sz

    def call(self, inputs, training=True):
        # Unpack the inputs
        inp, targ = inputs

        enc_hidden = self.encoder.initialize_hidden_state()
        enc_output, enc_hidden = self.encoder(inp, enc_hidden)

        dec_hidden = enc_hidden
        
        # Start token ID (assuming it's 1, adjust as needed)
        start_token_id = tf.constant([self.targ_lang_tokenizer.word_index['<start>']] * self.batch_sz)
        start_token_id = tf.reshape(start_token_id, (self.batch_sz, 1))
        
        # Teacher forcing - feeding the target as the next input
        dec_input = start_token_id  # Start with start token
        
        # Initialize the outputs container
        outputs = []
        
        # For each timestep in the target sequence
        for t in range(1, targ.shape[1]):
            # Passing enc_output to the decoder
            predictions, dec_hidden, _ = self.decoder(dec_input, dec_hidden, enc_output)
            
            outputs.append(predictions)
            
            if training:
                # Teacher forcing - use actual target tokens as next input
                dec_input = tf.expand_dims(targ[:, t], 1)
            else:
                # Use predictions as next input
                predicted_id = tf.argmax(predictions, axis=1)
                dec_input = tf.expand_dims(predicted_id, 1)
        
        return tf.stack(outputs, axis=1)  # Shape: [batch_size, targ_seq_len-1, targ_vocab_size]

# Example of how to use the model for training
def train_model(dataset, epochs, model, optimizer, loss_function):
    for epoch in range(epochs):
        total_loss = 0
        
        for (batch, (inp, targ)) in enumerate(dataset):
            print(inp,"/n/n/n",targ)
            with tf.GradientTape() as tape:
                predictions = model((inp, targ), training=True)
                
                # Calculate loss (targ[:, 1:] is the target without start token)
                loss = loss_function(targ[:, 1:], predictions)
                
            # Apply gradients
            variables = model.trainable_variables
            gradients = tape.gradient(loss, variables)
            optimizer.apply_gradients(zip(gradients, variables))
            
            total_loss += loss
            
        print(f'Epoch {epoch+1} Loss {total_loss/len(dataset):.4f}')

# Usage example:
if __name__ == "__main__":
    # These values would be set based on your dataset
    BATCH_SIZE = 64
    embedding_dim = 256
    units = 1024
    
    # Example tokenizers and vocab sizes (these would come from preprocessing your data)
    inp_lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    targ_lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    
    # After fitting tokenizers on your data:
    inp_vocab_size = len(inp_lang_tokenizer.word_index) + 1
    targ_vocab_size = len(targ_lang_tokenizer.word_index) + 1
    
    # Create and compile model
    model = Seq2SeqWithAttention(inp_vocab_size, targ_vocab_size, 
                                embedding_dim, units, units, 
                                BATCH_SIZE, targ_lang_tokenizer)
    
    optimizer = tf.keras.optimizers.Adam()
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
    
    def loss_function(real, pred):
        mask = tf.math.logical_not(tf.math.equal(real, 0))
        loss_ = loss_object(real, pred)
        
        mask = tf.cast(mask, dtype=loss_.dtype)
        loss_ *= mask
        
        return tf.reduce_mean(loss_)
    
    # Example of how you might create a dataset (you would use your actual data)
    # dataset = tf.data.Dataset.from_tensor_slices((input_tensor, target_tensor)).shuffle(BUFFER_SIZE)
    # dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
    
    # Then you would call:
    train_model(train_ds, 5, model, optimizer, loss_function)

{'english': <tf.Tensor: shape=(16, 20), dtype=int64, numpy=
array([[    3,    14,   131,     7,    16,   147,   146,    24,   110,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0],
       [    3,  1278,   245,  3472,     9,   186,     7,    97, 11072,
           30,     7,    12,   852,    25,    12,     1,    30,     7,
          852,    25],
       [    3,    12,    19, 14133,    71,     4,    27,   721,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0],
       [    3,     5,   918,   162,   735,     4,  2887,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0],
       [    3,    14,  2191,     9,  1207,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0],
       [    3,     7,  1620,    27,     1,     0,     0,     0,     0,
            0,     0,     0,     0,     0,  

1. The `call()` method of your layer may be crashing. Try to `__call__()` the layer eagerly on some test input first to see if it works. E.g. `x = np.random.random((3, 4)); y = layer(x)`
2. If the `call()` method is correct, then you may need to implement the `def build(self, input_shape)` method on your layer. It should create all variables used by the layer (e.g. by calling `layer.build()` on all its children layers).
Exception encountered: ''Exception encountered when calling Embedding.call().

[1m'dict' object has no attribute 'dtype'[0m

Arguments received by Embedding.call():
  • inputs={'english': 'tf.Tensor(shape=(16, 20), dtype=int64)', 'spanish': 'tf.Tensor(shape=(16, 20), dtype=int64)'}''
1. The `call()` method of your layer may be crashing. Try to `__call__()` the layer eagerly on some test input first to see if it works. E.g. `x = np.random.random((3, 4)); y = layer(x)`
2. If the `call()` method is correct, then you may need to implement the `def build(self, input_shape)

AttributeError: Exception encountered when calling Embedding.call().

[1m'dict' object has no attribute 'dtype'[0m

Arguments received by Embedding.call():
  • inputs={'english': 'tf.Tensor(shape=(16, 20), dtype=int64)', 'spanish': 'tf.Tensor(shape=(16, 20), dtype=int64)'}

In [None]:
import tensorflow as tf
import numpy as np


class Encoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        
        # Bidirectional LSTM layer
        self.bilstm = tf.keras.layers.Bidirectional(
            tf.keras.layers.LSTM(self.enc_units,
                                return_sequences=True,
                                return_state=True,
                                recurrent_initializer='glorot_uniform')
        )

    def call(self, x):
        x = self.embedding(x)
        
        # Bidirectional LSTM returns outputs and states from both directions
        output, forward_h, forward_c, backward_h, backward_c = self.bilstm(x)
        
        # Concatenate the forward and backward states
        state_h = tf.concat([forward_h, backward_h], axis=-1)
        state_c = tf.concat([forward_c, backward_c], axis=-1)
        
        return output, state_h, state_c


class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, query, values):
        # query shape == (batch_size, hidden size)
        # query_with_time_axis shape == (batch_size, 1, hidden size)
        # values shape == (batch_size, max_len, hidden size)
        query_with_time_axis = tf.expand_dims(query, 1)

        # score shape == (batch_size, max_length, 1)
        score = self.V(tf.nn.tanh(
            self.W1(query_with_time_axis) + self.W2(values)))

        # attention_weights shape == (batch_size, max_length, 1)
        attention_weights = tf.nn.softmax(score, axis=1)

        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights


class Decoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        
        # Use LSTM for decoder (note: not bidirectional in decoder)
        self.lstm = tf.keras.layers.LSTM(self.dec_units * 2,  # *2 because encoder output is bidirectional
                                        return_sequences=True,
                                        return_state=True,
                                        recurrent_initializer='glorot_uniform')
                                        
        self.fc = tf.keras.layers.Dense(vocab_size)

        # Attention mechanism
        self.attention = BahdanauAttention(self.dec_units * 2)

    def call(self, x, hidden_state, cell_state, enc_output):
        # enc_output shape == (batch_size, max_length, hidden_size*2)
        context_vector, attention_weights = self.attention(hidden_state, enc_output)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(x)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size*2)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the LSTM
        output, state_h, state_c = self.lstm(x, initial_state=[hidden_state, cell_state])

        # output shape == (batch_size * 1, hidden_size*2)
        output = tf.reshape(output, (-1, output.shape[2]))

        # output shape == (batch_size, vocab)
        x = self.fc(output)

        return x, state_h, state_c, attention_weights


class Seq2SeqWithAttention(tf.keras.Model):
    def __init__(self, inp_vocab_size, targ_vocab_size, embedding_dim, 
                enc_units, dec_units, batch_sz, targ_lang_tokenizer):
        super(Seq2SeqWithAttention, self).__init__()
        self.encoder = Encoder(inp_vocab_size, embedding_dim, enc_units, batch_sz)
        self.decoder = Decoder(targ_vocab_size, embedding_dim, dec_units, batch_sz)
        self.targ_lang_tokenizer = targ_lang_tokenizer
        self.batch_sz = batch_sz

    def call(self, inputs, training=True):
        # Unpack the inputs
        inp, targ = inputs

        # Encoder output
        enc_output, enc_hidden, enc_cell = self.encoder(inp)

        # Initialize decoder state with encoder final states
        dec_hidden = enc_hidden
        dec_cell = enc_cell
        
        # Start token ID (assuming it's 1, adjust as needed)
        start_token_id = tf.constant([self.targ_lang_tokenizer.word_index['<start>']] * self.batch_sz)
        start_token_id = tf.reshape(start_token_id, (self.batch_sz, 1))
        
        # Initialize the outputs container
        outputs = []
        
        # For each timestep in the target sequence
        dec_input = start_token_id  # Start with start token
        
        for t in range(1, targ.shape[1]):
            # Passing enc_output to the decoder
            predictions, dec_hidden, dec_cell, _ = self.decoder(
                dec_input, dec_hidden, dec_cell, enc_output)
            
            outputs.append(predictions)
            
            if training:
                # Teacher forcing - use actual target tokens as next input
                dec_input = tf.expand_dims(targ[:, t], 1)
            else:
                # Use predictions as next input
                predicted_id = tf.argmax(predictions, axis=1)
                dec_input = tf.expand_dims(predicted_id, 1)
        
        return tf.stack(outputs, axis=1)  # Shape: [batch_size, targ_seq_len-1, targ_vocab_size]

    def initialize_states(self, batch_size):
        # This method can be used during inference to initialize states
        return tf.zeros((batch_size, self.decoder.dec_units * 2)), tf.zeros((batch_size, self.decoder.dec_units * 2))


# Training function
def train_model(dataset, epochs, model, optimizer, loss_function):
    for epoch in range(epochs):
        total_loss = 0
        
        for (batch, (inp, targ)) in enumerate(dataset):
            with tf.GradientTape() as tape:
                predictions = model((inp, targ), training=True)
                
                # Calculate loss (targ[:, 1:] is the target without start token)
                loss = loss_function(targ[:, 1:], predictions)
                
            # Apply gradients
            variables = model.trainable_variables
            gradients = tape.gradient(loss, variables)
            optimizer.apply_gradients(zip(gradients, variables))
            
            total_loss += loss
            
        print(f'Epoch {epoch+1} Loss {total_loss/len(dataset):.4f}')


# Example usage
if __name__ == "__main__":
    # These values would be set based on your dataset
    BATCH_SIZE = 64
    embedding_dim = 256
    units = 512  # Each direction will have this many units
    
    # Example tokenizers and vocab sizes (these would come from preprocessing your data)
    inp_lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    targ_lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    
    # After fitting tokenizers on your data:
    inp_vocab_size = len(inp_lang_tokenizer.word_index) + 1
    targ_vocab_size = len(targ_lang_tokenizer.word_index) + 1
    
    # Create and compile model
    model = Seq2SeqWithAttention(inp_vocab_size, targ_vocab_size, 
                                embedding_dim, units, units, 
                                BATCH_SIZE, targ_lang_tokenizer)
    
    optimizer = tf.keras.optimizers.Adam()
    loss_object = tf.keras.losses.SparseCategori
    calCrossentropy(from_logits=True, reduction='none')
    
    def loss_function(real, pred):
        mask = tf.math.logical_not(tf.math.equal(real, 0))
        loss_ = loss_object(real, pred)
        
        mask = tf.cast(mask, dtype=loss_.dtype)
        loss_ *= mask
        
        return tf.reduce_mean(loss_)
    
    # Example of creating a dataset (use your actual data)
    # dataset = tf.data.Dataset.from_tensor_slices((input_tensor, target_tensor)).shuffle(BUFFER_SIZE)
    # dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
    
    # Call the training function
    # train_model(dataset, EPOCHS, model, optimizer, loss_function)


# Inference function (for generating translations)
def translate(model, sentence, max_length=40):
    # Preprocess the sentence (tokenize, pad, etc.)
    # ...
    
    # Initialize decoder input and states
    dec_input = tf.expand_dims([targ_lang_tokenizer.word_index['<start>']], 0)
    dec_hidden, dec_cell = model.initialize_states(1)
    
    # Get encoder output
    inp_tensor = tf.convert_to_tensor([sentence])
    enc_output, enc_hidden, enc_cell = model.encoder(inp_tensor)
    
    dec_hidden = enc_hidden
    dec_cell = enc_cell
    
    result = []
    
    for t in range(max_length):
        predictions, dec_hidden, dec_cell, attention_weights = model.decoder(
            dec_input, dec_hidden, dec_cell, enc_output)
        
        # Get the predicted ID
        predicted_id = tf.argmax(predictions[0]).numpy()
        
        # Append to result
        if targ_lang_tokenizer.index_word[predicted_id] == '<end>':
            break
        result.append(targ_lang_tokenizer.index_word[predicted_id])
        
        # Use the predicted ID as the next input
        dec_input = tf.expand_dims([predicted_id], 0)
    
    return ' '.join(result)

In [16]:
print(seq2seq_rnn.summary())

None


**Training our recurrent sequence-to-sequence model**

In [12]:
seq2seq_rnn.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
seq2seq_rnn.fit(train_ds, epochs=5, validation_data=val_ds)

Epoch 1/5


2025-03-15 23:29:51.643424: W tensorflow/core/framework/op_kernel.cc:1829] INVALID_ARGUMENT: required broadcastable shapes


InvalidArgumentError: Graph execution error:

Detected at node functional_1_1/attention_1_1/sub defined at (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main

  File "<frozen runpy>", line 88, in _run_code

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/ipykernel_launcher.py", line 18, in <module>

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/traitlets/config/application.py", line 1075, in launch_instance

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/ipykernel/kernelapp.py", line 739, in start

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/tornado/platform/asyncio.py", line 205, in start

  File "/usr/lib/python3.11/asyncio/base_events.py", line 607, in run_forever

  File "/usr/lib/python3.11/asyncio/base_events.py", line 1922, in _run_once

  File "/usr/lib/python3.11/asyncio/events.py", line 80, in _run

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/ipykernel/kernelbase.py", line 545, in dispatch_queue

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/ipykernel/kernelbase.py", line 534, in process_one

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/ipykernel/kernelbase.py", line 437, in dispatch_shell

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/ipykernel/ipkernel.py", line 362, in execute_request

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/ipykernel/kernelbase.py", line 778, in execute_request

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/ipykernel/ipkernel.py", line 449, in do_execute

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/ipykernel/zmqshell.py", line 549, in run_cell

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 3077, in run_cell

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 3132, in _run_cell

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/IPython/core/async_helpers.py", line 128, in _pseudo_sync_runner

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 3336, in run_cell_async

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 3519, in run_ast_nodes

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 3579, in run_code

  File "/tmp/ipykernel_12844/4134485903.py", line 5, in <module>

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/utils/traceback_utils.py", line 117, in error_handler

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/backend/tensorflow/trainer.py", line 371, in fit

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/backend/tensorflow/trainer.py", line 219, in function

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/backend/tensorflow/trainer.py", line 132, in multi_step_on_iterator

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/backend/tensorflow/trainer.py", line 113, in one_step_on_data

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/backend/tensorflow/trainer.py", line 57, in train_step

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/utils/traceback_utils.py", line 117, in error_handler

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/layers/layer.py", line 908, in __call__

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/utils/traceback_utils.py", line 117, in error_handler

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/ops/operation.py", line 46, in __call__

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/utils/traceback_utils.py", line 156, in error_handler

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/models/functional.py", line 182, in call

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/ops/function.py", line 171, in _run_through_graph

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/models/functional.py", line 637, in call

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/utils/traceback_utils.py", line 117, in error_handler

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/layers/layer.py", line 908, in __call__

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/utils/traceback_utils.py", line 117, in error_handler

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/ops/operation.py", line 46, in __call__

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/utils/traceback_utils.py", line 156, in error_handler

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/layers/attention/attention.py", line 233, in call

  File "/home/admin/Desktop/deep_learning/.venv/lib/python3.11/site-packages/keras/src/layers/attention/attention.py", line 180, in _apply_scores

required broadcastable shapes
	 [[{{node functional_1_1/attention_1_1/sub}}]] [Op:__inference_multi_step_on_iterator_9552]

**Translating new sentences with our RNN encoder and decoder**

In [11]:
import numpy as np
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])
        next_token_predictions = seq2seq_rnn.predict(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(next_token_predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return decoded_sentence

# test_eng_texts = [pair[0] for pair in test_pairs]
# for _ in range(20):
#     input_sentence = random.choice(test_eng_texts)
#     print("-")
#     print(input_sentence)
#     print(decode_sequence(input_sentence))
print(decode_sequence("<sp2> O mercúrio é também conhecido como azougue."))

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 135ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
[start] muchas palabras son [UNK] y [UNK] [end]


### Sequence-to-sequence learning with Transformer

#### The Transformer decoder

**The `TransformerDecoder`**

In [129]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

#### Putting it all together: A Transformer for machine translation

**PositionalEmbedding layer**

In [134]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

**End-to-end Transformer**

In [135]:
embed_dim = 256
dense_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spanish")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

ValueError: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.operations`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```


**Training the sequence-to-sequence Transformer**

In [133]:
transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
transformer.fit(train_ds, epochs=30, validation_data=val_ds)

NameError: name 'transformer' is not defined

**Translating new sentences with our Transformer model**

In [0]:
import numpy as np
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1]
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
    input_sentence = random.choice(test_eng_texts)
    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))

## Summary