In [8]:

import os
import numpy as np
import pandas as pd
import re
import pickle

import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

In [9]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding, encoder_units, batch_size):
        super(Encoder, self).__init__()
        
        self.batch_size = batch_size
        self.enc_units = encoder_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding)
        self.gru = tf.keras.layers.GRU(self.enc_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform', kernel_regularizer=tf.keras.regularizers.L2(0.001))
    
    def call(self, inputs, hidden_state):
        embedded_inputs = self.embedding(inputs)
        enc_outputs, thought_vector = self.gru(embedded_inputs, initial_state=hidden_state)
        return enc_outputs, thought_vector

In [10]:
class Attention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(Attention, self).__init__()
        
        self.enc_output_layer = tf.keras.layers.Dense(units, kernel_regularizer=tf.keras.regularizers.L2(0.001))
        self.thought_layer    = tf.keras.layers.Dense(units, kernel_regularizer=tf.keras.regularizers.L2(0.001))
        self.final_layer      = tf.keras.layers.Dense(1    , kernel_regularizer=tf.keras.regularizers.L2(0.001))
        
    def call(self, enc_outputs, thought_vector):
        thought_matrix = tf.expand_dims(thought_vector, 1)
        
        scores = self.final_layer(tf.keras.activations.tanh(self.enc_output_layer(enc_outputs) + self.thought_layer(thought_matrix)))
        attention_weights = tf.keras.activations.softmax(scores, axis=-1)
        
        attention_output = attention_weights * enc_outputs 
        attention_output = tf.reduce_sum(attention_output, axis=1) 
        
        return attention_output, attention_weights

In [11]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding, decoder_units, batch_size):
        super(Decoder, self).__init__()
        
        self.batch_size = batch_size
        self.dec_units = decoder_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding)
        self.gru = tf.keras.layers.GRU(self.dec_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform', kernel_regularizer=tf.keras.regularizers.L2(0.001))
        
        self.attention = Attention(self.dec_units)
        self.word_output = tf.keras.layers.Dense(vocab_size, kernel_regularizer=tf.keras.regularizers.L2(0.001))
        
    def call(self, inputs, enc_outputs, thought_vector):
        attention_output, attention_weights = self.attention(enc_outputs, thought_vector)
        
        
        embedded_inputs = self.embedding(inputs) 
        attention_output = tf.expand_dims(attention_output, 1) 
        concat_inputs = tf.concat([attention_output, embedded_inputs], axis=-1)
        
        decoder_outputs, hidden_state = self.gru(concat_inputs)
        decoder_outputs = tf.reshape(decoder_outputs, (-1, decoder_outputs.shape[2])) 
        
        final_outputs = self.word_output(decoder_outputs)
        return final_outputs, hidden_state, attention_weights

In [12]:
class Train:
    def __init__(self):
        self.optimizer = tf.keras.optimizers.Adam()
        self.base_loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
        
    def loss_function(self, y_real, y_pred):
        base_mask = tf.math.logical_not(tf.math.equal(y_real, 0))
        base_loss = self.base_loss_function(y_real, y_pred)
        
        mask = tf.cast(base_mask, dtype=base_loss.dtype)
        final_loss = mask * base_loss
        
        return tf.reduce_mean(final_loss)
    
    def train_step(self, train_data, label_data, enc_hidden, encoder, decoder, batch_size, label_tokenizer):
        loss = 0
        
        with tf.GradientTape() as tape:
            enc_outputs, thought_vector = encoder(train_data, enc_hidden)
            dec_hidden = thought_vector
            dec_input = tf.expand_dims([label_tokenizer.word_index['<start>']] * batch_size, 1)
            
            for index in range(1, label_data.shape[1]):
                outputs, dec_hidden, _ = decoder(dec_input, enc_outputs, dec_hidden)
                
                dec_input = tf.expand_dims(label_data[:, index], 1)
                loss = loss + self.loss_function(label_data[:, index], outputs)
        
        word_loss = loss / int(label_data.shape[1])
        
        variables = encoder.trainable_variables + decoder.trainable_variables
        gradients = tape.gradient(loss, variables)
        self.optimizer.apply_gradients(zip(gradients, variables))
        
        return word_loss

In [13]:
class Data_Preprocessing:
    def __init__(self):
        self.temp = None
    
    def get_data(self, path):
        file = open(path, 'r').read()
        lists = [f.split('\t') for f in file.split('\n')]
        
        questions = [x[0] for x in lists]
        answers = [x[1] for x in lists]
        
        return questions, answers
    
    def process_sentence(self, line):
        line = line.lower().strip()
        
        line = re.sub(r"([?!.,])", r" \1 ", line)
        line = re.sub(r'[" "]+', " ", line)
        line = re.sub(r"[^a-zA-Z?!.,]+", " ", line)
        line = line.strip()
        
        line = '<start> ' + line + ' <end>'
        return line
    
    def word_to_vec(self, inputs):
        tokenizer = Tokenizer(filters='')
        tokenizer.fit_on_texts(inputs)
        
        vectors = tokenizer.texts_to_sequences(inputs)
        vectors = pad_sequences(vectors, padding='post')
        
        return vectors, tokenizer

In [14]:
data = Data_Preprocessing()

questions, answers = data.get_data('/content/chatbot.txt')

questions = [data.process_sentence(str(sentence)) for sentence in questions]
answers = [data.process_sentence(str(sentence)) for sentence in answers]

train_vectors, train_tokenizer = data.word_to_vec(questions)
label_vectors, label_tokenizer = data.word_to_vec(answers)

max_length_train = train_vectors.shape[1]
max_length_label = label_vectors.shape[1]

batch_size = 64
buffer_size = train_vectors.shape[0]
embedding_dim = 256
steps_per_epoch = buffer_size//batch_size
units = 1024

In [15]:
vocab_train = len(train_tokenizer.word_index) + 1
vocab_label = len(label_tokenizer.word_index) + 1

In [16]:
dataset = tf.data.Dataset.from_tensor_slices((train_vectors, label_vectors))
dataset = dataset.shuffle(buffer_size)
dataset = dataset.batch(batch_size, drop_remainder=True)

In [17]:
encoder = Encoder(vocab_train, embedding_dim, units, batch_size)
decoder = Decoder(vocab_label, embedding_dim, units, batch_size)
trainer = Train()

In [23]:
EPOCHS = 30

for epoch in range(1, EPOCHS + 1):
    enc_hidden = tf.zeros((batch_size, units))
    total_loss = 0
    
    for (batch_num, (train_data, label_data)) in enumerate(dataset.take(steps_per_epoch)):
        batch_loss = trainer.train_step(train_data, label_data, enc_hidden, encoder, decoder, batch_size, label_tokenizer)
        total_loss = total_loss + batch_loss
        
    print(f"Epoch: {epoch}, Loss: {total_loss/steps_per_epoch}")

Epoch: 1, Loss: 1.5623279809951782
Epoch: 2, Loss: 1.4190069437026978
Epoch: 3, Loss: 1.3079081773757935
Epoch: 4, Loss: 1.1954843997955322
Epoch: 5, Loss: 1.0742905139923096
Epoch: 6, Loss: 0.9466956853866577
Epoch: 7, Loss: 0.8213114142417908
Epoch: 8, Loss: 0.7093750238418579
Epoch: 9, Loss: 0.6212669610977173
Epoch: 10, Loss: 0.5410820245742798
Epoch: 11, Loss: 0.47152963280677795
Epoch: 12, Loss: 0.41393670439720154
Epoch: 13, Loss: 0.35476791858673096
Epoch: 14, Loss: 0.3045884668827057
Epoch: 15, Loss: 0.25934770703315735
Epoch: 16, Loss: 0.22261252999305725
Epoch: 17, Loss: 0.18853700160980225
Epoch: 18, Loss: 0.1634351760149002
Epoch: 19, Loss: 0.14451169967651367
Epoch: 20, Loss: 0.1290172040462494
Epoch: 21, Loss: 0.1166825219988823
Epoch: 22, Loss: 0.10803203284740448
Epoch: 23, Loss: 0.10099603235721588
Epoch: 24, Loss: 0.09551913291215897
Epoch: 25, Loss: 0.08982820808887482
Epoch: 26, Loss: 0.08712898939847946
Epoch: 27, Loss: 0.08356238901615143
Epoch: 28, Loss: 0.08219

In [24]:
import pickle

PKL_Filename ="botencoder_pickle.pkl"
with open(PKL_Filename, 'wb') as file:
  pickle.dump(encoder, file)




INFO:tensorflow:Assets written to: ram://4ec67983-7bb4-42dc-bfe3-803967ce50cf/assets


INFO:tensorflow:Assets written to: ram://4ec67983-7bb4-42dc-bfe3-803967ce50cf/assets


In [25]:
import pickle

PKL_Filename ="botdecoder_pickle.pkl"
with open(PKL_Filename, 'wb') as file:
  pickle.dump(decoder, file)



INFO:tensorflow:Assets written to: ram://b479c6da-06b1-474a-8784-1f1fc055906f/assets


INFO:tensorflow:Assets written to: ram://b479c6da-06b1-474a-8784-1f1fc055906f/assets


In [26]:
class Chatbot:
    def __init__(self, encoder, decoder, train_tokenizer, label_tokenizer, max_length_train, units):
        self.train_tokenizer = train_tokenizer
        self.label_tokenizer = label_tokenizer
        self.encoder = encoder
        self.decoder = decoder
        self.units = units
        self.data = Data_Preprocessing()
        self.maxlen = max_length_train
    
    def clean_answer(self, answer):
        answer = answer[:-1]
        answer = ' '.join(answer)
        return answer
    
    def predict(self, sentence):
        sentence = self.data.process_sentence(sentence)
        
        sentence_mat = []
        for word in sentence.split(" "):
            try:
                sentence_mat.append(self.train_tokenizer.word_index[word])
            except:
                return "I Could not understand you, can you repeat again"
        
        sentence_mat = pad_sequences([sentence_mat], maxlen=self.maxlen, padding='post')
        sentence_mat = tf.convert_to_tensor(sentence_mat)
        
        enc_hidden = [tf.zeros((1, self.units))]
        encoder_outputs, thought_vector = self.encoder(sentence_mat, enc_hidden)
        
        dec_hidden = thought_vector
        dec_input = tf.expand_dims([label_tokenizer.word_index['<start>']], 0)
        
        answer = []
        for i in range(1, self.maxlen):
            pred, dec_hidden, _ = decoder(dec_input, encoder_outputs, dec_hidden)
            
            word = self.label_tokenizer.index_word[np.argmax(pred[0])]
            answer.append(word)
            
            if word == '<end>':
                return self.clean_answer(answer)
            
            dec_input = tf.expand_dims([np.argmax(pred[0])], 0)
        
        return self.clean_answer(answer)

In [27]:
bot = Chatbot(encoder, decoder, train_tokenizer, label_tokenizer, max_length_train, units)

In [30]:
question = ''
while True:
    question = str(input('You:'))
    if question == 'quit' or question == 'Quit':
        break
        
    answer = bot.predict(question)
    print(f'Bot: {answer}')

You:hello
Bot: hello
You:good Morning
Bot: morning
You:good evening
Bot: good evening
You:bye
Bot: bye
You:do you like to chat
Bot: I Could not understand you, can you repeat again
You:i love you
Bot: i love you .
You:welcome
Bot: what do you re done .
You:what did you do?
Bot: i watered all the plants .
You:where is your house
Bot: it s in a house .
You:bye bye robot
Bot: I Could not understand you, can you repeat again
You:quit
