In [1]:
NUM_DATA_TO_LOAD = 7000
EPOCHS = 150
BATCH_SIZE = 64
EMBEDDING_DIM = 256
UNITS = 1024
MAX_INPUT_LANG_LEN = 20
MAX_TARGET_LANG_LEN = 13

RESTORE_SAVED_CHECKPOINT = True

In [2]:
import re
import os
import json
import unicodedata
import tensorflow as tf
from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.preprocessing import text

# Converts the unicode file to ascii
def unicode_to_ascii(s):
  return ''.join(c for c in unicodedata.normalize('NFD', s)
      if unicodedata.category(c) != 'Mn')

def clean_seq(w):
  w = unicode_to_ascii(w.lower().strip())
  w = re.sub(r"([?.!,।])", r" \1 ", w)
  w = re.sub(r'[" "]+', " ", w)
  w = re.sub(r"[^a-zA-Z।?.!,]+", " ", w)

  w = w.strip()

  return w

def add_start_and_end_token_to_seq(sentence):
    # adding a start and an end token to the sentence
    # so that the model know when to start and stop predicting.
    return '<start> ' + sentence + ' <end>'

def texts_to_sequences(texts, tokenizer):
    tensor = tokenizer.texts_to_sequences(texts)
    tensor = sequence.pad_sequences(tensor, padding='post')
    
    return tensor

def get_lang_tokenize(texts):
    lang_tokenizer = text.Tokenizer(filters='')
    lang_tokenizer.fit_on_texts(texts)

    return lang_tokenizer

def save_tokenizer(tokenizer, save_at, file_name):
    path_to_file = os.path.join(save_at, file_name)
    with open(path_to_file, 'w', encoding='utf8') as fp:
        tokenizer_json = tokenizer.to_json()
        fp.write(json.dumps(tokenizer_json, indent=4, ensure_ascii=False))
    print("Tokenizer write at:", path_to_file)

def load_tokenizer(path_to_tokenizer_file):
    print("Loading:", path_to_tokenizer_file)
    with open(path_to_tokenizer_file, 'r', encoding='utf8') as fp:
        tokenizer_json = json.load(fp)
        tokenizer = tf.keras.preprocessing.text.tokenizer_from_json(tokenizer_json)
    return tokenizer
 
def show_index_to_word_maping(inp_lang_tok, tensor):
    print('-' * 45)
    for t in tensor:
        if t != 0:
            print("{:<6} map with {}".format(
                t, inp_lang_tok.index_word[t]))

In [3]:
import io

class TatoebaDataset():
    def __init__(self, path_to_file, num_data_to_load):
        self.path_to_file = path_to_file
        self.num_data_to_load = num_data_to_load

    def read_data(self):
        lines = io.open(
            self.path_to_file, encoding='UTF-8').read().strip().split('\n')
        return lines

    def make_sequence_pair(self, lines):
        seq_pairs = []
        for line in lines[:self.num_data_to_load]:
            en, bn, _ = line.split('\t')
            pair = []
            for seq in [en, bn]:
                seq = clean_seq(seq)
                seq = add_start_and_end_token_to_seq(seq)
                pair.append(seq)    
            seq_pairs.append(pair)
        return seq_pairs

    def create_dataset(self):
        lines = self.read_data()
        word_pairs = self.make_sequence_pair(lines)
        return zip(*word_pairs)
    
    def load_data(self):
        # creating cleaned input, output pairs
        targ_lang_text, inp_lang_text = self.create_dataset()

        targ_lang_tokenizer = get_lang_tokenize(targ_lang_text)
        inp_lang_tokenizer = get_lang_tokenize(inp_lang_text)
        
        target_tensor = texts_to_sequences(targ_lang_text, targ_lang_tokenizer)
        input_tensor  = texts_to_sequences(inp_lang_text, inp_lang_tokenizer)
       
        tensor_pair = (input_tensor, target_tensor)
        tokenizer_pair = (inp_lang_tokenizer, targ_lang_tokenizer)

        return tensor_pair, tokenizer_pair

In [4]:
import tensorflow as tf

class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.enc_units,
                                        return_sequences=True,
                                        return_state=True,
                                        recurrent_initializer='glorot_uniform')

    def call(self, x, hidden):
        x = self.embedding(x)
        output, state = self.gru(x, initial_state=hidden)
        return output, state

    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz, self.enc_units))

class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, dec_hidden, enc_output):
        # dec_hidden hidden state shape == (batch_size, hidden size)
        # query_with_time_axis shape == (batch_size, 1, hidden size)
        # enc_output shape == (batch_size, max_len, hidden size)
        # we are doing this to broadcast addition along the time axis to calculate the score
        query_with_time_axis = tf.expand_dims(dec_hidden, 1)

        # score shape == (batch_size, max_length, 1)
        # we get 1 at the last axis because we are applying score to self.V
        # the shape of the tensor before applying self.V is (batch_size, max_length, units)
        score = self.V(tf.nn.tanh(
            self.W1(query_with_time_axis) + self.W2(enc_output)))

        # attention_weights shape == (batch_size, max_length, 1)
        attention_weights = tf.nn.softmax(score, axis=1)

        # context_vector shape after sum == (batch_size, hidden_size)
        context_vector = attention_weights * enc_output
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.dec_units,
                                        return_sequences=True,
                                        return_state=True,
                                        recurrent_initializer='glorot_uniform')
        self.fc = tf.keras.layers.Dense(vocab_size)

        # used for attention
        self.attention = BahdanauAttention(self.dec_units)

    def call(self, dec_input, dec_hidden, enc_output):
        # enc_output shape == (batch_size, max_length, hidden_size)
        context_vector, attention_weights = self.attention(dec_hidden, enc_output)

        # x shape after passing through embedding == (batch_size, 1, embedding_dim)
        x = self.embedding(dec_input)

        # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

        # passing the concatenated vector to the GRU
        output, state = self.gru(x)

        # output shape == (batch_size * 1, hidden_size)
        output = tf.reshape(output, (-1, output.shape[2]))

        # output shape == (batch_size, vocab)
        x = self.fc(output)

        return x, state, attention_weights


In [5]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing import sequence

from matplotlib import ticker
from matplotlib import pyplot as plt
from matplotlib import font_manager as fm

FONT_NAME = 'assets/banglafonts/Siyamrupali.ttf'

class Infer():
    def __init__(self, input_language_tokenizer, target_language_tokenizer,
                max_length_input, max_length_target, encoder, decoder, units):
        self.input_language_tokenizer = input_language_tokenizer
        self.target_language_tokenizer = target_language_tokenizer
        self.max_length_input = max_length_input
        self.max_length_target = max_length_target
        self.encoder = encoder
        self.decoder = decoder
        self.units = units
    
    def preprocess(self, sentence):
        # clean and pad sequece
        sentence = clean_seq(sentence)
        sentence = add_start_and_end_token_to_seq(sentence)
        
        inputs = [
            self.input_language_tokenizer.word_index[i] for i in sentence.split(' ')]
        inputs = sequence.pad_sequences(
            [inputs], maxlen=self.max_length_input,padding='post')
        tensor = tf.convert_to_tensor(inputs)

        return tensor

    def predict(self, sentence):
        tensor = self.preprocess(sentence)

        # init encoder
        encoder_initial_hidden = [tf.zeros((1, self.units))]
        encoder_out, encoder_hidden = self.encoder(tensor, encoder_initial_hidden)

        # init decoder
        decoder_hidden = encoder_hidden
        decoder_input = tf.expand_dims(
            [self.target_language_tokenizer.word_index['<start>']], 0)

        result = ''
        for _ in range(self.max_length_target):
            predictions, decoder_hidden, _ = self.decoder(decoder_input, decoder_hidden, encoder_out)
            predicted_id = tf.argmax(predictions[0]).numpy()
            result += self.target_language_tokenizer.index_word[predicted_id] + ' '
            if self.target_language_tokenizer.index_word[predicted_id] == '<end>':
                return result
            # the predicted ID is fed back into the model insteqad of using 
            # teacher forcing that we use in training time
            decoder_input = tf.expand_dims([predicted_id], 0)

        return result

    def predict_with_attention_weights(self, sentence):
        tensor = self.preprocess(sentence)

        # init encoder
        encoder_initial_hidden = [tf.zeros((1, self.units))]
        encoder_out, encoder_hidden = self.encoder(tensor, encoder_initial_hidden)

        # init decoder
        decoder_hidden = encoder_hidden
        decoder_input = tf.expand_dims(
            [self.target_language_tokenizer.word_index['<start>']], 0)

        result = ''
        attention_plot = np.zeros((self.max_length_target, self.max_length_input))
        for t in range(self.max_length_target):
            predictions, decoder_hidden, attention_weights = \
                self.decoder(decoder_input, decoder_hidden, encoder_out)
            
            # storing the attention weights to plot later on
            attention_weights = tf.reshape(attention_weights, (-1, ))
            attention_plot[t] = attention_weights.numpy()

            predicted_id = tf.argmax(predictions[0]).numpy()
            result += self.target_language_tokenizer.index_word[predicted_id] + ' '
            if self.target_language_tokenizer.index_word[predicted_id] == '<end>':
                return result, sentence, attention_plot

            # the predicted ID is fed back into the model insteqad of using 
            # teacher forcing that we use in training time
            decoder_input = tf.expand_dims([predicted_id], 0)

        return result, sentence, attention_plot

# function for plotting the attention weights
def plot_attention(attention, sentence, predicted_sentence):
    prop = fm.FontProperties(fname=FONT_NAME)
    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(1, 1, 1)
    ax.matshow(attention, cmap='viridis')

    ax.set_xticklabels([''] + sentence, rotation=90, fontproperties=prop)
    ax.set_yticklabels([''] + predicted_sentence, fontproperties=prop)

    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.rcParams.update({'font.size': 14})

    plt.show()

In [None]:
import os
import time
import tensorflow as tf
from tqdm import tqdm
from sklearn.model_selection import train_test_split


loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_mean(loss_)

@tf.function
def train_step(inp, targ, targ_lang_tokenizer, 
            enc_hidden, encoder, decoder, optimizer):
    loss = 0
    with tf.GradientTape() as tape:
        enc_output, enc_hidden = encoder(inp, enc_hidden)
        dec_hidden = enc_hidden
        dec_input = tf.expand_dims(
            [targ_lang_tokenizer.word_index['<start>']]*BATCH_SIZE, 1)
        # Teacher forcing - feeding the target as the next input
        for t in range(1, targ.shape[1]):
            # passing enc_output to the decoder
            predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
            loss += loss_function(targ[:, t], predictions)
            # using teacher forcing
            dec_input = tf.expand_dims(targ[:, t], 1)

    batch_loss = (loss / int(targ.shape[1]))
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))

    return batch_loss

def run():
    text_data = TatoebaDataset('./nob-eng/nob.txt', NUM_DATA_TO_LOAD)
    
    # retrive data and tokenizers
    tensors, tokenizer = text_data.load_data()
    input_tensor, target_tensor = tensors 
    inp_lang_tokenizer, targ_lang_tokenizer = tokenizer

    # save tokenizer for further use
    save_tokenizer(
        tokenizer=inp_lang_tokenizer,
        save_at='./',
        file_name='input_language_tokenizer.json')
    save_tokenizer(
        tokenizer=targ_lang_tokenizer,
        save_at='./',
        file_name='target_language_tokenizer.json')  

    # Creating training and validation sets using an 80-20 split
    input_train, input_val, target_train, target_val = \
        train_test_split(input_tensor, target_tensor, test_size=0.2)

    # set training params
    buffer_size = len(input_train)
    steps_per_epoch = len(input_train) // BATCH_SIZE
    vocab_inp_size = len(inp_lang_tokenizer.word_index) + 1
    vocab_tar_size = len(targ_lang_tokenizer.word_index) + 1

    # convert data to tf.data formate
    dataset = tf.data.Dataset.from_tensor_slices((input_train, target_train))
    dataset = dataset.shuffle(buffer_size)
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

    # init optimizer
    optimizer = tf.keras.optimizers.Adam()
    
    # init encoder & decoder
    encoder = Encoder(
        vocab_inp_size, EMBEDDING_DIM, UNITS, BATCH_SIZE)
    decoder = Decoder(
        vocab_tar_size, EMBEDDING_DIM, UNITS, BATCH_SIZE)

    # init checkpoint 
    checkpoint_dir = './training_checkpoints'
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
    checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                    encoder=encoder,
                                    decoder=decoder)

    if RESTORE_SAVED_CHECKPOINT:
        checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

    for epoch in range(EPOCHS):
        print("Epoch {} / {}".format(epoch, EPOCHS))
        pbar = tqdm(dataset.take(steps_per_epoch), ascii=True)
        
        total_loss = 0
        enc_hidden = encoder.initialize_hidden_state()
        
        for step, data in enumerate(pbar):
            inp, targ = data
            batch_loss = train_step(
                inp, targ, targ_lang_tokenizer,
                enc_hidden, encoder, decoder, optimizer)
            
            total_loss += batch_loss

            pbar.set_description(
                "Step - {} / {} - batch loss - {:.4f} - "
                    .format(steps_per_epoch, step+1, batch_loss.numpy()))
        
        # saving (checkpoint) the model every 2 epochs
        if (epoch + 1) % 2 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)
        print('Epoch loss - {:.4f}'.format(total_loss / steps_per_epoch))

if __name__ == "__main__":
    run()
