# Translation using a Transformer

In [None]:
import random
import datetime
import string
import re

import tensorflow as tf
import tensorflow.keras as keras
import numpy as np

## Loading data

To use the data for translation purposes from English to Spanish, we must add [start] and [end] markers to all spanish sentences.

We directly put all the sentences into a list of pairs to constitute a dataset.

In [None]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
!unzip -q spa-eng.zip

In [None]:
validation_split = 0.2

text_pairs = []

with open('spa-eng/spa.txt', 'r') as f:
  for line in f.readlines():
    eng, spa = line.strip('\n').split('	')
    text_pairs.append((eng, '[start] ' + spa + ' [end]'))
    
nb_samples = len(text_pairs)
    
random.Random().shuffle(text_pairs)
validation_index = int(nb_samples * validation_split)

train_pairs = text_pairs[:validation_index]
validation_pairs = text_pairs[validation_index:]

## Text vectorization

As the vocabulary is different in both cases, we use separate vectorization layers.

BE CAREFUL:

- Spanish vectorizer should have one more sequence element, as we'll be padding these sentences
- We map BATCHES, so "english" and "spanish" are size (batch_size, sequence_length)

In [None]:
num_features = 20_000
max_sequence_length = 20

strip_chars = (string.punctuation + "¿¡").replace('[', '').replace(']', '')

def target_standardize(input):
  lower = tf.strings.lower(input)
  return tf.strings.regex_replace(lower, f'[{re.escape(strip_chars)}]', '')

# We use 'int' output as we want sequences (each text is short enough for this purpose)
english_vectorizer = keras.layers.TextVectorization(
  max_tokens=num_features,
  output_sequence_length=max_sequence_length,
  output_mode='int'
)
spanish_vectorizer = keras.layers.TextVectorization(
  max_tokens=num_features,
  output_sequence_length=max_sequence_length + 1,  # We'll need to offset spanish sentences
  output_mode='int',
  standardize=target_standardize
)

# Automatically learning the vocabulary from the sentences
english_sentences = [pair[0] for pair in train_pairs]
spanish_sentences = [pair[1] for pair in train_pairs]
english_vectorizer.adapt(english_sentences)
spanish_vectorizer.adapt(spanish_sentences)

In [None]:
batch_size = 64

def get_dataset_element(english, spanish):
  # We map BATCHES, so "english" and "spanish" are size (batch_size, sequence_length)
  english = english_vectorizer(english)
  spanish = spanish_vectorizer(spanish)
  return (
      {
        'english': english,
        'spanish': spanish[:, :-1]
      },
    spanish[:, 1:]
  )

def make_dataset(pairs):
  english_texts, spanish_texts = zip(*pairs)
  english_texts = list(english_texts)
  spanish_texts = list(spanish_texts)
  dataset = tf.data.Dataset.from_tensor_slices((english_texts, spanish_texts))
  dataset = dataset.batch(batch_size)
  dataset = dataset.map(get_dataset_element, num_parallel_calls=4)
  # Prefetching enables faster performances when taking elements of the dataset
  # Caching is also beneficial for performance
  return dataset.shuffle(buffer_size=2048).prefetch(16).cache()
  
train_dataset = make_dataset(train_pairs)
validation_dataset = make_dataset(validation_pairs)

## Useful layers

### Positional encoding

In [None]:
class PositionalEmbedding(keras.layers.Layer):
  def __init__(self, input_dim, sequence_length, output_dim, **kwargs):
    super(PositionalEmbedding, self).__init__(**kwargs)
    self.token_embedding = keras.layers.Embedding(
      input_dim=input_dim,
      output_dim=output_dim,
    )
    self.position_embedding = keras.layers.Embedding(
      input_dim=sequence_length,
      output_dim=output_dim,
    )
      
    # Storing variables is always useful
    self.input_dim = input_dim
    self.sequence_length = sequence_length
    self.output_dim = output_dim
      
  def get_config(self):
    config = super().get_config()
    config.update({
      'input_dim': self.input_dim,
      'sequence_length': self.sequence_length,
      'output_dim': self.output_dim,
    })
    return config
      
  def call(self, inputs):
    # Computing the position is as simple as creating a range with the same size...
    batch_size, sequence_length = inputs.shape[0], inputs.shape[-1]
    positions = inputs
    if sequence_length is not None:
      positions = tf.zeros((batch_size, max_sequence_length), dtype=np.int32) + tf.range(0, max_sequence_length)
    return keras.layers.add((self.position_embedding(positions), self.token_embedding(inputs)))
  
  # The mask is a (num_samples, embedding_dim) 2D matrix to indicate which value should be kept (1) or not (0)
  # This option is necessary for embedding layers
  def compute_mask(self, inputs, mask=None):
    return keras.ops.not_equal(inputs, 0)

### Transformer encoder

In [None]:
class TransformerEncoder(keras.layers.Layer):
  # Always pass **kwargs to the top class
  def __init__(self, embedding_dim, dense_dim, num_heads, **kwargs):
    super(TransformerEncoder, self).__init__(**kwargs)
    self.attention = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embedding_dim)
    self.dense = keras.models.Sequential([
      tf.keras.layers.Dense(units=dense_dim, activation='relu'),
      tf.keras.layers.Dense(embedding_dim)
    ])
    
    self.normalization1 = keras.layers.LayerNormalization()
    self.normalization2 = keras.layers.LayerNormalization()
      
    self.embedding_dim = embedding_dim
    self.dense_dim = dense_dim
    self.num_heads = num_heads
      
    self.supports_masking=True
  
  def call(self, inputs, mask=None):
    if mask is not None:
      mask = mask[:, tf.newaxis, :]
    
    connection = inputs
    x = self.attention(query=inputs, key=inputs, value=inputs, attention_mask=mask)
    x = keras.layers.add((connection, x))
    x = self.normalization1(x)
    
    residual = x
    x = self.dense(x)
    x = keras.layers.add((residual, x))

    output = self.normalization2(x)
    return output
    
  def get_config(self):
    config = super().get_config()
    config.update({
      'num_heads': self.num_heads,
      'embedding_dim': self.embedding_dim,
      'dense_dim': self.dense_dim,
    })
    return config

### Transformer decoder

In [None]:
class TransformerDecoder(keras.layers.Layer):
  def __init__(self, embedding_dim, dense_dim, num_heads, **kwargs):
    super(TransformerDecoder, self).__init__(**kwargs)
    self.self_attention = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embedding_dim)
    self.cross_attention = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embedding_dim)
    
    self.dense = keras.models.Sequential([
      tf.keras.layers.Dense(units=dense_dim, activation='relu'),
      tf.keras.layers.Dense(embedding_dim)
    ])
    
    self.normalization1 = keras.layers.LayerNormalization()
    self.normalization2 = keras.layers.LayerNormalization()
    self.normalization3 = keras.layers.LayerNormalization()
      
    self.num_heads = num_heads
    self.embedding_dim = embedding_dim
    self.dense_dim = dense_dim
      
    self.supports_masking=True
      
  def get_causal_attention_mask(self, inputs):
    batch_size, sequence_length = inputs.shape[0], inputs.shape[1]
    if sequence_length is None:
      return None
    
    # Step 1: creating the (sequence_length, sequence_length) matrix for 1 sequence
    i = tf.range(sequence_length)[:, tf.newaxis]
    j = tf.range(sequence_length)
    mask = tf.cast(i >= j, dtype=np.int32)
    
    # Step 2: replicating it for (batch_size) inputs
    
    # Formally adding the dimension
    mask = tf.reshape(mask, (1, sequence_length, sequence_length))
    # Tiling in the batch_size direction
    return tf.tile(mask, [batch_size, 1, 1])
      

  def call(self, inputs, encoded_source, mask=None):
    causal_mask = self.get_causal_attention_mask(inputs)
    if mask is not None:
      mask = mask[:, tf.newaxis, :]
      mask = tf.minimum(mask, causal_mask)

    connection1 = inputs
    x = self.self_attention(
      query=inputs,
      key=inputs, 
      value=inputs,
      attention_mask=causal_mask
    )
    x = tf.keras.layers.add((connection1, x))
    x = self.normalization1(x)

    connection2 = x
    # We put a special mask here for each word to ignore future words
    x = self.cross_attention(
      query=x,
      key=encoded_source,
      value=encoded_source,
      attention_mask=mask
    )
    x = tf.keras.layers.add((connection2, x))
    x = self.normalization2(x)
    
    residual = x
    x = self.dense(x)
    x = tf.keras.layers.add((residual, x))
    return self.normalization3(x)
    
  def get_config(self):
    config = super().get_config()
    config.update({
      'num_heads': self.num_heads,
      'embedding_dim': self.embedding_dim,
      'dense_dim': self.dense_dim,
    })
    return config

## Transformer model

In [None]:
class Transformer(tf.keras.Model):
  def __init__(self, vocabulary_size, sequence_length, embedding_dim, dense_dim, num_heads, encoder_name, decoder_name, dropout_rate=0.5, **kwargs):
    super().__init__(**kwargs)
    self.encoder_embedding = PositionalEmbedding(vocabulary_size, sequence_length, embedding_dim)
    self.decoder_embedding = PositionalEmbedding(vocabulary_size, sequence_length, embedding_dim)
      
    self.encoder = TransformerEncoder(embedding_dim, dense_dim, num_heads)
    self.decoder = TransformerDecoder(embedding_dim, dense_dim, num_heads)
    self.probabilities_output = tf.keras.models.Sequential([
      tf.keras.layers.Dropout(rate=dropout_rate),
      tf.keras.layers.Dense(units=vocabulary_size, activation='softmax')
    ])
      
    self.encoder_name = encoder_name
    self.decoder_name = decoder_name
      
  def call(self, inputs):
    print(inputs)
    encoder_input = self.encoder_embedding(inputs[self.encoder_name])
    decoder_input = self.decoder_embedding(inputs[self.decoder_name])
    
    encoder_output = self.encoder(encoder_input)
    decoder_output = self.decoder(decoder_input, encoder_output)
    return self.probabilities_output(decoder_output)

In [None]:
embedding_dim = 256
dense_dim = 2048
num_heads = 8

dropout_rate = 0.5

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
x = PositionalEmbedding(num_features, max_sequence_length, embedding_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embedding_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spanish")
x = PositionalEmbedding(num_features, max_sequence_length, embedding_dim)(decoder_inputs)
x = TransformerDecoder(embedding_dim, dense_dim, num_heads)(x, encoder_outputs)
x = keras.layers.Dropout(rate=dropout_rate)(x)
outputs = keras.layers.Dense(units=num_features, activation='softmax')(x)

transformer_model = keras.Model(inputs=(encoder_inputs, decoder_inputs), outputs=outputs)

transformer_model.compile(
    optimizer=keras.optimizers.RMSprop(),
    loss=keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

In [None]:
transformer_model.summary()
keras.utils.plot_model(
  transformer_model,
  show_shapes=True,
  show_dtype=True,
  show_layer_names=True,
  show_layer_activations=True,
  expand_nested=True
)

In [None]:
date = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
transformer_model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=30,
    callbacks=[
        tf.keras.callbacks.ModelCheckpoint(f'models/transformer_best.keras', save_best_only=True, monitor='val_loss'),
        tf.keras.callbacks.ModelCheckpoint('models/transformer{epoch:02d}-{val_loss:.2f}.keras'),
        tf.keras.callbacks.BackupAndRestore(backup_dir=f'/tmp/backup/transformer--{date}'),
        tf.keras.callbacks.TensorBoard(log_dir=f'logs/fit/transformer--{date}', histogram_freq=1)
    ]
)

## Testing with never-before seen data

In [17]:
best_transformer = keras.models.load_model(
  'models/transformer10-2.43.keras',
  custom_objects={
    "TransformerEncoder": TransformerEncoder,
    "TransformerDecoder": TransformerDecoder,
    "PositionalEmbedding": PositionalEmbedding
  }
)

In [18]:
best_transformer.summary()

Output of the model is of shape (batch_size, sequence_length, nb_features). Each value (:, i, j) corresponds to the probability that word number i is at position j in the vocabulary.

In [22]:
spanish_vocabulary = spanish_vectorizer.get_vocabulary()

def get_translation(english_sentence):
  tokenized_english = english_vectorizer([english_sentence])
  output_sentence = "[start]"
  for i in range(max_sequence_length):
    tokenized_spanish = spanish_vectorizer([output_sentence])[:, :-1]  # Remember to apply the offset
    prediction = best_transformer({
      'english': tokenized_english,
      'spanish': tokenized_spanish
    })
    prediction_index = np.argmax(prediction[0, i, :])
    prediction_word = spanish_vocabulary[prediction_index]
    output_sentence += f' {prediction_word}'
    if prediction_word == '[end]':
      break    
      
  return output_sentence

for sentence in random.sample(english_sentences, 5):
  print(sentence)
  print(f'- {get_translation(sentence)}')
  print()

Spring is in the air.
- [start] [end]

She was too proud to ask him for help.
- [start] [end]

Tom didn't know Mary had decided to leave him.
- [start] [end]

They are leaving Japan tomorrow.
- [start] [end]

What time do you want me to pick you up?
- [start] [end]



In [19]:
best_transformer.evaluate(validation_dataset)

W0000 00:00:1723986508.931206   35980 assert_op.cc:38] Ignoring Assert operator compile_loss/sparse_categorical_crossentropy/SparseSoftmaxCrossEntropyWithLogits/assert_equal_1/Assert/Assert


[1m 178/1488[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m54s[0m 42ms/step - accuracy: 0.6802 - loss: 6.3533

W0000 00:00:1723986518.412503   35980 assert_op.cc:38] Ignoring Assert operator compile_loss/sparse_categorical_crossentropy/SparseSoftmaxCrossEntropyWithLogits/assert_equal_1/Assert/Assert


[1m 460/1488[0m [32m━━━━━━[0m[37m━━━━━━━━━━━━━━[0m [1m50s[0m 49ms/step - accuracy: 0.6814 - loss: 6.3405


KeyboardInterrupt

