# Phonics-level recurrent sequence-to-sequence translation model

**Author:** Tsz Lung<br>
**Date created:** 2021/09/29<br>
**Last modified:** 2021/09/29<br>
**Description:** phonics-level recurrent sequence-to-sequence translation model


## Introduction

This example demonstrates how to implement a basic phonics-level
recurrent sequence-to-sequence translation model. We apply it to translating
single word with accent into single word with standard phoneme,
phoneme-by-phoneme. Note that it is fairly unusual to
do phoneme-level machine translation, as word-level
models are more common in this domain.

**Summary of the algorithm**

- We start with input sequences from a domain (e.g. single word)
    and corresponding target sequences from another domain
    (e.g. single word).
- An encoder LSTM turns input sequences to 2 state vectors
    (we keep the last LSTM state and discard the outputs).
- A decoder LSTM is trained to turn the target sequences into
    the same sequence but offset by one timestep in the future,
    a training process called "teacher forcing" in this context.
    It uses as initial state the state vectors from the encoder.
    Effectively, the decoder learns to generate `targets[t+1...]`
    given `targets[...t]`, conditioned on the input sequence.
- In inference mode, when we want to decode unknown input sequences, we:
    - Encode the input sequence into state vectors
    - Start with a target sequence of size 1
        (just the start-of-sequence character)
    - Feed the state vectors and 1-char target sequence
        to the decoder to produce predictions for the next phoneme
    - Sample the next phoneme using these predictions
        (we simply use argmax).
    - Append the sampled phoneme to the target sequence
    - Repeat until we generate the end-of-sequence character or we
        hit the character limit.


## Setup


In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
import re
import json
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical


## Load phonics dictionary

In [3]:

with open('../data/cmusphinxdict.json') as f:
  phoneme_dictionary = json.load(f)

#cmu phoneme list
cmu_phoneme = [
            'SPACE',
            'AA',
            'AE',
            'AH',
            'AO',
            'AW',
            'AY',
            'B',
            'CH',
            'D',
            'DH',
            'EH',
            'ER',
            'EY',
            'F',
            'G',
            'HH',
            'IH',
            'IY',
            'JH',
            'K',
            'L',
            'M',
            'N',
            'NG',
            'OW',
            'OY',
            'P',
            'R',
            'S',
            'SH',
            'T',
            'TH',
            'UH',
            'UW',
            'V',
            'W',
            'Y',
            'Z',
            'ZH',
        ] 


def word_to_phonics(input_word):
    phonics_blending = []
    for word in phoneme_dictionary:
        if (input_word == word):
#             print(input_word)
            for phoneme in phoneme_dictionary[word]:
                phoneme = re.split(' ',phoneme)
                phonics_blending.append(phoneme)
       
    return phonics_blending

def get_phoneme_index(input_phoneme):
    phoneme_index = 0
    
    for phoneme in cmu_phoneme:
        if (phoneme.upper() == input_phoneme):
            break

        phoneme_index += 1
    
    return phoneme_index

In [4]:
phoneme_dictionary


{'A': ['AH', 'EY'],
 "A'S": ['EY Z'],
 'A.': ['EY'],
 "A.'S": ['EY Z'],
 'A.S': ['EY Z'],
 'A42128': ['EY F AO R T UW W AH N T UW EY T'],
 'AAA': ['T R IH P AH L EY'],
 'AABERG': ['AA B ER G'],
 'AACHEN': ['AA K AH N'],
 'AACHENER': ['AA K AH N ER'],
 'AAKER': ['AA K ER'],
 'AALSETH': ['AA L S EH TH'],
 'AAMODT': ['AA M AH T'],
 'AANCOR': ['AA N K AO R'],
 'AARDEMA': ['AA R D EH M AH'],
 'AARDVARK': ['AA R D V AA R K'],
 'AARON': ['EH R AH N'],
 "AARON'S": ['EH R AH N Z'],
 'AARONS': ['EH R AH N Z'],
 'AARONSON': ['EH R AH N S AH N', 'AA R AH N S AH N'],
 "AARONSON'S": ['EH R AH N S AH N Z', 'AA R AH N S AH N Z'],
 'AARTI': ['AA R T IY'],
 'AASE': ['AA S'],
 'AASEN': ['AA S AH N'],
 'AB': ['AE B', 'EY B IY'],
 'ABABA': ['AH B AA B AH', 'AA B AH B AH'],
 'ABACHA': ['AE B AH K AH'],
 'ABACK': ['AH B AE K'],
 'ABACO': ['AE B AH K OW'],
 'ABACUS': ['AE B AH K AH S'],
 'ABAD': ['AH B AA D'],
 'ABADAKA': ['AH B AE D AH K AH'],
 'ABADI': ['AH B AE D IY'],
 'ABADIE': ['AH B AE D IY'],
 'ABAIR'

## Prepare the data


In [18]:
#read training data
input_data_list = []
output_data_list = []
with open('wordfun_phonics.txt') as f:
    lines = f.readlines()
    
count = 0    
for line in lines:
    count += 1
    #print(f'line {count}: {line}')    
    input_output = re.split('; |, |\n',line)
    #output = input_output[0]
    #print(output)
    wordCount =0
    output_word = ''
    for word in input_output:
        if wordCount == 0:
            output_word = word
        else:
            if word != '':
                input_data_list.append(word)
                output_data_list.append(output_word)
        
        wordCount += 1
        
print(f'size of training data {len(input_data_list)}')

size of training data 739


In [19]:
## Convert word list to phonemic list

input_data_phonics_list = []
output_data_phoics_list = []

input_phonemes = set()
target_phonemes = set() 

input_data_index = 0

# label_encoder = LabelEncoder()
# y_encoded_0 = label_encoder.fit_transform(output_data_list)
# y_encoded = to_categorical(y_encoded_0)

# total_output_classes = len(label_encoder.classes_)
# print(f'total_output_classes {total_output_classes}')
# print(f'total y_encoded {len(y_encoded)}')

# #generate dummy data by duplicate current data
# for class_label in label_encoder.classes_:
#     for index in range(50):
#         input_data_list.append(class_label)
#         output_data_list.append(class_label)

# y_encoded_0 = label_encoder.fit_transform(output_data_list)
# y_encoded = to_categorical(y_encoded_0)

# print(f'size of training data  with dummy data {len(input_data_list)}')
# print(f'total y_encoded {len(y_encoded)}')

for input_data in input_data_list:
#     print(input_data.upper())
#     print(output_data_list[input_data_index].upper())
#     print(y_encoded_0[input_data_index])

    seperated_input_word_list = re.split(' ',input_data.upper())
    
    for single_input_word in seperated_input_word_list:
        
        for phonics_blending in word_to_phonics(single_input_word):
#             print(phonics_blending)
            single_word_phoneme_blending_index = []
            for phoneme in phonics_blending:
                #print(get_phoneme_index(phoneme))
                single_word_phoneme_blending_index.append(phoneme)
                
#             print(output_data_list[input_data_index])
            for target_word_phoneme in word_to_phonics(output_data_list[input_data_index].upper()):
#                 print(target_word_phoneme)
                target_word_phoneme_blending_index = []
                target_word_phoneme_blending_index.append('\t')
                for phoneme in target_word_phoneme:
                    #print(get_phoneme_index(phoneme))
                    target_word_phoneme_blending_index.append(phoneme)
                target_word_phoneme_blending_index.append('\n')
                    
                input_data_phonics_list.append(single_word_phoneme_blending_index)
                output_data_phoics_list.append(target_word_phoneme_blending_index) 
                
                for phoneme in single_word_phoneme_blending_index:
#                     print(phoneme)
                    if phoneme not in input_phonemes:
                        input_phonemes.add(phoneme)
                for phoneme in target_word_phoneme_blending_index:
                    if phoneme not in target_phonemes:
                        target_phonemes.add(phoneme)

            #insert space after word
            #single_word_phoneme_blending_index.append(get_phoneme_index('SPACE'))
            
#             #add ending padding
#             end_padding = range(SEQUENCE_MAX_LENGTH-len(single_word_phoneme_blending_index))
#             for end_padding_index in end_padding:
#                 #set the padding valoue to 100
#                 single_word_phoneme_blending_index.append(get_phoneme_index('SPACE'))
            
            
            #print(single_word_phoneme_blending_index)
            #print(y_encoded[input_data_index])
     
    
    #add ending padding
    
    input_data_index += 1
    

        

# print(input_phonemes)
# print(target_phonemes)

input_phonemes = sorted(list(input_phonemes))
target_phonemes = sorted(list(target_phonemes))
num_encoder_tokens = len(input_phonemes)
num_decoder_tokens = len(target_phonemes)
max_encoder_seq_length = max([len(txt) for txt in input_data_phonics_list])
max_decoder_seq_length = max([len(txt) for txt in output_data_phoics_list])



print(input_phonemes)
print(target_phonemes)

print("Number of samples:", len(input_data_list))
print(f'size of training data {len(input_data_phonics_list)}')
print("Number of unique input tokens:", num_encoder_tokens)
print("Number of unique output tokens:", num_decoder_tokens)
print("Max sequence length for inputs:", max_encoder_seq_length)
print("Max sequence length for outputs:", max_decoder_seq_length)

input_token_index = dict([(char, i) for i, char in enumerate(input_phonemes)])
target_token_index = dict([(char, i) for i, char in enumerate(target_phonemes)])

encoder_input_data = np.zeros(
    (len(input_data_phonics_list), max_encoder_seq_length, num_encoder_tokens), dtype="float32"
)
decoder_input_data = np.zeros(
    (len(input_data_phonics_list), max_decoder_seq_length, num_decoder_tokens), dtype="float32"
)
decoder_target_data = np.zeros(
    (len(input_data_phonics_list), max_decoder_seq_length, num_decoder_tokens), dtype="float32"
)

for i, (input_text, target_text) in enumerate(zip(input_data_phonics_list, output_data_phoics_list)):
    for t, char in enumerate(input_text):
        encoder_input_data[i, t, input_token_index[char]] = 1.0
#     encoder_input_data[i, t + 1 :, input_token_index[" "]] = 1.0
    for t, char in enumerate(target_text):
        # decoder_target_data is ahead of decoder_input_data by one timestep
        decoder_input_data[i, t, target_token_index[char]] = 1.0
        if t > 0:
            # decoder_target_data will be ahead by one timestep
            # and will not include the start character.
            decoder_target_data[i, t - 1, target_token_index[char]] = 1.0
    # decoder_input_data[i, t + 1 :, target_token_index[' ']] = 1.0
    # decoder_target_data[i, t:, target_token_index[' ']] = 1.0

['AA', 'AE', 'AH', 'AO', 'AW', 'AY', 'B', 'CH', 'D', 'DH', 'EH', 'ER', 'EY', 'F', 'G', 'HH', 'IH', 'IY', 'JH', 'K', 'L', 'M', 'N', 'NG', 'OW', 'OY', 'P', 'R', 'S', 'SH', 'T', 'TH', 'UH', 'UW', 'V', 'W', 'Y', 'Z']
['\t', '\n', 'AA', 'AE', 'AH', 'AO', 'AW', 'AY', 'B', 'CH', 'D', 'DH', 'EH', 'ER', 'EY', 'F', 'G', 'HH', 'IH', 'IY', 'JH', 'K', 'L', 'M', 'N', 'NG', 'OW', 'OY', 'P', 'R', 'S', 'SH', 'T', 'TH', 'UH', 'UW', 'V', 'W', 'Y', 'Z']
Number of samples: 739
size of training data 986
Number of unique input tokens: 38
Number of unique output tokens: 40
Max sequence length for inputs: 10
Max sequence length for outputs: 12


## Configuration


In [14]:
batch_size = 64  # Batch size for training.
epochs = 500  # Number of epochs to train for.
latent_dim = 256  # Latent dimensionality of the encoding space.
#num_samples = 1000  # Number of samples to train on.
# Path to the data txt file on disk.
# data_path = "yue.txt"


## Build the model


In [8]:
# Define an input sequence and process it.
encoder_inputs = keras.Input(shape=(None, num_encoder_tokens))
encoder = keras.layers.LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_inputs)

# We discard `encoder_outputs` and only keep the states.
encoder_states = [state_h, state_c]

# Set up the decoder, using `encoder_states` as initial state.
decoder_inputs = keras.Input(shape=(None, num_decoder_tokens))

# We set up our decoder to return full output sequences,
# and to return internal states as well. We don't use the
# return states in the training model, but we will use them in inference.
decoder_lstm = keras.layers.LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)
decoder_dense = keras.layers.Dense(num_decoder_tokens, activation="softmax")
decoder_outputs = decoder_dense(decoder_outputs)

# Define the model that will turn
# `encoder_input_data` & `decoder_input_data` into `decoder_target_data`
model = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)


2021-10-01 11:57:37.169335: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Train the model


In [9]:
model.compile(
    optimizer="rmsprop", loss="categorical_crossentropy", metrics=["accuracy"]
)
model.fit(
    [encoder_input_data, decoder_input_data],
    decoder_target_data,
    batch_size=batch_size,
    epochs=epochs,
    validation_split=0.2,
)
# Save model
# model.save("phonics2phonics")


2021-10-01 11:57:45.952203: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


Epoch 1/500


KeyboardInterrupt: 

 Save model

In [None]:

model.save("phonics2phonics")

## Run inference (sampling)

1. encode input and retrieve initial decoder state
2. run one step of decoder with this initial state
and a "start of sequence" token as target.
Output will be the next target token.
3. Repeat with the current target token and current states


In [12]:
# Define sampling models
# Restore the model and construct the encoder and decoder.
model = keras.models.load_model("phonics2phonics")

encoder_inputs = model.input[0]  # input_1
encoder_outputs, state_h_enc, state_c_enc = model.layers[2].output  # lstm_1
encoder_states = [state_h_enc, state_c_enc]
encoder_model = keras.Model(encoder_inputs, encoder_states)

decoder_inputs = model.input[1]  # input_2
decoder_state_input_h = keras.Input(shape=(latent_dim,))
decoder_state_input_c = keras.Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
decoder_lstm = model.layers[3]
decoder_outputs, state_h_dec, state_c_dec = decoder_lstm(
    decoder_inputs, initial_state=decoder_states_inputs
)
decoder_states = [state_h_dec, state_c_dec]
decoder_dense = model.layers[4]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = keras.Model(
    [decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states
)

# Reverse-lookup token index to decode sequences back to
# something readable.
reverse_input_char_index = dict((i, char) for char, i in input_token_index.items())
reverse_target_char_index = dict((i, char) for char, i in target_token_index.items())


def decode_sequence(input_seq):
    # Encode the input as state vectors.
    states_value = encoder_model.predict(input_seq)

    # Generate empty target sequence of length 1.
    target_seq = np.zeros((1, 1, num_decoder_tokens))
    # Populate the first character of target sequence with the start character.
    target_seq[0, 0, target_token_index["\t"]] = 1.0

    # Sampling loop for a batch of sequences
    # (to simplify, here we assume a batch of size 1).
    stop_condition = False
    decoded_sentence = ""
    while not stop_condition:
        output_tokens, h, c = decoder_model.predict([target_seq] + states_value)

        # Sample a token
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = reverse_target_char_index[sampled_token_index]
#         if (sampled_char !='\t' or sampled_char != '\n'):
        decoded_sentence += sampled_char + ' '

        # Exit condition: either hit max length
        # or find stop character.
        if sampled_char == "\n" or len(decoded_sentence) > max_decoder_seq_length:
            stop_condition = True

        # Update the target sequence (of length 1).
        target_seq = np.zeros((1, 1, num_decoder_tokens))
        target_seq[0, 0, sampled_token_index] = 1.0

        # Update states
        states_value = [h, c]
    return decoded_sentence



You can now generate decoded sentences as such:


In [None]:
while(True):
    i=0
    input_data = input("Enter your word: ")
    
    phoneme_blendings = word_to_phonics(input_data.upper())
    
    for phoneme_blending in phoneme_blendings:
 

 
    #     for phonics_blending in word_to_phonics(input_data.upper()):
        encoder_input_data_1 = np.zeros((1, max_encoder_seq_length, num_encoder_tokens), dtype="float32")
    #     for t, char in enumerate(input_data.upper()):

        input_word_phoneme = ''
        for t in range(len(phoneme_blending)):
           
            char = phoneme_blending[t]
            input_word_phoneme += char + ' '
    #         print(char)
    #         print(t)
            encoder_input_data_1[i, t, input_token_index[char]] = 1.0
    #     encoder_input_data_1[i, t + 1 :, input_token_index[" "]] = 1.0

        # print(encoder_input_data_1[0,1])
        input_seq = encoder_input_data_1[0 :  1]
        # print(input_seq)
        decoded_sentence = decode_sequence(input_seq)
        # decoded_sentence = decode_sequence(encoder_input_data_1[0:1])
        # print("-")
        #print("Input sentence:", input_texts[seq_index])
    #     decoded_output = re.split('\t|, |\n',decoded_sentence)

    #     decoded_sentence = ''
    #     for ouput in decoded_output:
    #         if output != '':
    #             decoded_sentence = decoded_sentence + output
        print("Input word phonics blending:", input_word_phoneme)
        print("Decoded word phonics blending:", decoded_sentence)


In [13]:
for seq_index in range(100):
    # Take one sequence (part of the training set)
    # for trying out decoding.
    # print(encoder_input_data[seq_index : seq_index + 1])
    input_seq = encoder_input_data[seq_index : seq_index + 1]
#     print(input_seq)
    decoded_sentence = decode_sequence(input_seq)
    print("-")
    print("Input sentence:", input_data_phonics_list[seq_index])
    print("Decoded sentence:", decoded_sentence)


-
Input sentence: ['R', 'EH', 'D']
Decoded sentence: R EH D 
 
-
Input sentence: ['G', 'L', 'AE', 'D']
Decoded sentence: R EH D 
 
-
Input sentence: ['R', 'EH', 'D']
Decoded sentence: R EH D 
 
-
Input sentence: ['R', 'IY', 'D']
Decoded sentence: R AE B AH T 
 
-
Input sentence: ['R', 'AE', 'T']
Decoded sentence: R EH D 
 
-
Input sentence: ['G', 'R', 'IY', 'N']
Decoded sentence: G R IY N 
 
-
Input sentence: ['K', 'L', 'IY', 'N']
Decoded sentence: G R IY N 
 
-
Input sentence: ['G', 'R', 'EY', 'T']
Decoded sentence: G R IY N 
 
-
Input sentence: ['B', 'L', 'UW']
Decoded sentence: B L UW 
 
-
Input sentence: ['HH', 'AH', 'L', 'OW']
Decoded sentence: HH EH R 
 
-
Input sentence: ['HH', 'EH', 'L', 'OW']
Decoded sentence: HH EH R 
 
-
Input sentence: ['W', 'AY', 'T']
Decoded sentence: HH W AY T 
 
-
Input sentence: ['W', 'AY', 'T']
Decoded sentence: HH W AY T 
 
-
Input sentence: ['HH', 'W', 'AY', 'T']
Decoded sentence: HH W AY T 
 
-
Input sentence: ['HH', 'W', 'AY', 'T']
Decoded sentenc