In [None]:
filename = 'data/deu.txt'

### Setup

In [None]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import pickle

In [None]:
tf.__version__

### Parsing the data

Each line in our dataset contains an English sentence and its corresposnding German Translation. English sentences are seperated by a tab and German sentences are seperated by a new line.
The English sentence is the source sentences and the German sentence is the target sentences. We want to append the token "[start]" and "[end]" at the start and the end of the each german sentence respectively.

In [None]:
with open(filename) as f:
    lines = f.read().split("\n")[:-1]

text_pairs = []
for line in lines:
    eng, deu, _ = line.split("\t")
    deu = "[start] " + deu + " [end]"
    text_pairs.append((eng, deu))

In [None]:
for _ in range(5):
    print(random.choice(text_pairs))

In [None]:
# Let's split the data into training and validation sets.

random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples : ]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

### Vectorizing the text data

Now we have to vectorize the text data. We will use the TextVectorization layer to vectorize the text data (one for english and one for german)..

In [None]:
strip_chars = string.punctuation
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

@tf.keras.utils.register_keras_serializable()
def custom_standardization(input_string):
    text = tf.strings.lower(input_string)
    
    text = tf.strings.regex_replace(text, "[%s]" % re.escape(strip_chars), "")

    return text

In [None]:
eng_vocab_size = 16000
deu_vocab_size = 35000
sequence_length = 30
batch_size = 64

strip_chars = string.punctuation
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

@tf.keras.utils.register_keras_serializable()
def custom_standardization(input_string):
    text = tf.strings.lower(input_string)
    
    text = tf.strings.regex_replace(text, "[%s]" % re.escape(strip_chars), "")

    return text

eng_vector = TextVectorization(
    max_tokens=eng_vocab_size, output_mode="int", output_sequence_length=sequence_length,
)

deu_vector = TextVectorization(
    max_tokens=deu_vocab_size, output_mode="int",
    output_sequence_length = sequence_length+1,
    standardize=custom_standardization,
)

train_eng_texts = [pair[0] for pair in train_pairs]
train_deu_texts = [pair[1] for pair in train_pairs]
eng_vector.adapt(train_eng_texts)
deu_vector.adapt(train_deu_texts)

### Saving the tokenizer

In [None]:
eng_vector_model = tf.keras.models.Sequential()
eng_vector_model.add(tf.keras.Input(shape=(None, ), dtype=tf.string))
eng_vector_model.add(eng_vector)
eng_vector_model.summary()

eng_vector_model.save('eng_vector_layer')

In [None]:
print(eng_vector_model.predict(['hello son']))

In [None]:
eng_token = tf.keras.models.load_model('models/eng_vector_layer')
eng_token.predict(['hello son']).to_tensor(default_value=0, shape=[None, 30])

In [None]:
deu_vector_model = tf.keras.models.Sequential()
deu_vector_model.add(tf.keras.Input(shape=(None, ), dtype=tf.string))
deu_vector_model.add(deu_vector)
deu_vector_model.summary()

deu_vector_model.save('deu_vector_layer')

In [None]:
deu_token = tf.keras.models.load_model('models/deu_vector_later')

We'll save the tokenizer for the later use.

Now, Let's create our dataset using `tf.data.Dataset` API.

At each training step, the model will seek to predict target words N+1 (and beyond) using the source sentence and the target words 0 to N.

As such, the training dataset will yield a tuple (`inputs`, `targets`), where:
* `inputs` consist of the source sentence and the target words 0 to N.
* `target` is the target sentence offset by one step: it provides the next words in the target sentence -- what the model will try to predict.

In [None]:
def format_dataset(eng, deu):
    eng = eng_vector(eng)
    deu = deu_vector(deu)
    return ({"encoder_inputs": eng, "decoder_inputs": deu[:, :-1]}, deu[:, 1:])

def make_dataset(pairs):
    eng_texts, deu_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    deu_texts = list(deu_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, deu_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'input["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"target.shape: {targets.shape}")

### Build the model

Our sequence to sequence Transformer consist of a `TransformerEncoder` and a `TransformerDecoder` chained together. To make the model aware of word order, we use a `PositionalEmbedding` layer.

The source sequence will be pass to the `TransformerEncoder`, which will produce a new representation of it. This new representation will be then passed to the `TransformerDecoder`, together with the target sequence so far (target word 0 to N). The `TransformerDecoder` will then seek to predict the next word in the target sequence (N+1 and beyond).

A key detail that makes this possible is casual masking (see the method `get_casual_attention_maski()` on the `TransformerDecoder`). The `TransformerDecoder` see the entire sequences at once, and thus we must make sure that it only uses information from target tokens 0 to N when predicting the token N+1 (otherwise, it could be use information from the future, which would result in a model that cannot be used at inference time).

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads):
        super(TransformerEncoder, self).__init__()
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation='relu'), layers.Dense(embed_dim)]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True
    
    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
        
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)
    
    def get_config(self):
        return {
            "embed_dim" : self.embed_dim,
            "dense_dim" : self.dense_dim,
            "num_heads" : self.num_heads,
               }
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim):
        super(PositionalEmbedding, self).__init__()
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.positional_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.positional_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)
    
    def get_config(self):
        return {
            "sequence_length" : self.sequence_length,
            "vocab_size" : self.vocab_size,
            "embed_dim" : self.embed_dim,
        }
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads):
        super(TransformerDecoder, self).__init__()
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation='relu'), layers.Dense(embed_dim)]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True
    
    def call(self, inputs, encoder_outputs, mask=None):
        casual_mask = self.get_casual_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype='int32')
            padding_mask = tf.minimum(padding_mask, casual_mask)
        
        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=casual_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)
        
        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_casual_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype='int32')
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
             axis=0,
        )

        return tf.tile(mask, mult)
    
    def get_config(self):
        return {
            "embed_dim" : self.embed_dim,
            "latent_dim" : self.latent_dim,
            "num_heads" : self.num_heads
        }
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

### Assemble the model

In [None]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None, ), dtype='int64', name='encoder_inputs')
x = PositionalEmbedding(sequence_length, eng_vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None, ), dtype='int64', name='decoder_inputs')
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name='decoder_state_inputs')
x = PositionalEmbedding(sequence_length, deu_vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(deu_vocab_size, activation='softmax')(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name='transformer'
)

### Training our model

In [None]:
epochs = 30

transformer.summary()
transformer.compile(
    optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
history = transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)


In [None]:
# transformer.save_weights()

In [None]:
transformer.save('seq2seq-transformer_full_vocab.h5')

In [None]:
# del transformer

### Loading the trained model

In [None]:
load_model = tf.keras.models.load_model('models/seq2seq-transformer_full_vocab.h5', custom_objects={
    "TransformerEncoder" : TransformerEncoder,
    "PositionalEmbedding" : PositionalEmbedding,
    "TransformerDecoder" : TransformerDecoder
})

In [None]:
# load_model.summary()

### Decoding test sentences

In [None]:
len(deu_token.layers[0].get_vocabulary())

In [None]:
deu_token_layer = deu_token.layers[0]

In [None]:
deu_token_vector

In [None]:
deu_vocab = deu_token.layers[0].get_vocabulary()
deu_index_lookup = dict(zip(range(len(deu_vocab)), deu_vocab))
max_decode_sentence_length = 30

def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_token.predict([input_sentence]).to_tensor(default_value=0, shape=[None, max_decode_sentence_length])
    decode_sentence = "[start]"
    for i in range(max_decode_sentence_length):
        tokenized_target_sentence = deu_token.predict([decode_sentence]).to_tensor(default_value=0, shape=[None, max_decode_sentence_length+1])[:, :-1]
        predictions = load_model([tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = deu_index_lookup[sampled_token_index]
        decode_sentence += " " + sampled_token
        
        if sampled_token == "[end]":
            break
    return decode_sentence

In [None]:
test_eng_texts = [pair[0] for pair in test_pairs]
test_deu_texts = [pair[1] for pair in test_pairs]
for _ in range(10):
    input_sentence, deu_sentence = random.choice(test_pairs)
    translated = decode_sequence(input_sentence)
    
    print(f" \
    English sentence: \n  {input_sentence}\n\n---\n\n \
    German sentence: \n  {deu_sentence}\n\n---\n\n \
    Translated sentence: \n  {translated}\n\n------------------\n\n\n")