In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers,callbacks
from tensorflow.keras.layers import TextVectorization
import random
import string
import re

# 1.preparing data

### download and unzip dataset from http://www.manythings.org/anki/pes-eng.zip

## Cleaning data

In [None]:
text_file = "pes.txt"
with open(text_file,"r",encoding="utf8")as f:
    lines = f.readlines()
    
text_pairs =[]

for line in lines:
    (english,persian,_) = line.split("\t")
    text_pairs.append((english,"[start] "+persian+" [end]"))

##  split dataset into train, validation and test

In [None]:
random.shuffle(text_pairs)
num_val_samples = int(0.15* len(text_pairs))
num_train_samples = len(text_file) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples+num_val_samples]
test_pairs = text_pairs[num_train_samples+num_val_samples:]

###  Persian punctuation  is added

In [None]:
strip_chars = string.punctuation+"؟"+"،"

###  Since [start] , [end] is neccessary in our dataset we should preserve them so we remove "[" and "]" from punctuations

In [None]:
strip_chars = strip_chars.replace("[","")
strip_chars = strip_chars.replace("]","")

## 1.3  preparing Text vectorization one for english and one for persian

In [None]:
def custome_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase,f"[{re.escape(strip_chars)}]","")

In [None]:
vocab_size = 15000
sequence_lenght = 20
source_vectorization = TextVectorization(
    max_tokens= vocab_size,
    output_mode="int",
    output_sequence_length=sequence_lenght
)

target_vectorization = TextVectorization(
    max_tokens= vocab_size,
    output_mode="int",
    output_sequence_length = sequence_lenght+1,
    standardize = custome_standardization,
)

train_english_text = [pair[0] for pair in train_pairs]
train_persian_text = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_text)
target_vectorization.adapt(train_persian_text)

## Preparing datasets for the translation task

In [None]:
batch_size = 64
def format_dataset(eng, per):
    eng = source_vectorization(eng)
    per = target_vectorization(per)
    return({
        "english":eng,
        "persian":per[:,:-1],
    },per[:,1:])

def make_dataset(pairs):
    eng_texts, per_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    per_texts = list(per_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts,per_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset,num_parallel_calls=8)
    return dataset.shuffle(1402).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [None]:
train_ds

In [None]:
for inputs,targets in train_ds.take(1):
    print(f"inputs['english'].shape: {inputs['english'].shape}")
    print(f"inputs['persian'].shape: {inputs['persian'].shape}")
    print(f"targets.shape: {targets.shape}")
          

# 2. Sequence to Sequence RNN

In [None]:
embed_dim = 256
latent_dim = 1024
source = keras.Input(shape=(None,),dtype="int64",name="english")
x = layers.Embedding(vocab_size,embed_dim,mask_zero=True)(source)
encoded_source = layers.Bidirectional(layers.GRU(latent_dim),merge_mode="sum")(x)

past_target = keras.Input(shape=(None,),dtype="int64",name = "persian")
x = layers.Embedding(vocab_size,embed_dim,mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim,return_sequences=True)
x = decoder_gru(x,initial_state=encoded_source)
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(persian_vocab_size,activation="softmax")(x)
seq2seq_rnn = keras.Model([source,past_target],target_next_step)
seq2seq_rnn.compile(optimizer = "rmsprop", loss = "sparse_categorical_crossentropy" , metrics =["accuracy"])

In [None]:
seq2seq_rnn.summary()

In [None]:
callback = [callbacks.ModelCheckpoint("seq2seq.keras",save_best_only=True)]
seq2seq_rnn.fit(train_ds,validati
                on_data=val_ds,epochs=50,callbacks=callback)

In [None]:
seq2seq_rnn = keras.models.load_model("seq2seq.keras")

## 2.1 Create 

In [None]:
persian_vocab = target_vectorization.get_vocabulary()
persian_index_lookup = dict(zip(range(len(persian_vocab)),persian_vocab))
max_decoded_sentence_lenght=20
def decode_sentence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_lenght):
        tokenized_target_sentence = target_vectorization([decoded_sentence])
        next_token_predictions = seq2seq_rnn.predict([tokenized_input_sentence,tokenized_target_sentence])
        sampled_token_index = np.argmax(next_token_predictions[0,i,:])
        if sampled_token_index> 3475:
            sampled_token = str(sampled_token_index)
            sampled_token = "[start]"
        else:
            sampled_token = persian_index_lookup.get(sampled_token_index)
        decoded_sentence +=" "+str(sampled_token)
        if sampled_token == "[end]":
            break
    return decoded_sentence
        

### Test a sentence

In [None]:
input_text="hello , this is me you are looking for"
decode_sentence(input_text)

# 3. Sequence to Sequence Transformer

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self,embed_dim,dense_dim,num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(num_heads=num_heads,key_dim=embed_dim)
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
    def call(self,inputs,mask=None):
        if mask is not None:
            mask = mask[:,tf.newaxis,:]
        attention_output = self.attention(inputs,inputs,inputs,attention_mask = mask)
        proj_input = self.layernorm_1(inputs+attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input+proj_output)
        
    def get_config(self):
        config = super().get_config()
        config.Update({
            "embed_dim":self.embed_dim,
            "dense_dim":self.dense_dim,
            "num_head":self.num_heads,
        })
        return config
    
    
class TransformerDecoder(layers.Layer):
    def __init__(self,embed_dim,dense_dim,num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention1 = layers.MultiHeadAttention(num_heads=num_heads,key_dim=embed_dim)
        self.attention2 = layers.MultiHeadAttention(num_heads=num_heads,key_dim=embed_dim)
        self.dense_proj = keras.Sequential([layers.Dense(dense_dim,activation="relu"),
                                            layers.Dense(embed_dim),
                                           ])
        self.layernorm1 = layers.LayerNormalization()
        self.layernorm2 = layers.LayerNormalization()
        self.layernorm3 = layers.LayerNormalization()
        self.support_masking = True
    
    def get_cadual_attention_mask(self,inputs):
        input_shape = tf.shape(inputs)
        batch_size , sequence_length = input_shape[0],input_shape[1]
        i = tf.range(sequence_lenght)[:,tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i>= j , dtype="int32")
        mask = tf.reshape(mask,(1,input_shape[1],input_shape[1]))
        mult = tf.concat([tf.expand_dims(batch_size,-1),
                        tf.constant([1,1],dtype = tf.int32)],axis = 0)
        return tf.tile(mask,mult)
        
    def call(self,inputs,encoder_outputs,mask= None):
        cadual_mask = self.get_cadual_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:,tf.newaxis,:],dtype="int32")
            padding_mask = tf.minimum(padding_mask, cadual_mask)
            
        attention_output_1 = self.attention1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=cadual_mask)
        
        normalization_output_1 = self.layernorm1(inputs+attention_output_1)
        attention_output_2 = self.attention2(
            query=attention_output_1,
            value=encoder_outputs,
            key = encoder_outputs,
            attention_mask= padding_mask,
        )
         
        dense_proj_input = self.layernorm2(normalization_output_1+attention_output_2)
        dense_proj_output = self.dense_proj(dense_proj_input)
        return self.layernorm3(dense_proj_input+dense_proj_output)
    
    def get_config(self):
        config = supper().get_config()
        config.Update({
            "embed_dim":self.embed_dim,
            "dense_dim":self.dense_dim,
            "num_head":self.num_heads,
        })
        

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self,sequence_length, input_dim , output_dim , **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(input_dim=input_dim,output_dim=output_dim)
        self.position_embeddings = layers.Embedding(input_dim=sequence_length,output_dim=output_dim)
        
        self.sequence_length=sequence_length
        self.input_dim=input_dim
        self.output_dim=output_dim
    
    def call(self,inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0,limit=length,delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens+embedded_positions
    
    def compute_mask(self,inputs , mask=None):
        return tf.math.not_equal(inputs,0)
    
    def get_config(self):
            config = super().get_config()
            config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
            })
            return config    

## 3.1 Create Transformer

In [None]:
embed_dim = 256
dense_dim = 1024
num_heads = 8

encoded_inputs = keras.Input(shape=(None,),dtype="int64" , name="english")
x =PositionalEmbedding(sequence_lenght,vocab_size,embed_dim)(encoded_inputs)
encoded_outputs = TransformerEncoder(embed_dim, dense_dim,num_heads)(x)

decoder_inputs = keras.Input(shape=(None,),dtype="int64",name="persian")
x = PositionalEmbedding(sequence_lenght,vocab_size,embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim,dense_dim,num_heads)(x,encoded_outputs)
x = layers.Dropout(0.4)(x)
decoder_outputs = layers.Dense(vocab_size,activation="softmax")(x)
transformer = keras.Model([encoded_inputs,decoder_inputs],decoder_outputs)

transformer.compile(optimizer="rmsprop",loss = "sparse_categorical_crossentropy", metrics = ["accuracy"])

In [None]:
transformer.fit(train_ds,validation_data=val_ds,epochs=10)

In [None]:
persian_vocab = target_vectorization.get_vocabulary()
persian_index_lookup = dict(zip(range(len(persian_vocab)),persian_vocab))
max_decoded_sentence_lenght=20
def decode_sentence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_lenght):
        tokenized_target_sentence = target_vectorization([decoded_sentence])[:,:-1]
        next_token_predictions = transformer.predict([tokenized_input_sentence,tokenized_target_sentence])
        sampled_token_index = np.argmax(next_token_predictions[0,i,:])
        sampled_token = persian_index_lookup.get(sampled_token_index)
        decoded_sentence +=" "+str(sampled_token)
        if sampled_token == "[end]":
            break
    return decoded_sentence

### Test a sentence

In [None]:
input_text="Good morning!"
decode_sentence(input_text)