In [None]:
%pip install tensorflow==2.12.*

**LOAD DATA**

In [None]:
import pathlib

import tensorflow as tf

# download dataset provided by Anki: https://www.manythings.org/anki/
text_file = tf.keras.utils.get_file(
    fname="fra-eng.zip",
    origin="http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip",
    extract=True,
)
# show where the file is located now
text_file = pathlib.Path(text_file).parent / "fra.txt"
print(text_file)

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip
/root/.keras/datasets/fra.txt


**TEXT NORMALIZATION**
> Normalization: covert into NFKC - compatibility and compositional normal form


> Tokenization: separating words into tokens but not punctuation





In [None]:
import pathlib
import pickle
import random
import re
import unicodedata

# download dataset provided by Anki: https://www.manythings.org/anki/
text_file = tf.keras.utils.get_file(
    fname="fra-eng.zip",
    origin="http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip",
    extract=True,
)
text_file = pathlib.Path(text_file).parent / "fra.txt"

def normalize(line):
    """Normalize a line of text and split into two at the tab character"""
    line = unicodedata.normalize("NFKC", line.strip().lower())
    line = re.sub(r"^([^ \w])(?!\s)", r"\1 ", line)
    line = re.sub(r"(\s[^ \w])(?!\s)", r"\1 ", line)
    line = re.sub(r"(?!\s)([^ \w])$", r" \1", line)
    line = re.sub(r"(?!\s)([^ \w]\s)", r" \1", line)
    eng, fra = line.split("\t")
    fra = "[start] " + fra + " [end]"
    return eng, fra

# normalize each line and separate into English and French
with open(text_file) as fp:
    text_pairs = [normalize(line) for line in fp]

# print some samples
for _ in range(5):
    print(random.choice(text_pairs))

with open("text_pairs.pickle", "wb") as fp:
    pickle.dump(text_pairs, fp)

("i'll wait in the gym .", "[start] j'attendrai à la salle de sport . [end]")
('i cannot look at this picture without thinking of my dead mother .', '[start] je ne peux regarder cette photo sans penser à ma défunte mère . [end]')
('my nose itches .', '[start] le nez me gratte . [end]')
('are you a policeman ?', '[start] êtes-vous un agent de police  ?  [end]')
('her father works at the bank .', '[start] son père travaille à la banque . [end]')


In [None]:
#Getting statistics of dataset

with open("text_pairs.pickle", "rb") as fp:
    text_pairs = pickle.load(fp)

# count tokens
eng_tokens, fra_tokens = set(), set()
eng_maxlen, fra_maxlen = 0, 0
for eng, fra in text_pairs:
    eng_tok, fra_tok = eng.split(), fra.split()
    eng_maxlen = max(eng_maxlen, len(eng_tok))
    fra_maxlen = max(fra_maxlen, len(fra_tok))
    eng_tokens.update(eng_tok)
    fra_tokens.update(fra_tok)
print(f"Total English tokens: {len(eng_tokens)}")
print(f"Total French tokens: {len(fra_tokens)}")
print(f"Max English length: {eng_maxlen}")
print(f"Max French length: {fra_maxlen}")
print(f"{len(text_pairs)} total pairs")

Total English tokens: 14969
Total French tokens: 29219
Max English length: 51
Max French length: 60
167130 total pairs


In [None]:
# import matplotlib.pyplot as plt

# with open("text_pairs.pickle", "rb") as fp:
#     text_pairs = pickle.load(fp)

# # histogram of sentence length in tokens
# en_lengths = [len(eng.split()) for eng, fra in text_pairs]
# fr_lengths = [len(fra.split()) for eng, fra in text_pairs]

# plt.hist(en_lengths, label="en", color="red", alpha=0.33)
# plt.hist(fr_lengths, label="fr", color="blue", alpha=0.33)
# plt.yscale("log")     # sentence length fits Benford"s law
# plt.ylim(plt.ylim())  # make y-axis consistent for both plots
# plt.plot([max(en_lengths), max(en_lengths)], plt.ylim(), color="red")
# plt.plot([max(fr_lengths), max(fr_lengths)], plt.ylim(), color="blue")
# plt.legend()
# plt.title("Examples count vs Token length")
# plt.show()

**TEXT VECTORIZATION**

In [None]:
from tensorflow.keras.layers import TextVectorization

# Load normalized sentence pairs
with open("text_pairs.pickle", "rb") as fp:
    text_pairs = pickle.load(fp)

# train-test-val split of randomized sentence pairs
random.shuffle(text_pairs)
n_val = int(0.15*len(text_pairs))
n_train = len(text_pairs) - 2*n_val
train_pairs = text_pairs[:n_train]
val_pairs = text_pairs[n_train:n_train+n_val]
test_pairs = text_pairs[n_train+n_val:]

# Parameter determined after analyzing the input data
vocab_size_en = 10000
vocab_size_fr = 20000
seq_length = 20

# Create vectorizer
eng_vectorizer = TextVectorization(
    max_tokens=vocab_size_en,
    standardize=None,
    split="whitespace",
    output_mode="int",
    output_sequence_length=seq_length,
)
fra_vectorizer = TextVectorization(
    max_tokens=vocab_size_fr,
    standardize=None,
    split="whitespace",
    output_mode="int",
    output_sequence_length=seq_length + 1
)

# train the vectorization layer using training dataset
train_eng_texts = [pair[0] for pair in train_pairs]
train_fra_texts = [pair[1] for pair in train_pairs]
eng_vectorizer.adapt(train_eng_texts)
fra_vectorizer.adapt(train_fra_texts)

# save for subsequent steps
with open("vectorize.pickle", "wb") as fp:
    data = {
        "train": train_pairs,
        "val":   val_pairs,
        "test":  test_pairs,
        "engvec_config":  eng_vectorizer.get_config(),
        "engvec_weights": eng_vectorizer.get_weights(),
        "fravec_config":  fra_vectorizer.get_config(),
        "fravec_weights": fra_vectorizer.get_weights(),
    }
    pickle.dump(data, fp)

In [None]:
# load text data and vectorizer weights
with open("vectorize.pickle", "rb") as fp:
    data = pickle.load(fp)

train_pairs = data["train"]
val_pairs = data["val"]
test_pairs = data["test"]   # not used

eng_vectorizer = TextVectorization.from_config(data["engvec_config"])
eng_vectorizer.set_weights(data["engvec_weights"])
fra_vectorizer = TextVectorization.from_config(data["fravec_config"])
fra_vectorizer.set_weights(data["fravec_weights"])

# set up Dataset object
def format_dataset(eng, fra):
    eng = eng_vectorizer(eng)
    fra = fra_vectorizer(fra)
    source = {"encoder_inputs": eng,
              "decoder_inputs": fra[:, :-1]}
    target = fra[:, 1:]
    return (source, target)

def make_dataset(pairs, batch_size=64):
    # aggregate sentences using zip(*pairs)
    eng_texts, fra_texts = zip(*pairs)
    # convert them into list, and then create tensors
    dataset = tf.data.Dataset.from_tensor_slices((list(eng_texts), list(fra_texts)))
    return dataset.shuffle(2048) \
                  .batch(batch_size).map(format_dataset) \
                  .prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

# test the dataset
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["encoder_inputs"][0]: {inputs["encoder_inputs"][0]}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"][0]: {inputs["decoder_inputs"][0]}')
    print(f"targets.shape: {targets.shape}")
    print(f"targets[0]: {targets[0]}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["encoder_inputs"][0]: [ 10 670  18   5  45   2   0   0   0   0   0   0   0   0   0   0   0   0
   0   0]
inputs["decoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"][0]: [   2   16  108 1114    6  205    4    3    0    0    0    0    0    0
    0    0    0    0    0    0]
targets.shape: (64, 20)
targets[0]: [  16  108 1114    6  205    4    3    0    0    0    0    0    0    0
    0    0    0    0    0    0]


**POSITIONAL ENCODING MATRIX**

In [None]:
#POSITIONAL ENCODING MATRIX
def pos_enc_matrix(L, d, n=10000):
    assert d % 2 == 0, "Output dimension needs to be an even integer"
    d2 = d//2
    P = np.zeros((L, d))
    k = np.arange(L).reshape(-1, 1)     # L-column vector
    i = np.arange(d2).reshape(1, -1)    # d-row vector
    denom = np.power(n, -i/d2)          # n**(-2*i/d)
    args = k * denom                    # (L,d) matrix
    P[:, ::2] = np.sin(args)
    P[:, 1::2] = np.cos(args)
    return P

#POSITIONAL EMBEDDING LAYER
class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim     # d_model in paper
        # token embedding layer: Convert integer token to D-dim float vector
        self.token_embeddings = tf.keras.layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim, mask_zero=True
        )
        # positional embedding layer: a matrix of hard-coded sine values
        matrix = pos_enc_matrix(sequence_length, embed_dim)
        self.position_embeddings = tf.constant(matrix, dtype="float32")

    def call(self, inputs):
        embedded_tokens = self.token_embeddings(inputs)
        return embedded_tokens + self.position_embeddings

    def compute_mask(self, *args, **kwargs):
        return self.token_embeddings.compute_mask(*args, **kwargs)

    def get_config(self):
        # to make save and load a model using custom layer possible
        config = super().get_config()
        config.update({
            "sequence_length": self.sequence_length,
            "vocab_size": self.vocab_size,
            "embed_dim": self.embed_dim,
        })
        return config

In [None]:
vocab_size_en = 10000
seq_length = 20

# test the dataset
for inputs, targets in train_ds.take(1):
    print(inputs["encoder_inputs"])
    embed_en = PositionalEmbedding(seq_length, vocab_size_en, embed_dim=512)
    en_emb = embed_en(inputs["encoder_inputs"])
    print(en_emb.shape)
    print(en_emb._keras_mask)

tf.Tensor(
[[   3   31    5 ...    0    0    0]
 [  12 4977  338 ...    0    0    0]
 [  27   41    4 ...    0    0    0]
 ...
 [  93   13    4 ...    0    0    0]
 [   4   38   45 ...    0    0    0]
 [ 140  118   11 ...    0    0    0]], shape=(64, 20), dtype=int64)
(64, 20, 512)
tf.Tensor(
[[ True  True  True ... False False False]
 [ True  True  True ... False False False]
 [ True  True  True ... False False False]
 ...
 [ True  True  True ... False False False]
 [ True  True  True ... False False False]
 [ True  True  True ... False False False]], shape=(64, 20), dtype=bool)


**TRANSFORMER BLOCK**

*Self-Attention Model*

In [None]:
"""The attention mechanism is applied to the inputs tensor, where the query, value, and key are all set to the inputs tensor itself.
The purpose is to capture dependencies within the input sequence and attend to different parts of the sequence based on their relevance to each other.
(This function assumes its input is the output from positional encoding layer.)"""

def self_attention(input_shape, prefix="att", mask=False, **kwargs):
    # create layers
    # Define an input layer with the specified input_shape and name.
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32',
                                   name=f"{prefix}_in1")

    # Create a MultiHeadAttention layer with the given keyword arguments and name.
    attention = tf.keras.layers.MultiHeadAttention(name=f"{prefix}_attn1", **kwargs)

    # Create a LayerNormalization layer
    norm = tf.keras.layers.LayerNormalization(name=f"{prefix}_norm1")

    # Create an Add layer
    add = tf.keras.layers.Add(name=f"{prefix}_add1")

    # functional API to connect input to output
    # Apply the attention layer to the inputs using the same inputs as query, value, and key.
    # If mask is True, a causal mask is applied during the attention calculation.
    attout = attention(query=inputs, value=inputs, key=inputs,
                       use_causal_mask=mask)

    # Add the inputs and attout using the Add layer, and then apply layer normalization using the norm layer to the sum.
    outputs = norm(add([inputs, attout]))

    # Create a Keras model with the inputs and outputs defined.
    model = tf.keras.Model(inputs=inputs, outputs=outputs, name=f"{prefix}_att")
    return model

seq_length = 20
key_dim = 128
num_heads = 8

model = self_attention(input_shape=(seq_length, key_dim),
                       num_heads=num_heads, key_dim=key_dim)

# Visualization of self_attention function
# tf.keras.utils.plot_model(model, "self-attention.png",
#                           show_shapes=True, show_dtype=True, show_layer_names=True,
#                           rankdir='BT', show_layer_activations=True)

*Cross-Attention Model*

In [None]:
"""The attention mechanism is applied between two tensors: inputs (eng) and context (fre).
The inputs tensor serves as the query, while the context tensor serves as both the value and the key.
The purpose is to attend to relevant information in the context tensor based on the query in the inputs tensor.
This allows the model to consider information from a different context, such as attending to relevant information in an encoder-decoder architecture.
In this case, this function allows the model to attend to the relevant parts of the English input sentence while generating each word of the French context sentence during the translation process.
"""

def cross_attention(input_shape, context_shape, prefix="att", **kwargs):
    # create layers
    # Define an input layer for the context (french).
    context = tf.keras.layers.Input(shape=context_shape, dtype='float32',
                                    name=f"{prefix}_ctx2")

    # Define an input layer for the inputs (english)
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32',
                                   name=f"{prefix}_in2")

    # Create a MultiHeadAttention layer.
    attention = tf.keras.layers.MultiHeadAttention(name=f"{prefix}_attn2", **kwargs)

    # Create a LayerNormalization layer.
    norm = tf.keras.layers.LayerNormalization(name=f"{prefix}_norm2")

    # Create an Add layer.
    add = tf.keras.layers.Add(name=f"{prefix}_add2")

    # functional API to connect input to output
    # Apply the attention layer to the inputs using the context as the value and key, and inputs as the query.
    attout = attention(query=inputs, value=context, key=context)

    # Add the attout and inputs using the Add layer, and then apply layer normalization using the norm layer to the sum.
    outputs = norm(add([attout, inputs]))
    # create model and return
    model = tf.keras.Model(inputs=[(context, inputs)], outputs=outputs,
                           name=f"{prefix}_cross")
    return model

seq_length = 20
key_dim = 128
num_heads = 8

model = cross_attention(input_shape=(seq_length, key_dim),
                        context_shape=(seq_length, key_dim),
                        num_heads=num_heads, key_dim=key_dim)
# tf.keras.utils.plot_model(model, "cross-attention.png",
#                           show_shapes=True, show_dtype=True, show_layer_names=True,
#                           rankdir='BT', show_layer_activations=True)

*Feed-Forward Model*

In [None]:
def feed_forward(input_shape, model_dim, ff_dim, dropout=0.1, prefix="ff"):
    # create layers
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32',
                                   name=f"{prefix}_in3")
    dense1 = tf.keras.layers.Dense(ff_dim, name=f"{prefix}_ff1", activation="relu")
    dense2 = tf.keras.layers.Dense(model_dim, name=f"{prefix}_ff2")
    drop = tf.keras.layers.Dropout(dropout, name=f"{prefix}_drop")
    add = tf.keras.layers.Add(name=f"{prefix}_add3")
    # functional API to connect input to output
    ffout = drop(dense2(dense1(inputs)))
    norm = tf.keras.layers.LayerNormalization(name=f"{prefix}_norm3")
    outputs = norm(add([inputs, ffout]))
    # create model and return
    model = tf.keras.Model(inputs=inputs, outputs=outputs, name=f"{prefix}_ff")
    return model

seq_length = 20
key_dim = 128
ff_dim = 512

model = feed_forward(input_shape=(seq_length, key_dim),
                     model_dim=key_dim, ff_dim=ff_dim)
# tf.keras.utils.plot_model(model, "feedforward.png",
#                           show_shapes=True, show_dtype=True, show_layer_names=True,
#                           rankdir='BT', show_layer_activations=True)

*Encoder Block*

In [None]:
def encoder(input_shape, key_dim, ff_dim, dropout=0.1, prefix="enc", **kwargs):
    model = tf.keras.models.Sequential([
        tf.keras.layers.Input(shape=input_shape, dtype='float32', name=f"{prefix}_in0"),
        self_attention(input_shape, prefix=prefix, key_dim=key_dim, mask=False, **kwargs),
        feed_forward(input_shape, key_dim, ff_dim, dropout, prefix),
    ], name=prefix)
    return model


seq_length = 20
key_dim = 128
ff_dim = 512
num_heads = 8

model = encoder(input_shape=(seq_length, key_dim), key_dim=key_dim, ff_dim=ff_dim,
                num_heads=num_heads)
# tf.keras.utils.plot_model(model, "encoder.png",
#                           show_shapes=True, show_dtype=True, show_layer_names=True,
#                           rankdir='BT', show_layer_activations=True)

*Decoder Block*

In [None]:
def decoder(input_shape, key_dim, ff_dim, dropout=0.1, prefix="dec", **kwargs):
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32',
                                   name=f"{prefix}_in0")
    context = tf.keras.layers.Input(shape=input_shape, dtype='float32',
                                    name=f"{prefix}_ctx0")
    attmodel = self_attention(input_shape, key_dim=key_dim, mask=True,
                              prefix=prefix, **kwargs)
    crossmodel = cross_attention(input_shape, input_shape, key_dim=key_dim,
                                 prefix=prefix, **kwargs)
    ffmodel = feed_forward(input_shape, key_dim, ff_dim, dropout, prefix)
    x = attmodel(inputs)
    x = crossmodel([(context, x)])
    output = ffmodel(x)
    model = tf.keras.Model(inputs=[(inputs, context)], outputs=output, name=prefix)
    return model


seq_length = 20
key_dim = 128
ff_dim = 512
num_heads = 8

model = decoder(input_shape=(seq_length, key_dim), key_dim=key_dim, ff_dim=ff_dim,
                num_heads=num_heads)
# tf.keras.utils.plot_model(model, "decoder.png",
#                           show_shapes=True, show_dtype=True, show_layer_names=True,
#                           rankdir='BT', show_layer_activations=True)

*Transfomer Block*

In [None]:
def transformer(num_layers, num_heads, seq_len, key_dim, ff_dim, vocab_size_src,
                vocab_size_tgt, dropout=0.1, name="transformer"):
    embed_shape = (seq_len, key_dim)  # output shape of the positional embedding layer

    # set up layers
    # Define an input layer for the encoder inputs.
    input_enc = tf.keras.layers.Input(shape=(seq_len,), dtype="int32",
                                      name="encoder_inputs")

    # Define an input layer for the decoder inputs
    input_dec = tf.keras.layers.Input(shape=(seq_len,), dtype="int32",
                                      name="decoder_inputs")

    # Create a positional embedding layer for the encoder inputs with the specified sequence length, source vocabulary size, and key dimension.
    embed_enc = PositionalEmbedding(seq_len, vocab_size_src, key_dim, name="embed_enc")

    # Create a positional embedding layer for the decoder inputs
    embed_dec = PositionalEmbedding(seq_len, vocab_size_tgt, key_dim, name="embed_dec")

    # Create a list of encoder layers with the.
    # Each encoder layer is created using the 'encoder' function with a unique prefix name.
    encoders = [encoder(input_shape=embed_shape, key_dim=key_dim,
                        ff_dim=ff_dim, dropout=dropout, prefix=f"enc{i}",
                        num_heads=num_heads)
                for i in range(num_layers)]

    # Create a list of decoder layers
    # Each decoder layer is created using the 'decoder' function with a unique prefix name.
    decoders = [decoder(input_shape=embed_shape, key_dim=key_dim,
                        ff_dim=ff_dim, dropout=dropout, prefix=f"dec{i}",
                        num_heads=num_heads)
                for i in range(num_layers)]

    # Create a Dense layer with the specified target vocabulary size and name.
    final = tf.keras.layers.Dense(vocab_size_tgt, name="linear")

    # build output
    # Apply the encoder positional embedding to the encoder inputs.
    x1 = embed_enc(input_enc)

    # Apply the decoder positional embedding to the decoder inputs.
    x2 = embed_dec(input_dec)


"""the for loop is necessary to ensure the sequential application of multiple layers,
allowing the model to benefit from the depth and complexity of the transformer architecture."""
    # Pass the encoded inputs through each encoder layer in a loop.
    for layer in encoders:
        x1 = layer(x1)

    # Pass the decoded inputs and the encoded inputs through each decoder layer in a loop.
    for layer in decoders:
        x2 = layer([x2, x1])

    # Apply the final Dense layer to the output of the last decoder layer.
    output = final(x2)

    # This try-except block removes the _keras_mask attribute from the output tensor, if it exists.
    # This is necessary for compatibility with certain versions of TensorFlow.
    try:
        del output._keras_mask
    except AttributeError:
        pass
    model = tf.keras.Model(inputs=[input_enc, input_dec], outputs=output, name=name)
    return model


seq_len = 20
num_layers = 4
num_heads = 8
key_dim = 128
ff_dim = 512
dropout = 0.1
vocab_size_en = 10000
vocab_size_fr = 20000
model = transformer(num_layers, num_heads, seq_len, key_dim, ff_dim,
                    vocab_size_en, vocab_size_fr, dropout)
# tf.keras.utils.plot_model(model, "transformer.png",
#                           show_shapes=True, show_dtype=True, show_layer_names=True,
#                           rankdir='BT', show_layer_activations=True)

*Adam the Optimizer*

In [None]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    "Custom learning rate for Adam optimizer"
    def __init__(self, key_dim, warmup_steps=4000):
        super().__init__()
        self.key_dim = key_dim
        self.warmup_steps = warmup_steps
        self.d = tf.cast(self.key_dim, tf.float32)

    def __call__(self, step):
        step = tf.cast(step, dtype=tf.float32)
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)
        return tf.math.rsqrt(self.d) * tf.math.minimum(arg1, arg2)

    def get_config(self):
        # to make save and load a model using custom layer possible0
        config = {
            "key_dim": self.key_dim,
            "warmup_steps": self.warmup_steps,
        }
        return config

key_dim = 128
lr = CustomSchedule(key_dim)
optimizer = tf.keras.optimizers.Adam(lr, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

# plt.plot(lr(tf.range(50000, dtype=tf.float32)))
# plt.ylabel('Learning Rate')
# plt.xlabel('Train Step')
# plt.show()

*Loss and Accuracy Metric*

In [None]:
def masked_loss(label, pred):
    mask = label != 0

    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction='none')
    loss = loss_object(label, pred)

    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask
    loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
    return loss


def masked_accuracy(label, pred):
    pred = tf.argmax(pred, axis=2)
    label = tf.cast(label, pred.dtype)
    match = label == pred

    mask = label != 0

    match = match & mask

    match = tf.cast(match, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)
    return tf.reduce_sum(match)/tf.reduce_sum(mask)

In [None]:
vocab_size_en = 10000
vocab_size_fr = 20000
seq_len = 20
num_layers = 4
num_heads = 8
key_dim = 128
ff_dim = 512
dropout = 0.1
model = transformer(num_layers, num_heads, seq_len, key_dim, ff_dim,
                    vocab_size_en, vocab_size_fr, dropout)
lr = CustomSchedule(key_dim)
optimizer = tf.keras.optimizers.Adam(lr, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
model.compile(loss=masked_loss, optimizer=optimizer, metrics=[masked_accuracy])
# model.summary()

**TRAINING THE MODEL**

In [None]:
# Create and train the model
seq_len = 20
num_layers = 4
num_heads = 8
key_dim = 128
ff_dim = 512
dropout = 0.1
vocab_size_en = 10000
vocab_size_fr = 20000
model = transformer(num_layers, num_heads, seq_len, key_dim, ff_dim,
                    vocab_size_en, vocab_size_fr, dropout)
lr = CustomSchedule(key_dim)
optimizer = tf.keras.optimizers.Adam(lr, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
model.compile(loss=masked_loss, optimizer=optimizer, metrics=[masked_accuracy])
epochs = 5
history = model.fit(train_ds, epochs=epochs, validation_data=val_ds)

# Save the trained model
model.save("eng-fra-transformer.h5")

# # Plot the loss and accuracy history
# fig, axs = plt.subplots(2, figsize=(6, 8), sharex=True)
# fig.suptitle('Traininig history')
# x = list(range(1, epochs+1))
# axs[0].plot(x, history.history["loss"], alpha=0.5, label="loss")
# axs[0].plot(x, history.history["val_loss"], alpha=0.5, label="val_loss")
# axs[0].set_ylabel("Loss")
# axs[0].legend(loc="upper right")
# axs[1].plot(x, history.history["masked_accuracy"], alpha=0.5, label="acc")
# axs[1].plot(x, history.history["val_masked_accuracy"], alpha=0.5, label="val_acc")
# axs[1].set_ylabel("Accuracy")
# axs[1].set_xlabel("epoch")
# axs[1].legend(loc="lower right")
# plt.show()

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5

**TEST**

In [None]:
# Load the trained model
custom_objects = {"PositionalEmbedding": PositionalEmbedding,
                  "CustomSchedule": CustomSchedule,
                  "masked_loss": masked_loss,
                  "masked_accuracy": masked_accuracy}
with tf.keras.utils.custom_object_scope(custom_objects):
    model = tf.keras.models.load_model("eng-fra-transformer.h5")

# training parameters used
seq_len = 20
vocab_size_en = 10000
vocab_size_fr = 20000

def translate(sentence):
    """Create the translated sentence"""
    enc_tokens = eng_vectorizer([sentence])
    lookup = list(fra_vectorizer.get_vocabulary())
    start_sentinel, end_sentinel = "[start]", "[end]"
    output_sentence = [start_sentinel]
    # generate the translated sentence word by word
    for i in range(seq_len):
        vector = fra_vectorizer([" ".join(output_sentence)])
        assert vector.shape == (1, seq_len+1)
        dec_tokens = vector[:, :-1]
        assert dec_tokens.shape == (1, seq_len)
        pred = model([enc_tokens, dec_tokens])
        assert pred.shape == (1, seq_len, vocab_size_fr)
        word = lookup[np.argmax(pred[0, i, :])]
        output_sentence.append(word)
        if word == end_sentinel:
            break
    return output_sentence

test_count = 20
for n in range(test_count):
    english_sentence, french_sentence = random.choice(test_pairs)
    translated = translate(english_sentence)
    print(f"Test {n}:")
    print(f"{english_sentence}")
    print(f"== {french_sentence}")
    print(f"-> {' '.join(translated)}")
    print()