# Conversational Chatbot with a sequence-to-sequence Transformer

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

## Preprocessing

In [4]:
def preprocessing(sentence):
  # strip removes space from begining and end of a string
  senetence = sentence.lower().strip()
  # creating a space between a word and the punctuation following it
  # eg: "he is a boy." => "he is a boy ."
  sentence = re.sub(r'([!?.,])', r" \1",sentence)
  # 'he is a boy?   how' -> he is a boy ? how
  sentence = re.sub(r'[" "]+', " ", sentence)
  # removing contractions
  sentence = re.sub(r"i'm", "i am", sentence)
  sentence = re.sub(r"he's", "he is", sentence)
  sentence = re.sub(r"she's", "she is", sentence)
  sentence = re.sub(r"it's", "it is", sentence)
  sentence = re.sub(r"that's", "that is", sentence)
  sentence = re.sub(r"what's", "what is", sentence)
  sentence = re.sub(r"where's", "where is", sentence)
  sentence = re.sub(r"how's", "how is", sentence)
  sentence = re.sub(r"\'ll", " will", sentence)
  sentence = re.sub(r"\'ve", " have", sentence)
  sentence = re.sub(r"\'re", " are", sentence)
  sentence = re.sub(r"\'d", " would", sentence)
  sentence = re.sub(r"\'re", " are", sentence)
  sentence = re.sub(r"won't", "will not", sentence)
  sentence = re.sub(r"can't", "cannot", sentence)
  sentence = re.sub(r"n't", " not", sentence)
  sentence = re.sub(r"n'", "ng", sentence)
  sentence = re.sub(r"'bout", "about", sentence)
  # replacing everything with space except (a-z, A-Z, ".", "?", "!", ",")
  sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)
  sentence = sentence.strip()
  return senetence


In [5]:
def reading_data(path):
  all_sentences_of_text = []
  with open(path,'r') as file:
    for line in file:
      line = line.rstrip()
      result = line.split('__eou__')
      for i in range(0, len(result) - 2):
        sent_1 = preprocessing(result[i])
        sent_2 = preprocessing(result[i+1])
        all_sentences_of_text.append(sent_1)
        all_sentences_of_text.append("[start] " + sent_2 + " [end]")
  return all_sentences_of_text

In [6]:
train_text = reading_data ('/content/drive/MyDrive/dialogues_train.txt')
test_text = reading_data ('/content/drive/MyDrive/dialogues_test.txt')
valid_text = reading_data( '/content/drive/MyDrive/dialogues_validation.txt')

In [7]:
dataset = train_text + test_text + valid_text

In [8]:
def creating_pairs(pair_name):
  index = len(pair_name)
  pair_input = [pair_name[x] for x in range(0,index) if x%2 ==0]
  pair_output = [pair_name[x] for x in range(0,index) if x% 2 ==1]
  pairs = [(input,output) for input,output in zip(pair_input, pair_output)]
  return pairs

In [9]:
train_pairs = creating_pairs(train_text)
test_pairs = creating_pairs(test_text)
val_pairs = creating_pairs(valid_text)

In [10]:
index = len(dataset)
inp_texts = [dataset[i] for i in range(0,index) if i % 2 == 0]
out_texts = [dataset[i] for i in range(0,index) if i % 2 == 1]

## Vectorizing the text data

In [11]:
# define how many words we have in corpus (we can create int for each unique of them)
vocab_size = 15000
# define maxium output length
sequence_length = 20
batch_size = 64

In [12]:
strip_chars = string.punctuation
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")
strip_chars = strip_chars.replace("?", "")
strip_chars = strip_chars.replace("!", "")
strip_chars = strip_chars.replace(".", "")
strip_chars = strip_chars.replace(",", "")

In [18]:
# replace strip_chars  with ''
def custom_standardization(input_string):
  lowercase = tf.strings.lower(input_string)
  return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

In [19]:
inp_vectorization = TextVectorization(max_tokens = vocab_size, output_mode = 'int', output_sequence_length = sequence_length, standardize=custom_standardization)

In [20]:
out_vectorization = TextVectorization(max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length+1, standardize=custom_standardization)

In [21]:
inp_vectorization.adapt(inp_texts)
out_vectorization.adapt(out_texts)

In [22]:
def format_dataset(inp, out):
    inp = inp_vectorization(inp)
    out = out_vectorization(out)
    # decoder input is encoder output without [end]
    # decoder output is without [start]
    return ({"encoder_inputs": inp, "decoder_inputs": out[:, :-1],}, out[:, 1:])

In [23]:
def make_dataset(pairs):
    inp_texts, out_texts = zip(*pairs)
    inp_texts = list(inp_texts)
    out_texts = list(out_texts)
    dataset = tf.data.Dataset.from_tensor_slices((inp_texts, out_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()

In [24]:
train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)
test_ds = make_dataset(test_pairs)

In [25]:
train_ds

<CacheDataset element_spec=({'encoder_inputs': TensorSpec(shape=(None, 20), dtype=tf.int64, name=None), 'decoder_inputs': TensorSpec(shape=(None, 20), dtype=tf.int64, name=None)}, TensorSpec(shape=(None, 20), dtype=tf.int64, name=None))>

In [26]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


## Building model

![](https://drive.google.com/uc?export=view&id=1_BX9YvY-E_7ttPOiY2wIOG6KW4FkmdAm)

In [27]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
        # first attention in encoder
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        # the first block in encoder
        proj_input = self.layernorm_1(inputs + attention_output)
        # second block in encoder (above picture)
        proj_output = self.dense_proj(proj_input)

        return self.layernorm_2(proj_input + proj_output)

In [28]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super(PositionalEmbedding, self).__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


In [29]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super(TransformerDecoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
      # only word and previous word has affects (we delete the effect of next words) (below picture)
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

![](https://drive.google.com/uc?export=view&id=158scEbQNyJg1TxMhrDCWAN925wRCHjfe)

after building the model we give the input output to transformer.

In [30]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
# using encoder model
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
# using decoder model
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])

transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs, name="transformer")

## Training the model

In [31]:
epochs = 30 

transformer.summary()
transformer.compile(
    "adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding (Position  (None, None, 256)   3845120     ['encoder_inputs[0][0]']         
 alEmbedding)                                                                                     
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder (Transform  (None, None, 256)   3155456     ['positional_embedding[

<keras.callbacks.History at 0x7fbd95609950>

## Testing model

In [32]:
out_vocab = out_vectorization.get_vocabulary()
out_index_lookup = dict(zip(range(len(out_vocab)), out_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
  # tokenizing input sentence
    tokenized_input_sentence = inp_vectorization([input_sentence])
  # first decoder input is <start>
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        # tokenizing target sentence 
        tokenized_target_sentence = out_vectorization([decoded_sentence])[:, :-1]
        # make prediction using input and targater sentecence
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        # token to word
        sampled_token = out_index_lookup[sampled_token_index]
        # updating decoder inputs
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
          break
    return decoded_sentence

In [33]:
input_sentences = ['bob ! i hear your team won the match .',
                   'how do you do ?',
                   'do you think you are introverted or extroverted ?',
                   'yes , you are right . after all , the quality of your air conditioners is good . the only problem is price .',
                   'what ? he cannot do this to you .',
                   'thank you , lisa .',
                   'oh , she can make her own decisions .',
                   'but if i do not pass , will you call me ?',
                   'what happened , john ?',
                   'nice to meet you , mr . wilson .',
                   'hello , is sue there ?',
                   'hi ! i am happy you could make it .',
                   'what foods do you eat now ?',
                   'ok . is the plane on schedule ?']

In [34]:
for input_sentence in input_sentences:
    translated = decode_sequence(input_sentence)
    print(input_sentence)
    print(translated)
    print('***************')


bob ! i hear your team won the match .
[start] yes , but they are so nice . [end]
***************
how do you do ?
[start] i ’ m a little nervous . i have never heard of it . [end]
***************
do you think you are introverted or extroverted ?
[start] i think i am extroverted , i like saying most girls will start by animals and chinese stories . [end]
***************
yes , you are right . after all , the quality of your air conditioners is good . the only problem is price .
[start] yes , we have a lot of things . its very common as the international . [end]
***************
what ? he cannot do this to you .
[start] it ’ s not that bad . the kitchen can be worn on your left hand . [end]
***************
thank you , lisa .
[start] i am glad you are enjoying yourself . [end]
***************
oh , she can make her own decisions .
[start] but you know that he is not a good teacher . [end]
***************
but if i do not pass , will you call me ?
[start] i can ’ t . [end]
***************
what