In [None]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.models import Sequential, load_model
from keras.layers.experimental.preprocessing import TextVectorization

**Downloading the data**

[Anki](https://www.manythings.org/anki/)

In [None]:
text_file1 = keras.utils.get_file(
    fname="pes-eng.zip",
    origin="https://dl.dropboxusercontent.com/s/b7b1xr2tyexhn89/pes-eng.zip",
    extract=True,
)
text_file1 = pathlib.Path(text_file1).parent  / "pes.txt"

In [None]:
with open(text_file1) as f:
    lines = f.read().split("\n")[:-1]

text_persian = []
len(lines)
for line in lines:
  eng = line.split("\t")[0]
  pes = line.split("\t")[1]
  pes = "[start] " + pes + " [end]"
  text_persian.append((eng, pes))

**[MIZAN: A Large Persian-English Parallel Corpus](https://github.com/omidkashefi/Mizan/)**



In [None]:
text_file2 = keras.utils.get_file(
    fname="mizan.zip",
    origin="https://github.com/omidkashefi/Mizan/blob/master/mizan.zip?raw=true",
    extract=True,
)
text_file2_pes = pathlib.Path(text_file2).parent / "mizan" / "mizan_fa.txt"
text_file2_en = pathlib.Path(text_file2).parent / "mizan" / "mizan_en.txt"

Downloading data from https://github.com/omidkashefi/Mizan/blob/master/mizan.zip?raw=true


In [None]:
with open(text_file2_en) as f:
  lines_en = f.read().split("\n")[:-1]

with open(text_file2_pes) as f:
  lines_pes = f.read().split("\n")[:-1]

for line in range(len(lines_en)):
  eng = lines_en[line]
  pes = lines_pes[line]
  text_persian.append((eng, pes))

In [None]:
for _ in range(5):
    print(random.choice(text_persian))

('He came to see me.', '[start] وی آمد تا من را ببیند. [end]')
("I don't even know him.", '[start] من حتی او را نمی شناسم. [end]')
('Where can we find the truth?', '[start] حقیقت را در کجا می توانیم بیابیم؟ [end]')
('Why do you suspect me?', '[start] چرا به من مظنون هستید؟ [end]')
('Tom wants to change the world.', '[start] تام می خواهد جهان را تغییر دهد. [end]')


Now, let's split the sentence persian into a training set, a validation set,
and a test set.

In [None]:
random.shuffle(text_persian)
num_val_samples = int(0.15 * len(text_persian))
num_train_samples = len(text_persian) - 2 * num_val_samples
train_persian = text_persian[:num_train_samples]
val_persian = text_persian[num_train_samples : num_train_samples + num_val_samples]
test_persian = text_persian[num_train_samples + num_val_samples :]

print(f"{len(text_persian)} total persian")
print(f"{len(train_persian)} training persian")
print(f"{len(val_persian)} validation persian")
print(f"{len(test_persian)} test persian")

2521 total persian
1765 training persian
378 validation persian
378 test persian


**Preprocessing And Vectorizing the text data**


In [None]:
strip_chars = string.punctuation + "؟"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 129500
sequence_length = 20
batch_size = 64


def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

# TextVectorization = This layer has basic options for managing text in a Keras model
eng_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
)
pes_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_eng_texts = [pair[0] for pair in train_persian]
train_pes_texts = [pair[1] for pair in train_persian]
eng_vectorization.adapt(train_eng_texts)
pes_vectorization.adapt(train_pes_texts)

def format_dataset(eng, pes):
    eng = eng_vectorization(eng)
    pes = pes_vectorization(pes)
    return ({"encoder_inputs": eng, "decoder_inputs": pes[:, :-1],}, pes[:, 1:])


def make_dataset(persian):
    eng_texts, pes_texts = zip(*persian)
    eng_texts = list(eng_texts)
    pes_texts = list(pes_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, pes_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_persian)
val_ds = make_dataset(val_persian)

**sequence shapes**

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


**Building the model**

In [None]:

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True
 
    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

# To make the model aware of word order
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super(PositionalEmbedding, self).__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super(TransformerDecoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    # A key detail that makes this possible is causal masking 
    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)


**Assemble Model**

In [None]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

# --------- > encoder
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)

# Model groups layers into an object with training and inference features.
encoder = keras.Model(encoder_inputs, encoder_outputs)

# --------- > decoder
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

**Training our model**

In [None]:
epochs = 100
 
transformer.summary()
transformer.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

Model: "transformer"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_inputs (InputLayer)     [(None, None)]       0                                            
__________________________________________________________________________________________________
positional_embedding (Positiona (None, None, 256)    33157120    encoder_inputs[0][0]             
__________________________________________________________________________________________________
decoder_inputs (InputLayer)     [(None, None)]       0                                            
__________________________________________________________________________________________________
transformer_encoder (Transforme (None, None, 256)    3155456     positional_embedding[0][0]       
________________________________________________________________________________________

<tensorflow.python.keras.callbacks.History at 0x7f2c1a34cf10>

**Decoding test**

In [None]:
pes_vocab = pes_vectorization.get_vocabulary()
pes_index_lookup = dict(zip(range(len(pes_vocab)), pes_vocab))
max_decoded_sentence_length = 20


# prediction
def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = pes_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = pes_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
      
        if sampled_token == "[end]":
            break
    return decoded_sentence


# choice random line text in eng
test_eng_texts = [pair[0] for pair in test_persian]
for _ in range(30):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
    
    print(input_sentence)
    print(translated)
    print("")

Tom politely pretended not to notice that Mary had been crying.
[start] تام کند به قطار آسیب زدند [end]

I prefer working to doing nothing.
[start] قهوه را ترجیح می‌دهم که کار نکن [end]

Are you fond of music?
[start] امشب وقتت آزاد است [end]

The food is getting cold.
[start] غذا حاضر است [end]

I never hurt Tom.
[start] من هیچ گاه به تام اعتمادی نکردم [end]

Lead is easily bent.
[start] نبضت عادی است [end]

Listen.
[start] احتیاط [end]

Who?
[start] هدف،آتش [end]

I got over it. You should, too.
[start] من اتاق را با خواهرم شریک هستم [end]

We have a wide choice of books.
[start] ما بیش از 40 سال است که انجام دهی [end]

I've never seen anything like this before.
[start] من هیچگاه دوباره شاد نخواهم شد [end]

Don't shout.
[start] ادامه بده ادامه دادن [end]

This bus will take you to the museum.
[start] این اتوبوس به دست بعدی می شوم [end]

She asked for my help.
[start] او از روز اتاق را انجام دهم [end]

There's nothing worth watching on TV today.
[start] همیشه هیچ چیزی برای خوردن کردن 