# INTRODUCTION

We will use the transformer architecture for machine translation. The transformer's encoder processes input sequences to create context-aware representations, the decoder generates output sequences from these representations, and the attention mechanism helps both components focus on relevant parts of the sequences.

## UTILITY

In [1]:
import random
import numpy as np
import string
import re
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import layers, Model, Input

# DATA DOWNLOAD AND REVIEW

**Download data from the source**

In [3]:
!wget https://www.manythings.org/anki/ita-eng.zip
!unzip -q ita-eng.zip

--2024-08-02 00:24:45--  https://www.manythings.org/anki/ita-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8326901 (7.9M) [application/zip]
Saving to: ‘ita-eng.zip’


2024-08-02 00:24:48 (4.31 MB/s) - ‘ita-eng.zip’ saved [8326901/8326901]



**Open and read the file and create pairs**

In [6]:
text_file = '/content/ita.txt'
with open(text_file) as f:
    lines = f.read().split('\n')[:-1]

text_pairs = []

# Create English and Italian pairs of sentences.
for line in lines:
    english, italian, ignore = line.split('\t')
    italian = '[start] ' + italian + ' [end]'
    text_pairs.append((english, italian))

In [7]:
# Randonly display a pair.
print(random.choice(text_pairs))

# Display the count of pairs.
print(len(text_pairs))

("Tom said he's proud of his children.", '[start] Tom ha detto di essere orgoglioso dei suoi figli. [end]')
377937


# DATA SPLIT

In [8]:
# Randomly shuffle the pairs.
random.shuffle(text_pairs)

num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples

# Split the pairs for training, validation and testing.
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

# DATA PROCESSING

**Vectorizing the English and Italian text pairs**



In [9]:
# Strip all the punctuations.
strip_chars = string.punctuation

#  Preserve the "[start]" and "[end]" tokens that we’ve inserted.
strip_chars = strip_chars.replace('[', '')
strip_chars = strip_chars.replace(']', '')

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f'[{re.escape(strip_chars)}]', '')

# Top n_words to consider.
vocab_size = 15000

# Restrict sentences to 20 words.
sequence_length = 20

# Create a TextVectorization layer for the source text
source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,  # Maximum number of unique words to consider
    output_mode='int',  # Convert words to integer indices
    output_sequence_length=sequence_length,  # Length of the output sequences
)

# Create a TextVectorization layer for the target text
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,  # Maximum number of unique words to consider
    output_mode='int',  # Convert words to integer indices
    output_sequence_length=sequence_length + 1,  # Length of the output sequences (1 more than source)
    standardize=custom_standardization,  # Apply a custom standardization process
)

train_english_texts = [pair[0] for pair in train_pairs]
train_italian_texts = [pair[1] for pair in train_pairs]

# Learn the vocabulary of each language.
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_italian_texts)

**Preparing datasets for the translation task**

In [10]:
# Set the batch size for the dataset
batch_size = 64

def format_dataset(eng, ita):
    # Apply text vectorization to the English and italian texts
    eng = source_vectorization(eng)
    ita = target_vectorization(ita)

    # Return a dictionary with English and Italian inputs and italian outputs
    return ({
        'english': eng,             # Input: English text
        'italian': ita[:, :-1],     # Input: Italian text (excluding the last token)
    }, ita[:, 1:])                  # Output: Italian text (excluding the first token)

def make_dataset(pairs):
    # Unpack the pairs of English and Italian texts
    eng_texts, ita_texts = zip(*pairs)

    # Convert texts to lists
    eng_texts = list(eng_texts)
    ita_texts = list(ita_texts)

    # Create a TensorFlow dataset from the texts
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, ita_texts))

    # Group texts into batches
    dataset = dataset.batch(batch_size)

    # Apply the format_dataset function to each batch
    dataset = dataset.map(format_dataset, num_parallel_calls=4)

    # Shuffle, prefetch, and cache the dataset for better performance
    return dataset.shuffle(2048).prefetch(16).cache()

# Create training and validation datasets
train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [12]:
for inputs, targets in train_ds.take(1):
    print(f"inputs['english'].shape: {inputs['english'].shape}")
    print(f"inputs['italian'].shape: {inputs['italian'].shape}")
    print(f"targets.shape: {targets.shape}")

inputs['english'].shape: (64, 20)
inputs['spanish'].shape: (64, 20)
targets.shape: (64, 20)


# SEQUENCE-TO-SEQUENCE LEARNING WITH A TRANSFORMER

In [13]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation='relu'),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            'embed_dim': self.embed_dim,
            'num_heads': self.num_heads,
            'dense_dim': self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype='int32')
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype='int32')
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

In [14]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation='relu'),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            'embed_dim': self.embed_dim,
            'num_heads': self.num_heads,
            'dense_dim': self.dense_dim,
        })
        return config

In [15]:
# In the PositionalEmbedding class, positional embeddings convert each word's position in a sentence into a vector,
# Token embeddings turn each word into a vector representing its meaning.
# By combining these two types of embeddings, the class helps the model understand both the content of each word and its order in the sentence.
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        # Get the length of the input sequence
        length = tf.shape(inputs)[-1]

        # Create position indices and apply position embeddings
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)

        # Note: The positions tensor should match the batch size for broadcasting
        # The positional embeddings need to be expanded to the same shape as `inputs`
        embedded_positions = self.position_embeddings(positions)

        # Add token and position embeddings
        return embedded_tokens + embedded_positions

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            'output_dim': self.output_dim,
            'sequence_length': self.sequence_length,
            'input_dim': self.input_dim,
        })
        return config

**End-to-end Transformer**

In [16]:
embed_dim = 256
dense_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype='int64', name='english')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype='int64', name='italian')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation='softmax')(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

**Training the sequence-to-sequence Transformer**

In [17]:
transformer.compile(
    optimizer='Adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy'])

transformer.fit(train_ds, epochs=15, validation_data=val_ds)

Epoch 1/15
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m132s[0m 24ms/step - accuracy: 0.7618 - loss: 1.6380 - val_accuracy: 0.8365 - val_loss: 0.8576
Epoch 2/15
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 16ms/step - accuracy: 0.8451 - loss: 0.8170 - val_accuracy: 0.8829 - val_loss: 0.5453
Epoch 3/15
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 16ms/step - accuracy: 0.8825 - loss: 0.5567 - val_accuracy: 0.9035 - val_loss: 0.4210
Epoch 4/15
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 16ms/step - accuracy: 0.9014 - loss: 0.4380 - val_accuracy: 0.9137 - val_loss: 0.3627
Epoch 5/15
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 16ms/step - accuracy: 0.9117 - loss: 0.3715 - val_accuracy: 0.9197 - val_loss: 0.3288
Epoch 6/15
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 16ms/step - accuracy: 0.9186 - loss: 0.3277 - val_accuracy: 0.9227 - val_loss: 0.3079
Epo

<keras.src.callbacks.history.History at 0x78fa8c49cf10>

**Translating new sentences with our Transformer model**

In [18]:
ita_vocab = target_vectorization.get_vocabulary()
ita_index_lookup = dict(zip(range(len(ita_vocab)), ita_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = '[start]'
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1]
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = ita_index_lookup[sampled_token_index]
        decoded_sentence += ' ' + sampled_token
        if sampled_token == '[end]':
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
    input_sentence = random.choice(test_eng_texts)
    print('-')
    print(input_sentence)
    print(decode_sequence(input_sentence))

-
When the sun comes up, I'll get out of bed.
[start] quando arriva il sole a letto [end]
-
We're biased.
[start] siamo di parte [end]
-
Why are you so excited?
[start] perché è così emozionato [end]
-
I was wrong about you.
[start] mi sbagliavo su di te [end]
-
The criminal is still at large.
[start] il criminale è ancora grande [end]
-
You're not sick.
[start] non è malata [end]
-
The box was empty when I opened it.
[start] la scatola era vuota quando lho aperta [end]
-
I'd like to check out tomorrow morning.
[start] mi piacerebbe fare lo spesa domani mattina [end]
-
Is Tom going to live?
[start] tom vivrà [end]
-
That's a pretty tune.
[start] È una melodia carina [end]
-
You won everything.
[start] hai vinto tutto [end]
-
The vegetables are fresh.
[start] le verdure sono fresche [end]
-
Please tell us about your family.
[start] per piacere dicci della vostra famiglia [end]
-
Update the app.
[start] [UNK] lapp [end]
-
What's your poison?
[start] qual è il tuo veleno [end]
-
Just watc

**Translating a random sentence**

In [23]:
input_sentence = "The morning is beautiful."
print(decode_sequence(input_sentence))

[start] la mattina è bella [end]
