In [41]:
import tensorflow as tf
import pathlib
import unicodedata
import re
import pickle
import tensorflow as tf
from tensorflow.keras.layers import TextVectorization
import random
import numpy as np

In [3]:
text_file = tf.keras.utils.get_file(
    fname='fra-eng.zip',
    origin="http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip",
    extract=True
)

In [6]:
text_file = pathlib.Path(text_file).parent / 'fra.txt'

In [8]:
def normalize(line):
    # Normalize unicode characters, strip leading/trailing whitespace, convert to lowercase
    line = unicodedata.normalize("NFKC", line.strip().lower())
    # Handle special characters and add start and end tokens for the target language (French)
    line = re.sub(r"^([^ \w])(?!\s)", r"\1", line)
    line = re.sub(r"(\s[^ \w])(?!\s)", r"\1", line)
    line = re.sub(r"(?!\s)([^ \w])$", r"\1", line)
    line = re.sub(r"(?!\s)([^ \w]\s)", r"\1", line)
    eng, fre = line.split("\t")
    fre = '[start] ' + fre + ' [end]'
    return eng, fre

In [9]:
with open(text_file) as fp:
    text_pairs = [normalize(line) for line in fp]


In [12]:
# Initialize sets to store unique tokens for English and French
eng_tokens, fre_tokens = set(), set()
# Initialize variables to store maximum sequence lengths
eng_maxlen, fre_maxlen = 0, 0

In [14]:
for eng, fre in text_pairs:
    eng_token, fre_token = eng.split(), fre.split()
    eng_maxlen = max(eng_maxlen, len(eng_token))
    fre_maxlen = max(fre_maxlen, len(fre_token))
    eng_tokens.update(eng_token)
    fre_tokens.update(fre_token)

In [15]:
print(f"Total tokens in English: {len(eng_tokens)}")
print(f"Total tokens in French: {len(fre_tokens)}")
print(f"Maximum length of English sequence: {eng_maxlen}")
print(f"Maximum length of French sequence: {fre_maxlen}")

Total tokens in English: 25380
Total tokens in French: 44866
Maximum length of English sequence: 47
Maximum length of French sequence: 56


### Embedding Layer

In [18]:
with open("text_pairs.pickle", 'wb') as fp:
    pickle.dump(text_pairs, fp)

In [21]:
# Load preprocessed text pairs
with open("text_pairs.pickle", 'rb') as fp:
    text_pairs = pickle.load(fp)

In [26]:
# Shuffle the data
random.shuffle(text_pairs)

In [27]:
# Split into train and test sets
n_val = int(0.15 * len(text_pairs))
n_train = len(text_pairs) - 2 * n_val
train_pair = text_pairs[:n_train]
test_pair = text_pairs[n_train: n_train + n_val]

In [28]:
# Vocabulary sizes and sequence length
vocab_en = 10000
vocab_fr = 20000
seq_length = 25


In [29]:
# Initialize TextVectorization layers
eng_vect = TextVectorization(
    max_tokens=vocab_en,
    standardize=None,
    split='whitespace',
    output_mode='int',
    output_sequence_length=seq_length
)

fre_vect = TextVectorization(
    max_tokens=vocab_fr,
    standardize=None,
    split='whitespace',
    output_mode='int',
    output_sequence_length=seq_length + 1  # +1 for start token
)

In [30]:
# Adapt TextVectorization layers to training data
train_eng = [pair[0] for pair in train_pair]
train_fre = [pair[1] for pair in train_pair]

eng_vect.adapt(train_eng)
fre_vect.adapt(train_fre)

In [31]:
# Serialize the vectorization layers and training/test data
with open('vectorize.pickle', 'wb') as fp:
    data = {'train': train_pair,
            'test': test_pair,
            'eng_vect': eng_vect.get_config(),
            'fre_vect': fre_vect.get_config(),
            'eng_weights': eng_vect.get_weights(),
            'fre_weights': fre_vect.get_weights()
            }
    pickle.dump(data, fp)


In [32]:
# Load serialized data
with open("vectorize.pickle", 'rb') as fp:
    data = pickle.load(fp)


In [33]:
# Retrieve train and test pairs
train_pair = data['train']
test_pair = data['test']

In [34]:
# Reconstruct TextVectorization layers
eng_vect = TextVectorization.from_config(data['eng_vect'])
eng_vect.set_weights(data['eng_weights'])
fre_vect = TextVectorization.from_config(data['fre_vect'])
fre_vect.set_weights(data['fre_weights'])

In [35]:
# Define function to format dataset
def format_dataset(eng, fre):
    eng = eng_vect(eng)
    fre = fre_vect(fre)
    source = {'encode_inp': eng,
              'decode_inp': fre[:, :-1]
              }
    target = fre[:, 1:]
    return (source, target)

In [36]:
# Define function to create dataset
def make_dataset(pairs, batchsize=64):
    eng_text, fre_text = zip(*pairs)
    dataset = tf.data.Dataset.from_tensor_slices((list(eng_text), list(fre_text)))
    return dataset.shuffle(2048).batch(batchsize).map(format_dataset).prefetch(16).cache()

In [37]:
# Create TensorFlow datasets for training and testing
train_ds = make_dataset(train_pair)
test_ds = make_dataset(test_pair)

### Positional Embedding

In [38]:
# Function to generate positional encoding matrix
def pos_enc_matrix(L, d, n=10000):
    assert d % 2 == 0
    d2 = d // 2

    P = np.zeros((L, d))
    k = np.arange(L).reshape(-1, 1)
    i = np.arange(d2).reshape(1, -1)

    denom = np.power(n, -i / d2)
    args = k * denom

    P[:, ::2] = np.sin(args)
    P[:, 1::2] = np.cos(args)
    return P

In [39]:
# Custom Keras layer for positional embedding
class PositionalEmbedding(tf.keras.layers.Layer):

    def __init__(self, seq_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.seq_length = seq_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

        self.token_embeddings = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=embed_dim, mask_zero=True)
        matrix = pos_enc_matrix(seq_length, embed_dim)

        self.positional_embedding = tf.constant(matrix, dtype='float32')

    def call(self, inputs):
        embedded_tokens = self.token_embeddings(inputs)
        return embedded_tokens + self.positional_embedding

    def compute_mask(self, *args, **kwargs):
        return self.token_embeddings.compute_mask(*args, **kwargs)

    def get_config(self):
        config = super().get_config()
        config.update({
            'seq_length': self.seq_length,
            'vocab_size': self.vocab_size,
            'embed_dim': self.embed_dim
        })

In [42]:
# Usage and Validation
vocab_en = 10000
seq_length = 25

for inputs, targets in train_ds.take(1):
    embed_en = PositionalEmbedding(seq_length, vocab_en, embed_dim=512)
    en_emb = embed_en(inputs['encode_inp'])
    print(en_emb._keras_mask)

tf.Tensor(
[[ True  True  True ... False False False]
 [ True  True  True ... False False False]
 [ True  True  True ... False False False]
 ...
 [ True  True  True ... False False False]
 [ True  True  True ... False False False]
 [ True  True  True ... False False False]], shape=(64, 25), dtype=bool)


### Self-Attention Layer

In [43]:
def self_attention(input_shape, prefix='att', mask=False, **kwargs):
    # Define inputs
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32', name=f"{prefix}_in1")

    # Multi-head attention layer
    attention = tf.keras.layers.MultiHeadAttention(name=f"{prefix}_att1", **kwargs)
    norm = tf.keras.layers.LayerNormalization(name=f'{prefix}_norm1')
    add = tf.keras.layers.Add(name=f'{prefix}_add1')

    # Apply attention mechanism
    attout = attention(query=inputs, value=inputs, key=inputs, use_causal_mask=mask)

    # Apply normalization and residual connection
    output = norm(add([inputs, attout]))

    # Create the model
    model = tf.keras.Model(inputs=inputs, outputs=output, name=f"{prefix}_att")

    return model

### Cross-Attention Layer

In [44]:
def cross_attention(input_shape, context_shape, prefix='att', **kwargs):
    # Define inputs
    context = tf.keras.layers.Input(shape=context_shape, dtype='float32', name=f"{prefix}_ctx2")
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32', name=f'{prefix}_in2')

    # Multi-head attention layer
    attention = tf.keras.layers.MultiHeadAttention(name=f'{prefix}_att2', **kwargs)
    norm = tf.keras.layers.LayerNormalization(name=f'{prefix}_norm2')
    add = tf.keras.layers.Add(name=f'{prefix}_add2')

    # Apply attention mechanism
    attout = attention(query=inputs, key=context, value=context)

    # Apply normalization and residual connection
    output = norm(add([attout, inputs]))

    # Create the model
    model = tf.keras.Model(inputs=[context, inputs], outputs=output, name=f'{prefix}_crs_at')

    return model

### Feed-Forward Layer

In [45]:
def feed_forward(input_shape, model_dim, ff_dim, dropout=.1, prefix='ff'):
    # Define inputs
    inputs = tf.keras.layers.Input(shape=input_shape, dtype='float32', name=f'{prefix}_in3')

    # Dense layers
    dense1 = tf.keras.layers.Dense(ff_dim, name=f'{prefix}_ff1', activation='relu')
    dense2 = tf.keras.layers.Dense(model_dim, name=f'{prefix}_ff2')
    drop = tf.keras.layers.Dropout(dropout, name=f'{prefix}_drop')
    add = tf.keras.layers.Add(name=f"{prefix}_add3")

    # Apply feed-forward transformation
    ffout = drop(dense2(dense1(inputs)))

    # Layer normalization and residual connection
    norm = tf.keras.layers.LayerNormalization(name=f'{prefix}_norm3')
    output = norm(add([inputs, ffout]))

    # Create the model
    model = tf.keras.Model(inputs=inputs, outputs=output, name=f'{prefix}_ff')

    return model