In [1]:
import os
import numpy as np
import pandas as pd
import re
import pickle       
import re
import tensorflow as tf
import torch.nn as nn
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from nltk.stem import PorterStemmer

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

In [5]:
"""
Encoder Architecture Inputs:2(embedding,hidden_state) , Outputs:2(enc_output,thought_vector)

ENCODER_CLASS:

Word2Vec(Inputs) => Encoder(Vocab_size,embedding from word2vec,GRU's required,words/sentences to fetched once) 

=> Encoder_Outputs,Thought Vector 

"""

"\nEncoder Architecture Inputs:2(embedding,hidden_state) , Outputs:2(enc_output,thought_vector)\n\nENCODER_CLASS:\n\nWord2Vec(Inputs) => Encoder(Vocab_size,embedding from word2vec,GRU's required,words/sentences to fetched once) \n\n=> Encoder_Outputs,Thought Vector \n\n"

In [6]:
class Encoder(tf.keras.Model):
    def __init__(self,vocab_size,embedding,encoder_units,batch_size):
        super(Encoder,self).__init__()
        self.batch_size = batch_size
        self.enc_units = encoder_units
        self.embedding = tf.keras.layers.Embedding(vocab_size,embedding)
        self.gru = tf.keras.layers.GRU(self.enc_units,return_sequences=True,return_state=True,kernel_regularizer=tf.keras.regularizers.L2(0.001))
        
    def call(self,inputs,hidden_state):
        embedded_inputs = self.embedding(inputs)
        enc_output,thought_vector= self.gru(embedded_inputs,initial_state=hidden_state)
        return enc_output,thought_vector

In [7]:
"""
Attention_network_Architecture: Inputs:2(enc_outputs,thought_vector), Outputs:2(attentn_output,attent_weight)
                                                                                                     
Enc_outputs   --> Enc_Layer -> ------                                                   
                                      Final_layer(Activation(++)) --> Attention_weights * encoder_output--2
Thought_vector--> Thought_layer -> ---

"""

'\nAttention_network_Architecture: Inputs:2(enc_outputs,thought_vector), Outputs:2(attentn_output,attent_weight)\n                                                                                                     \nEnc_outputs   --> Enc_Layer -> ------                                                   \n                                      Final_layer(Activation(++)) --> Attention_weights * encoder_output--2\nThought_vector--> Thought_layer -> ---\n\n'

In [8]:
class Attention(tf.keras.layers.Layer):
    def __init__(self,units):
        super(Attention,self).__init__()
        self.enc_output_layer = tf.keras.layers.Dense(units,kernel_regularizer=tf.keras.regularizers.L2(0.001))
        self.thought_layer = tf.keras.layers.Dense(units,kernel_regularizer=tf.keras.regularizers.L2(0.001))
        self.final_output = tf.keras.layers.Dense(1,kernel_regularizer=tf.keras.regularizers.L2(0.001))
    
    def call(self,enc_output,thought_vector):
        thought_matrix = tf.expand_dims(thought_vector,1)    #return tensor, add extra tensor dimension to the input
        scores = self.final_output(tf.keras.activations.tanh(self.enc_output_layer(enc_output)+self.thought_layer(thought_matrix)))
        attention_weights = tf.keras.activations.softmax(scores,axis=-1)
        
        attention_output = attention_weights * enc_output
        attention_output = tf.reduce_sum(attention_output, axis=1)   #new_shape (batch_size,attention_outputs)
        return attention_output,attention_weights        
        

In [9]:
"""
Decoder Archirecture: Inputs:2(hindi_words,dec_hidden,enc_outputs) Outputs:2(dec_output,dec_hidden)

1. embedding_output(hindi_inputs) + attention_outputs

"""



'\nDecoder Archirecture: Inputs:2(hindi_words,dec_hidden,enc_outputs) Outputs:2(dec_output,dec_hidden)\n\n1. embedding_output(hindi_inputs) + attention_outputs\n\n'

In [10]:
class Decoder(tf.keras.Model):
    def __init__(self,vocab_size,embedding,decoder_units,batch_size):
        super(Decoder,self).__init__()
        
        self.batch_size = batch_size
        self.dec_units = decoder_units
        self.batch_size = batch_size
        self.embedding = tf.keras.layers.Embedding(vocab_size,embedding)
        
        self.gru = tf.keras.layers.GRU(self.dec_units,return_sequences=True,return_state=True,kernel_regularizer=tf.keras.regularizers.L2(0.001))
        
        self.attention = Attention(self.dec_units)
        self.word_output = tf.keras.layers.Dense(vocab_size,kernel_regularizer=tf.keras.regularizers.L2(0.001))
        
    def call(self, hindi_inputs, enc_outputs, thought_vector):
        attention_output, attention_weights = self.attention(enc_outputs,thought_vector)
        embedded_inputs = self.embedding(hindi_inputs)  #shape(batch_size,num_words,size_of_embediing)
        attention_output = tf.expand_dims(attention_output, 0)  #shape (batch_size,1,size_of_embedding)
        
        concat_input = tf.concat([attention_output,embedded_inputs],axis=-1)
        
        decoder_outputs,hidden_state = self.gru(concat_input)  #shape(batch_size,1, size_of_embedding)
        decoder_outputs = tf.reshape(decoder_outputs,(-1,decoder_outputs.shape[2])) #shape(batch_size,sizeof_embed
        
        final_output = self.word_output(decoder_outputs)
        return final_output,hidden_state,attention_weights

In [11]:
"""
Model Training Architecture:

def loss_function:

1. Loss function: SparseCategoricalCrossentropy
2. Optimizer: Adam

input = [Hii I'm samiksha a Data Science Enthusiast]

y_real = [0,0,0,1,0,0,0]

To improve the GRU's performance we mask the corerctly predicted output and tell's model to focus on incorrect
prediction values. Hence we mask the Correct Prediction
y_real = [0,0,0,1,0,0,0] ---> base_mask = [1,1,1,0,1,1,1]

base_loss = [0.001,0.001,0.001,0.9,0.001,0.001,0.001]

final_output = base_loss * base_mask 

final_output = [0.001,0.001,0.001,0,0.001,0.001,001]

def Training:

Encoder = _,_  , _,_ = attention_network = attention_weights,attention_outputs,
[attention_outputs + embedding(hindi_inputs)] , hidden_state => Decoder
Decoder(decoder_output,dec_hidden)

"""

"\nModel Training Architecture:\n\ndef loss_function:\n\n1. Loss function: SparseCategoricalCrossentropy\n2. Optimizer: Adam\n\ninput = [Hii I'm samiksha a Data Science Enthusiast]\n\ny_real = [0,0,0,1,0,0,0]\n\nTo improve the GRU's performance we mask the corerctly predicted output and tell's model to focus on incorrect\nprediction values. Hence we mask the Correct Prediction\ny_real = [0,0,0,1,0,0,0] ---> base_mask = [1,1,1,0,1,1,1]\n\nbase_loss = [0.001,0.001,0.001,0.9,0.001,0.001,0.001]\n\nfinal_output = base_loss * base_mask \n\nfinal_output = [0.001,0.001,0.001,0,0.001,0.001,001]\n\ndef Training:\n\nEncoder = _,_  , _,_ = attention_network = attention_weights,attention_outputs,\n[attention_outputs + embedding(hindi_inputs)] , hidden_state => Decoder\nDecoder(decoder_output,dec_hidden)\n\n"

In [12]:
class Train:
    def __init__(self):
        self.optimizer = tf.keras.optimizers.Adam()
        self.base_loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True,reduction='none')
        
    
    def loss_function(self,y_real,y_pred):
        base_mask = tf.math.logical_not(tf.math.equal(y_real,0))
        base_loss = self.base_loss_function(y_real,y_pred)
        
        cast_mask = tf.cast(base_mask,dtype=base_loss.dtype)
        final_output = cast_mask * base_loss
        
        return tf.reduce_mean(final_output)             #
    
    def Training(self,train_data,label_data,enc_hidden,encoder,decoder,batch_size,label_tokenizer):
        loss = 0
        
        with tf.GradientTape() as tape:
            enc_output,thought_vector = encoder(train_data,enc_hidden)
            dec_hidden = thought_vector
            dec_input = tf.expand_dims([label_tokenizer.word_index['start']] * batch_size, 0)
            
            # As decoder calculates loss of each word hence to collectively call loss over the sentence.
            for index in range(1, label_data.shape[1]):
                outputs,dec_hidden, _ = decoder(dec_input,enc_output,dec_hidden)
                
                dec_input = tf.expand_dims(label_data[:,index],1)
                loss = loss + self.loss_function(label_data[:,index],outputs)
                
        word_loss = loss / int(label_data.shape[1])
        
        variables = encoder.trainable_variables + decoder.trainable_variables
        
        gradients = tape.gradient(loss,variables)
        self.optimizer.apply_gradients(zip(gradients,variables))
        
        return word_loss

In [13]:
class Data_preprocessing:
    def __init__(self):
        self.temp=None
        
    def get_data(self,path):
        file = open(path,'r').read()
        lists = [f.split('\t') for f in file.split('\n')]
        
        questions = [x[0] for x in lists]
        answers = [x[1] for x in lists]
        
        return questions,answers
    
    def process_sentence(self,line):
        line = line.lower().strip()
        
        line = re.sub(r"([?!.,])"," ",line)
        line = re.sub(r'[" "]+'," ",line)
        line = re.sub(r"[^a-zA-Z?!,.]+"," ",line)
        line = line.strip()
        line = '<start> '+line+' <end>'
        return line
    
    def lemmatization(self,inputs):
        word_stemmer = PorterStemmer()
        inputs = word_stemmer.stem(inputs)
        
    def word_2_vec(self,inputs):
        tokenizer = Tokenizer()
        tokenizer.fit_on_texts(inputs)
        
        vectors = tokenizer.texts_to_sequences(inputs)
        vectors = pad_sequences(vectors,padding = "post")
        
        return vectors,tokenizer

In [14]:
data = Data_preprocessing()
questions,answers = data.get_data('../input/coversational-ai-chatbot/chatbot.txt')

In [15]:
max_train_length = max(len(ele) for ele in questions)
max_test_length = max(len(ele) for ele in answers)

In [16]:
questions = [data.process_sentence(str(sentence)) for sentence in questions]
answers = [data.process_sentence(str(sentences)) for sentences in answers]

In [17]:
train_vectors,train_tokenizer = data.word_2_vec(questions)
label_vectors,label_tokenizer = data.word_2_vec(answers)

In [18]:
max_length_train = train_vectors.shape[1]
max_length_label = label_vectors.shape[1]
max_length_label

24

In [19]:
batch_size=64
buffer_size = train_vectors.shape[0]
embedding_dims = 1024
steps_per_epoch = buffer_size//batch_size
units=1024

In [20]:
# One-hot encoder input shape
len(train_tokenizer.word_index)

2649

In [21]:
vocab_train = len(train_tokenizer.word_index) +1
vocab_label = len(label_tokenizer.word_index) +1

In [22]:
dataset = tf.data.Dataset.from_tensor_slices((train_vectors,label_vectors))
dataset = dataset.shuffle(buffer_size)
dataset = dataset.batch(batch_size,drop_remainder=True)

In [23]:
encoder = Encoder(vocab_train,embedding_dims,units,batch_size)
decoder = Decoder(vocab_label,embedding_dims,units,batch_size)
trainer = Train()

In [24]:
dataset.take(steps_per_epoch)

<TakeDataset shapes: ((64, 22), (64, 24)), types: (tf.int32, tf.int32)>

In [None]:
EPOCHS = 20

for epochs in range(1,EPOCHS+1):
    encoder_hidden = tf.zeros((batch_size,units))
    total_loss=0

for i in enumerate(dataset.take(steps_per_epoch)):
    for (batch_num, (train_data,label_data)) in enumerate(dataset.take(steps_per_epoch)):
        batch_loss = trainer.Training(train_data,label_data,encoder_hidden,encoder,decoder,batch_size,label_tokenizer)
        total_loss = total_loss * batch_size
    
    print(f"Epoch:{epochs},Loss:{total_loss/steps_per_epoch}")

In [None]:
class ChatBot:
    def __init__(self,encoder,decoder,train_tokenizer,label_tokenizer,max_length_train,units):
        self.train_tokenizer= train_tokenizer
        self.label_tokenizer = label_tokenizer
        self.encoder = encoder
        self.decoder = decoder
        self.units = units
        self.data = Data_preprocessing()
        self.max_len = max_length_train
        
    def predict(self,sentence):
        sentence = self.data.process_sentence(sentence)
        
        sentence_mat = []
        for word in sentence.split(" "):
            try:
                sentence_mat.append(self.train_tokenizer.word_index[word])
            except:
                return "Couldn't understand the issue"
        
        sentence_mat = pad_sequences([sentence_mat],maxlen=self.maxlen,padding="post")
        sentence_mat= tf.convert_to_tensor(sentence_mat)
        
        enc_hidden = [tf.zeros((1,self.units))]
        encoder_outputs,thought_vector = self.encoder(sentence_mat,enc_hidden)
        
        dec_hidden = thought_vector
        dec_input = tf.expand_dims([label_tokenizer.word_index['<start>']],0)
        
        ans = ""
        for i in range(1,self.max_len):
            pred,dec_hidden,_ = decoder(dec_input,encoder_outputs,dec_hidden)
            word = self.label_tokenizer.index_word[np.argmax[pred[0]]]
            
            ans.append(word)
            
            if word == '<end>':
                return ans
            
            dec_input = tf.expand_dims([np.argmax(pred[0])],0)
        
        return ans

In [None]:
bot = ChatBot(encoder,decoder,train_tokenizer,label_tokenizer,max_length_train,units)

question = ''
while True:
    question = str(input('You:'))
    if question == 'quit' or question == 'Quit':
        break
    
    answer = bot.predict(question)
    print(f"Bot: {answer}")