Spanish to MSL translation taken from the Example of English-to-Spanish translation with a sequence-to-sequence Transformer

**Author:** [fchollet](https://twitter.com/fchollet)<br>
**Date created:** 2021/05/26<br>
**Last modified:** 2023/02/25<br>
**Description:** Implementing a sequence-to-sequene Transformer and training it on a machine translation task.

In [1]:
!!pip install -q rouge-score
!!pip install -q git+https://github.com/keras-team/keras-nlp.git --upgrade

['  Installing build dependencies ... \x1b[?25l\x1b[?25hdone',
 '  Getting requirements to build wheel ... \x1b[?25l\x1b[?25hdone',
 '  Preparing metadata (pyproject.toml) ... \x1b[?25l\x1b[?25hdone',
 '\x1b[?25l     \x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m0.0/950.8 kB\x1b[0m \x1b[31m?\x1b[0m eta \x1b[36m-:--:--\x1b[0m',
 '\x1b[2K     \x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m0.0/950.8 kB\x1b[0m \x1b[31m?\x1b[0m eta \x1b[36m-:--:--\x1b[0m',
 '\x1b[2K     \x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m10.2/950.8 kB\x1b[0m \x1b[31m?\x1b[0m eta \x1b[36m-:--:--\x1b[0m',
 '\x1b[2K     \x1b[91m━\x1b[0m\x1b[90m╺\x1b[0m\x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m30.7/950.8 kB\x1b[0m \x1b[31m486.9 kB/s\x1b[0m eta \x1b[36m0:00:02\x1b[0m',
 '\x1b[2K     \x1b[91m━━━\x1b[0m\x1b[90m╺\x1b[0m\x1b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[32m81.9/950.8 kB\x1b[0m \x1b[31m740.7 kB/s\x1b[0m eta \x1b[36m0:00:02\x1b[0

In [2]:
!pip install nltk

[0m

In [3]:
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu
from nltk.tokenize import word_tokenize

In [4]:
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

## Introduction

In this example, we'll build a sequence-to-sequence Transformer model, which
we'll train on an English-to-Spanish machine translation task.

You'll learn how to:

- Vectorize text using the Keras `TextVectorization` layer.
- Implement a `TransformerEncoder` layer, a `TransformerDecoder` layer,
and a `PositionalEmbedding` layer.
- Prepare data for training a sequence-to-sequence model.
- Use the trained model to generate translations of never-seen-before
input sentences (sequence-to-sequence inference).


## Setup

In [5]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
import keras_nlp
from nltk.translate.bleu_score import sentence_bleu

ImportError: ignored

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Downloading the data



In [None]:
text_file = "/content/drive/MyDrive/esp-lsm.txt"

## Parsing the data

Each line contains an Spanish sentence and its corresponding MSL gloss sentence.
The Spanish sentence is the *source sequence* and MSL one is the *target sequence*.
We prepend the token `"[start]"` and we append the token `"[end]"` to the Spanish sentence.

In [None]:
with open(text_file, encoding="utf_8") as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    spa, msl = line.split("\t")
    msl = "[start] " + msl+ " [end]"
    text_pairs.append((spa, msl))

Here's what our sentence pairs look like:

In [None]:
for _ in range(5):
    print(random.choice(text_pairs))

Now, let's split the sentence pairs into a training set, a validation set,
and a test set.

In [None]:
random.shuffle(text_pairs)
num_val_samples = int(0.10 * len(text_pairs))
num_train_samples = len(text_pairs) - num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
#test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
#print(f"{len(test_pairs)} test pairs")

## Vectorizing the text data



In [None]:
from tensorflow.python.ops.gen_linalg_ops import matrix_solve
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 1000
sequence_length = 20
batch_size = 64


def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


spa_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
)
msl_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_spa_texts = [pair[0] for pair in train_pairs]
train_msl_texts = [pair[1] for pair in train_pairs]
spa_vectorization.adapt(train_spa_texts)
msl_vectorization.adapt(train_msl_texts)

Next, we'll format our datasets.

At each training step, the model will seek to predict target words N+1 (and beyond)
using the source sentence and the target words 0 to N.

As such, the training dataset will yield a tuple `(inputs, targets)`, where:

- `inputs` is a dictionary with the keys `encoder_inputs` and `decoder_inputs`.
`encoder_inputs` is the vectorized source sentence and `encoder_inputs` is the target sentence "so far",
that is to say, the words 0 to N used to predict word N+1 (and beyond) in the target sentence.
- `target` is the target sentence offset by one step:
it provides the next words in the target sentence -- what the model will try to predict.

In [None]:

def format_dataset(spa, msl):
    spa = spa_vectorization(spa)
    msl =msl_vectorization(msl)
    return ({"encoder_inputs": spa, "decoder_inputs": msl[:, :-1],}, msl[:, 1:])


def make_dataset(pairs):
    spa_texts, msl_texts = zip(*pairs)
    spa_texts = list(spa_texts)
    msl_texts = list(msl_texts)
    dataset = tf.data.Dataset.from_tensor_slices((spa_texts, msl_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

Let's take a quick look at the sequence shapes
(we have batches of 64 pairs, and all sequences are 20 steps long):

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

## Building the model

Our sequence-to-sequence Transformer consists of a `TransformerEncoder`
and a `TransformerDecoder` chained together. To make the model aware of word order,
we also use a `PositionalEmbedding` layer.

The source sequence will be pass to the `TransformerEncoder`,
which will produce a new representation of it.
This new representation will then be passed
to the `TransformerDecoder`, together with the target sequence so far (target words 0 to N).
The `TransformerDecoder` will then seek to predict the next words in the target sequence (N+1 and beyond).

A key detail that makes this possible is causal masking
(see method `get_causal_attention_mask()` on the `TransformerDecoder`).
The `TransformerDecoder` sees the entire sequences at once, and thus we must make
sure that it only uses information from target tokens 0 to N when predicting token N+1
(otherwise, it could use information from the future, which would
result in a model that cannot be used at inference time).

In [None]:

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "dense_dim": self.dense_dim,
            "num_heads": self.num_heads,
        })
        return config


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)
    def get_config(self):
        config = super().get_config()
        config.update({
            "sequence_length": self.sequence_length,
            "vocab_size": self.vocab_size,
            "embed_dim": self.embed_dim,
        })
        return config


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "latent_dim": self.latent_dim,
            "num_heads": self.num_heads,
        })
        return config


In [None]:

embed_dim = 512
latent_dim = 2048
num_heads = 8
num_layers=1

encoder_inputs=keras.layers.Input(shape=(None,),dtype="int64", name="encoder_inputs")
decoder_inputs=keras.layers.Input(shape=(None,), dtype="int64", name="decoder_inputs")

embed_enc=PositionalEmbedding(sequence_length, vocab_size, embed_dim)
embed_dec=PositionalEmbedding(sequence_length, vocab_size, embed_dim)

encoders=[TransformerEncoder(embed_dim, latent_dim, num_heads) for i in range(num_layers)]
decoders=[TransformerDecoder(embed_dim, latent_dim, num_heads) for i in range(num_layers)]

final=keras.layers.Dense(vocab_size, activation="softmax")

#Building output
x1=embed_enc(encoder_inputs)
x2=embed_dec(decoder_inputs)

for layer in encoders:
  x1=layer(x1)

for layer in decoders:
  x2=layer(x2,x1)


x2 = layers.Dropout(0.5)(x2)

output=final(x2)

transformer = keras.Model(
    [encoder_inputs, decoder_inputs], output, name="transformer"
)

Next, we assemble the end-to-end model.

## Training our model

We'll use accuracy as a quick way to monitor training progress on the validation data.
Note that machine translation typically uses BLEU scores as well as other metrics, rather than accuracy.

In [None]:
epochs = 40
transformer.summary()
optimizer=keras.optimizers.Adam(learning_rate=1.5e-4,beta_1=0.999)
transformer.compile(
    optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
history=transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)


In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.ylim((0.4,1.0))
plt.legend(['Train', 'Val'], loc='upper left')
plt.show()

## Decoding test sentences

Finally, let's demonstrate how to translate brand new Spanish sentences.
We simply feed into the model the vectorized Spanish sentence
as well as the target token `"[start]"`, then we repeatedly generated the next token, until
we hit the token `"[end]"`.

In [None]:
msl_vocab = msl_vectorization.get_vocabulary()
msl_index_lookup = dict(zip(range(len(msl_vocab)), msl_vocab))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = spa_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = msl_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = msl_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence


In [None]:

rouge_1 = keras_nlp.metrics.RougeN(order=1)
rouge_2 = keras_nlp.metrics.RougeN(order=2)
ref=[]
translated=[]
for pair in train_pairs:
   input_sentence = pair[0]
   reference_sentence = pair[1]
   reference_sentence=(reference_sentence.lower().replace("[start]", "").replace("[end]", "").replace("+", "").replace("¿", "").replace("?", "").strip())
   translated_sentence = decode_sequence(input_sentence)
   translated_sentence = (translated_sentence.replace("[start]", "").replace("[end]", "").replace("¿", "").replace("?", "").strip())
   ref.append(reference_sentence)
   translated.append(translated_sentence)
   rouge_1(reference_sentence, translated_sentence)
   rouge_2(reference_sentence, translated_sentence)

references_list=[[word_tokenize(ref)] for ref in ref]
print(references_list)

trans=[word_tokenize(w) for w in translated]
print(trans)

bleu_score_corpus=corpus_bleu(references_list,trans)
print("Corpus BLEU Score", bleu_score_corpus)

#for i in range(len(ref)):


print("ROUGE-1 Score: ", rouge_1.result())
print("ROUGE-2 Score: ", rouge_2.result())




In [None]:
print(ref[-1])
print(translated[-1])