# libraries and corpus

In [1]:
import numpy as np
import pickle
import re
from copy import copy
from bisect import bisect_left

# tensorflow
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import LSTM, Input, Dense, Embedding
from tensorflow.keras.utils import to_categorical, plot_model

In [2]:
!wget http://www.manythings.org/anki/pes-eng.zip
!unzip pes-eng.zip

--2021-04-10 05:51:14--  http://www.manythings.org/anki/pes-eng.zip
Resolving www.manythings.org (www.manythings.org)... 104.21.55.222, 172.67.173.198, 2606:4700:3031::6815:37de, ...
Connecting to www.manythings.org (www.manythings.org)|104.21.55.222|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 112927 (110K) [application/zip]
Saving to: ‘pes-eng.zip’


2021-04-10 05:51:15 (236 KB/s) - ‘pes-eng.zip’ saved [112927/112927]

Archive:  pes-eng.zip
  inflating: _about.txt              
  inflating: pes.txt                 


In [3]:
file_path = 'pes.txt'
text_file = open('pes.txt', 'r', encoding='utf-8')

# text pre-processing

In [4]:
eng_signs = '?!;()'
# eng_signs = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n'
fa_signs = '،؟!؛()٪'
eng_list = []
fa_list = []
for line in text_file:
  # remove useless part of sentences
  line = re.sub('CC-BY(.*)','',line)
  # change text to lower case
  line = line.lower()
  # split english and persian sentence
  line_split = line.strip().split('\t')
  eng = line_split[0]
  fa = line_split[1]
  # replace english signs with single point sign
  for sign in eng_signs:
    eng = eng.replace(sign, '.')
    
  # replace persian signs with single point sign
  for sign in fa_signs:
    fa = fa.replace(sign, '.')
  
  # replace numbers and clock with tag
  eng = re.sub('[0-9]+(|:)[0-9]+','<number>',eng)
  fa = re.sub('[0-9]+(|:)[0-9]+','<عدد>',fa)
  # replace other signs
  eng = eng.replace(':', ' ')
  fa = fa.replace(':', ' ')
  eng = eng.replace(',', '')
  eng = eng.replace('"',' ')
  fa = fa.replace('،', ' ')
  fa = fa.replace('«', ' ')
  fa = fa.replace('»', ' ')
  fa = fa.replace('"', ' ')
  # replace half-space
  fa = fa.replace('\u200c',' ')
  fa = fa.replace('\xa0',' ')
  # replace duplicate whitespaces
  fa = re.sub(' +', ' ', fa)
  eng = re.sub(' +', ' ', eng)
  
  # get eng sentence
  eng = eng.strip().split('.')[0]
  # get translations from persian
  for tr in fa.strip().split('.'):
    if tr != '':
      eng_list.append(eng)
      fa_list.append(tr.strip())

# encoder input

In [5]:
# encoder tokenizer (oov_token is for unknown words)
enc_tokenizer = Tokenizer(oov_token=1)
enc_tokenizer.fit_on_texts(eng_list)

# vocab size (+1 is for reserving padding i.e. index zero)
enc_vocab_size = len(enc_tokenizer.word_index) + 1
eng_seq_list = enc_tokenizer.texts_to_sequences(eng_list)

# max encoder sequence size
max_enc_len = len(max(eng_seq_list, key=len))

# padding sequences
padded_eng_seq = pad_sequences(eng_seq_list, maxlen=max_enc_len)

# convert list to numpy array
encoder_input_data = np.array(padded_eng_seq)

# decoder input & output

In [6]:
# decoder tokenizer (oov_token is for unknown words)
dec_tokenizer = Tokenizer(oov_token=1)
dec_tokenizer.fit_on_texts(fa_list+['sos eos'])

# vocab size (+1 is for reserving padding i.e. index zero)
dec_vocab_size = len(dec_tokenizer.word_index) + 1

# input sequences
dec_input_fa_list = ['sos '+seq for seq in fa_list]
dec_input_seq_list = dec_tokenizer.texts_to_sequences(dec_input_fa_list)

# reversing sentences is not required
# for seq in dec_input_seq_list:
#   seq.reverse()

# output sequences
dec_output_fa_list = [seq+' eos' for seq in fa_list]
dec_output_seq_list = dec_tokenizer.texts_to_sequences(dec_output_fa_list)

# reversing sentences is not required
# for seq in dec_output_seq_list:
#   seq.reverse()

# max decoder sequence size (both input and output are same in length)
max_dec_len = len(max(dec_input_seq_list, key=len))

# padding sequences
padded_dec_input_seq = pad_sequences(dec_input_seq_list, maxlen=max_dec_len, padding='post')
padded_dec_output_seq = pad_sequences(dec_output_seq_list, maxlen=max_dec_len, padding='post')

# convert list to numpy array
decoder_input_data = np.array(padded_dec_input_seq)
decoder_output_data = np.array(padded_dec_output_seq)

# vocab size (+1 is for reserving padding i.e. index zero)
dec_vocab_size = len(dec_tokenizer.word_index) + 1

# trainable model

In [7]:
# encoder model
encoder_input_layer = Input(shape=(max_enc_len,), name='enc_input')
encoder_embedding_layer = Embedding(input_dim=enc_vocab_size, 
                                    output_dim=300, 
                                    mask_zero=True,
                                    name='enc_embedding')(encoder_input_layer)
encoder_outputs, state_h, state_c = LSTM(300, return_state=True, name='enc_lstm')(encoder_embedding_layer)
# output of lstm layer (states)
encoder_states = [state_h, state_c]

# decoder model
decoder_input_layer = Input(shape=(max_dec_len,),name='dec_input')
decoder_embedding = Embedding(input_dim=dec_vocab_size, output_dim=300, mask_zero=True, name='dec_embedding')
decoder_embedding_layer = decoder_embedding(decoder_input_layer)
decoder_lstm = LSTM(300, return_sequences=True, return_state=True, name='dec_lstm')
# connecting decoder lstm to encoder lstm state gates
decoder_lstm_outputs,_,_ = decoder_lstm(decoder_embedding_layer, initial_state=encoder_states)
decoder_dense = Dense(dec_vocab_size, activation='softmax',name='dec_output')
# for each decoder lstm cell create a dense softmax predicting most probable word out of vocab
decoder_outputs = decoder_dense(decoder_lstm_outputs)

# model by encoder and decoder input
# decoder_outputs consists of network layers of encoder and decoder
# (note: we connected both encoder and decoder in decoder lstm layer by encoder lstm states)
# model will make a vocab size batch vector to be used with softmax
# output shape is (lstm_cells_num or max_dec_len) * vocab_size 
model = Model([encoder_input_layer, decoder_input_layer], decoder_outputs)

In [None]:
plot_model(model, show_shapes=True)

# compile and train model

In [8]:
model.compile(optimizer='rmsprop', loss='categorical_crossentropy')
# provide encoder input and decoder input as X
# provide decoder_output_data as y, which is max_dec_len * vocab_size vectors
model.fit([encoder_input_data, decoder_input_data], 
          np.array(to_categorical(decoder_output_data)),
          batch_size=32,
          epochs=40,
          validation_split=0.2)


Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40
Epoch 26/40
Epoch 27/40
Epoch 28/40
Epoch 29/40
Epoch 30/40
Epoch 31/40
Epoch 32/40
Epoch 33/40
Epoch 34/40
Epoch 35/40
Epoch 36/40
Epoch 37/40
Epoch 38/40
Epoch 39/40
Epoch 40/40


<tensorflow.python.keras.callbacks.History at 0x7fc42c144b90>

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# model.save('/content/drive/My Drive/models/my_model.h5')

# create inference models (models for prediction purposes)

In [9]:
# here we separate encoder model and decoder model fpr prediction purposes
# why we did not use previous model?
# 1 - previous model had two input encoder input and target translation
# for prediction purpose we don't have target sentence
# 2 - in each step of prediction, we predict only one word of sentence (prediction step by step)
# 3 - in order to use last prediction for new prediction
# i.e. predict first word and get new lstm states, use that word and states to predict next word and so on


def make_inference_models():
  # separate encoder model
  # encoder_states consists of encoder model (embedding and lstm layers) and by given input returns states of last timestep
  # encoder_states is lstm states output which it's weights are trained
  # we give input sentence< it returns states of trained lstm
  encoder_model = Model(encoder_input_layer, encoder_states)

  # use decoder single input (lstm will have the same weights trained on multi-input form)
  # i.e here only one cell is used instead of multiple cells (one word prediction at a time)
  decoder_input_single = Input(shape=(1,))
  decoder_input_single_x = decoder_embedding(decoder_input_single)

  # placeholder fo decoder states (these are inputs and will be fed by values from encoder states)
  # actually we define an input for lstm states to assign states came from encoder output to it
  dec_state_h = Input(shape=(300,))
  dec_state_c = Input(shape=(300,))
  # inital states for decoder lstm
  dec_states_input = [dec_state_h, dec_state_c]

  # decoder model taken from trained decoder (lstm weights are pre-trained)
  # here dec_states_input is initial states for decoder lstm
  # and state_h and state_c are output of decoder lstm states
  # state_h and state_c are used in circular form, i.e. they are used as initial state for next word
  dec_lstm_outputs, state_h, state_c = decoder_lstm(decoder_input_single_x, initial_state=dec_states_input)

  # store states produced by lstm of decoder
  dec_states = [state_h, state_c]
  
  # use traiedn softmax dense layer from decoder part of model
  dec_outputs = decoder_dense(dec_lstm_outputs)
  
  # give single word + states provided from encoder model as inputs of decoder_model
  # [decoder_input_single] + dec_states_input is of type list ("+" here is for concating lists)
  # give softmax output as model output + states provided from decoder lstm to be used for next prediction
  decoder_model = Model([decoder_input_single] + dec_states_input,
                        [dec_outputs] + dec_states)
  
  return encoder_model, decoder_model

# seq2token converter for inputs

In [10]:
# function to convert input to sequences
def str_to_tokens( sentence : str ):
    words = sentence.lower().split()
    tokens_list = list()
    for word in words:
        tokens_list.append( enc_tokenizer.word_index[ word ] ) 
    return pad_sequences( [tokens_list] , maxlen=max_enc_len , padding='pre')

# english-to-persian translate method definition

## greedy

## beam search

In [21]:
# get encoder and decoder models
enc_model , dec_model = make_inference_models()

def decode_translate(seq):
  decoded_translation = ''
  for x in seq:
    decoded_translation += f' {dec_tokenizer.index_word[x]}'
  return decoded_translation

def beam_translate(sen, b=3):
  '''
  translates english to persian with beam search
  params:
    b: width of search (b == 1 is equal to greedy search) 
  '''
  
  # initial constants
  eos_index = dec_tokenizer.word_index['eos']
  
  # result holders
  final_results = []
  final_probs = []

  # remember we passed last states to model output so it returns the output of lstm layer
  states_values = enc_model.predict( str_to_tokens(sen) )

  # create single length seq for target
  # this is initial input for decoder which is 'sos'
  empty_target_seq = np.zeros( ( 1 , 1 ) )
  empty_target_seq[0, 0] = dec_tokenizer.word_index['sos']

  # initial prediction
  dec_outputs, h, c = dec_model.predict([ empty_target_seq ] + states_values )

  # note: below lists are sorted
  # top b predictions
  seq_indices = np.argsort(dec_outputs[0,-1,:])[-b:][::-1]
  # top b probs
  seq_probs = np.log(dec_outputs[0,-1,seq_indices])
  # total probability of top b prediction sequences
  sum_probs = copy(list(seq_probs))
  # last states of sequences (for next)
  seq_states = [[h,c] for x in range(b)]

  # consider each elemnt in lists as a separate list (to keep sequences of predictions)
  seq_indices = [[x] for x in seq_indices]
  seq_probs = [[x] for x in seq_probs]

  # list of alive predictions sequences (not faced "eos" token)
  alive_seqs = b

  while alive_seqs > 0:
    # temp lists
    temp_seqs = []
    temp_probs = []
    temp_sum_probs = []
    temp_states = []
    for i in range(len(seq_indices)):
      # current seq of indices and related probs
      seq = copy(seq_indices[i])
      probs = copy(seq_probs[i])
      states = copy(seq_states[i]) # list object [h,c]

      # consider i-th sequence in seq_indices list as input of decoder for next prediction
      empty_target_seq = np.zeros( ( 1 , 1 ) )
      # consider last word in current sequence
      empty_target_seq[ 0 , 0 ] = seq[-1]
      # predict
      dec_outputs, h, c = dec_model.predict([empty_target_seq] + states)
      # top b prediction indices and probs
      temp_indeces = np.argsort(dec_outputs[0,-1,:])[-b:][::-1]
      
      for index in temp_indeces:
        # seq
        temp_seqs.append(seq+[index])
        # prob
        new_prob = dec_outputs[0,-1,index]
        temp_probs.append(probs+[np.log(new_prob)])
        # sum_prob
        temp_sum_probs.append(np.sum(temp_probs[-1])/len(temp_probs[-1]))
        # temp_states
        temp_states.append([h,c])

    # get top 3 items
    indices = np.argsort(temp_sum_probs)[-b:][::-1]
    seq_indices = [temp_seqs[x] for x in indices]
    seq_probs = [temp_probs[x] for x in indices]
    sum_probs = [temp_sum_probs[x] for x in indices]
    seq_states = [temp_states[x] for x in indices]

    all_lists = [seq_indices, seq_probs, sum_probs, seq_states]
    # update alive values
    for i,seq in enumerate(seq_indices):
      if seq[-1] == eos_index or len(seq) > max_dec_len:
        final_results.append(decode_translate(seq))
        final_probs.append(sum_probs[i])
        for x in all_lists:
          del x[i]
        alive_seqs -= 1
  
  # print translations
  print(f'\n top {b} translations')
  sorted_args = np.argsort(final_probs)[::-1]
  for i in sorted_args:
    print(final_results[i], ' - sum-logs=', round(final_probs[i],3))

In [22]:
beam_translate('i thought i could trust you', b=3)


 top 3 translations
 فکر کردم می توانم بهت اعتماد کنم eos  - sum-logs= -0.173
 فکر کردم می خواستی بدانی eos  - sum-logs= -0.784
 فکر کردم می توانم بهت اعتماد کردم eos  - sum-logs= -0.807


# old greedy method

In [11]:
# get encoder and decoder models
enc_model , dec_model = make_inference_models()

def translate(sen):
  # remember we passed last states to model output so it returns the output of lstm layer
  states_values = enc_model.predict( str_to_tokens(sen) )

  # create single length seq for target
  # this is initial input for decoder which is 'sos'
  empty_target_seq = np.zeros( ( 1 , 1 ) )
  empty_target_seq[0, 0] = dec_tokenizer.word_index['sos']

  # stop condition will trigger when eos is printed
  stop_condition = False
  decoded_translation = ''
  while not stop_condition :
      # use encoder provided states for initial step of prediction
      # get vector output of lstm and it's output states
      # dec_outputs is deocder dense layer softmax output which is a vector of vocab_size length
      # we used sigle length sequence so output is single vocab_size vector
      dec_outputs , h , c = dec_model.predict([ empty_target_seq ] + states_values )
      # argmax on vector output
      sampled_word_index = np.argmax( dec_outputs[0, -1, :] )
      # predicted word
      sampled_word = None

      # search dictionary for predicted index
      # personal note: we don't need a for loop here, modify it later
      for word , index in dec_tokenizer.word_index.items() :
          if sampled_word_index == index :
              # add predicted word to translation
              decoded_translation += ' {}'.format( word )
              # store predicted word
              sampled_word = word
      
      # if eos printed or max_length of translation printed, trigger stop_condition
      if sampled_word == 'eos' or len(decoded_translation.split()) > max_dec_len:
          stop_condition = True
          
      # consider new prediction as input of decoder for next prediction
      empty_target_seq = np.zeros( ( 1 , 1 ) )  
      empty_target_seq[ 0 , 0 ] = sampled_word_index
      # h,c are states of decoder lstm provided by prediction
      # use them as initial states for lstm of next prediction
      states_values = [ h , c ] 

  print( decoded_translation )

In [12]:
translate('i thought i could trust you')

 فکر کردم می توانم بهت اعتماد کنم eos
