In [1]:
"""
  AUTHOR:            Dixith, Mourya, Bhanu, Navya, Krishna
  FILENAME:          rl_chatbot_final
  SPECIFICATION:     This script builds Deep Q-Network(DQN) class and trains DQN model with the dataset and responds to user queries regarding cardio vascular disease.
  FOR:               CS 5392 Reinforcement Learning Section 001
"""
import numpy as np
import tensorflow as tf
import random
import json
import pickle
import warnings
import nltk
import string
from nltk.stem.lancaster import LancasterStemmer
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from tensorflow.keras.callbacks import EarlyStopping

In [2]:
# Ignore warnings in the code execution
warnings.filterwarnings("ignore")

# Initialize a LancasterStemmer object for word stemming
stemmer = LancasterStemmer()

# Create a deque with a maximum length of 5 to store conversation history
conversation_history = deque(maxlen=5)

# Set a similarity threshold value of 0.7 for matching text similarity
similarity_threshold = 0.7

In [3]:
# Load intents file
with open("intents.json") as json_data:
    intents = json.load(json_data)

In [4]:
# Process intents
words = []          # List of all unique words in the dataset
classes = []        # List of all unique classes (intents) in the dataset
documents = []      # List of tuples containing tokenized patterns and their corresponding classes
ignore_words = ['?']    # List of words to ignore during processing

# Process intents
for intent in intents['intents']:
    for pattern in intent['patterns']:
        w = nltk.word_tokenize(pattern)
        words.extend(w)
        documents.append((w, intent['tag']))
        if intent['tag'] not in classes:
            classes.append(intent['tag'])


In [5]:
# Preprocess words, remove duplicates, and sort
words = [stemmer.stem(w.lower()) for w in words if w not in ignore_words]
words = sorted(list(set(words)))
classes = sorted(list(set(classes)))

In [6]:
# Convert documents to training data
training = []   # List of training data containing input-output pairs
output_empty = [0] * len(classes)
for doc in documents:
    bag = []   # Bag-of-words representation for each pattern
    pattern_words = doc[0]
    pattern_words = [stemmer.stem(word.lower()) for word in pattern_words]
    for w in words:
        bag.append(1) if w in pattern_words else bag.append(0)
    output_row = list(output_empty)
    output_row[classes.index(doc[1])] = 1
    training.append([bag, output_row])

In [7]:
# Shuffle and convert to NumPy arrays
random.shuffle(training)
training = [(np.array(x), np.array(y)) for x, y in training]
train_x = np.array([x for x, y in training])
train_y = np.array([y for x, y in training])

In [8]:
# DQN class
class DQN:
    """
      NAME:             DQN
      PURPOSE:          The DQN class implements a Deep Q-Network for reinforcement learning. It contains methods for building and training the network, as well as managing the agent's memory and exploration-exploitation trade-off.
      INVARIANTS:       The DQN class requires state_size and action_size as input parameters, representing the dimensions of the state space and action space, respectively. The class manages a memory buffer with a fixed maximum length, and the exploration rate (epsilon) is decayed over time according to the specified decay rate.

    """
    def __init__(self, state_size, action_size):
        """
          NAME: init
          PARAMETERS: state_size, action_size
          PURPOSE: Initializes the DQN class with given state and action sizes, and sets up memory and other necessary parameters
          PRECONDITION: Both state_size and action_size should be integers, representing the dimensions of the state space and action space respectively
          POSTCONDITION: Initializes the DQN object with specified parameters, memory, and a model built using the _build_model method
        """
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        """
          NAME:             _build_model
          PARAMETERS:       None
          PURPOSE:          Builds the neural network model for the DQN class
          PRECONDITION:     DQN class instance should be created
          POSTCONDITION:    Returns a compiled neural network model
        """
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(48, activation='relu')) # Added hidden layer
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        """
          NAME:             remember
          PARAMETERS:       state, action, reward, next_state, done
          PURPOSE:          Stores experience in the DQN's memory for future training
          PRECONDITION:     DQN class instance should be created
          POSTCONDITION:    Experience is added to the DQN's memory
        """
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        """
          NAME:             act
          PARAMETERS:       state
          PURPOSE:          Selects an action based on the current state using the DQN model
          PRECONDITION:     DQN class instance should be created, and the model should be trained or loaded
          POSTCONDITION:    Returns an action (integer) based on the current state
        """
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        """
          NAME:             replay
          PARAMETERS:       batch_size
          PURPOSE:          Trains the DQN model using a random sample of experiences from memory
          PRECONDITION:     DQN class instance should be created, and the memory should have enough experiences
          POSTCONDITION:    DQN model is updated with new knowledge from the experiences
        """
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        """
          NAME:             load
          PARAMETERS:       name
          PURPOSE:          Loads the DQN model's weights from a file
          PRECONDITION:     DQN class instance should be created, and the file should exist
          POSTCONDITION:    DQN model's weights are updated with the loaded weights
        """
        self.model.load_weights(name)

    def save(self, name):
        """
          NAME:             save
          PARAMETERS:       name
          PURPOSE:          Saves the DQN model's weights to a file
          PRECONDITION:     DQN class instance should be created, and the model should be trained or loaded
          POSTCONDITION:    DQN model's weights are saved to a file
        """
        self.model.save_weights(name)

In [9]:
# Helper functions for processing user input and generating a response
def clean_up_sentence(sentence):
    """
      NAME:             clean_up_sentence
      PARAMETERS:       sentence
      PURPOSE:          Tokenizes and stems the input sentence
      PRECONDITION:     'nltk' and 'LancasterStemmer' should be imported, and the input sentence should be a string
      POSTCONDITION:    Returns a list of stemmed tokens from the input sentence
    """
    sentence_words = nltk.word_tokenize(sentence)
    sentence_words = [stemmer.stem(word.lower()) for word in sentence_words]
    return sentence_words

def bow(sentence, words, show_details=False):
    """
      NAME:             bow
      PARAMETERS:       sentence, words, show_details (default: False)
      PURPOSE:          Converts the input sentence into a bag-of-words representation
      PRECONDITION:     'clean_up_sentence' function should be defined, and the input sentence should be a string
      POSTCONDITION:    Returns a NumPy array representing the bag-of-words of the input sentence
    """
    sentence_words = clean_up_sentence(sentence)
    bag = [0]*len(words)
    for s in sentence_words:
        for i,w in enumerate(words):
            if w == s:
                bag[i] = 1
                if show_details:
                    print("found in bag: %s" % w)
    return np.array(bag)

def user_input_to_state(user_input, words):
    """
      NAME:             user_input_to_state
      PARAMETERS:       user_input, words
      PURPOSE:          Converts user input into a state for the DQN model
      PRECONDITION:     'bow' function should be defined, and the user_input should be a string
      POSTCONDITION:    Returns a NumPy array representing the state for the DQN model
    """
    return bow(user_input, words).reshape(1, -1)

In [10]:
# Initialize DQN
state_size = len(train_x[0])          # Size of the state/input vector
action_size = len(train_y[0])         # Size of the action/output vector
dqn = DQN(state_size, action_size)    # Initialize the DQN (Deep Q-Network) with specified sizes

# Set up early stopping
patience = 3                          # Number of epochs to wait for improvement before early stopping
min_loss_diff = 0.001                  # Minimum difference in loss considered as improvement
loss_increase_count = 0                # Counter to track the number of consecutive epochs with increased loss

prev_epoch_avg_loss_diff = None        # Difference in average loss compared to the previous epoch

In [11]:
# Train DQN with existing training data
for epoch in range(10):
    epoch_losses = [] # List that stores the loss differences for each training iteration within an epoch
    for i in range(len(train_x)):
        state = train_x[i].reshape(1, -1)
        action = np.argmax(train_y[i])
        next_state = train_x[(i + 1) % len(train_x)].reshape(1, -1)
        reward = 1
        done = False
        dqn.remember(state, action, reward, next_state, done)
        dqn.replay(min(32, len(dqn.memory)))
        
        # Get the loss before training
        initial_loss = dqn.model.evaluate(state, train_y[i].reshape(1, -1), verbose=0)
        
        # Train and get the new loss
        dqn.model.fit(state, train_y[i].reshape(1, -1), epochs=1, verbose=0)
        final_loss = dqn.model.evaluate(state, train_y[i].reshape(1, -1), verbose=0)
        
        # Calculate the loss difference
        loss_diff = initial_loss - final_loss
        epoch_losses.append(loss_diff)
    
    epoch_avg_loss_diff = np.mean(epoch_losses)
    
    if prev_epoch_avg_loss_diff is not None:
        if prev_epoch_avg_loss_diff - epoch_avg_loss_diff < min_loss_diff:
            loss_increase_count += 1
        else:
            loss_increase_count = 0
            
    if loss_increase_count >= patience:
        print("Early stopping triggered.")
        break
    
    prev_epoch_avg_loss_diff = epoch_avg_loss_diff
    
    print(f"Epoch {epoch}: Mean Loss Difference = {epoch_avg_loss_diff}")
    print(f"------------------Epoch-{epoch} Done----------------------")

# Save DQN weights
dqn.save('dqn_weights.h5')

# Load DQN weights
dqn.load('dqn_weights.h5')



2023-04-27 23:15:08.673126: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


Epoch 0: Mean Loss Difference = 0.23749440529480062
------------------Epoch-0 Done----------------------
Epoch 1: Mean Loss Difference = 0.6874629115236217
------------------Epoch-1 Done----------------------
Epoch 2: Mean Loss Difference = 0.5922053600179737
------------------Epoch-2 Done----------------------
Epoch 3: Mean Loss Difference = 0.63704815815235
------------------Epoch-3 Done----------------------
Epoch 4: Mean Loss Difference = 0.6301699095758898
------------------Epoch-4 Done----------------------
Epoch 5: Mean Loss Difference = 0.5509356835792805
------------------Epoch-5 Done----------------------
Epoch 6: Mean Loss Difference = 0.6117632101322042
------------------Epoch-6 Done----------------------
Epoch 7: Mean Loss Difference = 0.6039747164167207
------------------Epoch-7 Done----------------------
Epoch 8: Mean Loss Difference = 0.5956561976465685
------------------Epoch-8 Done----------------------


KeyboardInterrupt: 

In [12]:
# Save DQN weights
dqn.save('dqn_weights.h5')

# Load DQN weights
dqn.load('dqn_weights.h5')

def get_intents(filename):
    """
      NAME: get_intents
      PARAMETERS: filename
      PURPOSE: Loads intents from a JSON file
      PRECONDITION: The JSON file should have a valid format with a key named 'intents' containing a list of intent dictionaries
      POSTCONDITION: Returns a list of intent dictionaries

    """
    with open(filename, 'r') as file:
        data = json.load(file)
    return data["intents"]

def analyze_text(text):
    """
      NAME: analyze_text
      PARAMETERS: text
      PURPOSE: Preprocesses the input text by converting to lowercase, tokenizing, and removing punctuation
      PRECONDITION: Input 'text' should be a string
      POSTCONDITION: Returns a preprocessed version of the input text as a string

    """
    text = text.lower()
    tokens = nltk.word_tokenize(text)
    tokens = [word for word in tokens if word not in string.punctuation]
    text = " ".join(tokens)
    return text

def create_tfidf_vectorizer(intents):
    """
      NAME: create_tfidf_vectorizer
      PARAMETERS: intents
      PURPOSE: Creates a TF-IDF vectorizer based on the input intents
      PRECONDITION: Input 'intents' should be a list of intent dictionaries, each containing a key named 'patterns' with a list of strings
      POSTCONDITION: Returns a fitted TfidfVectorizer object
    
    """
    patterns = []
    for intent in intents:
        for pattern in intent["patterns"]:
            patterns.append(analyze_text(pattern))
    vectorizer = TfidfVectorizer().fit(patterns)
    return vectorizer

In [13]:
def match_intent(user_message, intents, vectorizer):
    """
      NAME:             match_intent
      PARAMETERS:       user_message, intents, vectorizer
      PURPOSE:          Matches user input to an intent based on cosine similarity and provides a confidence score
      PRECONDITION:     'analyze_text' function should be defined, and the user_message should be a string
      POSTCONDITION:    Returns the matched intent, matched pattern, and confidence score, or None, None, 0 if no match is found
    
    """
    user_message = analyze_text(user_message)
    user_vector = vectorizer.transform([user_message])
    max_similarity = -1
    matched_intent = None
    matched_pattern = None
    
    # To iterate over each intent and its associated patterns in the intents list and calculates the cosine similarity between the vectorized user input (user_vector) and the vectorized pattern (pattern_vector)
    for intent in intents:
        for pattern in intent["patterns"]:
            pattern_vector = vectorizer.transform([analyze_text(pattern)])
            similarity = cosine_similarity(user_vector, pattern_vector)
            
            if similarity > max_similarity:
                max_similarity = similarity
                matched_intent = intent
                matched_pattern = pattern
    
    if max_similarity > 0.5:
        return matched_intent, matched_pattern, max_similarity
    else:
        return None, None, 0

In [14]:
def respond(self, user_input):
    """
      NAME:             respond
      PARAMETERS:       user_input
      PURPOSE:          Analyzes the user input, identifies the best matching intent, and generates an appropriate response
      PRECONDITION:     The 'preprocess' and 'calculate_similarity' functions should be defined.
      POSTCONDITION:    Returns a response based on the user input and matching intent, or a fallback response if no match is found.
    
    """
    processed_input = preprocess(user_input)
    max_similarity = 0
    best_intent = None
    
    # Calculate the similarity between processed input and intent patterns by iterating over each intent
    for intent in self.intents:
        similarity = calculate_similarity(processed_input, intent.patterns)
        if similarity > max_similarity:           
            max_similarity = similarity
            best_intent = intent

    if max_similarity > similarity_threshold:     # Check if the maximum similarity is above the threshold
        response = random.choice(best_intent.responses)
    else:
        response = self.fallback_response()        # If no intent matches above the threshold, use the fallback response

    return response

In [15]:
def fallback_response(self):
    """
      NAME: fallback_response
      PARAMETERS: self
      PURPOSE: Selects and returns a random fallback response from a list of predefined options
      PRECONDITION: The 'random' module should be imported.
      POSTCONDITION: Returns a randomly selected fallback response from the predefined list.
    
    """
    fallback_responses = [
        "I'm not sure I understand. Can you please rephrase the question?",
        "I'm sorry, but I cannot help with that. Would you like to speak with a human agent?",
        "I didn't quite get that. Can you please provide more information?"
    ]
    return random.choice(fallback_responses)

In [16]:
def get_response(intent):
    """
      NAME: get_response
      PARAMETERS: intent
      PURPOSE: Retrieves a random response from the given intent
      PRECONDITION: Input 'intent' should be a valid dictionary containing 'responses' key with a list of strings
      POSTCONDITION: Returns a random response string from the input intent
    
    """
    return random.choice(intent["responses"])

In [17]:
class ConversationHistory:
    """
      NAME: ConversationHistory
      PURPOSE: Represents the conversation history with a maximum length
      INVARIANTS: The ConversationHistory class requires the 'deque' module to be imported.
                  The class has a single attribute, 'history', which is a deque object representing the conversation history.
                  The 'history' deque has a maximum length specified during initialization.
    """
    def __init__(self, maxlen=5):
        """
          NAME: ConversationHistory
          PARAMETERS: maxlen
          PURPOSE: Initializes an instance of the ConversationHistory class with a maximum length for the history
          PRECONDITION: 'deque' module should be imported
          POSTCONDITION: Creates a ConversationHistory object with an empty history deque of the specified maximum length
        """
        self.history = deque(maxlen=maxlen)

    def add_message(self, message):
        """
          NAME: add_message
          PARAMETERS: message
          PURPOSE: Adds a new message to the conversation history
          PRECONDITION: NONE
          POSTCONDITION: Appends the provided message to the conversation history deque
        """
        self.history.append(message)
    
    # Retrieves the combined conversation history as a single string
    def get_combined_history(self):
        """
          NAME: get_combined_history
          PARAMETERS: None
          PURPOSE: Retrieves the combined conversation history as a single string
          PRECONDITION: None
          POSTCONDITION: Returns a string containing the combined conversation history
        """
        return " ".join(self.history)

    # Clears the conversation history
    def clear(self):
        """
          NAME: clear
          PARAMETERSL None
          PURPOSE: Clears the conversation history
          PRECONDITION: None
          POSTCONDITION: Clears the conversation history deque, making it empty
        """
        self.history.clear()

In [18]:
# Update user_input_to_state function to include conversation history
def user_input_to_state(user_input, words, conversation_history):
    """
      NAME: user_input_to_state
      PARAMETERS: user_input, words, conversation_history
      PURPOSE: Converts user input into a state for the DQN model
      PRECONDITION: 'bow' function should be defined, and the user_input should be a string
      POSTCONDITION: Returns a NumPy array representing the state for the DQN model
    """
    conversation_history.add_message(user_input)
    combined_input = conversation_history.get_combined_history()
    return bow(combined_input, words).reshape(1, -1)

In [19]:
class User:
    """
      NAME: User
      PURPOSE: Represents a user with a name and preferences
      INVARIANTS: The User class requires the 'name' parameter to be a string.
                  The class has two attributes: 'name', representing the user's name as a string, and 'preferences', representing the user's preferences as a dictionary.
                  The 'preferences' dictionary is initially empty.
    """
    def __init__(self, name):
        """
          NAME: User
          PARAMETERS: name
          PURPOSE: Initializes a User object with a given name and an empty dictionary for preferences
          PRECONDITION: The 'name' parameter should be a string
          POSTCONDITION: A User object is created with the specified name and an empty preferences dictionary
        """
        self.name = name
        self.preferences = {}

    def add_preference(self, key, value):
        """
          NAME: add_preference
          PARAMETERS: key, value
          PURPOSE: Adds a preference to the user's preferences dictionary
          PRECONDITION: 'key' and 'value' parameters should be valid data types
          POSTCONDITION: The preference is added to the user's preferences dictionary
        """
        self.preferences[key] = value

    def get_preference(self, key):
        """
          NAME: get_preference
          PARAMETERS: key
          PURPOSE: Retrieves the value of a preference from the user's preferences dictionary
          PRECONDITION: 'key' parameter should be a valid key present in the user's preferences dictionary
          POSTCONDITION: Returns the value associated with the provided key, or None if the key is not found
        """
        return self.preferences.get(key, None)

In [21]:
def calculate_bmr(weight, height, age, gender):
    """
    Calculates the Basal Metabolic Rate (BMR) of a person.

    Args:
        weight (float): Weight of the person in kilograms.
        height (float): Height of the person in centimeters.
        age (int): Age of the person in years.
        gender (str): Gender of the person ('male' or 'female').

    Returns:
        float: The calculated BMR value.
    """
    if gender.lower() == 'male':
        bmr = 10 * weight + 6.25 * height - 5 * age + 5
    elif gender.lower() == 'female':
        bmr = 10 * weight + 6.25 * height - 5 * age - 161
    else:
        raise ValueError("Invalid gender. Please provide 'male' or 'female'.")

    return bmr

In [23]:
def chatbot():
    """
      NAME: chatbot
      PARAMETERS: None
      PURPOSE: Implements a chatbot for cardiovascular health using intents and user information
      PRECONDITION: The intents JSON file path should be specified correctly and accessible.
                  The necessary functions and classes, such as 'get_intents', 'create_tfidf_vectorizer', 'User', 'match_intent', and 'get_response' should be defined and accessible.
                  The 'random' module should be imported.
      POSTCONDITION: Implements an interactive chatbot that responds to user queries based on intents and provides personalized responses using user information.
    """
    
    intents = get_intents("intents.json")
    vectorizer = create_tfidf_vectorizer(intents)
    print("Welcome to the Cardiovascular Health Chatbot! Type 'quit' to exit.")
    
    # Get user-specific information
    user_name = input("Please enter your name: ")
    user = User(user_name)
    
    # Gather user preferences
    age_preference = input("Please enter your age: ")
    user.add_preference("age", age_preference)

    while True:
        user_message = input("You: ")
        if user_message.lower() == "quit":
            break
        
        conversation_history.append(user_message)
        matched_intent, _, confidence = match_intent(user_message, intents, vectorizer)
        if matched_intent and confidence > 0.5:
            response = get_response(matched_intent)

            # Personalize response using user information
            response = response.replace("{name}", user.name)

            conversation_history.append(response)
            print("Chatbot: ", response)
        else:
            response = "I'm sorry, I don't understand your question. Please try again."
            conversation_history.append(response)
            print("Chatbot: ", response)

if __name__ == "__main__":
    chatbot()

FileNotFoundError: [Errno 2] No such file or directory: 'intents.json'