https://keras.io/examples/lstm_seq2seq/

## Sequence to sequence example in Keras (character-level).

This script demonstrates how to implement a basic character-level sequence-to-sequence model. We apply it to translating short English sentences into short French sentences, character-by-character. Note that it is fairly unusual to do character-level machine translation, as word-level models are more common in this domain.

In [1]:
from keras.models import Model
from keras.layers import Input, LSTM, Dense
from keras.utils import Sequence
import numpy as np
import random
from typing import List, Dict, Tuple

Using TensorFlow backend.


In [2]:
def read_pairs(fn: str)-> List[Tuple]:
    res = []
    with open(fn, 'rt', encoding='utf-8') as f:
        next(f)  # skip header
        for line in f:
            line = line.strip()
            original, compressed = line.split('\t')
            res.append((original,compressed))
    return res

In [3]:
train_samples = read_pairs('sent-comp.train.tsv')
print(f"Total samples: {len(train_samples)}")
for s in train_samples[:3]:
    print(s)

Total samples: 200000
('Serge Ibaka -- the Oklahoma City Thunder forward who was born in the Congo but played in Spain -- has been granted Spanish citizenship and will play for the country in EuroBasket this summer, the event where spots in the 2012 Olympics will be decided.', 'Serge Ibaka has been granted Spanish citizenship and will play in EuroBasket.')
('MILAN -Catania held Roma to a 1-1 draw in Serie A on Wednesday as the teams played out the remaining 25 minutes of a game that was called off last month.', 'Catania held Roma to a 1 1 draw in Serie A.')
('State Street Corporation, a provider of investment servicing, investment management and investment research and trading services, has launched a new investment servicing solution to support small to mid-sized asset managers with their investment operations needs.', 'State Street Corporation, has launched a new investment servicing solution.')


In [4]:
eval_samples = read_pairs('comp-data.eval.tsv')
print(f"Total samples: {len(eval_samples)}")
for s in eval_samples[:3]:
    print(s)

Total samples: 10000
('Five people have been taken to hospital with minor injuries following a crash on the A17 near Sleaford this morning.', 'Five people have been taken to hospital with minor injuries following a crash on the A17 near Sleaford.')
("Several school districts in Hampton Roads are holding classes this Presidents' Day to make up for days missed because of the snow.", "Several school districts are holding classes this Presidents ' Day to make up for days missed.")
('Luis Suarez was spotted in London this afternoon and this has led the Daily Star to link the Liverpool striker to a potential move to Chelsea or Arsenal.', 'Luis Suarez was spotted in London.')


In [5]:
batch_size = 64  # Batch size for training.
epochs = 100  # Number of epochs to train for.
latent_dim = 256  # Latent dimensionality of the encoding space.

In [6]:
# Vectorize the data.
input_texts = []
target_texts = []
input_characters = set()
target_characters = set()
for s in train_samples+eval_samples:
    input_text, target_text = s
    # We use "tab" as the "start sequence" character
    # for the targets, and "\n" as "end sequence" character.
    target_text = '\t' + target_text + '\n'
    input_texts.append(input_text)
    target_texts.append(target_text)
    for char in input_text:
        if char not in input_characters:
            input_characters.add(char)
    for char in target_text:
        if char not in target_characters:
            target_characters.add(char)

input_characters = sorted(list(input_characters))
target_characters = sorted(list(target_characters))
num_encoder_tokens = len(input_characters)
num_decoder_tokens = len(target_characters)
max_encoder_seq_length = max([len(txt) for txt in input_texts])
max_decoder_seq_length = max([len(txt) for txt in target_texts])

print('Number of samples:', len(input_texts))
print('Number of unique input tokens:', num_encoder_tokens)
print('Number of unique output tokens:', num_decoder_tokens)
print('Max sequence length for inputs:', max_encoder_seq_length)
print('Max sequence length for outputs:', max_decoder_seq_length)

Number of samples: 210000
Number of unique input tokens: 250
Number of unique output tokens: 181
Max sequence length for inputs: 4769
Max sequence length for outputs: 245


In [7]:
train_input_texts = input_texts[0:-len(eval_samples)]
train_target_texts = target_texts[0:-len(eval_samples)]
eval_input_texts = input_texts[-len(eval_samples):]
eval_target_texts = target_texts[-len(eval_samples):]

In [8]:
input_token_index = dict(
    [(char, i) for i, char in enumerate(input_characters)])
target_token_index = dict(
    [(char, i) for i, char in enumerate(target_characters)])

In [9]:
class DataGenerator(Sequence):
    'Generates data for Keras'
    def __init__(self, input_texts, target_texts, batch_size=batch_size, shuffle=True):
        'Initialization'
        self.input_texts = input_texts
        self.target_texts = target_texts
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.input_texts) / self.batch_size))

    def __getitem__(self, index):        
        'Generate one batch of data'
        encoder_input_data = np.zeros(
            (self.batch_size, max_encoder_seq_length, num_encoder_tokens),
            dtype='float32')
        decoder_input_data = np.zeros(
            (self.batch_size, max_decoder_seq_length, num_decoder_tokens),
            dtype='float32')
        decoder_target_data = np.zeros(
            (self.batch_size, max_decoder_seq_length, num_decoder_tokens),
            dtype='float32')

        from_ind = index*self.batch_size
        to_ind = from_ind+self.batch_size
        for i, (input_text, target_text) in enumerate(zip(input_texts[from_ind:to_ind], target_texts[from_ind:to_ind])):
            for t, char in enumerate(input_text):
                encoder_input_data[i, t, input_token_index[char]] = 1.
            for t, char in enumerate(target_text):
                # decoder_target_data is ahead of decoder_input_data by one timestep
                decoder_input_data[i, t, target_token_index[char]] = 1.
                if t > 0:
                    # decoder_target_data will be ahead by one timestep
                    # and will not include the start character.
                    decoder_target_data[i, t - 1, target_token_index[char]] = 1.
        
        return [encoder_input_data, decoder_input_data], decoder_target_data

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        if self.shuffle == True:
            c = list(zip(self.input_texts, self.target_texts))
            random.shuffle(c)
            self.input_texts, self.target_texts = zip(*c)


In [None]:
encoder_input_data = np.zeros(
    (len(input_texts), max_encoder_seq_length, num_encoder_tokens),
    dtype='float32')
decoder_input_data = np.zeros(
    (len(input_texts), max_decoder_seq_length, num_decoder_tokens),
    dtype='float32')
decoder_target_data = np.zeros(
    (len(input_texts), max_decoder_seq_length, num_decoder_tokens),
    dtype='float32')

for i, (input_text, target_text) in enumerate(zip(input_texts, target_texts)):
    for t, char in enumerate(input_text):
        encoder_input_data[i, t, input_token_index[char]] = 1.
    for t, char in enumerate(target_text):
        # decoder_target_data is ahead of decoder_input_data by one timestep
        decoder_input_data[i, t, target_token_index[char]] = 1.
        if t > 0:
            # decoder_target_data will be ahead by one timestep
            # and will not include the start character.
            decoder_target_data[i, t - 1, target_token_index[char]] = 1.

In [10]:
training_generator = DataGenerator(train_input_texts, train_target_texts)
val_generator = DataGenerator(eval_input_texts, eval_target_texts)

In [None]:
# Define an input sequence and process it.
encoder_inputs = Input(shape=(None, num_encoder_tokens))
encoder = LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_inputs)
# We discard `encoder_outputs` and only keep the states.
encoder_states = [state_h, state_c]

# Set up the decoder, using `encoder_states` as initial state.
decoder_inputs = Input(shape=(None, num_decoder_tokens))
# We set up our decoder to return full output sequences,
# and to return internal states as well. We don't use the
# return states in the training model, but we will use them in inference.
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs,
                                     initial_state=encoder_states)
decoder_dense = Dense(num_decoder_tokens, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

# Define the model that will turn
# `encoder_input_data` & `decoder_input_data` into `decoder_target_data`
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

# Run training
model.compile(optimizer='rmsprop', loss='categorical_crossentropy')
model.fit_generator(generator=training_generator,
                    validation_data=val_generator,
                    epochs=5)
# Save model
# model.save('data/s2s.h5')

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Use tf.cast instead.
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
Epoch 1/5
  49/3125 [..............................] - ETA: 9:16:01 - loss: 0.7670

In [None]:
# Next: inference mode (sampling).
# Here's the drill:
# 1) encode input and retrieve initial decoder state
# 2) run one step of decoder with this initial state
# and a "start of sequence" token as target.
# Output will be the next target token
# 3) Repeat with the current target token and current states

# Define sampling models
encoder_model = Model(encoder_inputs, encoder_states)

decoder_state_input_h = Input(shape=(latent_dim,))
decoder_state_input_c = Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
decoder_outputs, state_h, state_c = decoder_lstm(
    decoder_inputs, initial_state=decoder_states_inputs)
decoder_states = [state_h, state_c]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs] + decoder_states)

# Reverse-lookup token index to decode sequences back to
# something readable.
reverse_input_char_index = dict(
    (i, char) for char, i in input_token_index.items())
reverse_target_char_index = dict(
    (i, char) for char, i in target_token_index.items())


def decode_sequence(input_seq):
    # Encode the input as state vectors.
    states_value = encoder_model.predict(input_seq)

    # Generate empty target sequence of length 1.
    target_seq = np.zeros((1, 1, num_decoder_tokens))
    # Populate the first character of target sequence with the start character.
    target_seq[0, 0, target_token_index['\t']] = 1.

    # Sampling loop for a batch of sequences
    # (to simplify, here we assume a batch of size 1).
    stop_condition = False
    decoded_sentence = ''
    while not stop_condition:
        output_tokens, h, c = decoder_model.predict(
            [target_seq] + states_value)

        # Sample a token
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = reverse_target_char_index[sampled_token_index]
        decoded_sentence += sampled_char

        # Exit condition: either hit max length
        # or find stop character.
        if (sampled_char == '\n' or
           len(decoded_sentence) > max_decoder_seq_length):
            stop_condition = True

        # Update the target sequence (of length 1).
        target_seq = np.zeros((1, 1, num_decoder_tokens))
        target_seq[0, 0, sampled_token_index] = 1.

        # Update states
        states_value = [h, c]

    return decoded_sentence

In [None]:
for seq_index in range(100):
    # Take one sequence (part of the training set)
    # for trying out decoding.
    input_seq = encoder_input_data[seq_index: seq_index + 1]
    decoded_sentence = decode_sequence(input_seq)
    print('-')
    print('Input sentence:', input_texts[seq_index])
    print('Decoded sentence:', decoded_sentence)

In [None]:
for seq_index in range(100):
    # Take one sequence (part of the training set)
    # for trying out decoding.
    input_seq = encoder_input_data[-seq_index-2: -seq_index-1]
    decoded_sentence = decode_sequence(input_seq)
    print('-')
    print('Input sentence:', input_texts[-seq_index-2])
    print('Decoded sentence:', decoded_sentence)

In [None]:
model.fit([encoder_input_data, decoder_input_data], decoder_target_data,
          batch_size=batch_size,
          epochs=5,
          validation_split=0.2)

In [None]:
for seq_index in range(100):
    # Take one sequence (part of the training set)
    # for trying out decoding.
    input_seq = encoder_input_data[-seq_index-2: -seq_index-1]
    decoded_sentence = decode_sequence(input_seq)
    print('-')
    print('Input sentence:', input_texts[-seq_index-2])
    print('Decoded sentence:', decoded_sentence)

In [None]:
model.fit([encoder_input_data, decoder_input_data], decoder_target_data,
          batch_size=batch_size,
          epochs=5,
          validation_split=0.2)

In [None]:
for seq_index in range(100):
    # Take one sequence (part of the training set)
    # for trying out decoding.
    input_seq = encoder_input_data[-seq_index-2: -seq_index-1]
    decoded_sentence = decode_sequence(input_seq)
    print('-')
    print('Input sentence:', input_texts[-seq_index-2])
    print('Decoded sentence:', decoded_sentence)