In [1]:
import os
from pickle import dump
import pandas as pd
from numpy import array
from numpy.random import shuffle
from sklearn.model_selection import train_test_split

from tensorflow.keras.layers import Input ,LSTM, Embedding, Dense, TimeDistributed

from tensorflow.keras.layers import RepeatVector, Bidirectional, Dropout

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.callbacks import ModelCheckpoint

#from generate_phoneme import get_phoneme_list
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
import numpy as np

from numpy import argmax
from keras.models import load_model
import time
from pickle import load

Using TensorFlow backend.


In [2]:
# Path to the data txt file on disk.
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [3]:
class LoadData():
        
    def load_data(self):
        data_path = '/content/drive/My Drive/Projects/NLP-Notebooks/1.4-machine-translation/data/ben-eng/ben.txt'
        file = open(data_path, encoding="utf8")
        lines = file.read().split('\n')
        input_target = list()
        for line in lines:
            try:
                input_text, target_text,_ = line.split('\t')
                input_target.append([input_text, target_text])
            except:
                pass
        return input_target


In [4]:
class PreProcessing():
    def __init__(self,dataset=None):
        self.pairs = None
        self.dataset = dataset
        self.training_data = None
        self.testing_data = None
        
            
    def get_tokenizer(self,lines):
        tokenizer = Tokenizer()
        tokenizer.fit_on_texts(lines)
        return tokenizer
        
    def max_length(self,lines):
        return max(len(line.split()) for line in lines)

    def get_length(self):
        correct_data = self.dataset[:, 0]
        faulty_data = self.dataset[:, 1]
        tar_data_length = max(len(line.split()) for line in correct_data)
        src_data_length = max(len(line.split()) for line in faulty_data)
        return tar_data_length,src_data_length
    
    def encode_sequences(self,tokenizer, length, lines):
        X = tokenizer.texts_to_sequences(lines)
        X = pad_sequences(X, maxlen=length, padding='post')
        return X
 
    def encode_output(self,sequences, vocab_size):
        ylist = list()
        for sequence in sequences:
            encoded = to_categorical(sequence, num_classes=vocab_size)
            ylist.append(encoded)
        y = np.array(ylist)
        y = y.reshape(sequences.shape[0], sequences.shape[1], vocab_size)
        return y

    def get_train_test_data(self,data):
        train, test = train_test_split(data, test_size=0.2, random_state = 143)
        return train, test 




In [14]:
class DesignModel():
    
    def __init__(self,X_train,Y_train,X_test,Y_test,epochs,batch_size):
        self.model = None
        self.X_train = X_train
        self.Y_train = Y_train
        self.X_test = X_test
        self.Y_test = Y_test
        self.encoder_model = None
        self.decoder_model = None
        self.epochs = epochs
        self.batch_size = batch_size
    
    def generate_batch(self, X, y, max_length_src, max_length_tar,num_decoder_tokens, batch_size ):
        ''' Generate a batch of data '''
        while True:
            for j in range(0, len(X), batch_size):
                encoder_input_data = np.zeros((batch_size, max_length_src),dtype='float32')
                decoder_input_data = np.zeros((batch_size, max_length_tar),dtype='float32')
                decoder_target_data = np.zeros((batch_size, max_length_tar, num_decoder_tokens),dtype='float32')
                for i, (input_text, target_text) in enumerate(zip(X[j:j+batch_size], y[j:j+batch_size])):
                    for t, item in enumerate(input_text):
                        try:
                            encoder_input_data[i, t] = item
                        except Exception as e:
                            print("Wrong word:",word)
                            print("Exception:",e)
                    for t, item in enumerate(target_text):
                        if t<len(target_text)-1:
                            decoder_input_data[i, t] = item
                        if t>0:
                            decoder_target_data[i, t - 1, item] = 1.
                yield([encoder_input_data, decoder_input_data], decoder_target_data)
                
    def enc_dec_model(self, input_vocab_size, target_vocab_size, src_data_length, tar_data_length, n_units):
        
        current_directory = os.getcwd()
        #data_folder = os.path.join(current_directory,"..","data")
        model_folder = os.path.join(current_directory,"..","models")
        
        #encoder
        encoder_input = Input(shape = (None,))
        encoder_emb =  Embedding(input_vocab_size, n_units, mask_zero = False)(encoder_input)
        encoder_lstm = LSTM(n_units,return_state = True)
        encoder_outputs,encode_h,encoder_c = encoder_lstm(encoder_emb)
        encoder_states = [encode_h,encoder_c]
        
        #decoder
        decoder_input = Input(shape = (None,))
        decoder_emb_layer = Embedding(target_vocab_size+1, n_units, mask_zero = False)
        decoder_emb = decoder_emb_layer(decoder_input)
        decoder_lstm = LSTM(n_units,return_sequences=True,return_state = True)
        decoder_out,decode_h,decoder_c = decoder_lstm(decoder_emb,initial_state = encoder_states)
        decoder_dense = Dense(target_vocab_size,activation="softmax")
        decoder_out = decoder_dense(decoder_out)
        self.model = Model([encoder_input,decoder_input],decoder_out)
        #compile
        self.model.compile(optimizer="rmsprop",loss="categorical_crossentropy",metrics=['accuracy'])

        #fit
        tar_data_length,src_data_length = tar_data_length,src_data_length
        train_gen = self.generate_batch(self.X_train,self.Y_train,src_data_length,tar_data_length,target_vocab_size,self.batch_size)
        test_gen = self.generate_batch(self.X_test,self.Y_test,src_data_length,tar_data_length,target_vocab_size,self.batch_size)
        train_samples_steps = len(self.X_train) / self.batch_size
        val_samples_steps = len(self.X_test) / self.batch_size
        
        
        
        filename = os.path.join('Enc_Dec_base_model.h5')
        checkpoint = ModelCheckpoint(filename, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
        callbacks=[checkpoint]
        
        self.model.fit(train_gen,
                    steps_per_epoch = train_samples_steps,
                    epochs=self.epochs,
                    validation_data = test_gen,
                    validation_steps = val_samples_steps,callbacks = callbacks)
        
        #eocoder setup
        
        self.encoder_model = Model(encoder_input, encoder_states)
        
        # Decoder setup
        decoder_state_input_h = Input(shape=(n_units,))
        decoder_state_input_c = Input(shape=(n_units,))
        decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
        
        dec_emb2 = decoder_emb_layer(decoder_input)
        
        decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=decoder_states_inputs)
        decoder_states2 = [state_h2, state_c2]
        decoder_outputs2 = decoder_dense(decoder_outputs2) # A dense softmax layer to generate prob dist. over the target vocabulary
        
        # Final decoder model
        self.decoder_model = Model([decoder_input] + decoder_states_inputs,[decoder_outputs2] + decoder_states2)
    
    def save_model(self,model,model_file):
        with open(model_file+'.json', 'w', encoding='utf8') as f:
            f.write(model.to_json())
        model.save_weights(model_file+'.h5')
        
    def save(self):
        encoder_model_name = 'Encoder_Model'
        self.save_model(self.encoder_model,encoder_model_name)
        
        decoder_model_name = 'Decoder_Model'
        self.save_model(self.decoder_model,decoder_model_name)


In [6]:
class Prediction():
    
    def __init__(self,model_structure,src_data_length,tar_data_length,data_vocab_size):
        self.src_data_length = src_data_length
        self.tar_data_length = tar_data_length
        self.data_vocab_size = data_vocab_size
        cat = self.model_structure["cat"]
        version = self.model_structure["version"]
        current_directory = os.getcwd()
        #data_folder = os.path.join(current_directory,"..","data")
        model_folder = os.path.join(current_directory,"..","models")
        encoder_model_filename = os.path.join(model_folder,cat+'_Encoder_base_model-v'+version+'.h5')
        self.encoder_model = load(open(encoder_model_filename, 'rb'))
        decoder_model_filename = os.path.join(model_folder,cat+'_Decoder_base_model-v'+version+'.h5')
        self.decoder_model = load(open(decoder_model_filename, 'rb'))
    
    def get_tokens(self):
        token_words = get_phoneme_list()
        token_index = dict([(word, i+1) for i, word in enumerate(token_words)])
        
        reverse_token_index = dict((i, word) for word, i in token_index.items())

        return token_index,reverse_token_index
    
    def generate_batch(self, x, y, max_length_src, max_length_tar, num_decoder_tokens, batch_size ):
        token_index,_ = self.get_tokens()
        ''' Generate a batch of data '''
        while True:
            for j in range(0, len(x), batch_size):
                encoder_input_data = np.zeros((batch_size, max_length_src),dtype='float32')
                decoder_input_data = np.zeros((batch_size, max_length_tar),dtype='float32')
                decoder_target_data = np.zeros((batch_size, max_length_tar, num_decoder_tokens),dtype='float32')
                for i, (input_text, target_text) in enumerate(zip(x[j:j+batch_size], y[j:j+batch_size])):
                    for t, word in enumerate(input_text.split()):
                        try:
                            encoder_input_data[i, t] = token_index[word]
                        except Exception as e:
                            print("Wrong word:",word)
                            print("Exception:",e)
                    for t, word in enumerate(target_text.split()):
                        if t<len(target_text.split())-1:
                            decoder_input_data[i, t] = token_index[word]
                        if t>0:
                            decoder_target_data[i, t - 1, token_index[word]] = 1.
                yield([encoder_input_data, decoder_input_data], decoder_target_data)
                
    
    
    def decode_sequence(self,input_seq):
        target_token_index,reverse_target_char_index = self.get_tokens()
        #print("target_token_index:",target_token_index)
        states_value = self.encoder_model.predict(input_seq)
        target_seq = np.zeros((1,1))
        target_seq[0, 0] = target_token_index["start"]
        stop_condition = False
        decoded_phoneme = []
        while not stop_condition:
            output_tokens, h, c = self.decoder_model.predict([target_seq] + states_value)
    
            # Sample a token
            sampled_token_index = np.argmax(output_tokens[0, -1, :])
            sampled_char = reverse_target_char_index[sampled_token_index]
            decoded_phoneme.append(sampled_char)
    

            if len(decoded_phoneme)> 26 or sampled_char=="end":
                del decoded_phoneme[-1]
                stop_condition = True
    
            target_seq = np.zeros((1,1))
            target_seq[0, 0] = sampled_token_index
    
            states_value = [h, c]
    
        return decoded_phoneme
    
    def get_preiction(self,item,actual_result):
        
        phoneme_item = get_phoneme(item)
        actual_phoneme = get_phoneme(actual_result)
        
        for id in range(len(phoneme_item)):
            if phoneme_item[id] == " ":
                phoneme_item[id] = "SP"
                
        for id in range(len(actual_phoneme)):
            if actual_phoneme[id] == " ":
                actual_phoneme[id] = "SP"
                
        phoneme_str = " ".join(phoneme_item)
        actual_str = " ".join(actual_phoneme)
        
        dataset = [[phoneme_str,actual_str]]
        dataset = array(dataset)
        x = dataset[:1,1]
        y = dataset[:1,0]
        test_gen = self.generate_batch(x, y, self.src_data_length, self.tar_data_length, self.data_vocab_size, 1)
        (input_seq, actual_output), _ = next(test_gen)
        enoded_phoneme_data = self.decode_sequence(input_seq)
        
        return enoded_phoneme_data

In [15]:
if __name__ == '__main__':
    
    # model_structure = {"cat" :"area","version" : "1","model_type" : "enc_dec"}
    # epochs = 100
    # batch_size = 35#ld.train_data_length
    
    
    ld = LoadData()
    input_target = ld.load_data()
    print(input_target[:5])
    input_texts = np.array(input_target)[:, 0]
    target_texts = np.array(input_target)[:, 1]
    prp = PreProcessing()
    input_tokenizer = prp.get_tokenizer(input_texts)
    innput_vocab_size = len(input_tokenizer.word_index) + 1
    input_length = prp.max_length(input_texts)
    print(f'Input Vocabulary Size:{innput_vocab_size}')
    print(f'Input Max Length Size:{input_length}')
                
    target_tokenizer = prp.get_tokenizer(target_texts)
    target_vocab_size = len(target_tokenizer.word_index) + 1
    target_length = prp.max_length(target_texts)
    print(f'Target Vocabulary Size: {target_vocab_size}')
    print(f'Target Max Length Size:{target_length}')

    train , test = prp.get_train_test_data(input_target)
    train = np.array(train) 
    test = np.array(test)
    trainX = prp.encode_sequences(input_tokenizer, input_length, train[:, 1])
    trainY = prp.encode_sequences(target_tokenizer, target_length, train[:, 0])
    
    testX = prp.encode_sequences(input_tokenizer, input_length, train[:, 1])
    testY = prp.encode_sequences(target_tokenizer, target_length, train[:, 0])
    
    # #model
    model_obj = DesignModel(trainX,trainY,testX,testY,5,32)
    model_obj.enc_dec_model(innput_vocab_size,target_vocab_size,input_length,target_length,512)
    model_obj.save()
    
    
    # #prediction
    # text_dict = load(open(os.path.join("..","models",cat+"_phoneme_to_text_dict.pkl"),'rb'))
    # pred = Prediction(model_structure,src_data_length,tar_data_length,data_vocab_size)
    # test_file = os.path.join(data_folder,cat+"_test_cases.txt")
    # fread = open(test_file,'r')
    # test_cases_list = fread.read().splitlines()
    # test_list = list()
    # for item in test_cases_list:
    #     test_list.append(item.split(','))
    
    # for each_list in test_list:
    #     actual_result = each_list[0]
    #     for item in each_list[1:]:
    #         start_time = time.time()
    #         print("Input:",item)
    #         print("target:",actual_result)
    #         result = pred.get_preiction(item,actual_result)
    #         print("Predicted Phoneme:",result)
    #         result = " ".join(result)
    #         result_list = result.split(" SP ")
            
    #         pred_text = ""
    #         for item in result_list:
    #             try:
    #                 if pred_text:
    #                     pred_text = pred_text +" "+ text_dict[item]
    #                 else:
    #                     pred_text = text_dict[item]
    #             except:
    #                 pass            
    #         print("Time:",(time.time()-start_time)*1000," ms" )
    #         print("Predicted Text: ",pred_text)
    #         print("Actual Text: ",actual_result)
    #         if pred_text.lower() == actual_result.lower():
    #             print("Pass")
    #         else:
    #             print("Fail")
    #         print("\n")
    
    

[['Go.', 'যাও।'], ['Go.', 'যান।'], ['Go.', 'যা।'], ['Run!', 'পালাও!'], ['Run!', 'পালান!']]
Input Vocabulary Size:1875
Input Max Length Size:19
Target Vocabulary Size: 3312
Target Max Length Size:18
Epoch 1/5
Epoch 00001: val_loss improved from inf to 0.00098, saving model to Enc_Dec_base_model.h5
Epoch 2/5
Epoch 00002: val_loss improved from 0.00098 to 0.00072, saving model to Enc_Dec_base_model.h5
Epoch 3/5
Epoch 00003: val_loss improved from 0.00072 to 0.00061, saving model to Enc_Dec_base_model.h5
Epoch 4/5
Epoch 00004: val_loss improved from 0.00061 to 0.00058, saving model to Enc_Dec_base_model.h5
Epoch 5/5
Epoch 00005: val_loss improved from 0.00058 to 0.00057, saving model to Enc_Dec_base_model.h5
