<a href="https://colab.research.google.com/github/riccardo1980/colab_bench/blob/master/seq2seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%tensorflow_version 2.x

In [2]:
import tensorflow
print(tensorflow.__version__)

2.3.0


In [3]:
import tensorflow as tf

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from sklearn.model_selection import train_test_split

import unicodedata
import re
import numpy as np
import os
import io
import time

# Dataset download
Resources: http://www.manythings.org/anki/

In [4]:
# Download the file
path_to_zip = tf.keras.utils.get_file(
    'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
    extract=True)

path_to_file = os.path.dirname(path_to_zip)+"/spa-eng/spa.txt"

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


# Preprocessing functions

In [5]:
def unicode_to_ascii(s: str) -> str:
  """
    Converts unicode string to ascii
  
    Non-spacing marks (Mn category) are discarded,
    see https://www.fileformat.info/info/unicode/category/Mn/list.htm 

    Applies Normalization Form C (NFC)

    :param s: unicode string
    :return: ascii string
  """

  return ''.join(c for c in unicodedata.normalize('NFD', s)
      if unicodedata.category(c) != 'Mn')


def preprocess_sentence(w: str) -> str:
  """
    Convert single string sentence
    
    1. unicode to ascii
    2. adds a space between word and following punctuation
    3. removes all chars except a-Z, A-Z, , ".", "?", "!", ",","¿"
    4. removes leading/trailing blanks
    5. adds start/end tokens

    :param w: unicode string string
    :return: cleaned string  
  """
  w = unicode_to_ascii(w.lower().strip())

  # creating a space between a word and the punctuation following it
  # eg: "he is a boy." => "he is a boy ."
  # Reference:- https://stackoverflow.com/questions/3645931/python-padding-punctuation-with-white-spaces-keeping-punctuation
  w = re.sub(r"([?.!,¿])", r" \1 ", w)
  w = re.sub(r'[" "]+', " ", w)

  # replacing everything with space except basic punctuation and alpha chars
  w = re.sub(r"[^a-zA-Z?.!,¿]+", " ", w)

  w = w.strip()

  # adding a start and an end token to the sentence
  # so that the model know when to start and stop predicting.
  w = '<start> ' + w + ' <end>'
  return w

## Test preprocessing functions

In [6]:
en_sentence = u"May I borrow this book?"
sp_sentence = u"¿Puedo tomar prestado este libro?"
print(preprocess_sentence(en_sentence))
print(preprocess_sentence(sp_sentence).encode('utf-8'))

<start> may i borrow this book ? <end>
b'<start> \xc2\xbf puedo tomar prestado este libro ? <end>'


# Dataset functions

In [7]:
from typing import Tuple, List

def create_dataset(path: str, num_examples: int) -> List[List[str]]:
  """
    Create pairs of sentences
  
    1. Remove the accents
    2. Clean the sentences
    3. Return sentences grouped by language]

    :param path: path to input file
    :param num_examples: maximum number of examples
    :return: tuple containing two list of sequences, one for each column in input file 
  """
  # each line contains two columns separated by tab character
  lines = io.open(path, encoding='UTF-8').read().strip().split('\n')

  # split lines, preprocess phrases, get a tuple for each line
  sentence_pairs = [[preprocess_sentence(w) for w in l.split('\t')]  for l in lines[:num_examples]]

  # rearrange to a tuple for each language
  return zip(*sentence_pairs)


def tokenize(lang: List[str]) -> Tuple[np.ndarray, tf.keras.preprocessing.text.Tokenizer]:
  """
    Fit a tokenizer on input list of sentences

    From words (string) to symbols (integer)

    :param lang: list of sentences of same language
    :return: a tuple of:
      a tensor of size [NUMBER_OF_SENTENCES, SENTENCE_SIZE] contaning the vectorizations of the sentences
      a learned tokenizer
  """

  # create vanilla tokenizer
  lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(
      filters='')
  
  # learn tokenization procedure on given set of sentences
  lang_tokenizer.fit_on_texts(lang)

  # transforms sentences in sequences of integers (sequences are of different lengths)
  tensor = lang_tokenizer.texts_to_sequences(lang)

  # pad sequenes
  tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor,
                                                         padding='post')

  return tensor, lang_tokenizer

def load_dataset(path, num_examples=None) -> Tuple[np.ndarray, np.ndarray, tf.keras.preprocessing.text.Tokenizer, tf.keras.preprocessing.text.Tokenizer]:
  """
    Load dataset, with preprocessing and tokenization

    :param path: path to input file
     :param num_examples: maximum number of examples
  """

  # creating cleaned input, output pairs
  targ_lang, inp_lang = create_dataset(path, num_examples)

  # tokenization
  input_tensor, inp_lang_tokenizer = tokenize(inp_lang)
  target_tensor, targ_lang_tokenizer = tokenize(targ_lang)

  return input_tensor, target_tensor, inp_lang_tokenizer, targ_lang_tokenizer

## Test dataset functions

In [8]:
target_sentences, input_sentences = create_dataset(path_to_file, None)
print(target_sentences[-1])
print(input_sentences[-1])

<start> if you want to sound like a native speaker , you must be willing to practice saying the same sentence over and over in the same way that banjo players practice the same phrase over and over until they can play it correctly and at the desired tempo . <end>
<start> si quieres sonar como un hablante nativo , debes estar dispuesto a practicar diciendo la misma frase una y otra vez de la misma manera en que un musico de banjo practica el mismo fraseo una y otra vez hasta que lo puedan tocar correctamente y en el tiempo esperado . <end>


In [9]:
num_examples = 30000
input_tensor, target_tensor, inp_lang_tokenizer, targ_lang_tokenizer = load_dataset(path_to_file, num_examples)

In [10]:
target_sentences[0]

'<start> go . <end>'

In [11]:
targ_lang_tokenizer.texts_to_sequences([['go']])

[[36]]

In [12]:
targ_lang_tokenizer.sequences_to_texts(
    [np.arange(20)]
)

['<start> <end> . i tom you ? is a it s t the he to we me m this']

In [13]:
max_length_targ, max_length_inp = target_tensor.shape[1], input_tensor.shape[1]
print((max_length_targ, max_length_inp))

(11, 16)


# Configuration

In [14]:
num_examples = 30000

BATCH_SIZE = 64
embedding_dim = 256
units = 1024


# Dataset creation

In [15]:
input_tensor, target_tensor, inp_lang_tokenizer, targ_lang_tokenizer = load_dataset(path_to_file, num_examples)
input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(input_tensor, target_tensor, test_size=0.2)

In [16]:
BUFFER_SIZE = len(input_tensor_train)

dataset = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

In [17]:
vocab_inp_size = len(inp_lang_tokenizer.word_index)+1
vocab_tar_size = len(targ_lang_tokenizer.word_index)+1

steps_per_epoch = len(input_tensor_train)//BATCH_SIZE
max_length_targ, max_length_inp = target_tensor.shape[1], input_tensor.shape[1]

# Encoder

In [18]:
class Encoder(tf.keras.Model):
  def __init__(self, vocab_size: int, 
               embedding_dim: int, units: int, batch_size: int):
    """
      Initialize the encoder

      :param vocab_size: size of the vocabulary, i.e. maximum integer index + 1.
      :param embedding_dim: dimension of the dense embedding
      :param units: dimensionality of the output space.
      :param batch_size: batch size

    """
    super(Encoder, self).__init__()
    self.batch_size = batch_size
    self.units = units

    # initialize embedding layer (from symbols to dense vectors)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)

    # initialize recurrent cells
    self.gru = tf.keras.layers.GRU(self.units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    
  def call(self, x: tf.Tensor, hidden: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    """
      Forward pass

      :param x: input tensor of shape (batch_size, max_input_length)
      :param hidden: hidden state initialization tensor of shape (batch_size, units)
      
      :return:
        output tensor of shape (batch_size, sequence_length, units)
        state tensor of shape (batch_size, units)
    """
    x = self.embedding(x)
    output, state = self.gru(x, initial_state=hidden)

    return output, state

  def initialize_hidden_state(self) -> tf.Tensor:
    """
      Initialize hidden state

      :return: a tensor of shape (batch_size, units)
    """
    return tf.zeros((self.batch_size, self.units))

## Test encoder

In [19]:
encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)

In [20]:
sample_hidden = encoder.initialize_hidden_state()

# sample input
example_input_batch, example_target_batch = next(iter(dataset))
print((example_input_batch.shape, example_target_batch.shape))

sample_output, sample_hidden = encoder(example_input_batch, sample_hidden)
print ('Encoder output shape: (batch size, sequence length, units) {}'.format(sample_output.shape))
print ('Encoder Hidden state shape: (batch size, units) {}'.format(sample_hidden.shape))

(TensorShape([64, 16]), TensorShape([64, 11]))
Encoder output shape: (batch size, sequence length, units) (64, 16, 1024)
Encoder Hidden state shape: (batch size, units) (64, 1024)


In [21]:
type(units)

int

In [22]:
output_np = sample_output.numpy()
state_np = sample_hidden.numpy()

In [23]:
np.linalg.norm(output_np.transpose([1,0,2])[-1] - state_np)

0.0

# Decoder

In [24]:
class Decoder(tf.keras.Model):
  def __init__(self, vocab_size: int, 
               embedding_dim: int, units: int, batch_size: int):
    """
      Initialize the decoder

      :param vocab_size: size of the vocabulary, i.e. maximum integer index + 1.
      :param embedding_dim: dimension of the dense embedding
      :param units: dimensionality of the output space.
      :param batch_size: batch size

    """
    super(Decoder, self).__init__()
    self.batch_size = batch_size
    self.units = units

    # initialize embedding layer (from symbols to dense vectors)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)


    # initialize recurrent cells
    self.gru = tf.keras.layers.GRU(self.units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    
    self.fc = tf.keras.layers.Dense(vocab_size)

  def call(self, x: tf.Tensor, hidden: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    """
      Forward pass

      :param x: input tensor of shape (batch_size, max_input_length)
      :param hidden: hidden state initialization tensor of shape (batch_size, units)
      
      :return:
        output tensor of shape (batch_size, sequence_length, units)
        state tensor of shape (batch_size, units)
    """
    x = self.embedding(x)
    output, state = self.gru(x, initial_state=hidden)

    # output shape == (batch_size * 1, hidden_size)
    output = tf.reshape(output, (-1, output.shape[2]))

    # output shape == (batch_size, vocab)
    x = self.fc(output)

    return x, state

In [25]:
decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)

sample_decoder_output, _ = decoder(tf.random.uniform((BATCH_SIZE, 1)),
                                      sample_hidden)

print ('Decoder output shape: (batch_size, vocab size) {}'.format(sample_decoder_output.shape))

Decoder output shape: (batch_size, vocab size) (64, 4935)


# Optimizer and loss function

In [26]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_mean(loss_)

In [27]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                 encoder=encoder,
                                 decoder=decoder)

In [28]:
@tf.function
def train_step(inp, targ, enc_hidden):
  loss = 0

  with tf.GradientTape() as tape:
    enc_output, enc_hidden = encoder(inp, enc_hidden)

    dec_hidden = enc_hidden

    dec_input = tf.expand_dims([targ_lang_tokenizer.word_index['<start>']] * BATCH_SIZE, 1)

    # Teacher forcing - feeding the target as the next input
    for t in range(1, targ.shape[1]):
      # NO passing enc_output to the decoder
      predictions, dec_hidden = decoder(dec_input, dec_hidden)

      loss += loss_function(targ[:, t], predictions)

      # using teacher forcing
      dec_input = tf.expand_dims(targ[:, t], 1)

  batch_loss = (loss / int(targ.shape[1]))

  variables = encoder.trainable_variables + decoder.trainable_variables

  gradients = tape.gradient(loss, variables)

  optimizer.apply_gradients(zip(gradients, variables))

  return batch_loss

In [29]:
EPOCHS = 10

for epoch in range(EPOCHS):
  start = time.time()

  enc_hidden = encoder.initialize_hidden_state()
  total_loss = 0

  for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
    batch_loss = train_step(inp, targ, enc_hidden)
    total_loss += batch_loss

    if batch % 100 == 0:
      print('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1,
                                                   batch,
                                                   batch_loss.numpy()))
  # saving (checkpoint) the model every 2 epochs
  if (epoch + 1) % 2 == 0:
    checkpoint.save(file_prefix = checkpoint_prefix)

  print('Epoch {} Loss {:.4f}'.format(epoch + 1,
                                      total_loss / steps_per_epoch))
  print('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

Epoch 1 Batch 0 Loss 4.6742
Epoch 1 Batch 100 Loss 2.0431
Epoch 1 Batch 200 Loss 1.8460
Epoch 1 Batch 300 Loss 1.6160
Epoch 1 Loss 1.9543
Time taken for 1 epoch 30.616573810577393 sec

Epoch 2 Batch 0 Loss 1.5295
Epoch 2 Batch 100 Loss 1.3625
Epoch 2 Batch 200 Loss 1.3609
Epoch 2 Batch 300 Loss 1.3084
Epoch 2 Loss 1.3455
Time taken for 1 epoch 21.003697156906128 sec

Epoch 3 Batch 0 Loss 1.0701
Epoch 3 Batch 100 Loss 0.9792
Epoch 3 Batch 200 Loss 0.9908
Epoch 3 Batch 300 Loss 1.0387
Epoch 3 Loss 1.0093
Time taken for 1 epoch 20.7595374584198 sec

Epoch 4 Batch 0 Loss 0.7245
Epoch 4 Batch 100 Loss 0.7645
Epoch 4 Batch 200 Loss 0.7615
Epoch 4 Batch 300 Loss 0.7333
Epoch 4 Loss 0.7598
Time taken for 1 epoch 21.409523725509644 sec

Epoch 5 Batch 0 Loss 0.5377
Epoch 5 Batch 100 Loss 0.5584
Epoch 5 Batch 200 Loss 0.5793
Epoch 5 Batch 300 Loss 0.5630
Epoch 5 Loss 0.5618
Time taken for 1 epoch 21.072704553604126 sec

Epoch 6 Batch 0 Loss 0.4124
Epoch 6 Batch 100 Loss 0.4024
Epoch 6 Batch 200 L

# Translate

In [124]:
def evaluate(sentence):
  sentence = preprocess_sentence(sentence)

  inputs = [inp_lang_tokenizer.word_index[i] for i in sentence.split(' ')]
  inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],
                                                         maxlen=max_length_inp,
                                                         padding='post')
  inputs = tf.convert_to_tensor(inputs)

  result = ''

  hidden = [tf.zeros((1, units))]
  enc_out, enc_hidden = encoder(inputs, hidden)

  dec_hidden = enc_hidden
  dec_input = tf.expand_dims([targ_lang_tokenizer.word_index['<start>']], 0)

  for t in range(max_length_targ):
    predictions, dec_hidden = decoder(dec_input,
                                      dec_hidden)

    predicted_id = tf.argmax(predictions[0]).numpy()

    predicted_word = targ_lang_tokenizer.index_word[predicted_id]
    result += predicted_word + ' '

    if predicted_word == '<end>':
      return result.strip(), sentence

    # the predicted ID is fed back into the model
    dec_input = tf.expand_dims([predicted_id], 0)

  return result, sentence

In [125]:
def translate(sentence):
  result, sentence = evaluate(sentence)

  print('Input: %s' % (sentence))
  print('Predicted translation: {}'.format(result))

# Restore checkpoint

In [126]:
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7ff7005c1518>

In [127]:
translate(u'Buenos días.')

Input: <start> buenos dias . <end>
Predicted translation: good morning . <end>


In [128]:
translate(u'hace mucho frio aqui.')

Input: <start> hace mucho frio aqui . <end>
Predicted translation: it s very busy . <end>


In [129]:
translate(u'esta es mi vida.')

Input: <start> esta es mi vida . <end>
Predicted translation: this is my life . <end>


In [130]:
translate(u'¿todavia estan en casa?')

Input: <start> ¿ todavia estan en casa ? <end>
Predicted translation: are you still at home ? <end>


In [131]:
translate(u'trata de averiguarlo.')

Input: <start> trata de averiguarlo . <end>
Predicted translation: try to figure it out . <end>


In [132]:
type(target_tensor_train)

numpy.ndarray

In [134]:
testset = tf.data.Dataset.from_tensor_slices((input_tensor_val , target_tensor_val)).shuffle(BUFFER_SIZE)
testset = testset.batch(1, drop_remainder=True)

In [136]:
 for (batch, (inp, targ)) in enumerate(testset.take(10)):
   inp_sentence = ' '.join(inp_lang_tokenizer.sequences_to_texts(inp.numpy())[0].split(' ')[1:-1])
   targ_sentence = ' '.join(targ_lang_tokenizer.sequences_to_texts(targ.numpy())[0].split(' ')[1:-1])

   predicted_sentence, _ = evaluate(inp_sentence)
   predicted_sentence = ' '.join(predicted_sentence.split(' ')[:-1])
   print('\ninput: {}'.format(inp_sentence))
   print('target: {}'.format(targ_sentence))
   print('predicted: {}'.format(predicted_sentence))


input: ¿ puedes ir con nosotras ?
target: can you go with us ?
predicted: can you go with us ?

input: nadie lo quiere .
target: nobody wants it .
predicted: nobody likes her .

input: me he mudado .
target: i moved .
predicted: i ve been shot .

input: queremos negociar .
target: we want to negotiate .
predicted: we want justice .

input: estoy hastiada de pescado .
target: i m sick of fish .
predicted: i m sure of you .

input: odio los mosquitos .
target: i hate mosquitoes .
predicted: i hate raccoons .

input: cuenta con eso .
target: count on it .
predicted: i ll take it .

input: reemplazadlo .
target: replace it .
predicted: make it short .

input: tom ama su empleo .
target: tom loves his job .
predicted: tom loves his work .

input: yo no le temo a la muerte .
target: i don t fear death .
predicted: i fear no music .
