In [None]:
#Install Libraries
!pip install evaluate sacrebleu

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting evaluate
  Downloading evaluate-0.4.0-py3-none-any.whl (81 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.4/81.4 KB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting sacrebleu
  Downloading sacrebleu-2.3.1-py3-none-any.whl (118 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m118.9/118.9 KB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill
  Downloading dill-0.3.6-py3-none-any.whl (110 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m110.5/110.5 KB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
Collecting xxhash
  Downloading xxhash-3.2.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (212 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.2/212.2 KB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
Collecting multiprocess
  Downloading multiprocess-0.70.14-py39-none-any.whl 

In [None]:
#Import libraries
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
from tensorflow.keras.callbacks import ModelCheckpoint
from google.colab import drive
import evaluate
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#I uploaded a parallel Italian English dataset to google drive for training, the dataset can be found here: https://www.manythings.org/anki/
text_file = "/content/drive/MyDrive/ita-eng/ita.txt"

In [None]:
#Function to read in data from the raw text file
def get_data(data_file):
  with open(data_file) as f:
    lines = f.read().split("\n")
    cleaned_lines = []
    for line in lines:
      #The first two fields of each line are the actual parallel sentences, so only grab those
      cleaned_lines.append(line.split("\t")[:2])
    #A blank line will be accidentally read so remove it
    cleaned_lines.pop(-1)
    return cleaned_lines

#Import data
raw_data = get_data(text_file)

In [None]:
#Start and end of sentence tokens
start_token = "[sos]"
end_token = "[eos]"
SRC_LANG = 'EN'
TGT_LANG = "IT"

#Function to generate a list of tuples containing the parallel sentences
def generate_pairs(sentence_list):
  sentence_pairs = []
  for sentence in sentence_list:
    en = sentence[0]
    #Add the start and end tokens to the Italian sentences
    it = start_token + " " + sentence[-1] + " " + end_token
    sentence_pairs.append((en,it))
  return sentence_pairs

#Get the sentence pairs
sentence_pairs = generate_pairs(raw_data)

In [None]:
#Shuffle data and split it into training, validation and test sets
num_sent = len(sentence_pairs)
random.shuffle(sentence_pairs)
train_sent = sentence_pairs[:int(num_sent*0.8)]
test_sent = sentence_pairs[int(num_sent*0.8):int(num_sent*0.9)]
val_sent = sentence_pairs[int(num_sent*0.9):]

In [None]:
#Remove punctuation from the target sentences, this is just for this simple example
strip_chars = string.punctuation
#But keep brakets since the start and end tokens contain them
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

#Set up vocab size, sequence length and batch size
vocab_size = 20000
sequence_length = 40
batch_size = 64

#Standardization function for the Italian text vectorization (tokenizer)
def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

#Set up text vectorization for English and Italian
eng_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
)
ita_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)

#Get individual lists of the English & Italian training sentences
train_eng_texts = [pair[0] for pair in train_sent]
train_ita_texts = [pair[1] for pair in train_sent]
#Fit the text vectorizers
eng_vectorization.adapt(train_eng_texts)
ita_vectorization.adapt(train_ita_texts)

In [None]:
#Function to use when calling map on a dataset object (This and the make_dataset funtion were taken from tensorflow documentation and modified slightly)
def format_dataset(eng, ita):
    eng = eng_vectorization(eng)
    ita = ita_vectorization(ita)
    return ({"encoder_inputs": eng, "decoder_inputs": ita[:, :-1],}, ita[:, 1:])

#Function to take the list parallel sentences for the train and validation datasets and convert them to a 
#tensorflow dataset object to be used in training.
def make_dataset(pairs):
    eng_texts, ita_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    ita_texts = list(ita_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, ita_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_sent)
val_ds = make_dataset(val_sent)

In [None]:
#Set up transformer model classes (encoder, decoder, positional embeddding, this was just taken directly from tensorflow documentation)
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

In [None]:
#Set up actual model
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

In [None]:
#Train for 30 epochs
epochs = 30

#Will save the best model checkpoint during training
checkpoint_folder = "/content/drive/MyDrive/small_transformer_keras"
#Pick val_loss to determine which is the best model to checkpoint since accuracy is not a great metric
callback = ModelCheckpoint(checkpoint_folder, monitor='val_loss', verbose=0, save_best_only=True, mode='min')

#Call back list to pass for fit method
callbacks_list = [callback]

#Print summary, compile and train the model
transformer.summary()
transformer.compile(
    "adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds, callbacks = callbacks_list)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding (Position  (None, None, 256)   5130240     ['encoder_inputs[0][0]']         
 alEmbedding)                                                                                     
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder (Transform  (None, None, 256)   3155456     ['positional_embedding[

  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)


Epoch 2/30





  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)


Epoch 3/30



Epoch 4/30


  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)






Epoch 5/30


  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)






Epoch 6/30


  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)






Epoch 7/30


  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)








  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)


Epoch 8/30



Epoch 9/30


  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)








  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)


Epoch 10/30





  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)


Epoch 11/30
Epoch 12/30
Epoch 13/30



Epoch 14/30


  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)


Epoch 15/30





  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)


Epoch 16/30





  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)


Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.callbacks.History at 0x7f11e01fd8e0>

In [None]:
#Also save the final model after training just in case
transformer.save("/content/drive/MyDrive/Final_keras_small_transformer")

  layer_config = serialize_layer_fn(layer)
  return serialization.serialize_keras_object(obj)


In [None]:
max_decoded_sentence_length = 40
#Method to decode an input sentence and generate the translation
def decode_sequence(input_sentence, transformer):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[sos]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = ita_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])
        #Find the most likely next token and add it to the decoded sentence
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = ita_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[eos]":
            break
    #get rid of SOS and EOS tokens
    decoded_sentence = decoded_sentence.replace("[sos] ","")
    decoded_sentence = decoded_sentence.replace(" [eos]","")
    return decoded_sentence

In [None]:
#Function to simply get a list of all test labels, removing the SOS and EOS tokens to make it look cleaner
def prepare_test_label(test_sent):
  test_labels = []
  for sent in test_sent:
    label = sent[1]
    label = label.replace("[sos] ","")
    label = label.replace(" [eos]","")
    test_labels.append(label)
  return test_labels

test_labels = prepare_test_label(test_sent)

In [None]:
#Get the vocab from the Italian text vectorizer and set up a dictionary to look it up by index
ita_vocab = ita_vectorization.get_vocabulary()
ita_index_lookup = dict(zip(range(len(ita_vocab)), ita_vocab))

#Function to generate the translations for the test set
def translate_test_set(test_sent, transformer):
  pred_test_translations = []
  for sent in test_sent:
    pred_translation = decode_sequence(sent[0],transformer)
    pred_test_translations.append(pred_translation)
  return pred_test_translations

pred_test_translations = translate_test_set(test_sent, transformer)

#Get the BLEU score on the test set
bleu = evaluate.load("sacrebleu")
results = bleu.compute(predictions=pred_test_translations, references=test_labels)
print("Bleu score on test set: ", results["score"])

Downloading builder script:   0%|          | 0.00/8.15k [00:00<?, ?B/s]

Bleu score on test set:  28.91200447679424


Not too bad for English to Italian given the dataset and model used.

In [None]:
#Function to randomly sample 10 sentences from the test set and show the input sentence, predicted translation and actual translation 
#to get a general idea of how close the translations are
def sample_test_translations(test_sent, transformer, test_labels):
  prev_examples = []
  max_index = len(test_sent)
  for i in range(10):
    sent_index = random.randint(0,max_index)
    #Make sure examples aren't repeated
    while sent_index in prev_examples:
      sent_index = random.randint(0,max_index)
    prev_examples.append(sent_index)
    test_input = test_sent[sent_index][0]
    test_label = test_labels[sent_index]
    prediction = decode_sequence(test_input, transformer)
    print("Example #",str(i+1))
    print("Input English sentence: ", test_input)
    print("Predicted translation: ", prediction)
    print("Ground truth translation: ",test_label,"\n")

sample_test_translations(test_sent, transformer, test_labels)

Example # 1
Input English sentence:  Do you want anything, Tom?
Predicted translation:  vuoi qualcosa
Ground truth translation:  Vuoi qualcosa, Tom? 

Example # 2
Input English sentence:  He has a flower in his hand.
Predicted translation:  lui ha un fiore in mano
Ground truth translation:  Ha un fiore in mano. 

Example # 3
Input English sentence:  Tom followed the instructions.
Predicted translation:  tom ha seguito le istruzioni
Ground truth translation:  Tom ha seguito le istruzioni. 

Example # 4
Input English sentence:  He said, "It's nine o'clock."
Predicted translation:  ha detto le nove
Ground truth translation:  Ha detto: "Sono le nove." 

Example # 5
Input English sentence:  Tom has been studying French for about three years.
Predicted translation:  tom ha studiato il francese per circa tre anni
Ground truth translation:  Tom ha studiato francese per circa tre anni. 

Example # 6
Input English sentence:  Circumstances did not permit me to help you.
Predicted translation:  le

So again, not perfect but not bad for the model and the dataset size (~362k parallel sentences), as well as the simple tokenization used.