In [None]:
import os 
import numpy as np
import pandas as pd
import re             # regular expression
import pickle         # Serialization and de-serialization of object (saving and loading)

import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' #LIMITS the lines of waring upto 3 lines

In [None]:
#encoder and decoder architecture
"""
INPUT -> Encoder -> ENC OUTPUTS, THOUGHT VECTOR -> Attention Network -> Attention Weights (x ENC OUTPUTS) -> ATTENTION OUTPUT

ATTENTION OUTPUT, PREV DECODER STATE -> DECODER ->FINAL OUTPUT
"""
# LSTM take more time and used for large datasets.
# where as GRU is good for small dataset and faster.

# Attention architecture layerclass (Neural network) - a simple dense layer
# Attention Network -> Attention Weights (x ENC OUTPUTS) -> ATTENTION OUTPUT
 


In [None]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding, encoder_units, batch_size):
        super(Encoder, self).__init__()
        
        self.batch_size = batch_size
        self.enc_units = encoder_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding)
        self.gru = tf.keras.layers.GRU(self.enc_units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform', kernel_regularizer=tf.keras.regularizers.L2(0.001))
    
    def call(self, inputs, hidden_state):
        embedded_inputs = self.embedding(inputs)
        enc_outputs, thought_vector = self.gru(embedded_inputs, initial_state=hidden_state)
        return enc_outputs, thought_vector

In [None]:
#the two inputs are enc_outputs and the thought_vector going into the attention layer
class Attention(tf.keras.layers.Layer ):
    def __init__(self, units):
        self.enc_output_layer = tf.keras.layers.Dense(units, kernel_regularizer =tf.keras.regularizers.L2(0.001))
        self.thought_layer = tf.keras.layers.Dense(units, kernel_regularizer =tf.keras.regularizers.L2(0.001))
        self.final_laye = tf.keras.layers.Dense(1    , kernel_regularizer =tf.keras.regularizers.L2(0.001))
        
    def call(self, enc_outputs, thought_vector):
        thought_matrix = tf.expand.dims(thought_vector, 1)
        scores = self.final_layer(tf.keras.activations.tanh(self.enc_output_layer(enc_outputs) + self.thought_layer(thought_matrix)))
        attention_weights = tf.keras.activations.softmax(scores, axis=-1)
        
        attention_output = attention_weights * enc_outputs          # shape(batch_size,num_output,output_size)
        attention_output = tf.reduce_sum(attention_output, axis=1)  # New shape (batch_size, output_size)
        
        return attention_output, attention_weights
        
    

In [None]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding, decoder_units, batch_size):
        super(Decoder, self).__init__()
        
        self.batch_size = batch_size
        self.dec_units = decoder_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding)
        self.gru = tf.keras.layers.GRU(self.dec_units, return_sequences = True, return_state = True, kernel_regularizer =tf.keras.regularizers.L2(0.001))
        
        self.attention = Attention(self.dec_units)
        self.word_output = tf.keras.layers.Dense(vocab_size, kernel_regularizer =tf.keras.regularizers.L2(0.001))
        
    def call(self, inputs, enc_outputs, thought_vector):
        attention_output, attention_weights = self.attention(enc_outputs, thought_vector)
        
        # Shape of attention output (batch_size, size_of_embedding)
        
        embedded_inputs = self.embedding(inputs) # shape (batch_size ,num_words, size_of_embedding) 
        attention_output = tf.expand_dims(attention_outputs, 1) #  shape of attention output (batch_size, size_of_embedding)
        concat_inputs = tf.concat([attention_output, embedded_inputs], axis=-1)
        
        decoder_outputs, hidden_state = self.gru(concat_inputs)
        decoder_outputs = tf.reshape(decoder_outputs, (-1,decoder_outputs.shape[2]))
        
        final_outputs = self.word_output(decoder_outputs)
        
        return final_outputs, hidden_state, attention_weights

In [None]:
class Train:
    def __init__(self):
        self.optimizer = tf.keras.optimizers.Adam()
        self.base_loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
        
    def loss_function(self, y_real, y_pred):
        base_mask = tf.math.logical_not(tf.math.equal(y_real, 0))
        base_loss = self.base_loss_function(y_real, y_pred)
        mask = tf.cast(base_mask, dtype=base_loss.dtype)
        final_loss = mask * base_loss
        return tf.reduce_mean(final_loss)
    
    def train_step(self, train_data, label_data, enc_hidden, encoder, decoder, batch_size, label_tokenizer):
        loss = 0
        
        with tf.GradientTape() as tape:
            enc_outputs, thought_vector = encoder(train_data, enc_hidden)
            dec_hidden = thought_vector
            dec_input = tf.expand_dims([label_tokenizer.word_index['<start>']] * batch_size, 1)
            
            for index in range(1, label_data.shape[1]):
                outputs, dec_hidden, _ = decoder(dec_input, enc_outputs, dec_hidden)
                dec_input = tf.expand_dims(label_data[:, index], 1)
                loss = loss + self.loss_function(label_data[:, index], outputs)
        
        word_loss = loss / int(label_data.shape[1])
        variables = encoder.trainable_variables + decoder.trainable_variables
        gradients = tape.gradient(loss, variables)
        self.optimizer.apply_gradients(zip(gradients, variables))
        
        return word_loss

In [None]:
class Data_Preprocessing:
    def __init__(self):
        self.temp = None
    
    def get_data(self, path):
        file = open(path, 'r').read()
        lists = [f.split('\t') for f in file.split('\n')]
        questions = [x[0] for x in lists]
        answers = [x[1] for x in lists]
        return questions, answers
    
    def process_sentence(self, line):
        line = line.lower().strip()
        line = re.sub(r"([?!.,])", r" \1 ", line)
        line = re.sub(r'[" "]+', " ", line)
        line = re.sub(r"[^a-zA-Z?!.,]+", " ", line)
        line = line.strip()
        line = '<start> ' + line + ' <end>'
        return line
    
    def word_to_vec(self, inputs):
        tokenizer = Tokenizer(filters='')
        tokenizer.fit_on_texts(inputs)
        vectors = tokenizer.texts_to_sequences(inputs)
        vectors = pad_sequences(vectors, padding='post')
        
        return vectors, tokenizer

In [None]:
data = Data_Preprocessing()

questions, answers = data.get_data('chatbot.txt')
questions = [data.process_sentence(str(sentence)) for sentence in questions]
answers = [data.process_sentence(str(sentence)) for sentence in answers]
train_vectors, train_tokenizer = data.word_to_vec(questions)
label_vectors, label_tokenizer = data.word_to_vec(answers)
max_length_train = train_vectors.shape[1]
max_length_label = label_vectors.shape[1]
batch_size = 64
buffer_size = train_vectors.shape[0]
embedding_dim = 256
steps_per_epoch = buffer_size//batch_size
units = 1024

In [12]:
vocab_train = len(train_tokenizer.word_index) + 1
vocab_label = len(label_tokenizer.word_index) + 1

In [13]:
dataset = tf.data.Dataset.from_tensor_slices((train_vectors, label_vectors))
dataset = dataset.shuffle(buffer_size)
dataset = dataset.batch(batch_size, drop_remainder=True)

In [14]:
encoder = Encoder(vocab_train, embedding_dim, units, batch_size)
decoder = Decoder(vocab_label, embedding_dim, units, batch_size)
trainer = Train()

In [None]:
EPOCHS = 20

for epoch in range(1, EPOCHS + 1):
    enc_hidden = tf.zeros((batch_size, units))
    total_loss = 0
    
    for (batch_num, (train_data, label_data)) in enumerate(dataset.take(steps_per_epoch)):
        batch_loss = trainer.train_step(train_data, label_data, enc_hidden, encoder, decoder, batch_size, label_tokenizer)
        total_loss = total_loss + batch_loss
        
    print(f"Epoch: {epoch}, Loss: {total_loss/steps_per_epoch}")

In [None]:
class Chatbot:
    def __init__(self, encoder, decoder, train_tokenizer, label_tokenizer, max_length_train, units):
        self.train_tokenizer = train_tokenizer
        self.label_tokenizer = label_tokenizer
        self.encoder = encoder
        self.decoder = decoder
        self.units = units
        self.data = Data_Preprocessing()
        self.maxlen = max_length_train
    
    def clean_answer(self, answer):
        answer = answer[:-1]
        answer = ' '.join(answer)
        return answer
    
    def predict(self, sentence):
        sentence = self.data.process_sentence(sentence)
        
        sentence_mat = []
        for word in sentence.split(" "):
            try:
                sentence_mat.append(self.train_tokenizer.word_index[word])
            except:
                return "Could not understand that, can you re-phrase?"
        
        sentence_mat = pad_sequences([sentence_mat], maxlen=self.maxlen, padding='post')
        sentence_mat = tf.convert_to_tensor(sentence_mat)
        enc_hidden = [tf.zeros((1, self.units))]
        encoder_outputs, thought_vector = self.encoder(sentence_mat, enc_hidden)
        dec_hidden = thought_vector
        dec_input = tf.expand_dims([label_tokenizer.word_index['<start>']], 0)
        
        answer = []
        for i in range(1, self.maxlen):
            pred, dec_hidden, _ = decoder(dec_input, encoder_outputs, dec_hidden)
            word = self.label_tokenizer.index_word[np.argmax(pred[0])]
            answer.append(word)
            
            if word == '<end>':
                return self.clean_answer(answer)
            
            dec_input = tf.expand_dims([np.argmax(pred[0])], 0)
        
        return self.clean_answer(answer)

In [None]:
bot = Chatbot(encoder, decoder, train_tokenizer, label_tokenizer, max_length_train, units)

In [None]:
question = ''
while True:
    question = str(input('You:'))
    if question == 'quit' or question == 'Quit':
        break
        
    answer = bot.predict(question)
    print(f'Bot: {answer}')

In [None]:
pred = [[1,2,3,4,5]]
pred[0] = [1,2,3,4,5]

In [None]:
sentence - english sentence

remove things from it 
convert it to one hot form
pass it through the whole model and get the predictions