# Sequence-to-Sequence Learning: English-German Translator

This notebook illustrates how to develop an English-to-German translation model as a sequence-to-sequence (seq2seq) learning problem using TensorFlow and Keras. This project demonstrates the core components of an encoder-decoder architecture with *BiLSTMs*, handling of text preprocessing, and evaluation using BLEU scores.

---

### Sequence-to-Sequence Learning Overview

Sequence-to-sequence (seq2seq) learning is a type of deep learning model that maps input sequences to output sequences of potentially different lengths. It has wide applications in:

- Machine Translation (e.g., English-to-German)
- Text Summarization
- Chatbots and Conversational Agents

Key components:
- **Encoder**: Encodes the input sequence into a fixed-length context vector.
- **Decoder**: Decodes the context vector to generate the target sequence.


***Reference***: [TensorFlow in Action](https://www.google.de/books/edition/TensorFlow_in_Action/JYyKEAAAQBAJ?hl=en&gbpv=0).

### **Step 1**: Download and Extract the Dataset

In [1]:
import os
import pandas as pd
import zipfile
from typing import List, Tuple

# Ensure the required dataset is available and extracted
def prepare_data() -> pd.DataFrame:
    """
    Prepares the English-German dataset for translation.
    Downloads and extracts the data if not already present.

    Returns:
        pd.DataFrame: A dataframe containing English and German sentences.
    """
    data_dir = 'data'
    zip_path = os.path.join(data_dir, 'deu-eng.zip')
    extracted_path = os.path.join(data_dir, 'deu.txt')

    if not os.path.exists(zip_path):
        raise FileNotFoundError(
            f"Please download 'deu-eng.zip' from "
            f"http://www.manythings.org/anki/deu-eng.zip and place it in the {data_dir} folder."
        )

    if not os.path.exists(extracted_path):
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(data_dir)
        print("Dataset extracted.")
    else:
        print("Dataset already extracted.")

    # Load and preprocess the data
    df = pd.read_csv(extracted_path, delimiter='\t', header=None, names=["EN", "DE", "Attribution"])
    df = df[["EN", "DE"]]
    return df

df = prepare_data()

Dataset already extracted.


**Explanation**: This step ensures the required dataset is extracted for processing. If the dataset is not present in the expected location, it prompts the user to download it manually.

---

### **Step 2**: Data Setup and Preprocessing
---

Download [deu-eng dataset](http://www.manythings.org/anki/deu-eng.zip) manually and locate it in the ```\data``` folder.

In [2]:
import pandas as pd

# Read the csv file
df = pd.read_csv(os.path.join('data', 'deu.txt'), delimiter='\t', header=None)

# Set column names
df.columns = ["EN", "DE", "Attribution"]
df = df[["EN", "DE"]]

print('df.shape = {}'.format(df.shape))

clean_inds = [i for i in range(len(df)) if b"\xc2" not in df.iloc[i]["DE"].encode("utf-8")]
df = df.iloc[clean_inds]

df.head()

df.shape = (277891, 2)


Unnamed: 0,EN,DE
0,Go.,Geh.
1,Hi.,Hallo!
2,Hi.,Grüß Gott!
3,Run!,Lauf!
4,Run.,Lauf!


**Explanation**: In this step, we clean the data by removing rows with unwanted characters and retain only English and German sentence pairs for the translation task.

---

### **Step 3**: Random Sampling and Token Addition

In [3]:
n_samples = 50000
random_seed= 4321
df = df.sample(n=n_samples, random_state=random_seed)

start_token = 'sos'
end_token = 'eos'
df["DE"] = start_token + ' ' + df["DE"] + ' ' + end_token

# Randomly sample 10% examples from the total 50000 randomly
test_df = df.sample(n=int(n_samples/10), random_state=random_seed)

# Randomly sample 10% examples from the remaining randomly
valid_df = df.loc[~df.index.isin(test_df.index)].sample(n=int(n_samples/10), random_state=random_seed)

# Assign the rest to training data
train_df = df.loc[~(df.index.isin(test_df.index) | df.index.isin(valid_df.index))]

**Explanation**: This step prepares a subset of ```50,000``` sentences for the task, appends start (```sos```) and end (```eos```) tokens to each German sentence, and splits the data into training, validation, and test sets.

---

### **Step 4**: Analyze Sequence Lengths

In [4]:
def print_sequence_length(str_ser):
    """
    Print the summary stats of the sequence length
    """
    seq_length_ser = str_ser.str.split(' ').str.len()
    print("\nSome summary statistics")
    print("Median length: {}\n".format(seq_length_ser.median()))
    print(seq_length_ser.describe())
    
    print("\nComputing the statistics between the 1% and 99% quantiles (to ignore outliers)")
    
    p_01 = seq_length_ser.quantile(0.01)
    p_99 = seq_length_ser.quantile(0.99)
    
    print(seq_length_ser[
    (seq_length_ser >= p_01) & (seq_length_ser < p_99)
    ].describe())

"""
print("English corpus")
print('='*50)
print_sequence_length(train_df["EN"])
print("\nGerman corpus")
print('='*50)

print_sequence_length(train_df["DE"])
"""

'\nprint("English corpus")\nprint(\'=\'*50)\nprint_sequence_length(train_df["EN"])\nprint("\nGerman corpus")\nprint(\'=\'*50)\n\nprint_sequence_length(train_df["DE"])\n'

**Explanation**: Here, we analyze the sequence lengths for both corpora. The results help determine appropriate sequence lengths for the encoder and decoder.

---

### **Step 5**: Vocabulary Analysis

In [5]:
from collections import Counter

en_words = train_df["EN"].str.split().sum()
de_words = train_df["DE"].str.split().sum()
n=10

def get_vocabulary_size_greater_than(words, n, verbose=True):
    """
    Get the vocabulary size above a certain threshold
    """
    counter = Counter(words)
    
    freq_df = pd.Series(
    list(counter.values()),
    index=list(counter.keys())
    ).sort_values(ascending=False)
    
    if verbose:
        print(freq_df.head(n=10))
    n_vocab = (freq_df>=n).sum()
    if verbose:
        print("\nVocabulary size (>={} frequent): {}".format(n, n_vocab))
    return n_vocab
'''
print("English corpus")
print('='*50)
'''
en_vocab = get_vocabulary_size_greater_than(en_words, n)
'''
print("\nGerman corpus")
print('='*50)
'''
de_vocab = get_vocabulary_size_greater_than(de_words, n)

Tom    9228
to     8700
I      8620
the    6766
you    6136
a      5741
is     4141
in     2639
of     2470
was    2380
dtype: int64

Vocabulary size (>=10 frequent): 2218
sos      40000
eos      40000
Tom       9713
Ich       7964
ist       4735
nicht     4616
zu        3606
Sie       3441
du        3132
das       2987
dtype: int64

Vocabulary size (>=10 frequent): 2483


**Explanation**: We compute the vocabulary size for both languages, focusing on words that appear at least 10 times. This informs the TextVectorization step later in the pipeline.

---

In [6]:
# Define sequence lengths with some extra space for longer sequences
en_seq_length = 19
de_seq_length = 21

print("EN vocabulary size: {}".format(en_vocab))
print("DE vocabulary size: {}".format(de_vocab))
print("\n")

print("EN max sequence length: {}".format(en_seq_length))
print("DE max sequence length: {}".format(de_seq_length))


EN vocabulary size: 2218
DE vocabulary size: 2483


EN max sequence length: 19
DE max sequence length: 21


In [7]:

print("EN vocabulary size: {}".format(en_vocab))
print("DE vocabulary size: {}".format(de_vocab))
print("\n")
# Define sequence lengths with some extra space for longer sequences
en_seq_length = 19
de_seq_length = 21
print("EN max sequence length: {}".format(en_seq_length))
print("DE max sequence length: {}".format(de_seq_length))


EN vocabulary size: 2218
DE vocabulary size: 2483


EN max sequence length: 19
DE max sequence length: 21


In [8]:
"""
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import TextVectorization

en_vectorize_layer = TextVectorization(
max_tokens=en_vocab,
output_mode='int',
output_sequence_length=None
)

en_vectorize_layer.adapt(np.array(train_df["EN"].tolist()).astype('str'))
print(en_vectorize_layer.get_vocabulary()[:10])
print(len(en_vectorize_layer.get_vocabulary()))
"""

'\nimport numpy as np\nimport tensorflow as tf\nfrom tensorflow.keras.layers import TextVectorization\n\nen_vectorize_layer = TextVectorization(\nmax_tokens=en_vocab,\noutput_mode=\'int\',\noutput_sequence_length=None\n)\n\nen_vectorize_layer.adapt(np.array(train_df["EN"].tolist()).astype(\'str\'))\nprint(en_vectorize_layer.get_vocabulary()[:10])\nprint(len(en_vectorize_layer.get_vocabulary()))\n'

In [9]:
"""
import numpy as np
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import TextVectorization

# Vocabulary size and sequence length
en_vocab = 1000

# Define the TextVectorization layer
en_vectorize_layer = TextVectorization(
    max_tokens=en_vocab,
    output_mode='int',
    output_sequence_length=10  # Adjust sequence length as needed
)

# Adapt the TextVectorization layer to a sample dataset
sample_text = ["run", "how are you", "ectoplasmic residue"]
en_vectorize_layer.adapt(sample_text)

# Create a toy model with the TextVectorization layer
toy_model = Sequential([en_vectorize_layer])

# Input data (ensure it's a TensorFlow tensor of strings)
input_data = tf.constant(["run", "how are you", "ectoplasmic residue"])

# Predict token IDs using the model
pred = toy_model.predict(input_data)

# Display results
print("Input data: \n{}\n".format(input_data.numpy()))
print("\nToken IDs: \n{}".format(pred))
"""

'\nimport numpy as np\nimport tensorflow as tf\nfrom tensorflow.keras import Sequential\nfrom tensorflow.keras.layers import TextVectorization\n\n# Vocabulary size and sequence length\nen_vocab = 1000\n\n# Define the TextVectorization layer\nen_vectorize_layer = TextVectorization(\n    max_tokens=en_vocab,\n    output_mode=\'int\',\n    output_sequence_length=10  # Adjust sequence length as needed\n)\n\n# Adapt the TextVectorization layer to a sample dataset\nsample_text = ["run", "how are you", "ectoplasmic residue"]\nen_vectorize_layer.adapt(sample_text)\n\n# Create a toy model with the TextVectorization layer\ntoy_model = Sequential([en_vectorize_layer])\n\n# Input data (ensure it\'s a TensorFlow tensor of strings)\ninput_data = tf.constant(["run", "how are you", "ectoplasmic residue"])\n\n# Predict token IDs using the model\npred = toy_model.predict(input_data)\n\n# Display results\nprint("Input data: \n{}\n".format(input_data.numpy()))\nprint("\nToken IDs: \n{}".format(pred))\n'

In [10]:
def get_vectorizer(
    corpus, n_vocab=100, max_length=None, return_vocabulary=True, name=None
    ):
    """ Return a text vectorization layer or a model """
    inp = tf.keras.Input(shape=(1,), dtype=tf.string, name='encoder_input')
    vectorize_layer = tf.keras.layers.TextVectorization(
                                                        max_tokens=n_vocab+2,
                                                        output_mode='int',
                                                        output_sequence_length=max_length,
                                                        )
    vectorize_layer.adapt(corpus)
    vectorized_out = vectorize_layer(inp)
    
    if not return_vocabulary:
        return tf.keras.models.Model(
    inputs=inp, outputs=vectorized_out, name=name
    )
    else:
        return tf.keras.models.Model(inputs=inp, outputs=vectorized_out, name=name), vectorize_layer.get_vocabulary()

In [11]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import TextVectorization

# Get the English vectorizer/vocabulary
en_vectorizer, en_vocabulary = get_vectorizer(
corpus=np.array(train_df["EN"].tolist()), n_vocab=en_vocab,
max_length=en_seq_length, name='en_vectorizer'
)

# Get the German vectorizer/vocabulary
de_vectorizer, de_vocabulary = get_vectorizer(
corpus=np.array(train_df["DE"].tolist()), n_vocab=de_vocab,
max_length=de_seq_length-1, name='de_vectorizer'
)

In [12]:
'''
n_vocab = 100

# The input is (None,1) shaped and accepts an array of strings
inp = tf.keras.Input(shape=(1,), dtype=tf.string, name='e_input')

# Vectorize the data (assign token IDs)
vectorized_out = en_vectorizer(inp)

# Define an embedding layer to convert IDs to word vectors
emb_layer = tf.keras.layers.Embedding(
input_dim=n_vocab+2, output_dim=128, mask_zero=True, name='e_embedding')

# Get the embeddings of the token IDs
emb_out = emb_layer(vectorized_out)

gru_layer = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(128))
gru_out = gru_layer(emb_out)

encoder = tf.keras.models.Model(inputs=inp, outputs=gru_out)
'''

"\nn_vocab = 100\n\n# The input is (None,1) shaped and accepts an array of strings\ninp = tf.keras.Input(shape=(1,), dtype=tf.string, name='e_input')\n\n# Vectorize the data (assign token IDs)\nvectorized_out = en_vectorizer(inp)\n\n# Define an embedding layer to convert IDs to word vectors\nemb_layer = tf.keras.layers.Embedding(\ninput_dim=n_vocab+2, output_dim=128, mask_zero=True, name='e_embedding')\n\n# Get the embeddings of the token IDs\nemb_out = emb_layer(vectorized_out)\n\ngru_layer = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(128))\ngru_out = gru_layer(emb_out)\n\nencoder = tf.keras.models.Model(inputs=inp, outputs=gru_out)\n"

In [13]:
import tensorflow as tf

def build_encoder(n_vocab, vectorizer):
    """
    Build the encoder for the seq2seq model.
    """
    # Input layer
    encoder_input = tf.keras.Input(shape=(1,), dtype=tf.string, name='encoder_input')
    
    # Text vectorization
    vectorized_output = vectorizer(encoder_input)
    
    # Embedding layer
    embedding_layer = tf.keras.layers.Embedding(
        input_dim=n_vocab + 2, 
        output_dim=128, 
        mask_zero=True, 
        name='encoder_embedding'
    )
    embedded_output = embedding_layer(vectorized_output)
    
    # Bidirectional GRU
    bidirectional_gru = tf.keras.layers.Bidirectional(
        tf.keras.layers.GRU(128, name='encoder_gru'), 
        name='encoder_bidirectional_gru'
    )
    encoder_output = bidirectional_gru(embedded_output)
    
    # Define encoder model
    encoder_model = tf.keras.models.Model(
        inputs=encoder_input, 
        outputs=encoder_output, 
        name='encoder'
    )
    return encoder_model


def build_decoder(n_vocab, encoder, vectorizer):
    """
    Build the decoder for the seq2seq model.
    """
    # Encoder input for initializing the decoder's GRU state
    encoder_input = tf.keras.Input(shape=(1,), dtype=tf.string, name='encoder_input_final')
    decoder_initial_state = encoder(encoder_input)

    # Decoder input
    decoder_input = tf.keras.Input(shape=(1,), dtype=tf.string, name='decoder_input')
    
    # Text vectorization for decoder input
    vectorized_output = vectorizer(decoder_input)
    
    # Embedding layer
    embedding_layer = tf.keras.layers.Embedding(
        input_dim=n_vocab + 2, 
        output_dim=128, 
        mask_zero=True, 
        name='decoder_embedding'
    )
    embedded_output = embedding_layer(vectorized_output)
    
    # GRU layer
    gru_layer = tf.keras.layers.GRU(
        256, 
        return_sequences=True, 
        name='decoder_gru'
    )
    gru_output = gru_layer(embedded_output, initial_state=decoder_initial_state)
    
    # Dense layers for output
    dense_layer_1 = tf.keras.layers.Dense(
        512, 
        activation='relu', 
        name='decoder_dense_1'
    )
    dense_output_1 = dense_layer_1(gru_output)
    
    dense_layer_final = tf.keras.layers.Dense(
        n_vocab + 2, 
        activation='softmax', 
        name='decoder_dense_final'
    )
    decoder_output = dense_layer_final(dense_output_1)
    
    # Define decoder model
    decoder_model = tf.keras.models.Model(
        inputs=[encoder_input, decoder_input], 
        outputs=decoder_output, 
        name='decoder'
    )
    return decoder_model

In [14]:
"""
e_inp = tf.keras.Input(shape=(1,), dtype=tf.string, name='e_input_final')
d_init_state = encoder(e_inp)

d_inp = tf.keras.Input(shape=(1,), dtype=tf.string, name='d_input')
vectorized_out = de_vectorizer(inp)

emb_layer = tf.keras.layers.Embedding(
input_dim=n_vocab+2, output_dim=128, mask_zero=True, name='d_embedding'
)
emb_out = emb_layer(vectorized_out)

gru_layer = tf.keras.layers.GRU(256, return_sequences=True)
gru_out = gru_layer(emb_out, initial_state=d_init_state)
"""

"\ne_inp = tf.keras.Input(shape=(1,), dtype=tf.string, name='e_input_final')\nd_init_state = encoder(e_inp)\n\nd_inp = tf.keras.Input(shape=(1,), dtype=tf.string, name='d_input')\nvectorized_out = de_vectorizer(inp)\n\nemb_layer = tf.keras.layers.Embedding(\ninput_dim=n_vocab+2, output_dim=128, mask_zero=True, name='d_embedding'\n)\nemb_out = emb_layer(vectorized_out)\n\ngru_layer = tf.keras.layers.GRU(256, return_sequences=True)\ngru_out = gru_layer(emb_out, initial_state=d_init_state)\n"

In [15]:
def build_seq2seq_model(encoder, decoder):
    """
    Build the final seq2seq model using the pre-built encoder and decoder.
    """
    # Encoder input
    encoder_input = tf.keras.Input(shape=(1,), dtype=tf.string, name='encoder_input_final')
    encoder_output = encoder(encoder_input)

    # Decoder input
    decoder_input = tf.keras.Input(shape=(1,), dtype=tf.string, name='decoder_input')

    # Use the decoder model
    decoder_output = decoder([encoder_input, decoder_input])

    # Define the seq2seq model
    seq2seq_model = tf.keras.models.Model(
        inputs=[encoder_input, decoder_input], 
        outputs=decoder_output, 
        name='seq2seq_model'
    )
    return seq2seq_model


# Usage
# Assuming `en_vocab`, `en_vectorizer`, and `de_vectorizer` are defined
encoder = build_encoder(en_vocab, en_vectorizer)
decoder = build_decoder(de_vocab, encoder, de_vectorizer)
seq2seq_model = build_seq2seq_model(encoder, decoder)

# Summaries
encoder.summary()
decoder.summary()
seq2seq_model.summary()

In [16]:
"""
# Get the English vectorizer/vocabulary
en_vectorizer, en_vocabulary = get_vectorizer(
corpus=np.array(train_df["EN"].tolist()), n_vocab=en_vocab,
max_length=en_seq_length, name='e_vectorizer'
)
# Get the German vectorizer/vocabulary
de_vectorizer, de_vocabulary = get_vectorizer(corpus=np.array(train_df["DE"].tolist()), n_vocab=de_vocab,
max_length=de_seq_length-1, name='d_vectorizer'
)
# Define the final model
encoder = get_encoder(n_vocab=en_vocab, vectorizer=en_vectorizer)
final_model = get_final_seq2seq_model(
n_vocab=de_vocab, encoder=encoder, vectorizer=de_vectorizer
)
"""

'\n# Get the English vectorizer/vocabulary\nen_vectorizer, en_vocabulary = get_vectorizer(\ncorpus=np.array(train_df["EN"].tolist()), n_vocab=en_vocab,\nmax_length=en_seq_length, name=\'e_vectorizer\'\n)\n# Get the German vectorizer/vocabulary\nde_vectorizer, de_vocabulary = get_vectorizer(corpus=np.array(train_df["DE"].tolist()), n_vocab=de_vocab,\nmax_length=de_seq_length-1, name=\'d_vectorizer\'\n)\n# Define the final model\nencoder = get_encoder(n_vocab=en_vocab, vectorizer=en_vectorizer)\nfinal_model = get_final_seq2seq_model(\nn_vocab=de_vocab, encoder=encoder, vectorizer=de_vectorizer\n)\n'

In [17]:
# Ensure [UNK] is at the beginning of the vocabulary
en_vocabulary = [v for v in en_vocabulary if v != '[UNK]']  # Remove existing [UNK]
en_vocabulary = ['[UNK]'] + en_vocabulary  # Add [UNK] at the start

de_vocabulary = [v for v in de_vocabulary if v != '[UNK]']  # Remove existing [UNK]
de_vocabulary = ['[UNK]'] + de_vocabulary  # Add [UNK] at the start

In [18]:
from tensorflow.keras.metrics import SparseCategoricalAccuracy
seq2seq_model.compile(
loss='sparse_categorical_crossentropy',
optimizer='adam',
metrics=['accuracy']
)

In [19]:
seq2seq_model.summary()

In [20]:
def prepare_data(train_df, valid_df, test_df):
    """
    Create a data dictionary from the dataframes containing data
    """
    data_dict = {}
    for label, df in zip(
        ['train', 'valid', 'test'], [train_df, valid_df, test_df]
        ):
        en_inputs = np.array(df["EN"].tolist())
        de_inputs = np.array(
        df["DE"].str.rsplit(n=1, expand=True).iloc[:,0].tolist()
        )
        de_labels = np.array(
        df["DE"].str.split(n=1, expand=True).iloc[:,1].tolist()
        )
        data_dict[label] = {
        'encoder_inputs': en_inputs,
        'decoder_inputs': de_inputs,
        'decoder_labels': de_labels
        }
    return data_dict

In [21]:
def shuffle_data(en_inputs, de_inputs, de_labels, shuffle_indices=None):
    """
    Shuffle the data randomly (but all of inputs and labels at ones)
    """
    if shuffle_indices is None:
        shuffle_indices = np.random.permutation(np.arange(en_inputs.shape[0]))
    else:
        shuffle_indices = np.random.permutation(shuffle_indices)
    return (
    en_inputs[shuffle_indices],
    de_inputs[shuffle_indices],
    de_labels[shuffle_indices]
    ), shuffle_indices

In [22]:
from tensorflow.keras.layers import StringLookup
from nmt_bleu import compute_bleu

class BLEUMetric:
    def __init__(self, vocabulary, name='bleu', **kwargs):
        """
        Computes the BLEU score for machine translation.
        """
        super().__init__()
        self.vocab = vocabulary
        self.id_to_token_layer = StringLookup(
            vocabulary=self.vocab,
            invert=True,
            num_oov_indices=1  # Allow one OOV token
        )

    def calculate_bleu_from_predictions(self, real, pred):
        """
        Calculate BLEU score for targets and predictions.
        """
        pred_argmax = tf.argmax(pred, axis=-1)  # Get predicted IDs
        pred_tokens = self.id_to_token_layer(pred_argmax)  # Convert to tokens
        real_tokens = self.id_to_token_layer(real)  # Convert to tokens
        
        # Clean and process tokens
        def clean_text(tokens):
            # Remove padding, strip unwanted tokens
            t = tf.strings.strip(
                tf.strings.regex_replace(
                    tf.strings.join(tf.transpose(tokens), separator=' '),
                    "eos.*", ""  # Remove everything after "eos"
                )
            )
            t = np.char.decode(t.numpy().astype(np.bytes_), encoding='utf-8')
            t = [doc if len(doc) > 0 else '[UNK]' for doc in t]
            return np.char.split(t).tolist()
        
        pred_tokens = clean_text(pred_tokens)
        real_tokens = [[r] for r in clean_text(real_tokens)]  # Format for BLEU

        # Compute BLEU using the provided `compute_bleu` function
        bleu, _, _, _, _, _ = compute_bleu(real_tokens, pred_tokens)
        return bleu

In [23]:
"""
translation = [['[UNK]', '[UNK]', 'mÃssen', 'wir', 'in', 'erfahrung', 'bringen', 'wo', 'sie', 'wohnen']]
reference = [[['als', 'mÃssen', 'mÃssen', 'wir', 'in', 'erfahrung', 'bringen', 'wo', 'sie', 'wohnen']]]

bleu1, _, _, _, _, _ = compute_bleu(reference, translation)

translation = [['[UNK]', 'einmal', 'mÃssen', '[UNK]', 'in', 'erfahrung', 'bringen', 'wo', 'sie', 'wohnen']]
reference = [[['als', 'mÃssen', 'mÃssen', 'wir', 'in', 'erfahrung', 'bringen', 'wo', 'sie', 'wohnen']]]


bleu2, _, _, _, _, _ = compute_bleu(reference, translation)

print("BLEU score with longer correctly predicte phrases: {}".format(bleu1))
print("BLEU score without longer correctly predicte phrases: {}".format(bleu2))
"""

'\ntranslation = [[\'[UNK]\', \'[UNK]\', \'mÃssen\', \'wir\', \'in\', \'erfahrung\', \'bringen\', \'wo\', \'sie\', \'wohnen\']]\nreference = [[[\'als\', \'mÃssen\', \'mÃssen\', \'wir\', \'in\', \'erfahrung\', \'bringen\', \'wo\', \'sie\', \'wohnen\']]]\n\nbleu1, _, _, _, _, _ = compute_bleu(reference, translation)\n\ntranslation = [[\'[UNK]\', \'einmal\', \'mÃssen\', \'[UNK]\', \'in\', \'erfahrung\', \'bringen\', \'wo\', \'sie\', \'wohnen\']]\nreference = [[[\'als\', \'mÃssen\', \'mÃssen\', \'wir\', \'in\', \'erfahrung\', \'bringen\', \'wo\', \'sie\', \'wohnen\']]]\n\n\nbleu2, _, _, _, _, _ = compute_bleu(reference, translation)\n\nprint("BLEU score with longer correctly predicte phrases: {}".format(bleu1))\nprint("BLEU score without longer correctly predicte phrases: {}".format(bleu2))\n'

In [24]:
def clean_text(tokens):
    

    # 3. Strip the string of any extra white spaces
    translations_in_bytes = tf.strings.strip(
        # 2. Replace everything after the eos token with blank
        tf.strings.regex_replace(
            # 1. Join all the tokens to one string in each sequence
            tf.strings.join(tf.transpose(tokens), separator=' '),
                "eos.*", ""
                ),
                )
    # Decode the byte stream to a string
    translations = np.char.decode(translations_in_bytes.numpy().astype(np.bytes_), encoding='utf-8')
    
    # If the string is empty, add a [UNK] token
    # Otherwise get a Division by zero error
    translations = [sent if len(sent)>0 else '[UNK]' for sent in translations ]
    
    # Split the sequences to individual tokens
    translations = np.char.split(translations).tolist()
    return translations

In [25]:
def evaluate_model(model, vectorizer, en_inputs_raw, de_inputs_raw, de_labels_raw, epochs, batch_size):
    """Evaluate the model on various metrics."""
    loss_log, accuracy_log, bleu_log = [], [], []
    bleu_metric = BLEUMetric(de_vocabulary)
    n_batches = en_inputs_raw.shape[0] // batch_size

    for i in range(n_batches):
        print(f"Evaluating batch {i + 1}/{n_batches}", end="\r")

        # Convert inputs to tensors
        x = [
            tf.convert_to_tensor(en_inputs_raw[i * batch_size:(i + 1) * batch_size], dtype=tf.string),
            tf.convert_to_tensor(de_inputs_raw[i * batch_size:(i + 1) * batch_size], dtype=tf.string),
        ]
        # Convert labels to integer token IDs
        y = tf.convert_to_tensor(vectorizer(de_labels_raw[i * batch_size:(i + 1) * batch_size]))

        # Evaluate model
        loss, accuracy = model.evaluate(x, y, verbose=0)
        pred_y = model.predict(x)

        # Compute BLEU score
        bleu = bleu_metric.calculate_bleu_from_predictions(y, pred_y)

        loss_log.append(loss)
        accuracy_log.append(accuracy)
        bleu_log.append(bleu)

    return np.mean(loss_log), np.mean(accuracy_log), np.mean(bleu_log)

In [26]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.models import load_model

# Function to save the model
def save_model(model, model_path='seq2seq_translatorX.h5'):
    """
    Saves the trained model to the specified file.
    
    Args:
        model: Trained Keras model to save.
        model_path (str): Path where the model will be saved.
    """
    model.save(model_path)
    print(f'Model saved to {model_path}')
    
def train_model(model, vectorizer, train_df, valid_df, test_df, epochs, batch_size):
    """Train the model and evaluate on validation/test sets."""
    bleu_metric = BLEUMetric(de_vocabulary)
    data_dict = prepare_data(train_df, valid_df, test_df)
    shuffle_inds = None

    for epoch in range(epochs):
        bleu_log, accuracy_log, loss_log = [], [], []

        # Shuffle the training data
        (en_inputs_raw, de_inputs_raw, de_labels_raw), shuffle_inds = shuffle_data(
            data_dict['train']['encoder_inputs'],
            data_dict['train']['decoder_inputs'],
            data_dict['train']['decoder_labels'],
            shuffle_inds
        )
        n_train_batches = en_inputs_raw.shape[0] // batch_size

        print(f"\nEpoch {epoch + 1}/{epochs}")

        # Training Loop
        for i in range(n_train_batches):
            print(f"Training batch {i + 1}/{n_train_batches}", end="\r")
            x = [
                tf.convert_to_tensor(en_inputs_raw[i * batch_size:(i + 1) * batch_size], dtype=tf.string),
                tf.convert_to_tensor(de_inputs_raw[i * batch_size:(i + 1) * batch_size], dtype=tf.string),
            ]
            y = tf.convert_to_tensor(vectorizer(de_labels_raw[i * batch_size:(i + 1) * batch_size]))

            # Train on batch
            model.train_on_batch(x, y)

            # Track metrics
            loss, accuracy = model.evaluate(x, y, verbose=0)
            pred_y = model.predict(x)
            bleu = bleu_metric.calculate_bleu_from_predictions(y, pred_y)

            loss_log.append(loss)
            accuracy_log.append(accuracy)
            bleu_log.append(bleu)

        print(f"\t(train) loss: {np.mean(loss_log):.4f} - accuracy: {np.mean(accuracy_log):.4f} - bleu: {np.mean(bleu_log):.4f}")

        # Validation after the epoch
        val_loss, val_accuracy, val_bleu = evaluate_model(
            model,
            vectorizer,
            data_dict['valid']['encoder_inputs'],
            data_dict['valid']['decoder_inputs'],
            data_dict['valid']['decoder_labels'],
            epochs=1,
            batch_size=batch_size
        )
        print(f"\t(valid) loss: {val_loss:.4f} - accuracy: {val_accuracy:.4f} - bleu: {val_bleu:.4f}")

    # Test evaluation after all epochs
    test_loss, test_accuracy, test_bleu = evaluate_model(
        model,
        vectorizer,
        data_dict['test']['encoder_inputs'],
        data_dict['test']['decoder_inputs'],
        data_dict['test']['decoder_labels'],
        epochs=1,
        batch_size=batch_size
    )
    print(f"\n(test) loss: {test_loss:.4f} - accuracy: {test_accuracy:.4f} - bleu: {test_bleu:.4f}")
    print("Training complete.")

    model_path = "en2de_translatorX.h5"
    save_model(seq2seq_model, model_path)

epochs = 2
batch_size = 1028

In [27]:
train_model(seq2seq_model, de_vectorizer, train_df, valid_df, test_df,epochs, batch_size)


Epoch 1/2
Training batch 1/38



[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 20ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step 
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━




(test) loss: 4.7856 - accuracy: 0.0853 - bleu: 0.0000
Training complete.
Model saved to en2de_translatorX.h5


In [28]:
def load_model_from_file(model_path='seq2seq_translatorX.h5'):
    """
    Loads the model from the specified file.
    
    Args:
        model_path (str): Path from where the model will be loaded.
        
    Returns:
        Loaded Keras model.
    """
    model = load_model(model_path, custom_objects={'TextVectorization': tf.keras.layers.TextVectorization})
    print(f'Model loaded from {model_path}')
    return model

In [29]:
def make_inference(model, en_vectorizer, input_sentence, de_vocab, max_output_length=20):
    """
    Make inference using the trained seq2seq model.
    
    Args:
        model: Loaded seq2seq model.
        en_vectorizer: TextVectorization layer for the English vocabulary.
        input_sentence (str): Input English sentence to translate.
        de_vocab (list): List of German vocabulary words.
        max_output_length (int): Maximum length of the translated output sequence.
        
    Returns:
        str: Translated German sentence.
    """
    # Vectorize the input sentence
    input_vector = tf.convert_to_tensor([input_sentence], dtype=tf.string)
    input_token_ids = en_vectorizer(input_vector)

    # Initialize the context vector using the encoder
    context_vector = model.get_layer("Encoder")(input_token_ids)

    # Initialize the decoder input with the start token (e.g., "sos")
    start_token_index = de_vocab.index("sos")
    decoder_input = tf.convert_to_tensor([[start_token_index]], dtype=tf.int32)

    output_tokens = []
    for _ in range(max_output_length):
        # Make a prediction using the decoder
        logits = model.get_layer("Decoder")([decoder_input, context_vector])
        predicted_token_id = tf.argmax(logits, axis=-1).numpy()[0, -1]

        # If the end token is predicted, break the loop
        if de_vocab[predicted_token_id] == "eos":
            break

        # Append the predicted token to the output tokens list
        output_tokens.append(de_vocab[predicted_token_id])

        # Update the decoder input for the next step
        decoder_input = tf.convert_to_tensor([[predicted_token_id]], dtype=tf.int32)

    # Join the output tokens to form the translated sentence
    translated_sentence = " ".join(output_tokens)
    return translated_sentence

In [30]:
loaded_model = load_model_from_file(model_path='en2de_translatorX.h5')


ValueError: Unknown layer: 'NotEqual'. Please ensure you are using a `keras.utils.custom_object_scope` and that this object is included in the scope. See https://www.tensorflow.org/guide/keras/save_and_serialize#registering_the_custom_object for details.

In [None]:
input_sentence = "How are you?"
translated_sentence = make_inference(loaded_model, en_vectorizer, input_sentence, de_vocabulary)
print(f'Translated Sentence: {translated_sentence}')