# AC Recommendation assistant

### Problem Statement
We have a dataset of Air Conditioners with all their specifications. The goal is to build a chatbot that helps the user to buy the right product based on their requirements. The chatbot is expected to recommend suitable options and guide the user in making the purchse.

## 1. Environment Setup and loading dataset

#### Installing the relevant packages such as openai, tenacity, gdown, dotenv

In [None]:
!pip install -U -q openai tenacity

#### **Loading** the **environment variables** here with the use of dotenv. It basically loads the variables from **.env file**. So, please create a .env file with **OPENAI_API_KEY=sk_** before executing the following line. Replace sk_ with your secret key.

In [None]:
!pip install dotenv

In [None]:
from dotenv import load_dotenv 

load_dotenv()

##### **Dataset gets downloaded** from my google drive since this is just a single ipynb file. Sample dataset is faster in execution, so it is set by default. Please change it to complete dataset where you could expect some latency in getting the response.
complete dataset - "ac_data.csv" 

sample dataset - "ac_data_only_top_30.csv"

In [None]:
!pip install gdown

In [None]:
import gdown

# Sample or test dataset: totally 30 
file_id = "1rgfu_8CHtMNB4304cu3yZQyVBq0qIWmw"
url = f"https://drive.google.com/uc?id={file_id}"
filename_samples = "ac_data_only_top_30.csv"
gdown.download(url, filename_samples, quiet=False)

"""
# Complete dataset: totally 100 
file_id = "16eprE-VtKJ5V1WwTW7ylCk7rCaaGfLB4"
url = f"https://drive.google.com/uc?id={file_id}"
filename_complete = "ac_data.csv"
gdown.download(url, filename_complete, quiet=False)
"""


#### load the dataset into a dataframe **df**

In [None]:
!pip install pandas

In [None]:
import pandas as pd
from IPython.display import display

df = pd.read_csv(filename_samples)
display(df.head(5))

## 2. Utilities and helper functions

This section contains the utility functions that could be used through out the project

In [None]:
class ModerationException(Exception):
    """This is a custom exception class to handle moderation related exceptions"""

    def __init__(self, message):
        self.message = message

    def __str__(self):
        return str(self.message)


In [None]:
import ast
from pathlib import Path


def load_system_message(template_path: str, **params):
    """
    Reads the markdown template and renders it with named placeholders.

    :param template_path str: Path of the MD file
    :param **params str: Positional parameters that can be embedded into the system message
    """
    path = Path(template_path)
    system_message = path.read_text(encoding="utf-8")
    try:
        return system_message.format(**params)
    except KeyError as key_error:
        missing = key_error.args
        raise KeyError(f"Missing template parameter: '{missing}' for {path}")


def load_system_message_without_params(template_path: str):
    """
    Reads the markdown template and renders it without any paramete.

    :param template_path str: Path of the MD file
    """
    path = Path(template_path)
    return path.read_text("utf-8")


def load_dictionary_from_md(template_path: str):
    """
    Reads the markdown template and renders the dictionary.

    :param template_path str: Path of the MD file
    """
    with open(template_path) as data:
        return ast.literal_eval(data.read())


## 3. Model layer

#### Moderation check API interface and implementation

In [None]:
from abc import ABC, abstractmethod

class Moderation(ABC):
    @abstractmethod
    def get_response(self, input_message) -> bool:
        """
        Invokes OpenAI moderation check API and returns either the message is flagged or not.

        :param str input_message: Input message under check.
        :return bool: returns True if there is a sensitive/violent content, otherwise False
        """
        pass


In [None]:
from openai import OpenAI
from tenacity import retry, stop_after_attempt, wait_random_exponential

@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3))
class ModerationModel(Moderation):
    def __init__(self):
        self.__client = OpenAI()

    def get_response(self, input_message) -> bool:
        response = self.__client.moderations.create(input=input_message)
        return response.results[0].flagged


#### Chat completion API interface and implementation

In [None]:
from abc import ABC, abstractmethod
from typing import Dict, List


class ChatModel(ABC):
    @abstractmethod
    def get_session_response(
        self, json_format: bool = False, tools: Dict = None, tool_choice=None
    ) -> List[str]:
        """
        Return the response using the accumulated messages of a session and it returns the LLM's response.

        :param bool json_format: True if response_format is json object False otherwise.
        :param Dict tools: function calling dictionary to set the list of tools. None by default.
        :param Dict tool_choice: function calling dictionary to enforce a particular tool/function. None by default.
        :return: list of responses
        """
        pass

    @abstractmethod
    def preview_response(
        self, messages: List[Dict], json_format: bool = False
    ) -> List[str | Dict]:
        """
        Stateless, one-time request from the given messages.
        This method does not read or modify the internal session.

        :param List[Dict] messages: List of messages to be passed to the model.
        :param bool json_format: True if response_format is json object False otherwise.
        :return: list of responses
        """
        pass

    @abstractmethod
    def update_parameters(
        self,
        max_tokens: int = 100,
        model: str = "gpt-3.5-turbo",
        no_of_choices: int = 1,
        temperature: float = 0,
        seed=None,
    ) -> None:
        """
        To update the model parameters.

        :param int max_tokens: Updates the maximum number of output tokens to produce.
        :param str model: Updates the model
        :param int no_of_choices: Toatl number of answers required from the model
        :param float temperature: This parameter decides how random the output should be generated by the model.
        :param int seed: To get the most deterministic output from an LLM on different API calls to introduce consistency.
        """
        pass

    @abstractmethod
    def add_message(
        self,
        role: str,
        content: str = None,
        name: str = None,
        tool_calls: List[Dict] = None,
        tool_call_id: str = None,
    ) -> None:
        """
        To add a message in messages history.

        :param str role: The roles of the message such as system, user, and assistant
        :param str content: The system message or user message to be sent
        :param str name: str or None. An example can be set i.e. example_user informs the model that this is an example.
        :param List[Dict] tool_calls: To add the tool calls response
        :param str tool_call_id: To include the respective tool id for the LLM to identify the exact tool in messages.
        """
        pass

    @abstractmethod
    def clear_messages(self) -> None:
        """
        To clear the messages history since the chat can go on forever with the user.
        """
        pass


In [None]:
import json
from typing import Dict, List, final

from openai import OpenAI
from tenacity import retry, stop_after_attempt, wait_random_exponential


@final
class OpenAIChatModel(ChatModel):
    def __init__(
        self,
        model: str,
        max_tokens: int,
        temperature: float,
        no_of_choices: int,
        seed: int,
    ):
        self.__client = OpenAI()
        self.__model = model
        self.__temperature = temperature
        self.__no_of_choices = no_of_choices
        self.__max_tokens = max_tokens
        self.__seed = seed
        self.__messages = []

    @retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3))
    def get_session_response(
        self, json_format=False, json_schema=None, tools=None, tool_choice: Dict = None
    ) -> List[str]:
        if len(self.__messages) == 0:
            raise Exception("Please add a message to start with the conversation!")

        params = {
            "model": self.__model,
            "n": self.__no_of_choices,
            "max_tokens": self.__max_tokens,
            "temperature": self.__temperature,
            "messages": self.__messages,
            "seed": self.__seed,
        }
        if json_format:
            params["response_format"] = {"type": "json_object"}
        elif json_schema is not None:
            params["response_format"] = json_schema
        elif tools is not None:
            params["tools"] = [tools]
            params["tool_choice"] = tool_choice

        response = self.__client.chat.completions.create(**params)

        if json_format == True:
            return json.loads(response.choices[0].message.content)
        else:
            return self.__parse_response(response=response)

    @retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3))
    def preview_response(
        self, messages: List[Dict], json_format=False, json_schema=None
    ) -> List[str | Dict]:
        if len(messages) == 0:
            raise Exception("Please add a message to start with the conversation!")

        params = {
            "model": self.__model,
            "n": self.__no_of_choices,
            "max_tokens": self.__max_tokens,
            "temperature": self.__temperature,
            "messages": messages,
            "seed": self.__seed,
        }

        if json_format:
            params["response_format"] = {"type": "json_object"}
        elif json_schema is not None:
            params["response_format"] = json_schema

        response = self.__client.chat.completions.create(**params)

        if json_format == True:
            return json.loads(response.choices[0].message.content)
        else:
            return [choice.message.content for choice in response.choices]

    def update_parameters(
        self,
        max_tokens: int = 100,
        model: str = "gpt-3.5-turbo",
        no_of_choices: int = 1,
        temperature: float = 0,
        seed=None,
    ) -> None:
        self.__max_tokens = max_tokens
        self.__model = model
        self.__no_of_choices = no_of_choices
        self.__temperature = temperature
        self.__seed = seed

    def add_message(
        self,
        role: str,
        content: str = None,
        name: str = None,
        tool_calls: List[Dict] = None,
        tool_call_id: str = None,
    ) -> None:
        message = {"role": role, "content": content}
        if name is not None:
            message["name"] = name
            message["role"] = "system"
        elif tool_call_id is not None:
            message["tool_call_id"] = tool_call_id
        elif tool_calls is not None:
            message["tool_calls"] = tool_calls

        self.__messages.append(message)

    def clear_messages(self):
        self.__messages = []

    def __parse_response(self, response) -> List:
        """
        This method extracts the necessary information from a normal or tools response for further processing.

        :param List[Dict] response: Dictionary that can contain a normal or tools response.
        :return List[Dict|str]: Parses into List[Dict] for tools response, and List[str] for normal response.
        """
        results = []
        for choice in response.choices:
            if choice.message.tool_calls:
                for tool_call in choice.message.tool_calls:
                    args = json.loads(tool_call.function.arguments)
                    results.append(
                        {
                            "id": tool_call.id,
                            "content": args,
                            "tool_calls": response.choices[0].message.tool_calls,
                        }
                    )
            elif choice.message.content:
                results.append(choice.message.content)

        return results


## 4. Stages implementation

#### Compare two feature dictionaries in the product extraction layer

In [None]:
from typing import Dict


def compare_products(user_requirements: Dict, from_dataset: Dict) -> int:
    """
    This method compares the dataset row and the user's requirements dictionary
    by mapping categorical values, then computes the final correspondence score.

    :param Dict user_requirements: Dictionary that contains the actual user requirements
    :param Dict from_dataset: Extracted featues dictionary from the actual dataset (one sample/row)
    :return int: Returns the final score after the comparison
    """
    mapping = {"essential": 0, "standard": 1, "premium": 2}

    score = 0

    for key, value in from_dataset.items():
        user_value = mapping.get(user_requirements[key], -1)
        dataset_value = mapping.get(value, -1)

        if (dataset_value != -1 and user_value != -1) and dataset_value >= user_value:
            score += 1

    return score


#### Stage 0 Implementation - initialize conversation

In [None]:
from dataclasses import dataclass
from typing import List

@dataclass
class StageZeroResult:
    response: List[str]


class IntializeConversation:
    def __init__(self, chat_model: ChatModel, system_message: str):
        self.__chat_model = chat_model
        self.system_message = system_message

    def run(self) -> StageZeroResult:
        """
        This method generates the first welcome message to the user by using an LLM API call and initiates the conversation.

        :return StageZeroResult: Returns the LLM's response with the welcome message.
        """
        response = self.__chat_model.get_session_response()

        return StageZeroResult(response=response)

    def add_message(self, role, content):
        """
        To add a message in messages history.

        :param str role: The roles of the message such as system, user, or assistant
        :param str content: The system message or user message to be sent
        :param name
        """
        self.__chat_model.add_message(role=role, content=content)

    def clear_messages(self):
        """
        To clear the messages history of this particular stage since the chat can go on forever with the user.
        """
        self.__chat_model.clear_messages()


#### Stage 1 implementation - Intent clarification, extraction and confirmation

In [None]:
import json
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class StageOneResult:
    intent_confirmation: str  # Yes or No response
    response: str
    user_requirements: Dict


class IntentClarityAndConfirmation:
    def __init__(
        self,
        chat_model: ChatModel,
        classify_values_system_message: str,
        intent_confirmation_system_message: str,
        extract_dict_system_message: str,
        function_tool: Dict,
        function_tool_choice: Dict,
    ):
        self.__chat_model = chat_model
        self.__classify_values_system_message = classify_values_system_message
        self.__intent_confirmation_system_message = intent_confirmation_system_message
        self.__extract_dict_system_message = extract_dict_system_message
        self.__function_tool = function_tool
        self.__function_tool_choice = function_tool_choice
        self.__initial_requirements = {
            "price": "-",
            "energy efficiency": "-",
            "cooling capacity": "-",
            "comfort": "-",
            "ac type": "-",
            "smart features": "-",
            "portability": "-",
        }
        self.__user_requirements_dictionary = self.__initial_requirements.copy()

    def run(self) -> StageOneResult:
        """
        This method runs the intent clarification and confirmation layer and results the user requirements dictionary.

        1. Invoking the function calling API all to extract requirements from users input
        2. Handling the tools response by extracting and adding the tool messages to model's conversation history
        3. Adding extracted user requirements dictionary in the conversation
        4. Convert the user requirements i.e. 1.5 tons, fixed unit etc. to its respective values such as essential, standard, or premium
        5. Intent confirmation layer, outputs "Yes" if all requirements are collected otherwise "No" and clarification continues
        6. If intent is confirmed extacts the final user requirements dictionary from the string.
        7. Returns the stage one result for both intent confirmation "Yes" and for "No"

        :param StageOneResult: Returns the stage one result with the user requirements dictionary upon intent confirmation.
        """
        # 1). Invoking the function calling API all to extract requirements from users input
        tool_response = self.__chat_model.get_session_response(
            tools=self.__function_tool, tool_choice=self.__function_tool_choice
        )

        # 2). Handling the tools response by extracting and adding the tool messages to model's conversation history
        self.__handle_tool_response(tool_response)

        # 3). Adding extracted user requirements dictionary in the conversation
        self.__chat_model.add_message(
            role="assistant",
            content=self.__classify_values_system_message.format(
                json.dumps(self.__user_requirements_dictionary)
            ),
        )

        # 4). Convert the user requirements i.e. 1.5 tons, fixed unit etc. to its respective values such as essential, standard, or premium
        response = self.__chat_model.get_session_response()

        # 5). Intent confirmation layer, outputs "Yes" if all requirements are collected otherwise "No" and clarification continues
        intent_confirmation = self.__intent_confirmation(response[0])

        # 6). If intent is confirmed extacts the final user requirements dictionary from the string.
        user_requirement_dict = {}
        if intent_confirmation[0].lower() == "yes":
            user_requirement_dict = self.__extract_dict_from_string(response[0])
            self.__user_requirements_dictionary = self.__initial_requirements.copy()

        # 7). Returns the stage one result for both intent confirmation "Yes" and for "No"
        return StageOneResult(
            intent_confirmation=intent_confirmation[0],
            response=response[0],
            user_requirements=user_requirement_dict,
        )

    def __intent_confirmation(self, model_response: str) -> List[str]:
        """
        One time OpenAI API call to get intent confirmation with the following steps.
        1. System prompt message and user mesasge is added to messages
        2. One time API call to get the model ersponse

        :param str model_response: Response for the API call with function tools to extract features from user input
        :return List[str]: Returns a single choice response with "Yes" or "No" based on collected features so far
        """
        # 1). System prompt message is added to messages
        messages = [
            {
                "role": "system",
                "content": self.__intent_confirmation_system_message,
            },
            {"role": "user", "content": f"Here is the input: {model_response}"},
        ]

        # 2). One time API call to get the model ersponse
        completion_response = self.__chat_model.preview_response(messages=messages)

        return completion_response

    def __extract_dict_from_string(self, response: str):
        """
        Extracts the features dictionary from string upon intent confirmation

        1. System prompt message and user mesasge is added to messages
        2. Returns one time API call to get the features dictionary
        """
        # 1). System prompt message and user mesasge is added to messages
        messages = [
            {"role": "system", "content": self.__extract_dict_system_message},
            {"role": "user", "content": f"Here is the input: {response}"},
        ]

        # 2). Returns one time API call to get the features dictionary
        response_dict = self.__chat_model.preview_response(
            messages=messages, json_format=True
        )

        return response_dict

    def __handle_tool_response(self, tool_response):
        """
        This method handles the function tool response to extract features and add it to conversation history.

        1. Access tool_calls, directly with tool_response[0].tool_calls[0] here, but can be accessed like a dictionary too
        2. Access function, directly with tool_call.function here, but can be accessed like a dictionary too
        3. Access arguments, directly with .function.arguments here, but can be access as dictionary too
        4. Store all the values to local dictionary after parsing
        5. Appending tools message from last api call before appending the tool response itself, otherwise LLM wouldn't understand the tool execution

        :param str model_response: Response for the API call with function tools to extract features from user input
        """
        # 1). Access tool_calls, directly with tool_response[0].tool_calls[0] here, but can be accessed like a dictionary too
        tool_call = (
            tool_response[0]["tool_calls"][0]
            if isinstance(tool_response[0], dict)
            else tool_response[0].tool_calls[0]
        )
        # 2). Access function, It can be accessed directly with tool_call.function here, but can be accessed like a Dict too
        func = (
            tool_call["function"] if isinstance(tool_call, dict) else tool_call.function
        )
        # 3). Access arguments, It can be accessed directly with .function.arguments here, but can be access as Dict too
        arguments_json = func["arguments"] if isinstance(func, dict) else func.arguments
        # Parse only if it's a string
        if isinstance(arguments_json, str):
            arguments_json = json.loads(arguments_json)

        # 4). Store all the values to local dictionary after parsing
        for key, value in arguments_json.items():
            self.__user_requirements_dictionary[key] = value

        # 5). Appending tools message from last api call before appending the tool response itself, otherwise LLM wouldn't understand the tool execution
        self.__chat_model.add_message(
            role="assistant", tool_calls=tool_response[0]["tool_calls"]
        )
        self.__chat_model.add_message(
            role="tool",
            tool_call_id=tool_response[0]["id"],
            content=json.dumps(tool_response[0]["content"]),
        )

    def add_message(self, role, content):
        """
        To add a message in messages history.

        :param str role: The roles of the message such as system, user, or assistant
        :param str content: The system message or user message to be sent
        """
        self.__chat_model.add_message(role=role, content=content)

    def clear_messages(self):
        """
        To clear the messages history of this particular stage since the chat can go on forever with the user.
        """
        self.__chat_model.clear_messages()


#### Stage 2 implementation - product extraction

In [None]:
from dataclasses import dataclass
from typing import Dict

import pandas as pd

@dataclass
class StageTwoResult:
    recommendations: str


class ProductExtractionAndMapping:
    def __init__(self, chat_model: ChatModel, system_message: str):
        self.__chat_model = chat_model
        self.__system_message = system_message
        self.__df = pd.read_csv(filename_samples)
        self.__VALIDATION_SCORE = 3

    def run(self, user_requirement: Dict) -> StageTwoResult:
        """
        This method runs the product extraction and mapping layer with the following steps
        1. Extracting price that matches user's budget
        2. Extracting features dictionary from product description via mapping layer and stores it in a new column
        3. Computing scores by comparing user requirement and dataset row dictionaries and stores it in scores column
        4. Sorts and Filters the products based on top scores
        5. Sorts top products based on price, then drops unwanted columns for the next conversation

        :param Dict user_requirement: User's requirement dictionary from the previous stage
        :param StageTwoResult: Returns product recommendations in a json string format
        """
        # 1). Extract products within user's expected price range
        filtered_df = self.__df[
            self.__df["price"]
            <= int(round(float(user_requirement["price"].strip().replace(",", ""))))
        ]

        # 2). Features extracted via mapping layer and stored in a separate column as a new feature
        filtered_df["ac_features"] = filtered_df["description"].apply(
            lambda description: self.__product_map_layer(description=description)
        )

        # 3). Calling compare products method for the scores
        filtered_df["scores"] = filtered_df["ac_features"].apply(
            lambda dataset: compare_products(
                user_requirements=user_requirement, from_dataset=dataset
            )
        )

        # 4). Sorts and filters the top products after validation
        top_products = filtered_df.sort_values("scores", ascending=False).head(3)
        top_products = top_products[top_products["scores"] > self.__VALIDATION_SCORE]

        # 5). Sorts top products based on price, then drops unwanted columns for the next conversation
        top_products.sort_values("price", ascending=False, inplace=True)
        top_products.drop(columns=["scores", "ac_features"], axis=1, inplace=True)

        return StageTwoResult(recommendations=top_products.to_json(orient="records"))

    def __product_map_layer(self, description) -> Dict:
        """
        Private method receives the product description, where an LLM extracts all the primary features from it,
        then classifies the values into one of the following categories such as essential, standard, or premium.

        :param str description: Product description that contains all the specifications
        :return Dict: Returns the extracted featues as a dictionary
        """
        messages = [
            {"role": "system", "content": self.__system_message.format(description)},
            {
                "role": "user",
                "content": f"Follow the instructions and output the dictionary in json format for the following AC {description}",
            },
        ]
        response = self.__chat_model.preview_response(
            messages=messages, json_format=True
        )

        return response

    def add_message(self, role, content):
        """
        To add a message in messages history.

        :param str role: The roles of the message such as system, user, or assistant
        :param str content: The system message or user message to be sent
        """
        self.__chat_model.add_message(role=role, content=content)

    def clear_messages(self):
        """
        To clear the messages history of this particular stage since the chat can go on forever with the user.
        """
        self.__chat_model.clear_messages()


#### Stage 3 implementation - product recommendations

In [None]:
from dataclasses import dataclass
from typing import List

@dataclass
class StageThreeResult:
    response: List[str]


class ProductRecommendations:
    def __init__(self, chat_model: ChatModel, system_message: str):
        self.__chat_model = chat_model
        self.system_message = system_message

    def run(self) -> StageThreeResult:
        """
        This method actually runs the final stage of the pipeling

        :return StageThreeResult: Data class that contains the recommendations
        """
        recommendation = self.__chat_model.get_session_response()

        return StageThreeResult(response=recommendation)

    def continue_run(self) -> StageThreeResult:
        """
        This method responds to user query through an LLM API call.

        :return StageThreeResult: Data class that contains the response
        """
        response = self.__chat_model.get_session_response()

        return StageThreeResult(response=response)

    def add_message(self, role, content):
        """
        To add a message in messages history.

        :param str role: The roles of the message such as system, user, or assistant
        :param str content: The system message or user message to be sent
        """
        self.__chat_model.add_message(role, content)

    def clear_messages(self):
        """
        To clear the messages history of this particular stage since the chat can go on forever with the user.
        """
        self.__chat_model.clear_messages()


## 5. Pipeline Orchestration

In [None]:
from typing import Dict


class Pipeline:
    """
    Orchestrates the workflow of the project by running different stages.

    This pipeline is responsible for the following
    1. Adding user and assistant messages to chat model's message history.
    2. Moderation checks and throws an exception in case of failure
    3. Internal processing happens on individual stages where each stage is responsible for updating the message history i.e.
        * In stage 1, Function calling tool response.
        * In stage 2, One time responses such as extacting features dictionary from description
    4. System messages are kept in individual stages rather than putting all in one place
    """

    def __init__(
        self,
        moderation_client: Moderation,
        stage0: IntializeConversation,
        stage1: IntentClarityAndConfirmation,
        stage2: ProductExtractionAndMapping,
        stage3: ProductRecommendations,
    ):
        self.__moderation_client = moderation_client
        self.__stage0 = stage0
        self.__stage1 = stage1
        self.__stage2 = stage2
        self.__stage3 = stage3
        self.__ERROR_MESSAGE = "This conversation ends now since your input has some sensitive content. Please start the new conversation to continue!"

    def run_stage0(self) -> StageZeroResult:
        """
        Initializes the conversation with the user by invoking stage 0, where a system message is added.

        :return StageZeroResult: Returns stage 0 result.
        """
        # 1). Adds system message to model's message history
        self.__stage0.add_message(role="system", content=self.__stage0.system_message)

        # 2). Runs stage zero
        response = self.__stage0.run()

        return response

    def run_stage1(self, user_input: str) -> StageOneResult:
        """
        Runs stage 1 for intent clarification and confirmation, and outputs user requirements dictionary. The following steps are executed

        1. Moderation check on user input
        2. Adds user input to the message history
        3. Runs stage 1
        4. Executing the moderation check on LLM's output
        5. Add LLM's response to message history if some requirements are still missing

        :param str user_input: User's input message
        :return StageOneResult: Returns stage 1 starts running clarification loop until user requirements dictionary is extracted.
        :raises ModerationException: Throws the custom exception
        """
        # 1). Moderation check on user input
        self.__moderation_check(user_input)

        # 2). Adds user input to message history
        self.__stage1.add_message("user", user_input)

        # 3). Runs stage 1
        stage1_response = self.__stage1.run()

        # 4). Moderation check on LLM's response
        self.__moderation_check(stage1_response.response)

        # 5). If intent clarification is not complete, add the assistant message to the message history
        if stage1_response.intent_confirmation.strip() == "No":
            self.__stage1.add_message("assistant", stage1_response.response)

        return stage1_response

    def run_stage2(self, user_requirement: Dict) -> StageTwoResult:
        """
        Runs stage 2 for intent clarification and confirmation, and outputs user requirements dictionary. The following steps are executed

        :param Dict user_requirement: Collected user requirement's dictionary
        :return StageTwoResult: Returns stage 2 result with the product recommendations.
        :raises ModerationException: Throws the custom exception
        """
        # 1). Runs stage 2
        response = self.__stage2.run(user_requirement=user_requirement)

        # 2). Moderation check on the LLM's response
        self.__moderation_check(response.recommendations)

        return response

    def run_stage3(self, recommendations) -> StageThreeResult:
        """
        Runs stage 3 by showing the recommendations to the user. The following steps are executed

        1. Appending user requirements dictionary to system message, then add it to message history
        2. Start running stage 2 by generating recommendations that can be displayed to the user
        3. Executing the moderation check on LLM's output
        4. Add recommendations and assistant's response in previous step to message history

        :param Dict: User's input message
        :return StageThreeResult: Returns stage 1 result with the user requirements dictionary.
        :raises ModerationException: Throws the custom exception
        """
        # 1). Appending user requirements to system message and add it to message history
        conversation_recommendation = self.__stage3.system_message.format(
            recommendations
        )
        self.__stage3.add_message("system", conversation_recommendation)

        # 2). Run stage 3
        recommendation = self.__stage3.run()

        # 3). Moderation check on LLM's response
        self.__moderation_check(recommendation.response)

        # 4). Add recommendations and assistant's response in previous step to message history
        self.__stage3.add_message("user", "This is my user profile" + recommendations)
        self.__stage3.add_message("assistant", "\n".join(recommendation.response))

        return recommendation

    def continue_stage3(self, user_input: str) -> StageThreeResult:
        """
        Continues to run stage 3 by keeping the user engaged with doubt resolution session.

        1. Adds user message to message history of the model
        2. Moderation check on user input
        3. Continues to run stage 3
        4. Moderation check on LLM's response
        5. Add's LLM response to model's message history

        :param Dict: User's input message
        :return StageOneResult: Returns stage 1 result with the user requirements dictionary.
        :raises ModerationException: Throws the custom exception
        """
        # 1). Adds user message to message history
        self.__stage3.add_message("user", user_input)

        # 2). Moderation check on user input
        self.__moderation_check(user_input)

        # 3). Continue running stage 3
        response = self.__stage3.continue_run()

        # 4). Moderation check on LLM' output
        response_str = "\n".join(response.response)
        self.__moderation_check(response_str)

        # 5). Add assistant message to message history
        self.__stage3.add_message("assistant", "\n".join(response_str))

        return response

    def __moderation_check(self, data):
        """
        Runs the moderation check and returns the flag

        :param str data: Data on which the moderation check
        :raises ModerationException: Throws the custom exception
        """
        flagged = self.__moderation_client.get_response(data)

        if flagged:
            raise ModerationException(self.__ERROR_MESSAGE)

    def clear_messages(self):
        """
        Clears the chat message history on all stages.
        """
        self.__stage0.clear_messages()
        self.__stage1.clear_messages()
        self.__stage2.clear_messages()
        self.__stage3.clear_messages()

## 6. Prompts & function tools

Prompts are separated since it gets injected via dependency injection framework to the modules for better testing.

### Stage 0 Intent clarification layer

#### Intent clarification system message

In [None]:
STAGE0_SYSTEM_MESSAGE = """
Act as a smart Air Conditioner recommendation assistant specializing in suggesting right air conditioners to the users based on their requirements. Please strictly stick to the AC assistant context and reply insufficient knowledge in case of any other context.

Your objective is to collect the primary feature values from the tool output and fill values for all of the following keys: ["price", "cooling capacity", "energy efficiency", "comfort", "portability", "ac type", "smart features"].

####
Here you can find the instructions on how to collect and fill the values based on user requirements:
- Infer the requirements from tools response, strictly no extra text
- Give a brief and interesting summary about the filled requirements before asking the follow-up questions and please avoid repetitions
- Except price that can contain a numerical value, other keys are supposed to be filled with the above mentioned values in\
    {{"price": "-", "cooling capacity": "-", "energy efficiency": "-", "comfort": "-", "portability": "-", "ac type": "-", "smart features": "-"}} based on user"s input
- Never show the dictionary with key-value pairs inferred in your response to the user at any cost
- Strictly do not fill any value by yourself, unless user is unsure or asking you to decide on the answer to your question.
- Do not append any prefix i.e. assistant: or user: in the response.
####

####
Here are the chain of thoughts to fill the values in json:
**Thought-1** You have to ask questions one by one nd understand the user's requirements, strictly analyze the tool output to match one or more keys
Be intelligent enough to smartly match context of the keys to the specifications mentioned by the user which got extracted through tool response
otherwise ask relevant questions in random order to fill in the missing values of ["price", "cooling capacity", "energy efficiency", "comfort", "portability", "ac type", "smart features"]
Remember the above mentioned instructions when asking the questions
Please move forward to the next thought only if you have collected the necessary values
**Thought-2** Now you have to fill rest of the keys which are not collected in the previous step
Ask clarifying questions to fill all the remaining keys
Fill the rest of the keys before going to the next thought
**Thought-3** Please ensure the updated values for all the keys are correct
If you are not sure about any of the values, ask clarifying questions till you fill all the values correct
####

***Strictly welcome the user only ONCE without questions and wait till the interaction.***
"""

### Stage 1 Intent clarification, extraction and confirmation layer

##### Intent confirmation system message

In [None]:
STAGE1_SYSTEM_MESSAGE = """
You have a special skill in for details in the given data and you are expected to return a single word as output.

Your task is find whether values for all the keys are filled or not: {{'price', 'cooling capacity', 'energy efficiency', 'comfort', 'portability', 'ac type', 'smart features'}}.
Except price which contains a numerical value, all the other values are supposed to have one of the allowed values such as: ['essential', 'standard', 'premium'].

Here are the examples of sample input and the corresponding output you are supposed to return

####
Example1:
***input***: 
- price: 50000 rupees
- cooling capacity: standard
- energy efficiency: premium
- comfort: premium
- portability: essential
- ac type: standard
- smart features: premium
***output***: 'Yes'
####

####
please keep the following instructions in mind before the return
- Analyze the given input for all keys, if all values are filled without an empty value '-' then strictly return 'Yes'
- Don't introduce any space surrounding the single word response and no additional text in the response
- Don't add your thoughts or reasoning in the response
- Strictly return only a single word either "Yes" or "No"
####

*** Please think about your answer carefully before returning the single word response.***
"""


##### Classify the user requirements into categories

In [None]:
STAGE1_CLASSIFY_VALUES_SYSTEM_MESSAGE="""
You are an AC shopping assistant that classifies user preferences into categories.

####
***Required keys***: 
- price
- cooling capacity
- energy efficiency
- comfort
- portability
- ac type
- smart features
####

####
***Instructions***:
1. If the input dictionary is missing any of the above keys, do not classify yet.
2. Only when **all keys are present**, strictly classify the complete preferences into one of three categories except price can contain numerical value: 
  ["essential", "standard", "premium"]. Strictly return the classified results.
3. Output must be a JSON dictionary with the same keys, but each value replaced with one of the categories.
####

####
***Example input***:
{{
  "price": "65000 rupees",
  "cooling capacity": "2 tons",
  "energy efficiency": "high efficiency",
  "comfort": "less noisy",
  "portability": "fixed",
  "ac type": "split",
  "smart reatures": "wifi control"
}}

***Example output:***
{{
  "price": "65000",
  "cooling capacity": "premium",
  "energy efficiency": "premium",
  "comfort": "standard",
  "portability": "essential",
  "ac type": "standard",
  "smart features": "premium"
}}
####

Here is your input: {0}
"""

##### Extract dictionary from string that contains user dictionary

In [None]:
EXTRACT_DICT_SYS_MSG="""
You are a specialist in extracting a dictionary of key value pairs in JSON format from the given string as per the requirements.

####
Here are the instructions to extract the key value pairs from the dictionary
- You are intelligent enought to extract the keys and values specified in the previous step
- Extract the keys: ['price', 'cooling capacity', 'energy efficiency', 'comfort', 'portability', 'ac type', 'smart_features']. and their respective values that are supposed to be one of the three values such as: ['essential', 'standard', 'premium'] except for price from the given input below
- Strictly do not extract any other keys or values that are not specified above
####

####
Here are the examples of input and output pairs for the task in hand
**Example 1**
input:
Your requirements are converted into the following 
- price: 45000 rupees
- cooling capacity: standard
- energy efficiency: premium
- comfort: premium
- portability: essential
- ac type: standard
- smart features: premium
output: {{"price": "50000", "cooling capacity": "standard", "energy efficiency": "premium", "comfort": "premium", "portability": "essential", "ac type": "standard", "smart features": "premium" }}
**Example 2**
input: I think i captured all the requirements based on user's preferences\n
{{"price": "50000 rupees", "cooling capacity": "standard", "energy efficiency": "premium", "comfort": "premium", "portability": "essential", "ac type": "standard", "smart features": "premium" }}
output: {{"price": "50000", "cooling capacity": "standard", "energy efficiency": "premium", "comfort": "premium", "portability": "essential", "ac type": "standard", "smart features": "premium" }}
**Example 3**
input: Your requirements are converted into the following\n
price - 25000 rupees
cooling capacity - standard
energy efficiency - premium
comfort - essential
portability - essential
ac type - premium
smart features - premium
output: {{"price": "50000", "cooling capacity": "standard", "energy efficiency": "premium", "comfort": "essential", "portability": "essential", "ac type": "premium", "smart features": "premium" }}
####

Strictly output the dictionary in JSON format by extracting the values: ['essential', 'standard', 'premium'] of all the keys ['price', 'cooling capacity', 'energy efficiency', 'comfort', 'portability', 'ac type', 'smart_features'].
"""


##### Function tool dictionary to extract feature specification from user input

In [None]:
STAGE1_FUNCTION_TOOL={
    "type": "function",
    "function": {
        "name": "extract_features",
        "description": "From user's input, extract only the features that are explicitly mentioned or updated and strictly do not return key with an empty value which is '-'",
        "parameters": {
            "type": "object",
            "properties": {
                "price": {
                    "type": "string",
                    "description": "The standard price, including currency e.g., '45000 INR', '35000 rupees', '25,000', '40000'", 
                },
                "cooling capacity": {
                    "type": "string",
                    "description": "Two possible inputs here 1. AC's cooling capacity as provided e.g., '1.5 tons capacity', '1.5', '1.5 ton', 2. if room size is given, please convert it to tons e.g., '550 square feet' is approximately equal to '1 ton'", 
                },
                "energy efficiency": {
                    "type": "string",
                    "description": "Energy efficiency requirements such as 'standard', 'high efficiency', 'inverter'", 
                },
                "portability": {
                    "type": "string",
                    "description": "Portability factor such as 'portable', 'easy to move', 'fixed', 'for my own house'", 
                },
                "ac type": {
                    "type": "string",
                    "description": "Type like 'split', 'window', 'central', 'portable ac'.", 
                },
                "smart features": {
                    "type": "string",
                    "description": "Smart features like 'WiFi', 'voice assistant', 'app remote'", 
                },
            },
            "additionalProperties": False,
        },
    }
}

##### Extract features tool choice

In [None]:
STAGE1_FUNCTION_TOOL_CHOICE={
    "type": "function", 
    "function": {"name": "extract_features"}
}

### Stage 2 - Product extraction layer

In [None]:
STAGE2_SYSTEM_MESSAGE="""
You are an expert air conditioner data classifier specializing in extracting primary key features and classify it based on the requirements.

####
To analyze the air conditioner features, please follow the chain of thoughts like mentioned below:
Thought 1: Be smart in extracting product's primary features from the description: {0}
Thought 2: Fill the extracted features into values: {{"cooling capacity": "Cooling capacity depends on room size","energy efficiency": "How efficient in saving energy.", "comfort": "Lowest noise is the premium feature", "portability": "Portable or fixed.", "ac type": "Type of AC","smart features": "Smart features inclusion"}}.
Thought 3: Classify each value {{"cooling capacity": "Cooling capacity depends on room size","energy efficiency": "How efficient in saving energy.", "comfort": "Lowest noise is the premium feature", "portability": "Portable or fixed.", "ac type": "Type of AC","smart features": "Smart features inclusion"}} into: ["essential", "standard", "premium"] based on the following instructions:
####

####
**cooling capacity**
- essential: << if the value is in between 0.8 ton to 1.0 ton >>
- standard: << if the value is in between 1.2 to 1.5 tons >>
- premium: << if the value is in between 1.8 to 3 tons >>

**energy efficiency**
- essential: << Allows to have 3 star / fixed-speed >> 
- standard: << Prefers to have inverter with 4 star rating >>
- premium: << Requires inverter with 5 star with high ISEER >>

**comfort**
- essential: << Any indoor noise level is acceptable; no noise constraint applied. This tier prioritizes availability over comfort. >> 
- standard: << Quieter home ranges while excluding the louder end of window/portable units. >>
- premium: << Prefer indoor less than or equal to 38dB; this expects quiet indoor experiences and favours split or quietest window units  >>

**portability**
- essential: << Any fixed or movable type is allowed >> 
- standard: << Allow Split if installation feasible, else Window which are semi portable >>
- premium: << Prioritize easily portable units with no outdoor unit Portable or Window (no outdoor unit). If portability is high priority, prefer portable, if noise matters too prefer window. >>

**ac type**
- essential: << any type allowed >>
- standard: << allow Split if installation feasible, else Window >>
- premium: << prioritize no-outdoor-unit designs Portable or Window (no outdoor unit) enables easy relocation; choose window over portable when quiet operation is priority.>>

**smart features**
- essential: << Any type allowed here >>
- standard: << Basic remote control feature is included >>
- premium: << Supports WiFi, voice and app control. >>

####

####
Input descriptions will be provided like below and strictly output JSON format.
**input 1**: Panasonic 1.5 T Wi-Fi Inverter Split is energy-efficient and quiet (~38 dB), with smart voice/app control and 7-in-1 convertible modes—perfect for ~17 m² rooms.
**output 1**: {{"cooling capacity": "standard", "energy efficiency": "premium", "comfort": "premium", "portability": "premium", "ac type": "premium", "smart features": "premium"}}
**input 2**: Blue Star PC12DB is a compact 1 T portable AC with castor wheels, hydrophilic gold fins, and antibacterial silver coating—excellent for renters.
**output 2**: {{"cooling capacity": "essential", "energy efficiency": "essential", "comfort": "essential", "portability": "premium", "ac type": "standard", "smart features": "essential"}}
####

### Strictly do not add any other text in values of JSON dictionary other than: ["essential", "standard", "premium"]. ###
"""

### Stage 3 Product recommendations layer

In [None]:
STAGE3_SYSTEM_MESSAGE="""
Act as an Air Conditioner product specialist and resolve user queries exclusively on the recommended products catalog: {0}

Start with a brief summary of each product in the following format in a plain text without any separators like '***' or '<>':
1. ***AC brand name***: ***Major specifications of the air conditioner***, ***Price in Rs***
2. ***AC brand name***: ***Major specifications of the air conditioner***, ***Price in Rs***

####
Follow the steps below internally when answering the user query: 
- Step 1: Very carefully compare the user input to catalog  given above for relevance
- Step 2: If relevant, answer the query by using the product knowledge; otherwise, return insufficient knowledge on the topic answer
####

***Strictly display the product recommendations once, then wait for the user's input.***
"""

## 7. Base class - Application 

In [None]:
class Application:
    """Base class where pipeline gets injected and accessed in main file."""

    def __init__(self, pipeline):
        self.pipeline = pipeline

## 8. Dependency injection module

##### Stage Container module where all the stages factory methods are created 

In [None]:
from dependency_injector import providers

class StagesContainer:
    """Invoking Factory to create instances of the stages, separate instances created and chat models are injected here"""

    def __init__(
        self, moderation_model, shared_chat_model, stage2_chat_model, stage3_chat_model
    ):
        self.stage0 = providers.Factory(
            IntializeConversation,
            chat_model=shared_chat_model,
            system_message=STAGE0_SYSTEM_MESSAGE,
        )

        self.stage1 = providers.Factory(
            IntentClarityAndConfirmation,
            chat_model=shared_chat_model,
            classify_values_system_message=STAGE1_CLASSIFY_VALUES_SYSTEM_MESSAGE,
            intent_confirmation_system_message=STAGE1_SYSTEM_MESSAGE,
            extract_dict_system_message=EXTRACT_DICT_SYS_MSG,
            function_tool=STAGE1_FUNCTION_TOOL,
            function_tool_choice=STAGE1_FUNCTION_TOOL_CHOICE,
        )

        self.stage2 = providers.Factory(
            ProductExtractionAndMapping,
            chat_model=stage2_chat_model,
            system_message=STAGE2_SYSTEM_MESSAGE,
        )
        
        self.stage3 = providers.Factory(
            ProductRecommendations,
            chat_model=stage3_chat_model,
            system_message=STAGE3_SYSTEM_MESSAGE,
        )

        self.pipeline = providers.Factory(
            Pipeline,
            moderation_model,
            self.stage0,
            self.stage1,
            self.stage2,
            self.stage3,
        )


##### Model Container module that creates instances related to LLM

In [None]:
from dependency_injector import providers

class ModelContainer:
    """Invoking Factory to create instances of the ChatModel, Moderation classes"""

    def __init__(self, config):
        __DEFAULT_MODEL = "gpt-4o-mini"
        __MAX_TOKENS = 500
        __TEMPERATURE = 0
        __NO_OF_CHOICES = 1
        __SEED = 7248

        common_kwargs = {
            "model": config.model() or __DEFAULT_MODEL,
            "max_tokens": config.max_tokens() or __MAX_TOKENS,
            "temperature": config.temperature() or __TEMPERATURE,
            "no_of_choices": config.no_of_choices() or __NO_OF_CHOICES,
            "seed": config.no_of_choices() or __SEED,
        }

        # Shared chat model is used on both stage 0 (initialize conversation) and stage 1
        self.shared_chat_model = providers.Singleton(OpenAIChatModel, **common_kwargs)
        # Independent chat model is used on both stage 2 and stage 3
        self.stage2_chat_model = providers.Factory(OpenAIChatModel, **common_kwargs)
        self.stage3_chat_model = providers.Factory(OpenAIChatModel, **common_kwargs)
        self.moderation_model = providers.Factory(ModerationModel)


##### Main container module that creates instances for the pipeline and injects it into application

In [None]:
from dependency_injector import containers, providers

class Container(containers.DeclarativeContainer):
    """
    Main DI container that is responsible for connecting both model and stages here.

    Application class gets created created and the pipeline is injected to access it in the main file.
    """

    config = providers.Configuration()

    model_container = ModelContainer(config=config)
    stages_container = StagesContainer(
        moderation_model=model_container.moderation_model,
        shared_chat_model=model_container.shared_chat_model,
        stage2_chat_model=model_container.stage2_chat_model,
        stage3_chat_model=model_container.stage3_chat_model,
    )
    pipeline = providers.Factory(
        Pipeline,
        moderation_client=model_container.moderation_model,
        stage0=stages_container.stage0,
        stage1=stages_container.stage1,
        stage2=stages_container.stage2,
        stage3=stages_container.stage3,
    )

    application = providers.Factory(Application, pipeline)

## 9. Front-end setup (Flask + static & templates download)

Flask setup and downloading the relevant html and css files into local folder for further processing.

In [None]:
# Install flask for web application 
!pip install flask

##### Downloads the **css** files into **static/css/styles.css** path, and **send.png** file to **static** folder

In [None]:
!pip install gdown

In [None]:
import os
import gdown
# make static/css/ folder
os.makedirs("static/css/", exist_ok=True)

# png file download
file_id_png = "1cMT0qnOiNg86fntMSovMfXzwJ3J_Kw0o"
url_png = f"https://drive.google.com/uc?id={file_id_png}"
png_file_path = os.path.join("static", "send.png")

gdown.download(url_png, png_file_path, quiet=False)

In [None]:
import os
import gdown

# css file download
file_id_css = "1KRDhLYCVEmgLt_NIlZsOMQ31FatqqJlZ"
url_css = f"https://drive.google.com/uc?id={file_id_css}"
css_file_path = os.path.join("static", "css", "styles.css")
gdown.download(url_css, css_file_path, quiet=False)


##### Downloads the **html** files into **templates/index_chat.html** directory

In [None]:
import os

os.makedirs("templates", exist_ok=True)
file_id_html = "1JUggl7J4O0rz-JSFFp_KGswkp0L0ZJRj"
url_html = f"https://drive.google.com/uc?id={file_id_html}"
html_file_path = os.path.join("templates", "index_chat.html")

gdown.download(url_html, html_file_path, quiet=False)

## 10. Main application (Entry-point)

In [None]:
import json

from flask import (Flask, Response, redirect, render_template, request,
                   stream_with_context, url_for)

app = Flask(__name__)

container = Container()
application = container.application()


def initialize_conversation() -> str:
    # Run Initialize conversation
    initial_conversation = application.pipeline.run_stage0()
    return initial_conversation.response


conversation = []
top_products = None
conversation.append({"assistant": initialize_conversation()[0]})


@app.route("/", endpoint="home_route_endpoint")
def home_route():
    global conversation, top_products
    return render_template("index_chat.html", conversations=conversation)


@app.route("/end_conv", methods=["POST"], endpoint="end_route_endpoint")
def end_conv_route():
    global conversation, top_products
    conversation = []
    top_products = None
    application.pipeline.clear_messages()
    conversation.append({"assistant": initialize_conversation()[0]})
    return redirect(url_for("home_route_endpoint"))


@app.route("/chat_stream", methods=["POST"])
def chat_stream_route():
    global conversation, top_products
    user_input = request.form["user_input_message"]
    conversation.append({"user": user_input})

    @stream_with_context
    def generate():
        global top_products

        def send(role, text, event="message"):
            payload = json.dumps({"role": role, "text": text})
            return f"event:{event}\ndata:{payload}\n\n"

        yield send("user", user_input)

        try:
            if top_products is None:
                # Run Stage 1
                stage1 = application.pipeline.run_stage1(user_input=user_input)

                if stage1.intent_confirmation.strip().lower() == "yes":
                    stage2 = application.pipeline.run_stage2(
                        user_requirement=stage1.user_requirements
                    )
                    conversation.append({"assistant": stage1.response})

                    # Handle empty recommendataions
                    if not stage2.recommendations:
                        yield send(
                            "assistant",
                            "Sorry we do not have AC's that match your requirements. Connecting you to a human expert.",
                        )
                        yield "event:end\ndata:{}\n\n"
                        return

                    # Run Stage 3 with an initial conversation
                    stage3_response = application.pipeline.run_stage3(
                        recommendations=stage2.recommendations
                    )

                    conversation.append(
                        {"assistant": "\n".join(stage3_response.response)}
                    )
                    top_products = stage2.recommendations
                    yield send("assistant", "\n".join(stage3_response.response))
                else:
                    conversation.append({"assistant": stage1.response})
                    yield send("assistant", stage1.response)
            else:
                # Continue running Stage 3 to resume helping the customer
                stage3_continue_response = application.pipeline.continue_stage3(
                    user_input
                )
                response = stage3_continue_response.response
                yield send("assistant", response)
                conversation.append({"assistant": response})

        except ModerationException as te:
            yield send("assistant", te.message)
            yield "event:block\ndata:{}\n\n"
            return
        except Exception as e:
            yield send("assistant", str(e))
            yield "event:block\ndata:{}\n\n"
            return

        # To finish the stream
        yield "event:end\ndata:{}\n\n"

    return Response(generate(), mimetype="text/event-stream")


def main():
    app.run(debug=True, threaded=True, use_reloader=False)


if __name__ == "__main__":
    main()


## 11. Unit testing

### Compare products unit testing 

In [None]:
import pytest


def test_no_requirements_met():
    user = {
        "cooling capacity": "standard",
        "energy efficiency": "standard",
        "comfort": "premium",
        "portability": "premium",
        "ac type": "premium",
        "smart features": "premium",
    }
    dataset = {
        "cooling capacity": "essential",
        "energy efficiency": "essential",
        "comfort": "essential",
        "portability": "essential",
        "ac type": "standard",
        "smart features": "essential",
    }

    assert compare_products(user_requirements=user, from_dataset=dataset) == 0

def test_all_requirements_met():
    user = {
        "cooling capacity": "essential",
        "energy efficiency": "essential",
        "comfort": "essential",
        "portability": "essential",
        "ac type": "standard",
        "smart features": "essential",
    }
    dataset = {
        "cooling capacity": "standard",
        "energy efficiency": "standard",
        "comfort": "premium",
        "portability": "essential",
        "ac type": "standard",
        "smart features": "premium",
    }

    assert compare_products(user_requirements=user, from_dataset=dataset) == 6

def test_four_requirements_met():
    user = {
        "cooling capacity": "essential",
        "energy efficiency": "essential",
        "comfort": "premium",
        "portability": "essential",
        "ac type": "premium",
        "smart features": "essential",
    }
    dataset = {
        "cooling capacity": "standard",
        "energy efficiency": "standard",
        "comfort": "essential",
        "portability": "essential",
        "ac type": "standard",
        "smart features": "premium",
    }

    assert compare_products(user_requirements=user, from_dataset=dataset) == 4

def test_missing_keys_in_dataset():
    user = {"cooling capacity": "standard", "energy efficiency": "standard"}
    dataset = {"cooling capacity": "standard"}

    assert compare_products(user_requirements=user, from_dataset=dataset) == 1

def test_missing_dataset():
    user = {
        "cooling capacity": "essential",
        "energy efficiency": "essential",
        "comfort": "premium",
        "portability": "essential",
        "ac type": "premium",
        "smart features": "essential",
    }
    dataset = {}

    assert compare_products(user_requirements=user, from_dataset=dataset) == 0


def test_missing_both_user_requirement_and_in_dataset():
    user = {}
    dataset = {}

    assert compare_products(user_requirements=user, from_dataset=dataset) == 0


def test_missing_values_in_user():
    user = {
        "cooling capacity": "",
        "energy efficiency": "",
        "comfort": "",
        "portability": "essential",
        "ac type": "premium",
        "smart features": "essential",
    }
    dataset = {
        "cooling capacity": "essential",
        "energy efficiency": "essential",
        "comfort": "premium",
        "portability": "standard",
        "ac type": "standard",
        "smart features": "essential",
    }

    assert compare_products(user_requirements=user, from_dataset=dataset) == 5


def test_missing_values_in_dataset():
    user = {
        "cooling capacity": "essential",
        "energy efficiency": "essential",
        "comfort": "premium",
        "portability": "standard",
        "ac type": "standard",
        "smart features": "essential",
    }
    dataset = {
        "cooling capacity": "",
        "energy efficiency": "",
        "comfort": "",
        "portability": "essential",
        "ac type": "premium",
        "smart features": "essential",
    }

    assert compare_products(user_requirements=user, from_dataset=dataset) == 2


def test_unknown_values_in_dataset():
    user = {
        "cooling capacity": "standard",
        "energy efficiency": "standard",
        "portability": "unknown",
    }
    dataset = {
        "cooling capacity": "premium",
        "energy efficiency": "unknown",
        "portability": "unknown",
    }

    assert compare_products(user_requirements=user, from_dataset=dataset) == 1

### Stage 0 Initialize conversation test 

In [None]:
from unittest.mock import create_autospec


def test_stage_0_calls_initialize_conversation_with_expected_messages():
    chat_model = create_autospec(ChatModel, instance=True)
    system_message = """
    Act as a smart Air Conditioner recommendation assistant specializing in suggesting right air conditioners to the users based on their requirements. Please strictly stick to the AC assistant context and reply insufficient knowledge in case of any other context.
    """
    FAKE_RESPONSE = "Welcome! I am a bot assistant to help you on selecting an ac!"
    response = StageZeroResult(FAKE_RESPONSE)
    chat_model.get_session_response.return_value = FAKE_RESPONSE

    stage_0 = IntializeConversation(
        chat_model=chat_model, system_message=system_message
    )

    actual = stage_0.run()

    assert actual == response

### Stage 1 Intent Clarification and Confirmation test 

In [None]:
from unittest.mock import MagicMock

import pytest

@pytest.fixture
def mock_chat_model():
    return MagicMock()


@pytest.fixture
def intent_clarity_and_confirmation(mock_chat_model):
    return IntentClarityAndConfirmation(
        chat_model=mock_chat_model,
        classify_values_system_message="System message",
        intent_confirmation_system_message="System: {model_response}",
        extract_dict_system_message="Extract dict system message",
        function_tool={"tool1": "desc"},
        function_tool_choice={"tool1": "testing"},
    )


def test_stage1_intent_confirmation_produces_user_requirements(
    intent_clarity_and_confirmation, mock_chat_model
):
    tool_response = [
        {
            "tool_calls": [{"type": "test_tool", "function": {"arguments": {}}}],
            "id": "tool_id_1",
            "content": {"feature": "value"},
        }
    ]

    main_response = [""]
    intent_confirmation = ["Yes"]
    extracted_dict = {"feature": "value"}

    mock_chat_model.get_session_response.side_effect = [tool_response, main_response]
    intent_clarity_and_confirmation._IntentClarityAndConfirmation__intent_confirmation = MagicMock(
        return_value=intent_confirmation
    )
    intent_clarity_and_confirmation._IntentClarityAndConfirmation__extract_dict_from_string = MagicMock(
        return_value=extracted_dict
    )

    result = intent_clarity_and_confirmation.run()

    assert isinstance(result, StageOneResult)
    assert result.intent_confirmation == "Yes"
    assert result.response == ""
    assert result.user_requirements == extracted_dict

def test_stage1_with_no_intent_confirmation(
    intent_clarity_and_confirmation, mock_chat_model
):
    tool_response = [
        {
            "tool_calls": [{"type": "test_tool", "function": {"arguments": {}}}],
            "id": "tool_id_1",
            "content": {"feature": "value"},
        }
    ]

    main_response = [""]
    intent_confirmation = ["No"]
    extracted_dict = {}

    mock_chat_model.get_session_response.side_effect = [tool_response, main_response]
    intent_clarity_and_confirmation._IntentClarityAndConfirmation__intent_confirmation = MagicMock(
        return_value=intent_confirmation
    )
    intent_clarity_and_confirmation._IntentClarityAndConfirmation__extract_dict_from_string = MagicMock(
        return_value=extracted_dict
    )

    result = intent_clarity_and_confirmation.run()

    assert isinstance(result, StageOneResult)
    assert result.intent_confirmation == "No"
    assert result.response == ""
    assert result.user_requirements == extracted_dict

### Stage 2 Product extraction

In [None]:
from unittest.mock import create_autospec


def test_stage_2_calls_product_extraction_with_expected_messages():
    chat_model = create_autospec(ChatModel, instance=True)
    system_message = "Extract products and dictionary from map"
    FAKE_RESPONSE = {
        "cooling capacity": "standard",
        "energy efficiency": "premium",
        "comfort": "essential",
        "portability": "essential",
        "ac type": "standard",
        "smart features": "premium",
    }
    DATABASE_RETRIEVAL = '[{"brand":"Lloyd","model_name":"1.2 Ton 4 Star Dual Inverter Split AC","room_size":14,"capacity_ton":1.2,"energy_efficiency":4,"inverter":"Yes","operating_range":"18\\u201345 \\u00b0C","installation_required":"Yes","delivery_duration":"3\\u20135 days","ac_type":"Split","smart_features":"4\\u2011Way Swing","noise_levels":"41 dB","stars":3.9,"price":44990,"warranty":"2 Years Product, 10 Years Compressor","portability":"Fixed","description":"Price \\u20b944990: Lloyd 1.2 Ton 4 Star Dual Inverter Split AC offers a cooling capacity of 1.2 Ton for ~14 m\\u00b2 rooms. Energy efficiency is 4-Star with inverter compression, and portability is \'Fixed\' for the Split form factor. Comfort is moderately quiet at around 41 dB. Smart features: 4\\u2011Way Swing. Ideal for Indian summers with an operating range of 18\\u201345 \\u00b0C."},{"brand":"Samsung","model_name":"1.0 Ton 3 Star Inverter Split AC","room_size":12,"capacity_ton":1.0,"energy_efficiency":3,"inverter":"Yes","operating_range":"16\\u201352 \\u00b0C","installation_required":"Yes","delivery_duration":"3\\u20135 days","ac_type":"Split","smart_features":"Wi\\u2011Fi + Voice","noise_levels":"41 dB","stars":4.4,"price":43300,"warranty":"2 Years Product, 10 Years Compressor","portability":"Fixed","description":"Price \\u20b943300: Samsung 1.0 Ton 3 Star Inverter Split AC offers a cooling capacity of 1.0 Ton for ~12 m\\u00b2 rooms. Energy efficiency is 3-Star with inverter compression, and portability is \'Fixed\' for the Split form factor. Comfort is moderately quiet at around 41 dB. Smart features: Wi\\u2011Fi + Voice. Ideal for Indian summers with an operating range of 16\\u201352 \\u00b0C."},{"brand":"Panasonic","model_name":"1.5 Ton 3 Star Window AC","room_size":18,"capacity_ton":1.5,"energy_efficiency":3,"inverter":"No","operating_range":"18\\u201350 \\u00b0C","installation_required":"Yes","delivery_duration":"4\\u20136 days","ac_type":"Window","smart_features":"Self Clean","noise_levels":"52 dB","stars":4.3,"price":34460,"warranty":"1 Year Product, 5 Years Compressor","portability":"Semi-portable","description":"Price \\u20b934460: Panasonic 1.5 Ton 3 Star Window AC offers a cooling capacity of 1.5 Ton for ~18 m\\u00b2 rooms. Energy efficiency is 3-Star, and portability is \'Semi-portable\' for the Window form factor. Comfort is moderately quiet at around 52 dB. Smart features: Self Clean. Ideal for Indian summers with an operating range of 18\\u201350 \\u00b0C."}]'
    fake_response = StageTwoResult(recommendations=DATABASE_RETRIEVAL)

    chat_model.preview_response.return_value = FAKE_RESPONSE
    stage_2 = ProductExtractionAndMapping(
        chat_model=chat_model, system_message=system_message
    )

    actual = stage_2.run(
        user_requirement={
            "price": "45000",
            "cooling capacity": "standard",
            "energy efficiency": "premium",
            "comfort": "essential",
            "portability": "essential",
            "ac type": "standard",
            "smart features": "premium",
        }
    )
    assert actual == fake_response

### Stage 3 Product recommendations test

In [None]:
from unittest.mock import create_autospec


def test_stage_3_calls_intent_confirmation_with_expected_messages():
    chat_model = create_autospec(ChatModel, instance=True)
    system_message = "Display recommendatation and chat with user."
    FAKE_RESPONSE = "lloyd model response"
    fake_response = StageThreeResult(FAKE_RESPONSE)

    chat_model.get_session_response.return_value = FAKE_RESPONSE
    stage_3 = ProductRecommendations(
        chat_model=chat_model, system_message=system_message
    )

    actual = stage_3.run()
    assert actual == fake_response

##### Pytest can be run with the following command

In [None]:
!python -m pytest