<a href="https://colab.research.google.com/github/mhuckvale/pals0039/blob/master/Demo_Seq2Seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Demonstration of Sequence-to-Sequence processing

This demonstration adapted from [https://keras.io/examples/lstm_seq2seq/](https://keras.io/examples/lstm_seq2seq/)

In [0]:
import numpy as np

%tensorflow_version 2.x
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense
from tensorflow.keras.utils import to_categorical


Synthesize some training data. The task is to take a string of alphabetic characters as input and to output the same list reversed in order and converted to upper case.  For example "abcde" -> "EDCBA".

In [0]:
# input and output vocabularies
IVOCAB={ 0:'#', 1:'a', 2:'b', 3:'c', 4:'d', 5:'e'}
OVOCAB={ 0:'#', 1:'A', 2:'B', 3:'C', 4:'D', 5:'E'}

# create some random sequences
seqlen=8
nseq=10000
xdata=[]    # encoder input
ydata=[]    # decoder output
tdata=[]    # decoder input
for i in range(nseq):
  seq=[]
  for j in range(seqlen):
    seq.append(1+np.random.choice(len(IVOCAB)-1))
  seq.append(0)
  xdata.append(seq)
  seq=seq[::-1]
  ydata.append(seq[1:])
  tdata.append(seq[:-1])

# split into train and test
ntrain=int(0.9*nseq)
xtrain=xdata[:ntrain]
ytrain=ydata[:ntrain]
ttrain=tdata[:ntrain]
xtest=xdata[ntrain:]
ytest=ydata[ntrain:]
ttest=tdata[ntrain:]

def inputstr(seq):
  seq=[IVOCAB[v] for v in seq]
  return ''.join(seq)
def outputstr(seq):
  seq=[OVOCAB[v] for v in seq]
  return ''.join(seq)

def printseq(x,t,y):
  print("Encoder-Input",inputstr(x),"Decoder-Input",outputstr(t),"Decoder-Output",outputstr(y))

for i in range(5):
  printseq(xtrain[i],ttrain[i],ytrain[i])



Build an encoder-decoder pair for training

In [0]:
isize=len(IVOCAB)
latent_dim=100
osize=len(OVOCAB)

# build encoder network
encoder_inputs = Input(shape=(None, isize))
encoder = LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_inputs)
# We discard `encoder_outputs` and only keep the states.
encoder_states = [state_h, state_c]

# Set up the decoder, using `encoder_states` as initial state.
decoder_inputs = Input(shape=(None, osize))
# We set up our decoder to return full output sequences,
# and to return internal states as well. We don't use the
# return states in the training model, but we will use them in inference.
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs,
                                     initial_state=encoder_states)
decoder_dense = Dense(osize, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

# Define the model that will turn
# `encoder_input_data` & `decoder_input_data` into `decoder_target_data`
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

# Run training
model.compile(optimizer='rmsprop', loss='categorical_crossentropy',
              metrics=['accuracy'])
print(model.summary())


Build the training data and train model

In [0]:
# convert our sequences to one-hot coding
encoder_input_data=to_categorical(xtrain)
decoder_input_data=to_categorical(ttrain)
decoder_target_data=to_categorical(ytrain)
print(encoder_input_data.shape)
print(decoder_input_data.shape)
print(decoder_target_data.shape)


In [0]:
epochs = 25  # Number of epochs to train for.
batch_size = 64  # Batch size for training.

model.fit([encoder_input_data, decoder_input_data], decoder_target_data,
          batch_size=batch_size,
          epochs=epochs,
          validation_split=0.05)

Build a sequence encoder-decoder

In [0]:
# Define sampling models
encoder_model = Model(encoder_inputs, encoder_states)

decoder_state_input_h = Input(shape=(latent_dim,))
decoder_state_input_c = Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
decoder_states = [state_h, state_c]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)

def decode_sequence(input_seq):
  # reshape input into tensor of size 1
  input_tensor=input_seq.reshape(1,input_seq.shape[0],input_seq.shape[1])
  # Encode the input as state vectors.
  states_value = encoder_model.predict(input_tensor)
  # Generate empty target sequence of length 1.
  target_seq = np.zeros((1, 1, osize))
  # Populate the first element of target sequence with the start symbol #.
  target_seq[0, 0, 0] = 1.

  # Sampling loop for a batch of sequences
  # (to simplify, here we assume a batch of size 1).
  stop_condition = False
  decoded_sentence = ''
  while not stop_condition:
    output_tokens, h, c = decoder_model.predict([target_seq] + states_value)
    # Sample a token
    sampled_token_index = np.argmax(output_tokens[0, -1, :])
    sampled_char = OVOCAB[sampled_token_index]

    # Exit condition: either hit max length
    # or find stop character.
    if (sampled_char == '#' or len(decoded_sentence) >= seqlen):
      stop_condition = True
    else:
      decoded_sentence += sampled_char

    # Update the target sequence (of length 1).
    target_seq = np.zeros((1, 1, osize))
    target_seq[0, 0, sampled_token_index] = 1.

    # Update states
    states_value = [h, c]

  return decoded_sentence

Test the encoder-decoder with some test data

In [0]:

encoder_input_test=to_categorical(xtest)

for i in range(10):
  inp=inputstr(xtest[i])
  tar=outputstr(ytest[i])
  out=decode_sequence(encoder_input_test[i])
  print("Input",inp,"Target",tar,"Predicted",out,"OK" if out==tar else "Fail")