In [58]:
from google.colab import drive # Link your drive if you are a colab user
drive.mount('/content/drive') # Models in this HW take a long time to get trained and make sure to save it her

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [59]:
import pathlib
import random
import pandas as pd
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras

from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
from sklearn.model_selection import train_test_split

In [60]:
# I have cleaned the data from my jupyter notebook
df = pd.read_csv('/content/drive/MyDrive/Kinya_English_sentences.csv')
df.head()

Unnamed: 0,Kinyarwanda,English
0,Ababyeyi babo bamaze gupfa basogokuru barabare...,After their parents died their grandparents ra...
1,Waba uzi umubare wabantu bitabiriye kubyina ic...,Do you know the number of people who attended ...
2,Jya kuryama Ugomba kubyuka kare ejo mugitondo.,Go to bed You have to wake up early tomorrow m...
3,Nzasimbuza igihe cyatakaye nkora uko nshoboye.,I will replace the lost time by doing my best.
4,Ngomba kumanika nonaha. Umuntu ategereje gukor...,I have to hang up now. Someone is waiting to u...


In [61]:
Kinyarwanda = list(df["Kinyarwanda"])

English = list(df["English"])

In [62]:
print(f'Kinyarwanda length {len(Kinyarwanda)}')
print(f'English length {len(English)}')

Kinyarwanda length 25013
English length 25013


In [72]:
text_pairs = []
for i in range(len(df)):
  kiny, eng = df["Kinyarwanda"][i], df["English"][i]
  eng = "[start] " + eng + " [end]"
  text_pairs.append((kiny,eng))
  
text_pairs[:4]

[('Ababyeyi babo bamaze gupfa basogokuru barabareze. ',
  '[start] After their parents died their grandparents raised them. [end]'),
 ('Waba uzi umubare wabantu bitabiriye kubyina icyumweru gishize? ',
  '[start] Do you know the number of people who attended the dance last week? [end]'),
 ('Jya kuryama Ugomba kubyuka kare ejo mugitondo. ',
  '[start] Go to bed You have to wake up early tomorrow morning. [end]'),
 ('Nzasimbuza igihe cyatakaye nkora uko nshoboye. ',
  '[start] I will replace the lost time by doing my best. [end]')]

In [73]:
# Sanity Check
for _ in range(5):
    print(random.choice(text_pairs))

('Mwebwe murashaka kwinezeza? ', '[start] Do you want to have fun? [end]')
('Hari umuntu ushinzwe gusukura inzu', '[start] There is someone responsible for cleaning the house [end]')
('Mfite ibintu byinshi ngomba gukora', '[start] I have many things I have to do [end]')
('william azagera i Boston ejo nyuma ya saa sita', '[start] william will arrive in Boston tomorrow afternoon [end]')
('Nzi neza ko ntigeze ntekereza ko  azakora ibintu nkibyo. ', '[start] I�m sure I never thought he would do such a thing. [end]')


In [74]:
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

25013 total pairs
17511 training pairs
3751 validation pairs
3751 test pairs


In [75]:
train_Kiny_texts = [pair[0] for pair in train_pairs]
train_Eng_texts = [pair[1] for pair in train_pairs]

train_Kiny_texts[:3]

['Muganga yakubwiye kuguma mu buriri kugeza umuriro wawe ugabanutse sibyo? ',
 'Umuhungu ntiyitaye ku nama za se',
 'Yamujyanye mu gihugu cyabo Abalayiki ']

In [76]:
train_Eng_texts[:3]

['[start] Did the doctor tell you to stay in bed until your fever subsides right? [end]',
 "[start] The boy did not pay attention to his father's advice [end]",
 '[start] He took him to their own land [end]']

VECTORIZING THE TEXT DATA

In [77]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 5000
sequence_length = 15
batch_size = 8


def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


Kiny_vectorization = TextVectorization(
    max_tokens=vocab_size, 
    output_mode="int", 
    output_sequence_length=sequence_length,
)
Eng_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_Kiny_texts = [pair[0] for pair in train_pairs]
train_Eng_texts = [pair[1] for pair in train_pairs]
Kiny_vectorization.adapt(train_Kiny_texts)
Eng_vectorization.adapt(train_Eng_texts)

In [78]:
def format_dataset(kiny, eng):
    kiny = Kiny_vectorization(kiny)
    eng = Eng_vectorization(eng)
    return ({"encoder_inputs": kiny, "decoder_inputs": eng[:, :-1],}, eng[:, 1:])
    return ({"encoder_inputs": kiny, "decoder_inputs": eng[:, :-1],}, eng[:, 1:])


def make_dataset(pairs):
    Kiny_texts, Eng_texts = zip(*pairs)
    Kiny_texts = list(Kiny_texts)
    Eng_texts = list(Eng_texts)
    dataset = tf.data.Dataset.from_tensor_slices((Kiny_texts, Eng_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [79]:
# Sanity check
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (8, 15)
inputs["decoder_inputs"].shape: (8, 15)
targets.shape: (8, 15)


BUILDING THE MODEL

In [80]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super(PositionalEmbedding, self).__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super(TransformerDecoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

Next, we assemble the end-to-end model.

In [81]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

Training our model

In [None]:
epochs = 30  # This should be at least 30 for convergence

transformer.summary()
transformer.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding_4 (Positi  (None, None, 256)   1283840     ['encoder_inputs[0][0]']         
 onalEmbedding)                                                                                   
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder_2 (Transfo  (None, None, 256)   3155456     ['positional_embedding_

DECODING TEST SENTENCE

In [84]:
Eng_vocab = Eng_vectorization.get_vocabulary()
Eng_index_lookup = dict(zip(range(len(Eng_vocab)), Eng_vocab))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = Kiny_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = Eng_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = Eng_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence


In [85]:
test_Kiny_texts = [pair[0] for pair in test_pairs]
for _ in range(5):
    input_sentence = random.choice(test_Kiny_texts)
    translated = decode_sequence(input_sentence)

In [86]:
for _ in range(3):
    input_sentence = random.choice(test_Kiny_texts)
    print(f'input Kinya sentence: {input_sentence}')
    translated = decode_sequence(input_sentence)
    print(f'output translated in english: {translated}')

input Kinya sentence: Yagenze vuba uko ashoboye kugira ngo amufate. 
output translated in english: [start] [UNK] is you [UNK] to be will be [end]
input Kinya sentence: Kuki umuntu yakwifuza kuba inshuti? 
output translated in english: [start] why you you you will be will do [end]
input Kinya sentence: Sinshaka guta aka gato. 
output translated in english: [start] i if you know to do do do you [end]


In [45]:
test_Kiny_texts[:3]

['iyo ukoresheje ibibabi byinshi wumva ',
 'Nageze i Butara gusura marume',
 'Ukanibagirana  cyane ndetse nkutarahageze']