In [1]:
import tensorflow as tf 
import tensorflow.keras as keras
from tensorflow.keras.layers import GRU, LSTM, Dense, Input, Lambda
import os, io
import shutil 
import tqdm 
import math
import pathlib
import numpy as np 
import matplotlib.pyplot as plt
from typing import *
import unicodedata
import re

In [2]:
SPANISH_DATASET_URL = 'http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip'

In [3]:
# Download the spanish dataset.
path_to_zip = tf.keras.utils.get_file(
      'spa-eng.zip',
      origin=SPANISH_DATASET_URL,
      extract=True
    )

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


In [4]:
path_to_file = os.path.dirname(path_to_zip)+"/spa-eng/spa.txt"
#path_to_file = pathlib.Path(path_to_zip).parent/'spa-eng/spa.txt'


In [5]:
#1. convert unicode files to ascii
def unicode_to_ascii(s): 
  """
    function, converts the unicode data into ascii.
    Params: 
      s(dtype: str): input string
    Return(dtype: ascii)
  """
  return ''.join(c for c in unicodedata.normalize('NFD',s) if unicodedata.category(c)!='Mn')

def preprocess_sentence(w):
    """
        this function, does a preprocessing of the string, like converted to ascii and 
          removing the whitespaces.. And also adds the start and end token to the start and
          end of the string respectively.
        Params:
          w(dtype: str): string, what needed to be preprocessed.
        Return(dtype: str)
          returns the string, which is preprocessed
    """
    w = unicode_to_ascii(w.lower().strip())
    w = re.sub(r"([.,!?¿])", r" \1 ", w)
    w = re.sub('\s{2,}', ' ', w)

    w = re.sub(r"[^a-zA-Z?.!,¿]+", " ", w)
    w = w.strip()

    w = '<start> ' + w + ' <end>'
    return w

In [6]:
def read_data(path: str) -> Tuple:
    """
        this function, will read the text from the input path, using io, and seperate into the 
        context and target for the preprocessing.
        Params:
          path(type: str): Input path of the text data.

        Return(type: (List, List))
          returns the list of context and list of target resp.
    """
    lines = io.open(path, encoding='UTF-8').read().strip().split('\n')
    word_pairs = [[preprocess_sentence(w) for w in l.split('\t')]  for l in lines]
  
    context = np.array([context for target, context in word_pairs])
    target = np.array([target for target, context in word_pairs])

    return context, target

In [7]:
context_text, target_text = read_data(path_to_file)
context_text.shape

(118964,)

In [8]:
context_text[1]

'<start> vete . <end>'

In [9]:
target_text[1]

'<start> go . <end>'

In [10]:
def get_vectorized_value(text): 
    """
        this function used to get the vector value for the given text value.
        Params:
          text(type; np.ndarray): numpy array contains the context or target data(text).
        Return(type: (tf.Tensor, tf.keras.Preprocessing))
          returns the tensor(which is a ineger sequence for text data), and vectorized function.
    """
    lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    lang_tokenizer.fit_on_texts(text)

    text_tensor = lang_tokenizer.texts_to_sequences(text)

    text_tensor = tf.keras.preprocessing.sequence.pad_sequences(text_tensor,
                                                         padding='post')
    return text_tensor, lang_tokenizer

In [11]:
context_tensor, context_tokenizer = get_vectorized_value(context_text)
target_tensor, target_tokenizer = get_vectorized_value(target_text)
target_tensor.shape

(118964, 51)

In [12]:
def split_dataset(context_data: np.array, target_data: np.array,
                  is_test: bool, train_split: float, val_split: float,
                  test_split=0.0) -> Tuple: 
    """
        this function, will create train, test and val data
        Params:
          is_test(dtype: Bool): used for does needed a test data or not.
          train_split(dtype; float): Amount of training dataset.
          test_split(dtype; float): Amount of testing dataset.
          val_split(dtype; float): Amount of validation dataset.
    """
    assert is_test and test_split > 0, "You cannot create a testing split, by specifying is_test to False"
    assert train_split <= 1.0, 'Train Split value should be float, and should be lesser than 1.0'
    assert val_split <= 1.0, 'val Split value should be float, and should be lesser than 1.0'
    assert test_split <= 1.0, 'test Split value should be float, and should be lesser than 1.0'
    assert train_split + test_split + val_split == 1.0, "Sum of train, test and val split, does't add up to 1.0"

    len_data =  context_data.shape[0]
    n_train = int(train_split * len_data)
    n_val = int(val_split * len_data)

    train_inds = np.random.choice(np.arange(len_data), n_train, replace=False)
    val_inds = np.random.choice([i for i in np.arange(len_data) if i not in train_inds], n_val, replace=False)

    if test_split: 
    
        n_test = int(test_split * context_data.shape[0])
        test_inds = [i for i in np.arange(len_data) if i not in train_inds and i not in val_inds]

        return (context_data[train_inds], target_data[train_inds]), \
              (context_data[val_inds], target_data[val_inds]), \
          (context_data[test_inds], target_data[test_inds])

    return (context_data[train_inds], target_data[train_inds]), (context_data[val_inds], target_data[val_inds])

In [13]:
train_data, val_data, test_data = split_dataset(context_tensor, target_tensor, True, 0.8, 0.1, 0.1)

In [14]:
train_data[0].shape

(95171, 53)

In [15]:
def create_tensorflow_dataset(data: tf.Tensor, batch_size: int) -> tf.data.Dataset: 
    """
        this function, will create a tensorflow dataset, which utilizes the gpu/tpu more than the numpy
          array.
        Params:
          data(type: Tuple): It is a tuple of data(train_X, train_y)
          batch_size(dtype: int): Number of batch.
        Return(type: tf.data.Dataset)
          returns the data, that is converted to tf.data.Dataset.
    """ 
    tensorflow_dataset = tf.data.Dataset.from_tensor_slices(data)
    tensorflow_dataset = (
               tensorflow_dataset.shuffle(1024)
              .batch(batch_size, drop_remainder=True)
              .prefetch(tf.data.experimental.AUTOTUNE)
            )
  
    return tensorflow_dataset

In [16]:
train_ds = create_tensorflow_dataset(train_data, 64)
val_ds = create_tensorflow_dataset(val_data, 64)
test_ds = create_tensorflow_dataset(test_data, 64)

In [54]:
train_ds

<PrefetchDataset element_spec=(TensorSpec(shape=(64, 53), dtype=tf.int32, name=None), TensorSpec(shape=(64, 51), dtype=tf.int32, name=None))>

In [29]:
eg_in, eg_de = next(iter(train_ds))

In [30]:
INPUT_VOCAB_SIZE = len(context_tokenizer.word_index) + 1
TARGET_VOCAB_SIZE = len(target_tokenizer.word_index) + 1
EMB_DIMS = 256 
HIDDEN_DIMS = 1024
INPUT_SEQ_SIZE = eg_in.shape[1] 
TARGET_SEQ_SIZE = eg_de.shape[1]

In [31]:
class Encoder(tf.keras.Model):
    """
        this class, is used as for constructing the encoder of type lstm.
        Methods:
          __init__: constructor.
          call: used to pass the input to get an output.
          initialize_hidden_state: used to initialize the initial hidden state of encoder(h0).
        Params:
          vocab_size(dtype: int) Dimension of the vectorized input.
          embedding_dim(dtype: int): number of hidden units in the embedding layer.
          h;idden_units(dtype: int): Number of hidden units in the LSTM layer.
          bt_size(dtype: int): Batch Size.
    """
    def __init__(self, vocab_size: int, embedding_dim: int, hidden_units: int, bt_size: int):
        super(Encoder, self).__init__()
        self.bt_size = bt_size
        self.hidden_units = hidden_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, name="embedding_layr")
        self.rnn = tf.keras.layers.GRU(self.hidden_units,
                                       return_sequences=True, 
                                       return_state=True, 
                                       recurrent_initializer='glorot_uniform',
                                       name="lstm_layer"
                                      )

    def call(self, x, hidden):
        x = self.embedding(x)
        output, state = self.rnn(x, hidden)
        return output, state

    def initialize_hidden_state(self):
        return tf.zeros((self.bt_size, 1024))

In [18]:
class DotProductAttention(tf.keras.layers.Layer):
    """
        this class is the custom keras layer for the dot product attention.
        Methods:
            call(scope: public): for calling the dotproduct attention.
    """
    def call(self, query, values):
        query_with_time_axis = tf.expand_dims(query, 1)

        score = query_with_time_axis * values
        score = tf.reduce_sum(score, axis=2)
        score = tf.expand_dims(score, 2)

        attention_weights = tf.nn.softmax(score, axis=1)

        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

In [19]:

class BahdanauAttention(tf.keras.layers.Layer):
    """
        this class is the custom keras layer for the BahdanauAttention.
        Methods:
            call(scope: public): for calling the dotproduct attention.
    """
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, query, values):

        query_with_time_axis = tf.expand_dims(query, 1)
        score = self.V(tf.nn.tanh(
            self.W1(values) + self.W2(query_with_time_axis)))

        attention_weights = tf.nn.softmax(score, axis=1)

        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights
     

In [20]:

class DecoderWithAttention(tf.keras.Model):
    """
        this class, is used as for constructing the decoder of type lstm.
        Methods:
            __init__: constructor.
            call                      : used to pass the input to get an output.
            initialize_hidden_state   : used to initialize the initial hidden state of encoder(h0).
        Params:
            vocab_size(dtype: int)               : Dimension of the vectorized input.
            embedding_dim(dtype: int)            : number of hidden units in the embedding layer.
            h;idden_units(dtype: int)            : Number of hidden units in the LSTM layer.
            bt_size(dtype: int)                  : Batch Size.
            attention_layer(type: keras.LAyer)   : Attention layer for the decoder.
    """
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz, attention_layer = None):
        super(DecoderWithAttention, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.dec_units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc = tf.keras.layers.Dense(vocab_size)

        self.attention = attention_layer

    def call(self, x, hidden, enc_output):
        x = self.embedding(x)
        attention_weights = None

        if self.attention:
            context_vector, attention_weights = self.attention(hidden, enc_output)
            x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        output, state = self.gru(x, initial_state = hidden)

        output = tf.reshape(output, (-1, output.shape[2]))
        x = self.fc(output)

        return x, state, attention_weights


In [24]:
def train_step(inp, targ, encoder,
                       decoder, input_tok, 
                       target_tok, batch_size, optimizer):
    """
        this function is a custom training step, which will use the TapeGradient to update the 
        parameter of the model.
        Params:
            inp(type: tf.Tensor)                : input for the model(vector of spanish words).
            targ(type: tf.Tensor)               : target for the model(vector of english words).
            encoder(keras.Model)                : keras model for the encoder part in seq2seq.
            decoder(keras.Model)                : keras model for the decoder part in seq2seq.
            input_tok(type; Dict)               :  Dict of words and their integer values for input.
            target_tok(type: List)              : Dict of words and their integer values for target.
            batch_size(dtype: int)              : Batch size.
            optimizedr(type: keras.Optimizer)   : optimizer, that is used to update the parameter.
        Return(dtype; float)
            returns the loss valye, which is calculated.
    """
    loss = 0
    enc_hidden = encoder.initialize_hidden_state()
    with tf.GradientTape() as tape: 
        enc_output, enc_hidden = encoder(inp, enc_hidden)
        dec_hidden = enc_hidden
        dec_input = tf.expand_dims([target_tok.word_index["<start>"]] * batch_size, 1)

        for t in range(1, targ.shape[1]):
            predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)

            loss += loss_function(targ[:, t], predictions)

            dec_input = tf.expand_dims(targ[:, t], 1)

        batch_loss = (loss / int(targ.shape[1]))
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    return batch_loss


def test_step(inp, targ, encoder,
                      decoder, input_tok, target_tok, batch_size):
     """
        this function is a custom training step, which will use the TapeGradient to update the 
        parameter of the model.
        Params:
            inp(type: tf.Tensor)    : input for the model(vector of spanish words).
            targ(type: tf.Tensor)   : target for the model(vector of english words).
            encoder(keras.Model)    : keras model for the encoder part in seq2seq.
            decoder(keras.Model)    : keras model for the decoder part in seq2seq.
            input_tok(type; Dict)   : Dict of words and their integer values for input.
            target_tok(type: List)  : Dict of words and their integer values for target.
            batch_size(dtype: int)  : Batch size.
        Return(dtype; float)
            returns the loss valye, which is calculated.
    """
    enc_hidden = encoder.initialize_hidden_state()
    loss = 0
    enc_output, enc_hidden = encoder(inp, enc_hidden)
    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([target_tok.word_index["<start>"]] * batch_size, 1)

    for t in range(1, targ.shape[1]):
        predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
        loss += loss_function(targ[:, t], predictions)
        dec_input = tf.expand_dims(targ[:, t], 1)

    loss = loss / int(targ.shape[1])
    return loss

In [25]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    """
        this function, used to calculate the loss.
        Params:
            real(type: tf.Tensor) : ground truth.
            pred(type: tf.Tensor) : predicted value from the model
        Return(dtype: float)
            returns the calcuated loss value.
    """
    mask = tf.math.logical_not(tf.math.equal(real, 0)) 
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

In [26]:
import time
def training_seq2seq(encoder, decoder, train_dataset,
                        val_dataset, optimizer,
                        input_tok, target_tok,
                        epochs, batch_size):
    """
        this function used to do the manual training of the seq2seq model. with the help of train_step
        and test_step functions.
        Params:
            encoder(keras.Model)                  : keras model for the encoder part in seq2seq.
            decoder(keras.Model)                  : keras model for the decoder part in seq2seq.
            train_dataset(type: tf.data.DAtaset)  : Training dataset.
            train_dataset(type: tf.data.DAtaset)  : Validation dataset.
            input_tok(type; tf.Tensor)            : tensor of words and their integer values for input.
            target_tok(type: tf.Tensor)           : tensor of words and their integer values for target.
            epochs(type: int)                     : Number of epochs.
            batch_size(dtype: int)                : Batch size.
            optimizer(tf.keras.Optimizer)         : Optimizer, used by the train_step function.
        Return(keras.Model, keras.model, List, List)
            returns the trained encoder, decoder and list of training and validation loss.
    """
    training_loss = []
    validation_loss = []

    for epoch in range(epochs):
        start = time.time()
        total_loss = 0
        total_loss_val =- 0

        for (batch, (inp, targ)) in enumerate(tqdm.tqdm(train_dataset)):
            batch_loss = train_step(inp, targ, 
                                    encoder, decoder, input_tok,
                                    target_tok, batch_size, optimizer)

            total_loss += batch_loss.numpy()

        for (batch, (inp, targ)) in enumerate((val_dataset)):
            batch_loss = test_step(inp, targ,
                                     encoder, decoder, input_tok,
                                     target_tok, batch_size)

            total_loss_val += batch_loss.numpy()
        
        avg_train_loss = total_loss/float(len(train_dataset))
        avg_val_loss = total_loss_val/float(len(val_dataset))
        print(f"Epoch {epoch} train_loss: {avg_train_loss} val_loss: {avg_val_loss}")
        training_loss.append(avg_train_loss)
        validation_loss.append(avg_train_loss)

    return encoder, decoder, training_loss, validation_loss

In [137]:
# Dot Prodcut Attention.
optimizer = tf.keras.optimizers.Adam()
attention = DotProductAttention()
encoder = Encoder(INPUT_VOCAB_SIZE, EMB_DIMS, HIDDEN_DIMS, 64)
decoder = DecoderWithAttention(TARGET_VOCAB_SIZE, EMB_DIMS, HIDDEN_DIMS, 64, attention)
encoder_t, decoder_t, training_loss, validation_loss = training_seq2seq(encoder, decoder, 
                            train_ds, val_ds, optimizer, context_tokenizer, target_tokenizer, 3, 64)

encoder_t.save("seq_seq_gru_attention_encoder")
decoder_t.save("seq_seq_gru-attention_decoder")

100%|██████████| 1487/1487 [27:39<00:00,  1.12s/it]


Epoch 0 train_loss: 0.7499041346471103 val_loss: 0.6028971610842524


100%|██████████| 1487/1487 [27:32<00:00,  1.11s/it]


Epoch 1 train_loss: 0.49198000923690977 val_loss: 0.3939233282127896


100%|██████████| 1487/1487 [27:49<00:00,  1.12s/it]


Epoch 2 train_loss: 0.30766756600113465 val_loss: 0.29631480703482754


In [33]:
# Dot Prodcut Attention.
optimizer = tf.keras.optimizers.Adam()
attention = BahdanauAttention(256)
encoder = Encoder(INPUT_VOCAB_SIZE, EMB_DIMS, HIDDEN_DIMS, 64)
decoder = DecoderWithAttention(TARGET_VOCAB_SIZE, EMB_DIMS, HIDDEN_DIMS, 64, attention)
encoder_t, decoder_t, training_loss, validation_loss = training_seq2seq(encoder, decoder, 
                            train_ds, val_ds, optimizer, context_tokenizer, target_tokenizer, 3, 64)

encoder_t.save("seq_seq_gru_attention_bahdanadu_encoder")
decoder_t.save("seq_seq_gru-attention_bahdanadu_decoder")

100%|██████████| 1487/1487 [32:39<00:00,  1.32s/it]


Epoch 0 train_loss: 0.6406364146702708 val_loss: 0.3980408955264736


100%|██████████| 1487/1487 [33:21<00:00,  1.35s/it]


Epoch 1 train_loss: 0.29456486556461053 val_loss: 0.2674178850811881


100%|██████████| 1487/1487 [32:56<00:00,  1.33s/it]


Epoch 2 train_loss: 0.1738644652895344 val_loss: 0.24044849051011574


In [56]:
def translate(sentence, encoder, decoder):
    """
        this function, used to infer the trained model.
        Params:
            sentence(dtype: str)   : sentence, that needed to be transalated.
            encoder(keras.Model)   : Keras model of the encoder trained.
            decoder(keeras.Model)  : Keras model of the decoder trained.
        
        Returns(dtype: str, str)
            returns the translated value of spanish to english and the spanish sentence.
    """
    attention_plot = np.zeros((51, 53))

    sentence = preprocess_sentence(sentence)

    inputs = [context_tokenizer.word_index[i] for i in sentence.split(' ')]
    inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],
                                                         maxlen=51,
                                                         padding='post')
    inputs = tf.convert_to_tensor(inputs)

    result = ''

    hidden = [tf.zeros((1, 1024))]
    enc_out, enc_hidden = encoder(inputs, hidden)

    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([target_tokenizer.word_index['<start>']], 0)

    for t in range(51):
        predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_out)

        predicted_id = tf.argmax(predictions[0]).numpy()
        
        if not target_tokenizer.index_word[predicted_id] == "<end>": 
            result += target_tokenizer.index_word[predicted_id] + ' '

        if target_tokenizer.index_word[predicted_id] == '<end>':
            return result, sentence
        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence
     

In [85]:
translate(u'¿todavia estan en casa?', encoder_t, decoder_t)

('are you still at home ? ', '<start> ¿ todavia estan en casa ? <end>')