In this [exercise](G_Machine_Translation_with_Encoder_Decoder_Attention.ipynb), I built an English-to-Portuguese neural machine translation (NMT) model using LSTM networks with attention, based on the starting code, instructions, and utility functions from the [Natural Language Processing with Attention Models](https://www.coursera.org/learn/attention-models-in-nlp) course (by DeepLearning.AI on Coursera).

Coursera starting code handled text pre-processing: reading from text files, train-test split, tokenizing and creating Tensorflow dataset.

In this model, we give the decoder access to all parts of the input sentence (because a hidden state is produced at each timestep of the encoder). The hidden states from the encoder are all passed to the attention layer. Thanks to this attention layer, the decoder can learn which of the encoder hidden states to pay more attention to as it tries to produce the next word.

![NMT_model.png](NMT_model.png)

The attention layer implemented in this model is the Scaled Dot Product Attention (please refer to the Coursera lecture for further details).

![QKV_attention.png](QKV_attention.png)

## Sample translations

After the NMT model is trained, change this line and run the cell following it to translate:
```
english_sentence = "I love reading books"
```

Sample translations:

| **English**                                    | **Portuguese**                                                 |
|------------------------------------------------|----------------------------------------------------------------|
| I love reading books                           | eu eu adoro ler os livros de voces                             |
| The cat is lying on the sofa                   | o gato esta deitado no sofa                                    |
| The teacher gives me a lot of homework         | a professora me entregou muito deveres                         |
| I have been studying math for the past 2 years | eu estive estudando matematica pelo passado pelas ultimos anos |
| You will get a good job if you work hard       | voce vai buscar um bom trabalho se voce trabalham              |



In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:

# %cd /content/drive/Othercomputers/My Laptop/My_NLP_notebooks/C4W1 Assignment/Files/tf
# %ls

/content/drive/Othercomputers/My Laptop/My_NLP_notebooks/C4W1 Assignment/Files/tf
C4W1_Assignment.ipynb                                       [0m[01;34m__pycache__[0m/
G_Machine_Translation_with_Encoder_Decoder_Attention.ipynb  [01;34mPythonScript[0m/
[01;34mimages[0m/                                                     QKV_attention.png
img.png                                                     readme.md
NMT_model.png                                               T_C4W1_Assignment_passed_version.ipynb
notebook2script.py                                          utils.py
[01;34mpor-eng[0m/                                                    w1_unittest.py


In [3]:
# %pip install tensorflow-text



In [4]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # Setting this env variable prevents TF warnings from showing up

import numpy as np
import tensorflow as tf
from collections import Counter
from utils import (sentences, train_data, val_data, english_vectorizer, portuguese_vectorizer,
                   masked_loss, masked_acc, tokens_to_text)

# Data Preparation

In [5]:
# This helps you convert from words to ids
word_to_id = tf.keras.layers.StringLookup(
    vocabulary=portuguese_vectorizer.get_vocabulary(),
    mask_token="",
    oov_token="[UNK]"
)

# This helps you convert from ids to words
id_to_word = tf.keras.layers.StringLookup(
    vocabulary=portuguese_vectorizer.get_vocabulary(),
    mask_token="",
    oov_token="[UNK]",
    invert=True,
)

In [6]:
unk_id = word_to_id("[UNK]")
sos_id = word_to_id("[SOS]")
eos_id = word_to_id("[EOS]")
baunilha_id = word_to_id("baunilha")

print(f"The id for the [UNK] token is {unk_id}")
print(f"The id for the [SOS] token is {sos_id}")
print(f"The id for the [EOS] token is {eos_id}")
print(f"The id for baunilha (vanilla) is {baunilha_id}")

The id for the [UNK] token is 1
The id for the [SOS] token is 2
The id for the [EOS] token is 3
The id for baunilha (vanilla) is 7079


In [7]:
# `train_data` is what is fed into the neural network
# padding has been applied to the tensors, represented by the value 0
# each example has 3 different tensors: (1) the English sentence, (2) the translated sentence shifted to the right and (3) the translated sentence - so that we can perform "teacher forcing" as described in the Coursera lecture
for (to_translate, sr_translation), translation in train_data.take(1):
    print(f"Tokenized english sentence:\n{to_translate[0, :].numpy()}\n\n")
    print(f"Tokenized portuguese sentence (shifted to the right):\n{sr_translation[0, :].numpy()}\n\n")
    print(f"Tokenized portuguese sentence:\n{translation[0, :].numpy()}\n\n")

Tokenized english sentence:
[   2  210    9  146  123   38    9 1672    4    3    0    0    0    0]


Tokenized portuguese sentence (shifted to the right):
[   2 1085    7  128   11  389   37 2038    4    0    0    0    0    0
    0]


Tokenized portuguese sentence:
[1085    7  128   11  389   37 2038    4    3    0    0    0    0    0
    0]



# Neural Machine Translation model with Attention

In [8]:
VOCAB_SIZE = 12000
UNITS = 256

In [9]:
# The Encoder layer
import keras.api._v2.keras as keras
class Encoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, units):
        """Initializes an instance of this class

        Args:
            vocab_size (int): Size of the vocabulary
            units (int): Number of units in the LSTM layer
        """
        super(Encoder, self).__init__()

        self.embedding = keras.layers.Embedding(
            input_dim=vocab_size,
            output_dim=units,
            mask_zero=True
        )
        self.rnn = keras.layers.Bidirectional(
            merge_mode="sum",
            layer=keras.layers.LSTM(
                units=units,
                return_sequences=True
            ),
        )

    def call(self, context):
        """Forward pass of this layer

        Args:
            context (tf.Tensor): The sentence to translate

        Returns:
            tf.Tensor: Encoded sentence to translate
        """

        # Pass the context through the embedding layer
        x = self.embedding(context)

        # Pass the output of the embedding through the RNN
        x = self.rnn(x)

        return x

In [10]:
# Implement the cross attention between the original sentences and the translations
class CrossAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        """Initializes an instance of this class

        Args:
            units (int): Number of units in the LSTM layer
        """
        super().__init__()

        self.mha = (
            keras.layers.MultiHeadAttention(
                key_dim=units,
                num_heads=1
            )
        )
        self.layernorm = tf.keras.layers.LayerNormalization()
        self.add = tf.keras.layers.Add()

    def call(self, context, target):
        """Forward pass of this layer

        Args:
            context (tf.Tensor): Encoded sentence to translate
            target (tf.Tensor): The embedded shifted-to-the-right translation

        Returns:
            tf.Tensor: Cross attention between context and target
        """
        # Call the MH attention by passing in the query and value
        # For this case the query should be the translation and the
#         value the encoded sentence to translate
        attn_output = self.mha(
            query=target,
            value=context
        )

        x = self.add([target, attn_output])

        x = self.layernorm(x)

        return x

In [11]:
# The decoder
import keras.api._v2.keras as keras
class Decoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, units):
        """Initializes an instance of this class

        Args:
            vocab_size (int): Size of the vocabulary
            units (int): Number of units in the LSTM layer
        """
        super(Decoder, self).__init__()

        # The embedding layer
        self.embedding = keras.layers.Embedding(
            input_dim=vocab_size,
            mask_zero=True,
            output_dim=units,
        )

        # The RNN before attention
        self.pre_attention_rnn = keras.layers.LSTM(
            units=units,
            return_sequences=True,
            return_state=True
        )
        # The attention layer
        self.attention = CrossAttention(units=units)
        # The RNN after attention
        self.post_attention_rnn = keras.layers.LSTM(
            units=units,
            return_sequences=True,
        )
        # The dense layer with logsoftmax activation
        self.output_layer = keras.layers.Dense(
            units=vocab_size,
            activation="log_softmax",
        )

    def call(self, context, target, state=None, return_state=False):
        """Forward pass of this layer

        Args:
            context (tf.Tensor): Encoded sentence to translate
            target (tf.Tensor): The shifted-to-the-right translation
            state (list[tf.Tensor, tf.Tensor], optional): Hidden state of the pre-attention LSTM. Defaults to None.
            return_state (bool, optional): If set to true return the hidden states of the LSTM. Defaults to False.

        Returns:
            tf.Tensor: The log_softmax probabilities of predicting a particular token
        """
        # Get the embedding of the input
        x = self.embedding(target)
        # Pass the embedded input into the pre attention LSTM

        x, hidden_state, cell_state = self.pre_attention_rnn(x, initial_state=state)
        # Perform cross attention between the context and the output of the LSTM (in that order)
        x = self.attention(context, x)
        # Do a pass through the post attention LSTM
        x = self.post_attention_rnn(x)
        # Compute the logits
        logits = self.output_layer(x)

        if return_state:
            return logits, [hidden_state, cell_state]
        return logits

In [12]:
# Putting the encoder and the decoder together
class Translator(tf.keras.Model):
    def __init__(self, vocab_size, units):
        """Initializes an instance of this class

        Args:
            vocab_size (int): Size of the vocabulary
            units (int): Number of units in the LSTM layer
        """
        super().__init__()

        # Define the encoder with the appropriate vocab_size and number of units
        self.encoder = Encoder(vocab_size=vocab_size, units=units)
        # Define the decoder with the appropriate vocab_size and number of units
        self.decoder = Decoder(vocab_size=vocab_size, units=units)

    def call(self, inputs):
        """Forward pass of this layer

        Args:
            inputs (tuple(tf.Tensor, tf.Tensor)): Tuple containing the context (sentence to translate) and the target (shifted-to-the-right translation)

        Returns:
            tf.Tensor: The log_softmax probabilities of predicting a particular token
        """

        # In this case inputs is a tuple consisting of the context and the target, unpack it into single variables
        context, target = inputs
        # Pass the context through the encoder
        encoded_context = self.encoder(context)
        # Compute the logits by passing the encoded context and the target to the decoder
        logits = self.decoder(encoded_context, target)

        return logits

# Training

In [13]:
def compile_and_train(model, epochs=20, steps_per_epoch=500):
    model.compile(optimizer="adam", loss=masked_loss, metrics=[masked_acc, masked_loss])

    history = model.fit(
        train_data.repeat(),
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        validation_data=val_data,
        validation_steps=50,
        callbacks=[tf.keras.callbacks.EarlyStopping(patience=3)],
    )

    return model, history


In [15]:
translator = Translator(VOCAB_SIZE, UNITS)
trained_translator, history = compile_and_train(translator, epochs=50)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50


# Using the Model for Translation

In [16]:
def generate_next_token(decoder, context, next_token, done, state, temperature=0.0):
    """Generates the next token in the sequence

    Args:
        decoder (Decoder): The decoder
        context (tf.Tensor): Encoded sentence to translate
        next_token (tf.Tensor): The predicted next token
        done (bool): True if the translation is complete
        state (list[tf.Tensor, tf.Tensor]): Hidden states of the pre-attention LSTM layer
        temperature (float, optional): The temperature that controls the randomness of the predicted tokens. Defaults to 0.0.

    Returns:
        tuple(tf.Tensor, np.float, list[tf.Tensor, tf.Tensor], bool): The next token, log prob of said token, hidden state of LSTM and if translation is done
    """
    # Get the logits and state from the decoder
    logits, state = decoder(context, next_token, state=state, return_state=True)

    # Trim the intermediate dimension
    logits = logits[:, -1, :]

    # If temp is 0 then next_token is the argmax of logits
    if temperature == 0.0:
        next_token = tf.argmax(logits, axis=-1)

    # If temp is not 0 then next_token is sampled out of logits
    else:
        logits = logits / temperature
        next_token = tf.random.categorical(logits, num_samples=1)

    # Trim dimensions of size 1
    logits = tf.squeeze(logits)
    next_token = tf.squeeze(next_token)

    # Get the logit of the selected next_token
    logit = logits[next_token].numpy()

    # Reshape to (1,1) since this is the expected shape for text encoded as TF tensors
    next_token = tf.reshape(next_token, shape=(1,1))

    # If next_token is End-of-Sentence token you are done
    if next_token == eos_id:
        done = True

    return next_token, logit, state, done

In [17]:
def translate(model, text, max_length=50, temperature=0.0):
    """Translate a given sentence from English to Portuguese

    Args:
        model (tf.keras.Model): The trained translator
        text (string): The sentence to translate
        max_length (int, optional): The maximum length of the translation. Defaults to 50.
        temperature (float, optional): The temperature that controls the randomness of the predicted tokens. Defaults to 0.0.

    Returns:
        tuple(str, np.float, tf.Tensor): The translation, logit that predicted <EOS> token and the tokenized translation
    """
    # Lists to save tokens and logits
    tokens, logits = [], []

    # PROCESS THE SENTENCE TO TRANSLATE

    # Convert the original string into a tensor
    tokens, logits = [], []
    text = tf.convert_to_tensor(text)[tf.newaxis]
    # Vectorize the text using the correct vectorizer
    context = english_vectorizer(text).to_tensor()
    # Get the encoded context (pass the context through the encoder)

    context = model.encoder(context)
    # INITIAL STATE OF THE DECODER

    # First token should be SOS token with shape (1,1)
    next_token = tf.fill((1, 1), sos_id)
    # Initial hidden and cell states should be tensors of zeros with shape (1, UNITS)
    state = [tf.zeros((1, UNITS)), tf.zeros((1, UNITS))]
    # You are done when you draw a EOS token as next token (initial state is False)
    done = False
    # Iterate for max_length iterations
    for i in range(max_length):
        # Generate the next token
        next_token, logit, state, done = generate_next_token(
            decoder=model.decoder,
            context=context,
            next_token=next_token,
            done=done,
            state=state,
            temperature=temperature
        )
        # If done then break out of the loop
        if done:
            break
        # Add next_token to the list of tokens
        tokens.append(next_token)
        # Add logit to the list of logits
        logits.append(logit)

    # Concatenate all tokens into a tensor
    tokens = tf.concat(tokens, axis=-1)

    # Convert the translated tokens into text
    translation = tf.squeeze(tokens_to_text(tokens, id_to_word))
    translation = translation.numpy().decode()

    return translation, logits[-1], tokens

In [18]:
# `temperature` is a variable that determines the randomness with which we sample from the decoder's output distributions to determine the next word
# `temperature` of 0 will yield a deterministic output - equivalent to greedy decoding
temp = 0.0
original_sentence = "I love languages"

translation, logit, tokens = translate(trained_translator, original_sentence, temperature=temp)

print(f"Temperature: {temp}\n\nOriginal sentence: {original_sentence}\nTranslation: {translation}\nTranslation tokens:{tokens}\nLogit: {logit:.3f}")

Temperature: 0.0

Original sentence: I love languages
Translation: eu adoro idiomas de idade .
Translation tokens:[[  9 564 850  11 514   4]]
Logit: -0.802


In [19]:
# `temperature` of 0.7 will give stochastic output
temp = 0.7
original_sentence = "I love languages"

translation, logit, tokens = translate(trained_translator, original_sentence, temperature=temp)

print(f"Temperature: {temp}\n\nOriginal sentence: {original_sentence}\nTranslation: {translation}\nTranslation tokens:{tokens}\nLogit: {logit:.3f}")

Temperature: 0.7

Original sentence: I love languages
Translation: eu adoro idiomas de frente o gosto do amor .
Translation tokens:[[  9 564 850  11 510   7  98  31 811   4]]
Logit: -0.077


In [20]:
# Below, we will generate several translations, score each translated sentence against all other versions, and select the one with the highest similarity score
def generate_samples(model, text, n_samples=4, temperature=0.6):

    samples, log_probs = [], []

    # Iterate for n_samples iterations
    for _ in range(n_samples):

        # Save the logit and the translated tensor
        _, logp, sample = translate(model, text, temperature=temperature)

        # Save the translated tensors
        samples.append(np.squeeze(sample.numpy()).tolist())

        # Save the logits
        log_probs.append(logp)

    return samples, log_probs

def jaccard_similarity(candidate, reference):

    # Convert the lists to sets to get the unique tokens
    candidate_set = set(candidate)
    reference_set = set(reference)

    # Get the set of tokens common to both candidate and reference
    common_tokens = candidate_set.intersection(reference_set)

    # Get the set of all tokens found in either candidate or reference
    all_tokens = candidate_set.union(reference_set)

    # Compute the percentage of overlap (divide the number of common tokens by the number of all tokens)
    overlap = len(common_tokens) / len(all_tokens)

    return overlap

def weighted_avg_overlap(samples, log_probs, similarity_fn):

    # Scores dictionary
    scores = {}

    # Iterate over the samples
    for index_candidate, candidate in enumerate(samples):

        # Initialize overlap and weighted sum
        overlap, weight_sum = 0.0, 0.0

        # Iterate over all samples and log probabilities
        for index_sample, (sample, logp) in enumerate(zip(samples, log_probs)):

            # Skip if the candidate index is the same as the sample index
            if index_candidate == index_sample:
                continue

            # Convert log probability to linear scale
            sample_p = float(np.exp(logp))

            # Update the weighted sum
            weight_sum += sample_p

            # Get the unigram overlap between candidate and sample
            sample_overlap = similarity_fn(candidate, sample)

            # Update the overlap
            overlap += sample_p * sample_overlap

        # Compute the score for the candidate
        score = overlap / weight_sum

        # Only use 3 decimal points
        score = round(score, 3)

        # Save the score in the dictionary. use index as the key.
        scores[index_candidate] = score

    return scores

In [21]:
# putting these steps together into a function
def mbr_decode(model, text, n_samples=5, temperature=0.6, similarity_fn=jaccard_similarity):

    # Generate samples
    samples, log_probs = generate_samples(model, text, n_samples=n_samples, temperature=temperature)

    # Compute the overlap scores
    scores = weighted_avg_overlap(samples, log_probs, similarity_fn)

    # Decode samples
    decoded_translations = [tokens_to_text(s, id_to_word).numpy().decode('utf-8') for s in samples]

    # Find the key with the highest score
    max_score_key = max(scores, key=lambda k: scores[k])

    # Get the translation
    translation = decoded_translations[max_score_key]

    return translation, decoded_translations

In [22]:
english_sentence = "I love reading books"

In [23]:

translation, candidates = mbr_decode(trained_translator, english_sentence, n_samples=10, temperature=0.6)

print("Translation candidates:")
for c in candidates:
    print(c)

print(f"\nSelected translation: {translation}")

Translation candidates:
eu adoro ler livros em livros .
eu adoro ler as livros dagua .
eu adoro ler livros para voce .
eu eu gosto de ler livros .
eu adoro ler os livros de voces salgada .
eu adoro ler livros em frances .
eu adoro ler os livros de vez em quando eu gosto de problemas . eu eu amo .
eu eu gosto de ler livros .
eu eu adoro ler os livros de voces .
eu adoro ler os livros de olho .

Selected translation: eu eu adoro ler os livros de voces .


In [24]:
english_sentence = "The cat is lying on the sofa"


translation, candidates = mbr_decode(trained_translator, english_sentence, n_samples=10, temperature=0.6)

print("Translation candidates:")
for c in candidates:
    print(c)

print(f"\nSelected translation: {translation}")

Translation candidates:
o gato esta deitado no sofa .
o gato esta deitado no sofa .
o gato esta deitado no sofa de lugar .
o gato esta mentindo no sofa .
o gato esta deitado no sofa dagua e o sofa .
o gato esta mentindo no sofa .
o gato esta deitado no sofa deles .
o gato esta deitado no sofa .
o gato esta deitado no sofa de plataforma .
o gato esta deitado no sofa .

Selected translation: o gato esta deitado no sofa .


In [25]:
english_sentence = "The teacher gives me a lot of homework."


translation, candidates = mbr_decode(trained_translator, english_sentence, n_samples=10, temperature=0.6)

print("Translation candidates:")
for c in candidates:
    print(c)

print(f"\nSelected translation: {translation}")

Translation candidates:
a professora me da muita licao de casa .
o professor me ensinando muito deveres .
a professora me entregou bastante escolar .
a professora me entregou muito deveres .
a professora me entregou muito mais deveres .
o professor me primeiro de licao de licao de licao de ensino .
o professor me da muita licao de licao de tarefa .
o professor me da muita licao de licao de licao .
a professora me entregou bastante bebendo licao de dever .
a professor me entregou muito deveres .

Selected translation: a professora me entregou muito deveres .


In [32]:
english_sentence = "I have been studying math for the past 2 years."


translation, candidates = mbr_decode(trained_translator, english_sentence, n_samples=10, temperature=0.6)

print("Translation candidates:")
for c in candidates:
    print(c)

print(f"\nSelected translation: {translation}")

Translation candidates:
eu ja possuia matematica aos estudantes daquele passado .
eu estive estudando matematica pelo passado pelas ultimos anos .
eu estive estudando matematica pelos ultimos anos .
eu estive estudando matematica no ano outro passada .
eu ja houve matematica os ultimos anos .
eu ja estudei matematica aos ultimos treze anos .
tenho estudado matematica pelo time dos estudantes .
tenho estudado matematica pelo passado fui os ultimos anos .
estou estudando matematica pelo passado dos ultimos anos .
eu tenho estudado matematica durante os ultimas os ultimos carros .

Selected translation: eu estive estudando matematica pelo passado pelas ultimos anos .


In [33]:
english_sentence = "You will get a good job if you work hard"


translation, candidates = mbr_decode(trained_translator, english_sentence, n_samples=10, temperature=0.6)

print("Translation candidates:")
for c in candidates:
    print(c)

print(f"\nSelected translation: {translation}")

Translation candidates:
voce vai buscar um sabe trabalho se voce [UNK] dificil .
voce vai obter um bom emprego se voce trabalha duro .
voce vai melhorar se voce trabalha muito bem se o voce trabalha .
voce vai pegar um bom estudio se voce trabalha duro .
voce vai [UNK] um bom trabalho se voce tomou dificuldade em paz .
voce vai buscar um boa trabalho se voce funciona cada vez de vontade .
voce vai melhorar vale um bom metodo se voce trabalhar duro .
voce vai obter um bom trabalho se voce trabalha com o trabalho vantagem .
voce vai melhorar se voce trabalhar uma boa partida se voce trabalha em inveja dos senhores tao bom .
voce vai buscar um bom trabalho se voce trabalham .

Selected translation: voce vai buscar um bom trabalho se voce trabalham .
