In [1]:
import numpy as np
import tensorflow as tf
import re
from sklearn.model_selection import train_test_split
np.set_printoptions(threshold='nan')

  from ._conv import register_converters as _register_converters


In [2]:
"""Set False for testing"""
training=False

In [3]:
"""Read CMUDict line by line and counted the maximum letters in a word and maximum phonemes in its conversion"""
F = open("CMUDict", "r")
x_temp = []
y_temp = []
max_count_x = 0
max_count_y = 0
while 1:
    temp = F.readline()
    if not temp:
        break
    temp = temp.split()
    if len(temp[0])>3 and temp[0][-3]=="(" and temp[0][-1]==")":
        continue
    if max_count_x<len(temp[0]):
        max_count_x=len(temp[0])
    if max_count_y<len(temp)-1:
        max_count_y=len(temp)-1
    x_temp.append(temp[0])
    y_temp.append(temp[1:])
F.close()
max_count_x+=2
max_count_y+=2

In [4]:
"""Created a list of unique letters and phonemes along with their counts"""
a = set()
for i in x_temp:
    for j in i:
        a.add(j)
alphabets = list(a)
alpha_count = len(alphabets)
alpha_count+=2
phoneme_count=41

phonemes = ['AA','AE','AH','AO','AW','AY','B','CH','D','DH','EH',
            'ER','EY','F','G','HH','IH','IY','JH','K','L','M','N','NG','OW','OY',
            'P','R','S','SH','T','TH','UH','UW','V','W','Y','Z','ZH']


In [5]:
"""Include start and end for both letters and phonemes"""
alphabets.insert(0,"start")
alphabets.append("end")
phonemes.insert(0,"start")
phonemes.append("end")
print alphabets
print phonemes

['start', "'", '-', '.', '1', '0', '3', '2', '5', '4', '7', '6', '9', '8', 'A', 'C', 'B', 'E', 'D', 'G', 'F', 'I', 'H', 'K', 'J', 'M', 'L', 'O', 'N', 'Q', 'P', 'S', 'R', 'U', 'T', 'W', 'V', 'Y', 'X', 'Z', '\\', '_', 'end']
['start', 'AA', 'AE', 'AH', 'AO', 'AW', 'AY', 'B', 'CH', 'D', 'DH', 'EH', 'ER', 'EY', 'F', 'G', 'HH', 'IH', 'IY', 'JH', 'K', 'L', 'M', 'N', 'NG', 'OW', 'OY', 'P', 'R', 'S', 'SH', 'T', 'TH', 'UH', 'UW', 'V', 'W', 'Y', 'Z', 'ZH', 'end']


In [6]:
"""converts words to one hot encoding with end padding to make size uniform"""
x = np.zeros((len(x_temp),max_count_x,alpha_count))
for i in range(len(x_temp)):
    x[i][0][0]=1
    for j in range(max_count_x-1):
        if j < len(x_temp[i]):
            x[i][j+1][alphabets.index(x_temp[i][j])]=1
        else:
            x[i][j+1][alpha_count-1]=1

In [7]:
"""converts phonemes to one hot encoding with end padding to make size uniform"""
y = np.zeros((len(y_temp),max_count_x,phoneme_count))
for i in range(len(y_temp)):
    y[i][0][0]=1
    for j in range(max_count_x-1):
        if j < len(y_temp[i]):
            t = re.sub(r'[0-9]+', '', y_temp[i][j])
            y[i][j+1][phonemes.index(t)]=1
        else:
            y[i][j+1][phoneme_count-1]=1

In [8]:
def convert_x(l):
    """convert one hot encoding of words to human readable form"""
    l1 = []
    for i in range(len(l)):
        l1.append(alphabets[list(l[i]).index(1)])
    return l1
def convert_y(l):
    """convert one hot encoding of phonemes to human readable form"""
    l1 = []
    for i in range(len(l)):
        l1.append(phonemes[np.argmax(l[i])])
    return l1

In [9]:
batch_size = 64
x_train = tf.placeholder(tf.float32, [batch_size, max_count_x,alpha_count])
y_train = tf.placeholder(tf.float32, [batch_size, max_count_x,phoneme_count])

In [10]:
"""Set of hyperparameters"""
num_hidden = 1024
num_layers = 3
output_keep_prob = 0.95
beam_width = 5

In [11]:
def gru_cell():
    """Returns a new GRU Cell"""
    return tf.nn.rnn_cell.GRUCell(num_hidden)

In [12]:
#Encoder
"""Creates a multi layer Unidirectional RNN encoder with dropout"""
with tf.variable_scope("encoder_gru"):
    cells = []
    for i in range(num_layers):
        cell_y = gru_cell()
        cell_y = tf.nn.rnn_cell.DropoutWrapper(cell_y, output_keep_prob=output_keep_prob)
        cells.append(cell_y)
    encoder_cell =  tf.contrib.rnn.MultiRNNCell(cells, state_is_tuple=True) 
    #initial_state = cell1.zero_state(batch_size, dtype=tf.float32)
    encoder_val, encoder_state = tf.nn.dynamic_rnn(encoder_cell, x_train, dtype=tf.float32)
encoder_state

(<tf.Tensor 'encoder_gru/rnn/while/Exit_3:0' shape=(64, 1024) dtype=float32>,
 <tf.Tensor 'encoder_gru/rnn/while/Exit_4:0' shape=(64, 1024) dtype=float32>,
 <tf.Tensor 'encoder_gru/rnn/while/Exit_5:0' shape=(64, 1024) dtype=float32>)

In [13]:
"""Creates a multi layer Unidirectional RNN decoder with dropout and initial state from last hidden state of encoder
    along with a Dense layer at the end"""
with tf.variable_scope("decoder_gru"):
    decoder_cells = []
    for i in range(num_layers):
        cell_x = gru_cell()
        cell_x = tf.nn.rnn_cell.DropoutWrapper(cell_x, output_keep_prob=output_keep_prob)
        decoder_cells.append(cell_x)
    decoder_cell = tf.contrib.rnn.MultiRNNCell(decoder_cells)
    # TRAINING DECODER
    p_layer = tf.layers.Dense(phoneme_count, use_bias=True, kernel_initializer=tf.contrib.layers.xavier_initializer(), bias_initializer=tf.contrib.layers.xavier_initializer())
    training_helper = tf.contrib.seq2seq.TrainingHelper(inputs = y_train,sequence_length = [max_count_x],time_major = False)
    training_decoder = tf.contrib.seq2seq.BasicDecoder(
        cell = decoder_cell,
        helper = training_helper,
        #output_layer = p_layer,
        initial_state = encoder_state)
    print training_decoder
    training_decoder_output, _, _ = tf.contrib.seq2seq.dynamic_decode(
        decoder = training_decoder)
    print training_decoder_output
    training_logits = training_decoder_output.rnn_output
    print training_logits
    training_logits = p_layer(training_logits)


<tensorflow.contrib.seq2seq.python.ops.basic_decoder.BasicDecoder object at 0x7f0315bca790>
BasicDecoderOutput(rnn_output=<tf.Tensor 'decoder_gru/decoder/transpose:0' shape=(64, ?, 1024) dtype=float32>, sample_id=<tf.Tensor 'decoder_gru/decoder/transpose_1:0' shape=(64, ?) dtype=int32>)
Tensor("decoder_gru/decoder/transpose:0", shape=(64, ?, 1024), dtype=float32)


In [14]:
val = tf.transpose(training_logits, [1, 0, 2])
print val

Tensor("transpose:0", shape=(?, 64, 41), dtype=float32)


In [15]:
"""Softmax is applied to the outputs to get phoneme probabilities at each time step
    and a start is added in the start to each output"""
val = tf.reshape(val, [-1,phoneme_count])
prediction = tf.nn.softmax(val)
prediction = tf.reshape(prediction, [-1,batch_size,int(y_train.get_shape()[2])])
temp_arr = np.zeros((batch_size, phoneme_count))
temp_arr[:,0]=1
prediction = tf.concat([tf.expand_dims(tf.constant(temp_arr, dtype=tf.float32),0),prediction], axis=0)[:-1]

In [16]:
"""Beam search implemented for predicting phonemes from a word"""
with tf.variable_scope("beam_search"):
    decoder_initial_state = tf.contrib.seq2seq.tile_batch(encoder_state, multiplier=beam_width)
    print decoder_initial_state
    temp = np.zeros((phoneme_count, phoneme_count))
    for i in range(phoneme_count):
        temp[i][i]=1
    decoder_embedding = tf.constant(temp, dtype=tf.float32)
    print decoder_embedding
    # PREDICTING_DECODER
    predicting_decoder = tf.contrib.seq2seq.BeamSearchDecoder(cell = decoder_cell,
        embedding = decoder_embedding,
        start_tokens = tf.fill([batch_size], 0),
        end_token = 40,
        initial_state = decoder_initial_state,
        output_layer = p_layer,
        beam_width = beam_width)
    predicting_decoder_output, _, _ = tf.contrib.seq2seq.dynamic_decode(
        decoder = predicting_decoder,
        maximum_iterations = 2*max_count_x)
    print predicting_decoder_output
    predicting_logits = predicting_decoder_output.predicted_ids

(<tf.Tensor 'beam_search/tile_batch/Reshape:0' shape=(320, 1024) dtype=float32>, <tf.Tensor 'beam_search/tile_batch/Reshape_1:0' shape=(320, 1024) dtype=float32>, <tf.Tensor 'beam_search/tile_batch/Reshape_2:0' shape=(320, 1024) dtype=float32>)
Tensor("beam_search/Const:0", shape=(41, 41), dtype=float32)
FinalBeamDecoderOutput(predicted_ids=<tf.Tensor 'beam_search/decoder/transpose:0' shape=(64, ?, 5) dtype=int32>, beam_search_decoder_output=BeamSearchDecoderOutput(scores=<tf.Tensor 'beam_search/decoder/transpose_1:0' shape=(64, ?, 5) dtype=float32>, predicted_ids=<tf.Tensor 'beam_search/decoder/transpose_2:0' shape=(64, ?, 5) dtype=int32>, parent_ids=<tf.Tensor 'beam_search/decoder/transpose_3:0' shape=(64, ?, 5) dtype=int32>))


In [17]:
"""Cross entropy of decoded outputs and true phonemes"""
y_t1 = tf.transpose(y_train, [1,0,2])
cross_entropy = -tf.reduce_sum(y_t1 * tf.log(tf.clip_by_value(prediction,1e-10,1.0)))

In [18]:
"""Learning rate and Annealing Rate is set for the Adam optimizer"""
global_step = tf.Variable(0, trainable=False)
starter_learning_rate = 0.001
learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step,
                                           1000, 0.85, staircase=True)
# Passing global_step to minimize() will increment it at each step.
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
minimize = optimizer.minimize(cross_entropy,global_step=global_step)

In [19]:
"""To display tensorboard"""
with tf.Session() as sess:
    train_writer = tf.summary.FileWriter( './logs/1/train ', sess.graph)

In [20]:
"""Session Initialization"""
init_op = tf.global_variables_initializer()
sess = tf.Session()
sess.run(init_op)

In [21]:
"""For using previous models, restore, else comment"""
saver = tf.train.Saver()
saver.restore(sess, "./8.2571937673/model.ckpt")

INFO:tensorflow:Restoring parameters from ./8.2571937673/model.ckpt


In [22]:
train_input, test_input, train_output, test_output = train_test_split(x,y, test_size=0.1, random_state=50)

In [23]:
def get_error(test_input, test_output):
    cur_error=0
    mistakes=0
    total=0
    test_batches=int(len(test_input)/batch_size)
    test_new = np.argmax(test_output, axis=2)[:test_batches*batch_size]
    p_test=0
    for k in range(test_batches):
        test_inp, test_out = test_input[p_test:p_test+batch_size], test_new[p_test:p_test+batch_size]
        p_test+=batch_size
        batch_test_new = sess.run(predicting_logits, {x_train: test_inp})[:,:,0]

        for l in range(batch_size):
            out = test_out[l]
            out = out[1:list(out).index(40)]
            predicted_out = np.pad(batch_test_new[l], (0,34), 'constant')
            predicted_out = predicted_out[:len(out)]
            mat = np.not_equal(out, predicted_out)
            mistakes+=np.count_nonzero(mat)
            total+=len(out)
    return 100*float(mistakes)/float(total)

In [24]:
"""For Training only- set number of epochs. Calculates error% at every epoch"""

if training:

    no_of_batches = int(len(train_input)/batch_size)
    epoch = 60
    prev_error=0
    cur_error=0
    min_error=100

    for i in range(epoch):
        ptr = 0
        for j in range(no_of_batches):
            inp, out = train_input[ptr:ptr+batch_size], train_output[ptr:ptr+batch_size]
            ptr+=batch_size
            sess.run(minimize,{x_train: inp, y_train: out})
        print "Epoch - ",str(i)

        prev_error = cur_error
        cur_error = get_error(test_input, test_output)
        print "Error % = " + str(cur_error)
        break
        if cur_error<min_error:
            save_path = saver.save(sess, "./saved_model/model.ckpt")
            print("Model saved in path: %s" % save_path)
            min_error = cur_error
        else:
            print "Not Saved"

In [25]:
"""Testing Function to convert word to one hot"""
def convert_to_one_hot(word):
    x_t = np.zeros((max_count_x,alpha_count))
    x_t[0][0]=1
    for j in range(max_count_x-1):
        if j < len(word):
            x_t[j+1][alphabets.index(word[j])]=1
        else:
            x_t[j+1][alpha_count-1]=1
    return x_t

In [26]:
"""Takes a word and predicts its phonemes"""
def phoneme_predict(word):
    words_x = np.array([convert_to_one_hot(word) for _ in range(batch_size)])
    
    abcd = sess.run(predicting_logits, {x_train: words_x})[:,:,0][0]
    l = []
    for k in abcd:
        if k==40:
            break
        l.append(phonemes[k])
        
    return l

In [27]:
"""Testing"""
phoneme_predict("INDIA")

['IH', 'N', 'D', 'IY', 'AH']

In [28]:
print "Error % = " + str(get_error(test_input, test_output))

Error % = 8.1586230428
