In [1]:
from pathlib import Path

import numpy as np
import tensorflow as tf
from tensorflow import data
from tensorflow import keras
from keras import layers

K = keras.backend


In [2]:
url = "https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip"
path = keras.utils.get_file(
    "spa-eng.zip", origin=url, cache_dir="datasets", extract=True
)
text = (Path(path).with_name("spa-eng") / "spa.txt").read_text(encoding="utf-8")


In [3]:
text = text.replace("¡", "").replace("¿", "")
pairs = [line.split("\t") for line in text.splitlines()]
np.random.seed(42)
np.random.shuffle(pairs)
sentences_en, sentences_es = zip(*pairs)


In [4]:
X_encoder = tf.constant(sentences_en)
X_decoder = tf.constant([f"startofseq {s}" for s in sentences_es])
Y = tf.constant([f"{s} endofseq" for s in sentences_es])


In [5]:
def adapt_compile_and_fit(
    model, X_encoder, X_decoder, Y, batch_size=32, epochs=5, validation_size=0.2
):
    valid_len = int(validation_size * len(Y))
    train_len = len(Y) - valid_len

    X_train_encoder = X_encoder[:train_len]
    X_train_decoder = X_decoder[:train_len]

    X_valid_encoder = X_encoder[train_len:]
    X_valid_decoder = X_decoder[train_len:]

    Y_train = Y[:train_len]
    Y_valid = Y[train_len:]

    model.vectorization_layer_en.adapt(X_train_encoder)
    model.vectorization_layer_es.adapt([f"{s} endofseq" for s in X_train_decoder])

    Y_train = model.vectorization_layer_es(Y_train)
    Y_valid = model.vectorization_layer_es(Y_valid)

    model.compile(
        loss="sparse_categorical_crossentropy", optimizer="nadam", metrics=["accuracy"]
    )

    model.fit(
        (X_train_encoder, X_train_decoder),
        Y_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=((X_valid_encoder, X_valid_decoder), Y_valid),
    )


In [23]:
class BasicEncoderDecoder(keras.Model):
    def __init__(
        self,
        vocabulary_size=1000,
        max_length=50,
        embedding_size=128,
        n_units_lstm=512,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.vectorization_layer_en = layers.TextVectorization(
            vocabulary_size, output_sequence_length=max_length
        )
        self.vectorization_layer_es = layers.TextVectorization(
            vocabulary_size, output_sequence_length=max_length
        )

        self.encoder_embedding_layer = layers.Embedding(
            vocabulary_size, embedding_size, mask_zero=True
        )
        self.decoder_embedding_layer = layers.Embedding(
            vocabulary_size, embedding_size, mask_zero=True
        )

        self.encoder = layers.LSTM(n_units_lstm, return_state=True)
        self.decoder = layers.LSTM(n_units_lstm, return_sequences=True)

        self.output_layer = layers.Dense(vocabulary_size, activation="softmax")

    def call(self, inputs):
        encoder_inputs, decoder_inputs = inputs

        encoder_input_ids = self.vectorization_layer_en(encoder_inputs)
        decoder_input_ids = self.vectorization_layer_es(decoder_inputs)

        encoder_embeddings = self.encoder_embedding_layer(encoder_input_ids)
        decoder_embeddings = self.decoder_embedding_layer(decoder_input_ids)

        encoder_output, *encoder_state = self.encoder(encoder_embeddings)
        decoder_output = self.decoder(decoder_embeddings, initial_state=encoder_state)

        return self.output_layer(decoder_output)


In [24]:
K.clear_session()
tf.random.set_seed(42)

basic_encoder_decoder = BasicEncoderDecoder()
adapt_compile_and_fit(basic_encoder_decoder, X_encoder, X_decoder, Y, epochs=1)




In [25]:
class BidirectionalEncoderDecoderWithAttention(keras.Model):
    def __init__(
        self,
        vocabulary_size=1000,
        max_length=50,
        embedding_size=128,
        n_units_lstm=512,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.vectorization_layer_en = layers.TextVectorization(
            vocabulary_size, output_sequence_length=max_length
        )
        self.vectorization_layer_es = layers.TextVectorization(
            vocabulary_size, output_sequence_length=max_length
        )

        self.encoder_embedding_layer = layers.Embedding(
            vocabulary_size, embedding_size, mask_zero=True
        )
        self.decoder_embedding_layer = layers.Embedding(
            vocabulary_size, embedding_size, mask_zero=True
        )

        self.encoder = layers.Bidirectional(
            layers.LSTM(n_units_lstm // 2, return_sequences=True, return_state=True)
        )
        self.decoder = layers.LSTM(n_units_lstm, return_sequences=True)
        self.attention_layer = layers.Attention()
        self.output_layer = layers.Dense(vocabulary_size, activation="softmax")

    def call(self, inputs):
        encoder_inputs, decoder_inputs = inputs

        encoder_input_ids = self.vectorization_layer_en(encoder_inputs)
        decoder_input_ids = self.vectorization_layer_es(decoder_inputs)

        encoder_embeddings = self.encoder_embedding_layer(encoder_input_ids)
        decoder_embeddings = self.decoder_embedding_layer(decoder_input_ids)

        encoder_output, *encoder_state = self.encoder(encoder_embeddings)
        encoder_state = [
            tf.concat(encoder_state[0::2], axis=-1),
            tf.concat(encoder_state[1::2], axis=-1),
        ]
        decoder_output = self.decoder(decoder_embeddings, initial_state=encoder_state)
        attention_output = self.attention_layer([decoder_output, encoder_output])

        return self.output_layer(attention_output)


In [60]:
K.clear_session()
tf.random.set_seed(42)

bidirect_encoder_decoder_with_attention = BidirectionalEncoderDecoderWithAttention()
adapt_compile_and_fit(
    bidirect_encoder_decoder_with_attention, X_encoder, X_decoder, Y, epochs=1
)




In [68]:
class PositionalEncoding(layers.Layer):
    def __init__(self, max_length=50, embed_size=128, dtype=tf.float32, **kwargs):
        super().__init__(dtype=dtype, **kwargs)
        if not embed_size % 2 == 0:
            raise ValueError("The `embedding_size` must be even.")

        p, i = np.meshgrid(np.arange(max_length), 2 * np.arange(embed_size // 2))
        pos_emb = np.empty((1, max_length, embed_size))
        pos_emb[:, :, 0::2] = np.sin(p / 10_000 ** (i / embed_size)).T
        pos_emb[:, :, 1::2] = np.cos(p / 10_000 ** (i / embed_size)).T
        self.positional_embedding = tf.constant(pos_emb.astype(self.dtype))
        self.supports_masking = True

    def call(self, inputs):
        batch_max_length = tf.shape(inputs)[1]
        return inputs + self.positional_embedding[:, :batch_max_length]


In [79]:
class Encoder(layers.Layer):
    def __init__(self, embed_size=128, n_heads=8, n_units=128, dropout=0.1, **kwargs):
        super().__init__(**kwargs)
        self.attn = layers.MultiHeadAttention(n_heads, embed_size, dropout=dropout)
        self.add = layers.Add()
        self.norm = layers.LayerNormalization()
        self.dense1 = layers.Dense(n_units, "relu", kernel_initializer="he_normal")
        self.dense2 = layers.Dense(embed_size)
        self.dropout = layers.Dropout(dropout)

    def call(self, inputs, mask):
        Z = inputs
        skip_Z = Z
        Z = self.attn(Z, value=Z, attention_mask=mask)
        Z = self.norm(self.add([Z, skip_Z]))
        skip_Z = Z
        Z = self.dense1(Z)
        Z = self.dense2(Z)
        Z = self.dropout(Z)
        return self.norm(self.add([Z, skip_Z]))


In [91]:
class Decoder(layers.Layer):
    def __init__(
        self, embed_size=128, n_heads=8, n_units=128, dropout_rate=0.1, **kwargs
    ):
        super().__init__(**kwargs)
        self.attn1 = layers.MultiHeadAttention(n_heads, embed_size, dropout=dropout_rate)
        self.attn2 = layers.MultiHeadAttention(n_heads, embed_size, dropout=dropout_rate)
        self.add = layers.Add()
        self.norm = layers.LayerNormalization()
        self.dense1 = layers.Dense(n_units, "relu", kernel_initializer="he_normal")
        self.dense2 = layers.Dense(embed_size)
        self.dropout = layers.Dropout(dropout_rate)

    def call(self, inputs, mask):
        decoder_mask, encoder_mask = mask
        Z, encoder_output = inputs
        Z_skip = Z
        Z = self.attn1(Z, value=Z, attention_mask=decoder_mask)
        Z = self.norm(self.add([Z, Z_skip]))
        Z_skip = Z
        Z = self.attn2(Z, value=encoder_output, attention_mask=encoder_mask)
        Z = self.norm(self.add([Z, Z_skip]))
        Z_skip = Z
        Z = self.dense1(Z)
        Z = self.dense2(Z)
        #Z = self.dropout(Z)
        return self.norm(self.add([Z, Z_skip]))


In [92]:
from keras.layers import TextVectorization, Embedding


class Transformer(keras.Model):
    def __init__(
        self,
        vocab_size=1000,
        max_length=50,
        embed_size=128,
        n_blocks=2,
        n_heads=8,
        n_units=128,
        dropout_rate=0.1,
        **kwargs,
    ):
        super().__init__(**kwargs)

        self.vectorization_layer_en = TextVectorization(
            vocab_size, output_sequence_length=max_length
        )
        self.vectorization_layer_es = TextVectorization(
            vocab_size, output_sequence_length=max_length
        )
        self.encoder_embedding = Embedding(vocab_size, embed_size, mask_zero=True)
        self.decoder_embedding = Embedding(vocab_size, embed_size, mask_zero=True)
        self.positional_embedding = PositionalEncoding(max_length, embed_size)
        self.encoder_blocks = [
            Encoder(embed_size, n_heads, n_units, dropout_rate) for _ in range(n_blocks)
        ]
        self.decoder_blocks = [
            Decoder(embed_size, n_heads, n_units, dropout_rate) for _ in range(n_blocks)
        ]
        self.output_layer = layers.Dense(vocab_size, activation="softmax")

    def call(self, inputs):
        encoder_inputs, decoder_inputs = inputs

        encoder_input_ids = self.vectorization_layer_en(encoder_inputs)
        decoder_input_ids = self.vectorization_layer_es(decoder_inputs)

        encoder_embeddings = self.encoder_embedding(encoder_input_ids)
        decoder_embeddings = self.decoder_embedding(decoder_input_ids)

        encoder_pos_embeddings = self.positional_embedding(encoder_embeddings)
        decoder_pos_embeddings = self.positional_embedding(decoder_embeddings)

        encoder_mask = tf.math.not_equal(encoder_input_ids, 0)[:, tf.newaxis]
        decoder_pad_mask = tf.math.not_equal(decoder_input_ids, 0)[:, tf.newaxis]
        batch_max_len_decoder = tf.shape(decoder_embeddings)[1]
        decoder_causal_mask = tf.linalg.band_part(  # Lower triangular matrix.
            tf.ones((batch_max_len_decoder, batch_max_len_decoder), tf.bool), -1, 0
        )
        decoder_mask = decoder_causal_mask & decoder_pad_mask

        Z = encoder_pos_embeddings
        for encoder_block in self.encoder_blocks:
            Z = encoder_block(Z, mask=encoder_mask)

        encoder_output = Z
        Z = decoder_pos_embeddings
        for decoder_block in self.decoder_blocks:
            Z = decoder_block([Z, encoder_output], mask=[decoder_mask, encoder_mask])

        return self.output_layer(Z)


In [None]:
K.clear_session()
tf.random.set_seed(42)

transformer = Transformer()
adapt_compile_and_fit(transformer, X_encoder, X_decoder, Y, epochs=1)
