<a href="https://colab.research.google.com/github/synabreu/AIHUB/blob/master/seq2seq_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Sequence-to-Sequence learning at character-level using Keras

This script demonstrates how to implement a basic character-level sequence-to-sequence model. I apply it to translating short English sentences into Korean sentences by character-by-character. 
Note that it is fairly unusual to do character-level machine translation, as word-level models are more common in this domain.

In [0]:
from __future__ import print_function

In [0]:
from keras.models import Model
from keras.layers import Input, LSTM, Dense
import numpy as np
import pandas as pd

New Section

In [0]:
# Batch Size of training dataset, 일반적으로 64, 129
batch_size = 64 

# Number of epochs to train for.
epochs = 300 # 100 일때는 batch size 64, 200 일때 domain unmatched. 500 - 1000 일때는 128

# Latent dimensionality of the encoding space. #256 is default.
latent_dim = 1024 

# Number of samples to train on
num_samples = 10000

In [0]:
# Path to the data txt file on google drive. 
from google.colab import drive
drive.mount('/content/drive')

# data_path = 'drive/My Drive/Colab Notebooks/kor-utf8_2.txt'  
data_path = 'drive/My Drive/Colab Notebooks/KOR_190920_UTF8.txt'  


In [0]:
# Read sample data if data reads successful or not. 
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

lines= pd.read_table(data_path, names=['eng', 'kor'])
lines = lines[0:10000]
lines.shape
lines.sample(10)

In [0]:
# Initialize variables, arrays and set for Vectorizing the data.
input_texts = []
target_texts = []
input_characters = set()
target_characters = set()

In [0]:
with open(data_path, encoding='UTF8') as f:
# with open(data_path, encoding='UTF16') as f:
    lines = f.read().split('\n')

for line in lines[: min(num_samples, len(lines) - 1)]:
    input_text, target_text = line.split('\t')
    
    # we use "tab(\t)" as the "start sequence" character for the targets, and "\n" as "end sequence" character
    target_text = '\t' + target_text + '\n'
    
    input_texts.append(input_text)
    target_texts.append(target_text)
    
    for char in input_text:
        if char not in input_characters:
            input_characters.add(char)
            
    for char in target_text:
        if char not in target_characters:
            target_characters.add(char)


In [0]:
input_characters = sorted(list(input_characters))
target_characters = set(list(target_characters))

num_encoder_tokens = len(input_characters)
num_decoder_tokens = len(target_characters)

max_encoder_seq_length = max([len(txt) for txt in input_texts])
max_decoder_seq_length = max([len(txt) for txt in target_texts])


In [0]:
print('Number of Samples:', len(input_texts))
print('Number of Unique Input Tokens:', num_encoder_tokens)
print('Number of Unique Output Tokens:', num_decoder_tokens)
print('Max Sequence Length for Inputs:', max_encoder_seq_length)
print('Max Sequence Length of Outputs:', max_decoder_seq_length)

In [0]:
input_token_index = dict([(char, i) for i, char in enumerate(input_characters)])
target_token_index = dict([(char, i) for i, char in enumerate(target_characters)])

In [0]:
encoder_input_data = np.zeros((len(input_texts), max_encoder_seq_length, num_encoder_tokens), dtype='float32')
decoder_input_data = np.zeros((len(input_texts), max_decoder_seq_length, num_decoder_tokens), dtype='float32')
decoder_target_data = np.zeros((len(input_texts), max_decoder_seq_length, num_decoder_tokens), dtype='float32')

In [0]:
for i, (input_text, target_text) in enumerate(zip(input_texts, target_texts)):
    
    for t, char in enumerate(input_text):
        encoder_input_data[i, t, input_token_index[char]] = 1.
    encoder_input_data[i, t+1:, input_token_index[' ']] = 1.
    
    for t, char in enumerate(target_text):
        # decoder_target_data is ahead of decoder_input_data by one timestep.
        decoder_input_data[i, t, target_token_index[char]] = 1.
        if t > 0:
            # decoder_target_data will be ahead by one timestep and will not include the start character.
            decoder_target_data[i, t - 1, target_token_index[char]] = 1.
            
    decoder_input_data[i, t+1:, target_token_index[' ']] = 1.
    decoder_target_data[i, t:, target_token_index[' ']] = 1.

In [0]:
# define an input sequence and process it.
encoder_inputs = Input(shape=(None, num_encoder_tokens))
encoder = LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_inputs)

# discard 'encoder_outputs' and 'encoder_states' as initial state.
encoder_states = [state_h, state_c]

In [0]:
# Set up the decoder, using 'encoder_states' as inital state.
decoder_inputs = Input(shape=(None, num_decoder_tokens))

In [0]:
# set up our decoder to return full output sequences, and to return internal states as well.
# Don't use the return states in the training model, but I will use them in inference.  
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)
decoder_dense = Dense(num_decoder_tokens, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

In [0]:
# Define the model that will turn 'encoder_input_data' into 'decoder_target_data'
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [0]:
# model compile
model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'])

In [0]:
model.fit([encoder_input_data, decoder_input_data], decoder_target_data,
         batch_size=batch_size, epochs=epochs, validation_split=0.2)

In [0]:
# evaluate the model
# scores = model.evaluate(encoder_inputs, encoder_states, verbose=0)
# print("%s: %.2f%%" % (model.metrics_names[1], scores[1]*100))


# save the model
save_path = 'drive/My Drive/Colab Notebooks/s2s_0505_01.h5'  
model.save(save_path)

# summerize the model
model.summary()

# Inference mode (sampling)

1) encode input and retrieve initial decoer state.

2) run one step of decoder with this initial state and a "start of sequence' token as target.

3) Output will be the next target token.

4) Repeat with the current target token and current states.

In [0]:
#define sampling models
encoder_model = Model(encoder_inputs, encoder_states)

decoder_state_input_h = Input(shape=(latent_dim, ))
decoder_state_input_c = Input(shape=(latent_dim, ))

decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
decoder_states = [state_h, state_c]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)

In [0]:
# Reverse lookup token index to decode sequences back to something readable.
reverse_input_char_index = dict((i, char) for char, i in input_token_index.items())
reverse_target_char_index = dict((i,char) for char, i in target_token_index.items())

In [0]:
def decode_sequence(input_seq):

  # Encode the input as state vectors.
  states_value = encoder_model.predict(input_seq)

  # Generate empty target sequence of length 1.
  target_seq = np.zeros((1, 1, num_decoder_tokens))
  # Populate the first character of target sequence with the start character.
  target_seq[0, 0, target_token_index['\t']] = 1. 

  # Sampling loop for a batch of sequences to simplify it. and assume that the batch size of 1
  stop_condition = False
  decoded_sentence = ''

  while not stop_condition:
    output_tokens, h, c = decoder_model.predict([target_seq] + states_value)

    # Sample a token
    sampled_token_index = np.argmax(output_tokens[0, -1, :])
    sampled_char = reverse_target_char_index[sampled_token_index]
    decoded_sentence += sampled_char 

    # Exit condition either hit max length or find <stop> chracter.
    if (sampled_char == '\n' or len(decoded_sentence) > max_decoder_seq_length):
      stop_condition = True

    # Update the target sequence or length 1. 
    target_seq = np.zeros((1, 1, num_decoder_tokens))
    target_seq[0, 0, sampled_token_index] = 1. 

    # Update states.
    states_value = [h, c]

  return decoded_sentence


In [0]:
for seq_index in range(100):
  # Take one sequence (part of the training set) for trying out decoding.
  input_seq = encoder_input_data[seq_index: seq_index + 1]
  decoded_sentence = decode_sequence(input_seq)

  print('-')
  print('Input sentence:', input_texts[seq_index])
  print('Decoded sentence:', decoded_sentence)