<a href="https://colab.research.google.com/github/gitHubAndyLee2020/Conversational_Chatbot_Project/blob/main/conversational_chatbot_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
import numpy as np
import os
import string
import random

In [46]:
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
  i = tf.range(n_dest)[:, None]
  j = tf.range(n_src)
  m = i >= j - n_src + n_dest
  mask = tf.cast(m, dtype)
  mask = tf.reshape(mask, [1, n_dest, n_src])
  mult = tf.concat(
      [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
  )
  return tf.tile(mask, mult)

class TransformerBlock(layers.Layer):
  def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
    super().__init__()
    self.att = layers.MultiHeadAttention(num_heads, embed_dim)
    self.ffn = keras.Sequential(
        [layers.Dense(ff_dim, activation='relu'), layers.Dense(embed_dim),]
    )
    self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
    self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
    self.dropout1 = layers.Dropout(rate)
    self.dropout2 = layers.Dropout(rate)

  def call(self, inputs):
    input_shape = tf.shape(inputs)
    batch_size = input_shape[0]
    seq_len = input_shape[1]
    causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
    attention_output = self.att(inputs, inputs, attention_mask=causal_mask)
    attention_output = self.dropout1(attention_output)
    out1 = self.layernorm1(inputs + attention_output)
    ffn_output = self.ffn(out1)
    ffn_output = self.dropout2(ffn_output)
    return self.layernorm2(out1 + ffn_output)

In [47]:
class TokenAndPositionEmbedding(layers.Layer):
  def __init__(self, maxlen, vocab_size, embed_dim):
    super().__init__()
    self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
    self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

  def call(self, x):
    maxlen = tf.shape(x)[-1]
    positions = tf.range(start=0, limit=maxlen, delta=1)
    positions = self.pos_emb(positions)
    x = self.token_emb(x)
    return x + positions

In [48]:
vocab_size = 2532
maxlen = 80
embed_dim = 256
num_heads = 2
feed_forward_dim = 256

def create_model():
  inputs = layers.Input(shape=(maxlen,), dtype=tf.int32)
  embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
  x = embedding_layer(inputs)
  transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
  x = transformer_block(x)
  outputs = layers.Dense(vocab_size)(x)
  model = keras.Model(inputs=inputs, outputs=[outputs, x])
  loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
  model.compile(
      "adam", loss=[loss_fn, None],
  )
  return model

- Dataset Source: https://www.kaggle.com/datasets/grafstor/simple-dialogs-for-chatbot

In [1]:
from google.colab import files

uploaded = files.upload()

for filename in uploaded.keys():
  print(f"User uploaded file '{filename}' with length {len(uploaded[filename])} bytes")

Saving dialogs.txt to dialogs.txt
User uploaded file 'dialogs.txt' with length 243904 bytes


In [96]:
def print_text_ds(text_ds):
  output_list = []

  for x_batch, y_batch in text_ds.take(1):
      for i in range(min(10, x_batch.shape[0])):
          x_text = ' '.join([vocab[idx] for idx in x_batch[i].numpy()])
          y_text = ' '.join([vocab[idx] for idx in y_batch[i].numpy()])

          combined_text = 'x = ' + x_text + '\ny = ' + y_text
          output_list.append(combined_text)

  output_string = '\n'.join(output_list)

  print(output_string)

In [97]:
QUESTION_SEQ = "questionseq"
ANSWER_SEQ = "answerseq"
END_SEQ = "endseq"

batch_size = 128

text_ds = tf.data.TextLineDataset(["dialogs.txt"])

def insert_start_end_tokens(input_string):
    splits = tf.strings.split(input_string, "\t")
    question = tf.strings.join([f"{QUESTION_SEQ} ", splits[0]])
    answer = tf.strings.join([f"{ANSWER_SEQ} ", splits[1], f" {END_SEQ}"])
    return tf.strings.join([question, " ", answer])

text_ds = text_ds.map(insert_start_end_tokens)
text_ds = text_ds.shuffle(buffer_size=256)
text_ds = text_ds.batch(batch_size)

def custom_standardization(input_string):
    lowercased = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercased, f"([{string.punctuation}])", r" \1")

vectorize_layer = TextVectorization(
    standardize=custom_standardization,
    max_tokens=vocab_size - 1,
    output_mode="int",
    output_sequence_length=maxlen + 1,
)
vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary()

print(f"length of vocab: {len(vocab)}")

def prepare_lm_inputs_labels(text):
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1] # omit last token
    y = tokenized_sentences[:, 1:] # omit first token (shift by 1)
    return x, y

text_ds = text_ds.map(prepare_lm_inputs_labels)
text_ds = text_ds.prefetch(tf.data.AUTOTUNE)

print_text_ds(text_ds)

length of vocab: 2531
x = questionseq it would be nice if the weather would never change . answerseq that would be great , then we could plan things sooner . endseq                                                      
y = it would be nice if the weather would never change . answerseq that would be great , then we could plan things sooner . endseq                                                       
x = questionseq because you can see the stars perfectly . answerseq i really hope it rains today . endseq                                                              
y = because you can see the stars perfectly . answerseq i really hope it rains today . endseq                                                               
x = questionseq i had a stomachache . answerseq did it get any better ? endseq                                                                  
y = i had a stomachache . answerseq did it get any better ? endseq                                                           

In [69]:
class TextGenerator(keras.callbacks.Callback):
  def __init__(
      self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1
  ):
    self.max_tokens = max_tokens
    self.start_tokens = start_tokens
    self.index_to_word = index_to_word
    self.print_every = print_every
    self.k = top_k

  def sample_from(self, logits):
    logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
    indices = np.asarray(indices).astype("int32")
    preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
    preds = np.asarray(preds).astype("float32")
    return np.random.choice(indices, p=preds)

  def detokenize(self, number):
    return self.index_to_word[number]

  def on_epoch_end(self, epoch, logs=None):
    start_tokens = [_ for _ in self.start_tokens]
    if (epoch + 1) % self.print_every != 0:
      return
    num_tokens_generated = 0
    tokens_generated = []
    while num_tokens_generated <= self.max_tokens:
      pad_len = maxlen - len(start_tokens)
      sample_index = len(start_tokens) - 1
      if pad_len < 0:
        x = start_tokens[:maxlen]
        sample_index = maxlen - 1
      elif pad_len > 0:
        x = start_tokens + [0] * pad_len
      else:
        x = start_tokens
      x = np.array([x])
      y, _ = self.model.predict(x)
      sample_token = self.sample_from(y[0][sample_index])
      tokens_generated.append(sample_token)
      start_tokens.append(sample_token)
      num_tokens_generated = len(tokens_generated)
    txt = " ".join(
        [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
    )
    print(f"generated text:\n{txt}\n")

word_to_index = {}
for index, word in enumerate(vocab):
  word_to_index[word] = index

start_prompt = "questionseq I've actually been pretty good. You? answerseq"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 40
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

In [157]:
model = create_model()

model.fit(text_ds, verbose=2, epochs=5, callbacks=[text_gen_callback])

Epoch 1/5
generated text:
questionseq [UNK] actually been pretty [UNK] [UNK] answerseq answerseq it answerseq endseq answerseq endseq                                   

30/30 - 14s - loss: 2.1400 - dense_36_loss: 2.1400 - 14s/epoch - 455ms/step
Epoch 2/5
generated text:
questionseq [UNK] actually been pretty [UNK] [UNK] answerseq that ? answerseq i think i can 't . answerseq i 'm . endseq                           

30/30 - 11s - loss: 1.0636 - dense_36_loss: 1.0636 - 11s/epoch - 355ms/step
Epoch 3/5
generated text:
questionseq [UNK] actually been pretty [UNK] [UNK] answerseq well , and then . endseq                                   

30/30 - 11s - loss: 0.9087 - dense_36_loss: 0.9087 - 11s/epoch - 351ms/step
Epoch 4/5
generated text:
questionseq [UNK] actually been pretty [UNK] [UNK] answerseq well , i 'm going to the best movie with the bathroom . endseq                           

30/30 - 10s - loss: 0.8210 - dense_36_loss: 0.8210 - 10s/epoch - 346ms/step
Epoch 5/5
generated text:

<keras.callbacks.History at 0x788a907b8a90>

In [158]:
CURRENT_MODEL_PATH = "chatbot_v1_epochs_5.h5"
NEXT_MODEL_PATH = "chatbot_v1_epochs_x.h5"

In [159]:
model.save(CURRENT_MODEL_PATH)

In [160]:
from google.colab import files

def download_file(file_name):
    # Download the file to the local computer
    files.download(file_name)

# Download the file
download_file(CURRENT_MODEL_PATH)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [161]:
from tensorflow.keras.models import load_model

custom_objects = {
    "TransformerBlock": TransformerBlock,
    "TokenAndPositionEmbedding": TokenAndPositionEmbedding
}

loaded_model = load_model(CURRENT_MODEL_PATH, custom_objects=custom_objects)

In [135]:
loaded_model.fit(text_ds, verbose=2, epochs=10, callbacks=[text_gen_callback])
loaded_model.save(NEXT_MODEL_PATH)

Epoch 1/30
generated text:
questionseq [UNK] actually been pretty [UNK] [UNK] answerseq is a . endseq                                     

30/30 - 12s - loss: 0.3988 - dense_8_loss: 0.3988 - 12s/epoch - 416ms/step
Epoch 2/30
generated text:
questionseq [UNK] actually been pretty [UNK] [UNK] answerseq is . endseq                                      

30/30 - 11s - loss: 0.3785 - dense_8_loss: 0.3785 - 11s/epoch - 363ms/step
Epoch 3/30
generated text:
questionseq [UNK] actually been pretty [UNK] [UNK] answerseq is one a big is a ? endseq                                 

30/30 - 11s - loss: 0.3607 - dense_8_loss: 0.3607 - 11s/epoch - 361ms/step
Epoch 4/30
generated text:
questionseq [UNK] actually been pretty [UNK] [UNK] answerseq a is a is a is a good table . endseq                              

30/30 - 11s - loss: 0.3438 - dense_8_loss: 0.3438 - 11s/epoch - 359ms/step
Epoch 5/30
generated text:
questionseq [UNK] actually been pretty [UNK] [UNK] answerseq a is a a a is a week . endse

In [166]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
import re

class TextGenerationModule(tf.Module):
    def __init__(self, model, vocab, word_to_index, maxlen=80, top_k=10):
        self.model = model
        self.vocab = vocab
        self.word_to_index = word_to_index
        self.maxlen = maxlen
        self.top_k = top_k

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.top_k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def __call__(self, start_prompt, num_tokens_to_generate=40):
        start_tokens = [self.word_to_index.get(word, 1) for word in start_prompt.split()]
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= num_tokens_to_generate:
            pad_len = self.maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:self.maxlen]
                sample_index = self.maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x)
            logits = y[0][sample_index]
            sample_token = self.sample_from(logits)

            # Check for TERMINATING_TOKEN
            if self.vocab[sample_token] == END_SEQ:
                break

            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)


        generated_text = " ".join([self.vocab[token] for token in tokens_generated])
        generated_text = re.sub(rf"\s([{string.punctuation}])", r"\1", generated_text)
        return generated_text

Generated text: i think that's too, but i guess.


In [None]:
# Create an instance of the TextGeneratorWrapper class
text_gen_module = TextGenerationModule(loaded_model, vocab, word_to_index)

# Test the instance with a start prompt
start_prompt = "questionseq Hi there. answerseq"
generated_text = text_gen_module(start_prompt)
print(f"Generated text: {generated_text}")

In [172]:
import pandas as pd

model_paths = [
    'chatbot_v1_epochs_5.h5',
    'chatbot_v1_epochs_10.h5',
    'chatbot_v1_epochs_20.h5',
    'chatbot_v1_epochs_50.h5',
    'chatbot_v1_epochs_100.h5',
]

start_prompts = [
    'hi, how are you doing?',
    'what school do you go to?',
    'isn\'t it a nice day?',
    'i really hope it doesn\'t get cold.',
    'maybe we can go see a movie or something.',
    'how about a movie?',
    'what does she look like, though?',
    'i hope you feel better.',
    'well, congratulations.',
    'really, where did you get it?',
]

start_prompts_formatted = [f"{QUESTION_SEQ} start_prompt {ANSWER_SEQ}" for start_prompt in start_prompts]

models_generated_texts = {}

for model_path in model_paths:
  loaded_model = load_model(model_path, custom_objects=custom_objects)
  text_gen_module = TextGenerationModule(loaded_model, vocab, word_to_index)
  models_generated_texts[model_path] = []
  for start_prompt in start_prompts_formatted:
    generated_text = text_gen_module(start_prompt)
    models_generated_texts[model_path].append(generated_text)

models_generated_texts = pd.DataFrame(models_generated_texts)
models_generated_texts.index = start_prompts



In [173]:
models_generated_texts

Unnamed: 0,chatbot_v1_epochs_5.h5,chatbot_v1_epochs_10.h5,chatbot_v1_epochs_20.h5,chatbot_v1_epochs_50.h5,chatbot_v1_epochs_100.h5
"hi, how are you doing?",what? answerseq so i'm going to eat a great pr...,you sure?,is a nice restaurant?,a tv show you bought a new answerseq that new ...,is a tv show?
what school do you go to?,we get the weather going to get it will be nice.,it's a long time we have to see the world record!,is a nice restaurant.,a tv show respect to god winter comes now.,is it a tv show follows another show.
isn't it a nice day?,he said i have to the car. answerseq i have yo...,what do it's the weather going to be?,is that?,a tv show will power you do your money.,is it a a a tv show you a new record?
i really hope it doesn't get cold.,"it is. you want to buy it's a good, but the we...",you're a good deal.,is a party?,is a tv show you my new answerseq i bought it ...,is it a tv show weather a couple of them.
maybe we can go see a movie or something.,it was a new york.,what did you have you going to?,is a nice. the car is that?,a tv show you a movie.,is a tv show follows a travel show follows ano...
how about a movie?,what do? answerseq i hope it was a new mattres...,it sure do?,is a good deal?,a tv is a tv show you time?,is that a tv show.
"what does she look like, though?",they can't want to be there are friendly than ...,"no, that sounds like that's a while is.",you is a big tree.,a tv news is this time we need to buy a new ma...,is a tv tv show weather a cold sometimes.
i hope you feel better.,"i have to buy a lot. answerseq yes, and a good...",he can't we need to the street.,that's a nice.,a tv show respect to god and others.,is a tv show?
"well, congratulations.",i will be there a lot of course. answerseq tha...,what you do you get the tv?,is the one?,a tv show respect to prepare.,is a tv show it a tv show you time on my new car.
"really, where did you get it?",i like it's too.,what you know?,is a nice restaurant?,a tv yesterday a dog or two cars a restaurant.,is it a a tv show you a new car?


> Evaluation

- chatbot_v1_epochs_10.h5 have the best rate of producing a soundly answer to the user's questions.