# 1. Import Libraries

In [44]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import numpy as np
import tensorflow as tf
from collections import Counter
import pathlib

path_to_file = pathlib.Path("C:/Users/Langat Kevin/Documents/New Bible Pair/lingala_english.txt")

np.random.seed(1234)
tf.random.set_seed(1234)

# 2. Loading and Preprocessing Data

In [45]:
def load_data(path):
    text = path.read_text(encoding="utf-8")
    lines = text.splitlines()
    pairs = [line.split("\t") for line in lines if len(line.split("\t")) == 2]
    context = np.array([context for context, target in pairs])
    target = np.array([target for context, target in pairs])
    return context, target

lingala_sentences, english_sentences = load_data(path_to_file)
sentences = (lingala_sentences, english_sentences)
print(lingala_sentences, "\n\n", english_sentences)

['lingala english'
 'Kasi mabele ezalaki kaka bongobongo mpe ezalaki mpamba, mpe molili ezalaki likoló ya mai mozindo; nguya ya Nzambe ezalaki kotambola epai na epai likoló ya mai. Now the earth was formless and desolate, and there was darkness upon the surface of the watery deep, and Godâ€™s active force was moving about over the surface of the waters.'
 'Nsima na yango, Nzambe amonaki ete pole ezalaki malamu, mpe Nzambe abandaki kokabola pole na molili. After that God saw that the light was good, and God began to divide the light from the darkness.'
 'Na nsima, Nzambe alobaki ete: “Etando ezala kati na mai, mpe mai ekabwana na mibale.” Then God said: “Let there be an expanse between the waters, and let there be a division between the waters and the waters.”'
 'Nzambe abengaki etando Likoló. Mpe mpokwa ekómaki mpe ntɔngɔ etanaki, wana mokolo ya mibale. God called the expanse Heaven. And there was evening and there was morning, a second day.'
 'Nzambe abengaki mokili oyo ekauki Mabele,

# 3. Creating Datasets for Training and Validation

In [46]:
BUFFER_SIZE = len(english_sentences)
BATCH_SIZE = 64

is_train = np.random.uniform(size=(len(lingala_sentences),)) < 0.8
print(is_train)

train_raw = (
    tf.data.Dataset.from_tensor_slices(
        (english_sentences[is_train], lingala_sentences[is_train])
    )
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
)

val_raw = (
    tf.data.Dataset.from_tensor_slices(
        (english_sentences[~is_train], lingala_sentences[~is_train])
    )
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
)

[ True  True  True  True  True  True  True False False False  True  True
  True  True  True  True  True  True  True False  True  True  True  True
 False  True  True  True  True  True False  True False  True  True  True
  True False  True False  True  True  True  True  True  True  True  True
  True  True]


# 4. Text Vectorization

In [47]:
def tf_lower_and_split_punct(text):
    text = tf.strings.lower(text)
    text = tf.strings.regex_replace(text, r"[^ a-z.?!,¿]", "")
    text = tf.strings.regex_replace(text, r"[.?!,¿]", r" \0 ")
    text = tf.strings.strip(text)
    text = tf.strings.join(["[SOS]", text, "[EOS]"], separator=" ")
    return text

max_vocab_size = 12000

english_vectorizer = tf.keras.layers.TextVectorization(
    standardize=tf_lower_and_split_punct, max_tokens=max_vocab_size, output_mode='int', ragged=True
)

english_vectorizer.adapt(train_raw.map(lambda context, target: context))

lingala_vectorizer = tf.keras.layers.TextVectorization(
    standardize=tf_lower_and_split_punct, max_tokens=max_vocab_size, output_mode='int', ragged=True
)

lingala_vectorizer.adapt(train_raw.map(lambda context, target: target))

# 5. Processing Text Model Input

In [48]:
def process_text(context, target):
    context = english_vectorizer(context).to_tensor()
    target = lingala_vectorizer(target)
    targ_in = target[:, :-1].to_tensor()
    targ_out = target[:, 1:].to_tensor()
    return (context, targ_in), targ_out

train_data = train_raw.map(lambda x, y: process_text(x, y), tf.data.AUTOTUNE).repeat()
val_data = val_raw.map(lambda x, y: process_text(x, y), tf.data.AUTOTUNE).repeat()

del train_raw
del val_raw

# 6. Define Loss and Accuracy Functions

In [49]:
def masked_loss(y_true, y_pred):
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
    loss = loss_fn(y_true, y_pred)
    mask = tf.cast(y_true != 0, loss.dtype)
    loss *= mask
    return tf.reduce_sum(loss) / tf.reduce_sum(mask)

def masked_acc(y_true, y_pred):
    y_pred = tf.argmax(y_pred, axis=-1)
    y_pred = tf.cast(y_pred, y_true.dtype)
    match = tf.cast(y_true == y_pred, tf.float32)
    mask = tf.cast(y_true != 0, tf.float32)
    return tf.reduce_sum(match) / tf.reduce_sum(mask)

# 7. Define the Encoder

In [50]:
VOCAB_SIZE = 12000
UNITS = 256

class Encoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, units):
        super(Encoder, self).__init__()
        self.embedding = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=units, mask_zero=True)
        self.rnn = tf.keras.layers.Bidirectional(
            merge_mode="sum",
            layer=tf.keras.layers.LSTM(units=units, return_sequences=True),
        )

    def call(self, context):
        x = self.embedding(context)
        x = self.rnn(x)
        return x

encoder = Encoder(VOCAB_SIZE, UNITS)

for (to_translate, sr_translation), _ in train_data.take(1):
    encoder_output = encoder(to_translate)
    print(f'Tensor of sentences in english has shape: {to_translate.shape}\n')
    print(f'Encoder output has shape: {encoder_output.shape}')

Tensor of sentences in english has shape: (41, 107)

Encoder output has shape: (41, 107, 256)


# 8. Define Cross Attention Layer

In [51]:
class CrossAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(key_dim=units, num_heads=1)
        self.layernorm = tf.keras.layers.LayerNormalization()
        self.add = tf.keras.layers.Add()

    def call(self, context, target, context_mask=None, target_mask=None):
        attn_output, _ = self.mha(query=target, value=context, key=context, attention_mask=context_mask)
        x = self.add([target, attn_output])
        x = self.layernorm(x)
        return x

attention_layer = CrossAttention(UNITS)
sr_translation_embed = tf.keras.layers.Embedding(VOCAB_SIZE, output_dim=UNITS, mask_zero=True)(sr_translation)

context_mask = tf.cast(tf.sequence_mask(tf.reduce_sum(tf.cast(to_translate != 0, tf.int32), axis=1), maxlen=tf.shape(to_translate)[1]), dtype=tf.float32)
target_mask = tf.cast(tf.sequence_mask(tf.reduce_sum(tf.cast(sr_translation != 0, tf.int32), axis=1), maxlen=tf.shape(sr_translation)[1]), dtype=tf.float32)

# Ensure the mask is in the correct shape for MultiHeadAttention
context_mask = context_mask[:, tf.newaxis, tf.newaxis, :]
target_mask = target_mask[:, tf.newaxis, tf.newaxis, :]

attention_result = attention_layer(encoder_output, sr_translation_embed, context_mask=context_mask, target_mask=target_mask)

print(f'Tensor of contexts has shape: {encoder_output.shape}')
print(f'Tensor of translations has shape: {sr_translation_embed.shape}')
print(f'Tensor of attention scores has shape: {attention_result.shape}')

InvalidArgumentError: Exception encountered when calling EinsumDense.call().

[1m{{function_node __wrapped____MklBatchMatMulV2_device_/job:localhost/replica:0/task:0/device:CPU:0}} Matrix size-incompatible: In[0]: [41,117,10496], In[1]: [256,256] 0 0 [Op:BatchMatMulV2] name: [0m

Arguments received by EinsumDense.call():
  • inputs=tf.Tensor(shape=(41, 117, 41, 256), dtype=float32)
  • training=None

# Defining the Decoder

In [None]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, vocab_size, units):
        super(Decoder, self).__init__()
        self.embedding = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=units, mask_zero=True)
        self.pre_attention_rnn = tf.keras.layers.LSTM(units=units, return_sequences=True, return_state=True)
        self.attention = CrossAttention(units)
        self.post_attention_rnn = tf.keras.layers.LSTM(units=units, return_sequences=True)
        self.output_layer = tf.keras.layers.Dense(units=vocab_size, activation=tf.nn.log_softmax)

    def call(self, context, target, state=None, return_state=False):
        x = self.embedding(target)
        x, hidden_state, cell_state = self.pre_attention_rnn(x, initial_state=state)
        x = self.attention(context, x)
        x = self.post_attention_rnn(x)
        logits = self.output_layer(x)
        if return_state:
            return logits, [hidden_state, cell_state]
        return logits

decoder = Decoder(VOCAB_SIZE, UNITS)
logits = decoder(encoder_output, sr_translation)

print(f'Tensor of contexts has shape: {encoder_output.shape}')
print(f'Tensor of right-shifted translations has shape: {sr_translation.shape}')
print(f'Tensor of logits has shape: {logits.shape}')



Tensor of contexts has shape: (41, 107, 256)
Tensor of right-shifted translations has shape: (41, 117)
Tensor of logits has shape: (41, 117, 12000)




# 10. Defining The Tranformer Model

In [None]:
class Translator(tf.keras.Model):
    def __init__(self, vocab_size, units):
        super().__init__()
        self.encoder = Encoder(vocab_size, units)
        self.decoder = Decoder(vocab_size, units)

    def call(self, inputs):
        context, target = inputs
        encoded_context = self.encoder(context)
        logits = self.decoder(encoded_context, target)
        return logits

translator = Translator(VOCAB_SIZE, UNITS)
logits = translator((to_translate, sr_translation))

print(f'Tensor of sentences to translate has shape: {to_translate.shape}')
print(f'Tensor of right-shifted translations has shape: {sr_translation.shape}')
print(f'Tensor of logits has shape: {logits.shape}')



Tensor of sentences to translate has shape: (41, 107)
Tensor of right-shifted translations has shape: (41, 117)
Tensor of logits has shape: (41, 117, 12000)


# 11. Training the Model

In [None]:
def compile_and_train(model, epochs=10, steps_per_epoch=500):
    model.compile(optimizer="adam", loss=masked_loss, metrics=[masked_acc, masked_loss])
    history = model.fit(
        train_data.repeat(),
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        validation_data=val_data,
        validation_steps=50,
        callbacks=[tf.keras.callbacks.EarlyStopping(patience=3)],
    )
    return model, history

trained_translator, history = compile_and_train(translator)

Epoch 1/20
[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3068s[0m 6s/step - loss: 5.1215 - masked_acc: 0.0994 - masked_loss: 5.1215 - val_loss: 7.7612 - val_masked_acc: 0.0391 - val_masked_loss: 3.8806
Epoch 2/20


  self.gen.throw(value)


[1m500/500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4163s[0m 8s/step - loss: 0.5553 - masked_acc: 0.9042 - masked_loss: 0.5553 - val_loss: 8.8777 - val_masked_acc: 0.0374 - val_masked_loss: 4.4388
Epoch 3/20
[1m413/500[0m [32m━━━━━━━━━━━━━━━━[0m[37m━━━━[0m [1m9:00[0m 6s/step - loss: 0.0136 - masked_acc: 1.0000 - masked_loss: 0.0136

KeyboardInterrupt: 

# Generating Next Token

In [None]:
def generate_next_token(decoder, context, next_token, done, state, temperature=0.0):
    logits, state = decoder(context, next_token, state=state, return_state=True)
    logits = logits[:, -1, :]
    if temperature == 0.0:
        next_token = tf.argmax(logits, axis=-1)
    else:
        logits = logits / temperature
        next_token = tf.random.categorical(logits, num_samples=1)
    logits = tf.squeeze(logits)
    next_token = tf.squeeze(next_token)
    logit = logits[next_token].numpy()
    next_token = tf.reshape(next_token, shape=(1,1))
    if next_token == eos_id:
        done = True
    return next_token, logit, state, done

eng_sentence = "I love languages"
texts = tf.convert_to_tensor(eng_sentence)[tf.newaxis]
context = english_vectorizer(texts).to_tensor()
context = encoder(context)
next_token = tf.fill((1,1), sos_id)
state = [tf.random.uniform((1, UNITS)), tf.random.uniform((1, UNITS))]
done = False
next_token, logit, state, done = generate_next_token(decoder, context, next_token, done, state, temperature=0.5)
print(f"Next token: {next_token}\nLogit: {logit:.4f}\nDone? {done}")

# 13. Translating the Sentences

In [None]:
def translate(model, text, max_length=50, temperature=0.0):
    tokens, logits = [], []
    text = tf.convert_to_tensor([text])[tf.newaxis]
    context = english_vectorizer(text).to_tensor()
    context = model.encoder(context)
    next_token = tf.fill((1, 1), sos_id)
    state = [tf.zeros((1, UNITS)), tf.zeros((1, UNITS))]
    done = False
    for _ in range(max_length):
        try:
            next_token, logit, state, done = generate_next_token(
                decoder=model.decoder,
                context=context,
                next_token=next_token,
                done=done,
                state=state,
                temperature=temperature
            )
        except:
            raise Exception("Problem generating the next token")
        if done:
            break
        tokens.append(next_token)
        logits.append(logit)
    tokens = tf.concat(tokens, axis=-1)
    translation = tf.squeeze(tokens_to_text(tokens, id_to_word))
    translation = translation.numpy().decode()
    return translation, logits[-1], tokens

temp = 0.0 
original_sentence = "I love languages"
translation, logit, tokens = translate(trained_translator, original_sentence, temperature=temp)
print(f"Temperature: {temp}\n\nOriginal sentence: {original_sentence}\nTranslation: {translation}\nTranslation tokens:{tokens}\nLogit: {logit:.3f}")

temp = 0.7
original_sentence = "I love languages"
translation, logit, tokens = translate(trained_translator, original_sentence, temperature=temp)
print(f"Temperature: {temp}\n\nOriginal sentence: {original_sentence}\nTranslation: {translation}\nTranslation tokens:{tokens}\nLogit: {logit:.3f}")

# 14. Generating Samples and Similarity Metrics

In [None]:
def generate_samples(model, text, n_samples=4, temperature=0.6):
    samples, log_probs = [], []
    for _ in range(n_samples):
        _, logp, sample = translate(model, text, temperature=temperature)
        samples.append(np.squeeze(sample.numpy()).tolist())
        log_probs.append(logp)
    return samples, log_probs

samples, log_probs = generate_samples(trained_translator, 'I love languages')
for s, l in zip(samples, log_probs):
    print(f"Translated tensor: {s} has logit: {l:.3f}")

def jaccard_similarity(candidate, reference):
    candidate_set = set(candidate)
    reference_set = set(reference)
    common_tokens = candidate_set.intersection(reference_set)
    all_tokens = candidate_set.union(reference_set)
    overlap = len(common_tokens) / len(all_tokens)
    return overlap

l1 = [1, 2, 3]
l2 = [1, 2, 3, 4]
js = jaccard_similarity(l1, l2)
print(f"jaccard similarity between lists: {l1} and {l2} is {js:.3f}")

def rouge1_similarity(candidate, reference):
    candidate_word_counts = Counter(candidate)
    reference_word_counts = Counter(reference)
    overlap = 0
    for token in candidate_word_counts.keys():
        token_count_candidate = candidate_word_counts[token]
        token_count_reference = reference_word_counts[token]
        overlap += min(token_count_candidate, token_count_reference)
    precision = overlap / len(candidate)
    recall = overlap / len(reference)
    if precision + recall != 0:
        f1_score = 2 * (precision * recall) / (precision + recall)
        return f1_score
    return 0

l1 = [1, 2, 3]
l2 = [1, 2, 3, 4]
r1s = rouge1_similarity(l1, l2)
print(f"rouge 1 similarity between lists: {l1} and {l2} is {r1s:.3f}")

def average_overlap(samples, similarity_fn):
    scores = {}
    for index_candidate, candidate in enumerate(samples):
        overlap = 0
        for index_sample, sample in enumerate(samples):
            if index_candidate == index_sample:
                continue
            sample_overlap = similarity_fn(candidate, sample)
            overlap += sample_overlap
        score = overlap / (len(samples) - 1)
        score = round(score, 3)
        scores[index_candidate] = score
    return scores

l1 = [1, 2, 3]
l2 = [1, 2, 4]
l3 = [1, 2, 4, 5]
avg_ovlp = average_overlap([l1, l2, l3], jaccard_similarity)
print(f"average overlap between lists: {l1}, {l2} and {l3} using Jaccard similarity is:\n\n{avg_ovlp}")

l1 = [1, 2, 3]
l2 = [1, 4]
l3 = [1, 2, 4, 5]
l4 = [5, 6]
avg_ovlp = average_overlap([l1, l2, l3, l4], rouge1_similarity)
print(f"average overlap between lists: {l1}, {l2}, {l3} and {l4} using Rouge1 similarity is:\n\n{avg_ovlp}")

def weighted_avg_overlap(samples, log_probs, similarity_fn):
    scores = {}
    for index_candidate, candidate in enumerate(samples):
        overlap, weight_sum = 0.0, 0.0
        for index_sample, (sample, logp) in enumerate(zip(samples, log_probs)):
            if index_candidate == index_sample:
                continue
            sample_p = float(np.exp(logp))
            weight_sum += sample_p
            sample_overlap = similarity_fn(candidate, sample)
            overlap += sample_p * sample_overlap
        score = overlap / weight_sum
        score = round(score, 3)
        scores[index_candidate] = score
    return scores

l1 = [1, 2, 3]
l2 = [1, 2, 4]
l3 = [1, 2, 4, 5]
log_probs = [0.4, 0.2, 0.5]
w_avg_ovlp = weighted_avg_overlap([l1, l2, l3], log_probs, jaccard_similarity)
print(f"weighted average overlap using Jaccard similarity is:\n\n{w_avg_ovlp}")

# 15. Minimum Bayes Risk Decoding

In [None]:
def mbr_decode(model, text, n_samples=5, temperature=0.6, similarity_fn=jaccard_similarity):
    samples, log_probs = generate_samples(model, text, n_samples=n_samples, temperature=temperature)
    scores = weighted_avg_overlap(samples, log_probs, similarity_fn)
    decoded_translations = [tokens_to_text(s, id_to_word).numpy().decode('utf-8') for s in samples]
    max_score_key = max(scores, key=lambda k: scores[k])
    translation = decoded_translations[max_score_key]
    return translation, decoded_translations

english_sentence = "I love languages"
translation, candidates = mbr_decode(trained_translator, english_sentence, n_samples=10, temperature=0.6)
print("Translation candidates:")
for c in candidates:
    print(c)
print(f"\nSelected translation: {translation}")