In [1]:
from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()

  from .autonotebook import tqdm as notebook_tqdm


In [1]:
import os
from dotenv import load_dotenv
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.utils import Secret
from haystack.dataclasses import ChatMessage
from haystack.components.generators.utils import print_streaming_chunk

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Set your API key as environment variable before executing this
load_dotenv()
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')

chat_generator = OpenAIChatGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY"),
  #api_base_url="https://openrouter.ai/api/v1",
  model="gpt-3.5-turbo",
        streaming_callback=print_streaming_chunk)

In [5]:
chat_generator.run(messages=[ChatMessage.from_user("Return this text: 'test'")])

'test'

{'replies': [ChatMessage(content="'test'", role=<ChatRole.ASSISTANT: 'assistant'>, name=None, meta={'model': 'gpt-3.5-turbo-0125', 'index': 0, 'finish_reason': 'stop', 'usage': {}})]}

In [4]:
from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder

# Sample documents
documents = [
    Document(content="Coffee shop opens at 9am and closes at 5pm."),
    Document(content="Gym room opens at 6am and closes at 10pm.")
]

# Create the document store
document_store = InMemoryDocumentStore()

# Create a pipeline to turn the texts into embeddings and store them in the document store
indexing_pipeline = Pipeline()
indexing_pipeline.add_component(
    "doc_embedder", SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
)
indexing_pipeline.add_component("doc_writer", DocumentWriter(document_store=document_store))

indexing_pipeline.connect("doc_embedder.documents", "doc_writer.documents")

indexing_pipeline.run({"doc_embedder": {"documents": documents}})

Batches: 100%|██████████| 1/1 [00:12<00:00, 12.48s/it]


{'doc_writer': {'documents_written': 2}}

In [5]:
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator

template = """
Answer the questions based on the given context.

Context:
{% for document in documents %}
    {{ document.content }}
{% endfor %}
Question: {{ question }}
Answer:
"""
rag_pipe = Pipeline()
rag_pipe.add_component("embedder", SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"))
rag_pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store))
rag_pipe.add_component("prompt_builder", PromptBuilder(template=template))
# Note to llm: We are using OpenAIGenerator, not the OpenAIChatGenerator, because the latter only accepts List[str] as input and cannot accept prompt_builder's str output
rag_pipe.add_component("llm", OpenAIGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY"),
  #api_base_url="https://openrouter.ai/api/v1",
  model="gpt-3.5-turbo"))

rag_pipe.connect("embedder.embedding", "retriever.query_embedding")
rag_pipe.connect("retriever", "prompt_builder.documents")
rag_pipe.connect("prompt_builder", "llm")

<haystack.core.pipeline.pipeline.Pipeline object at 0x136234440>
🚅 Components
  - embedder: SentenceTransformersTextEmbedder
  - retriever: InMemoryEmbeddingRetriever
  - prompt_builder: PromptBuilder
  - llm: OpenAIGenerator
🛤️ Connections
  - embedder.embedding -> retriever.query_embedding (List[float])
  - retriever.documents -> prompt_builder.documents (List[Document])
  - prompt_builder.prompt -> llm.prompt (str)

In [7]:
query = "When does the coffee shop open?"
rag_pipe.run({"embedder": {"text": query}, "prompt_builder": {"question": query}})

Batches: 100%|██████████| 1/1 [00:05<00:00,  5.95s/it]
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


{'llm': {'replies': ['The coffee shop opens at 9am.'],
  'meta': [{'model': 'gpt-3.5-turbo-0125',
    'index': 0,
    'finish_reason': 'stop',
    'usage': {'completion_tokens': 9,
     'prompt_tokens': 60,
     'total_tokens': 69}}]}}

In [8]:
def rag_pipeline_func(query: str):
    result = rag_pipe.run({"embedder": {"text": query}, "prompt_builder": {"question": query}})

    return {"reply": result["llm"]["replies"][0]}

## At this point, run db_api.py

In [9]:
# Flask's default local URL, change it if necessary
db_base_url = 'http://127.0.0.1:5000'

# Use requests to get the data from the database
import requests
import json

# get_categories is supplied as part of the prompt, it is not used as a tool
def get_categories():
    response = requests.get(f'{db_base_url}/category')
    data = response.json()
    return data

def get_items(ids=None,categories=None):
    params = {
        'id': ids,
        'category': categories,
    }
    response = requests.get(f'{db_base_url}/item', params=params)
    data = response.json()
    return data

def purchase_item(id,quantity):

    headers = {
    'Content-type':'application/json', 
    'Accept':'application/json'
    }

    data = {
        'id': id,
        'quantity': quantity,
    }
    response = requests.post(f'{db_base_url}/item/purchase', json=data, headers=headers)
    return response.json()

In [10]:
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_items",
            "description": "Get a list of items from the database",
            "parameters": {
                "type": "object",
                "properties": {
                    "ids": {
                        "type": "string",
                        "description": "Comma separated list of item ids to fetch",
                    },
                    "categories": {
                        "type": "string",
                        "description": "Comma separated list of item categories to fetch",
                    },
                },
                "required": [],
            },
        }
    },
    {
        "type": "function",
        "function": {
            "name": "purchase_item",
            "description": "Purchase a particular item",
            "parameters": {
                "type": "object",
                "properties": {
                    "id": {
                        "type": "string",
                        "description": "The given product ID, product name is not accepted here. Please obtain the product ID from the database first.",
                    },
                    "quantity": {
                        "type": "integer",
                        "description": "Number of items to purchase",
                    },
                },
                "required": [],
            },
        }
    },
    {
        "type": "function",
        "function": {
            "name": "rag_pipeline_func",
            "description": "Get information from hotel brochure",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The query to use in the search. Infer this from the user's message. It should be a question or a statement",
                    }
                },
                "required": ["query"],
            },
        },
    }
]

In [13]:
get_categories()

['Food and beverages', 'Miscellaneous']

In [14]:
# 1. Initial prompt
context = f"""You are an assistant to tourists visiting a hotel.
You have access to a database of items (which includes {get_categories()}) that tourists can buy, you also have access to the hotel's brochure.
If the tourist's question cannot be answered from the database, you can refer to the brochure.
If the tourist's question cannot be answered from the brochure, you can ask the tourist to ask the hotel staff.
"""
messages = [
    ChatMessage.from_system(context),
    # 2. Sample message from user
    ChatMessage.from_user("Can I buy a coffee?"),
    ]

# 3. Passing the tools list and invoke the chat generator
response = chat_generator.run(messages=messages, generation_kwargs= {"tools": tools})
response

{'replies': [ChatMessage(content='[{"index": 0, "id": "call_872YbWujqlOtOVNd6VkMdCK0", "function": {"arguments": "{\\"categories\\":\\"Food and beverages\\"}", "name": "get_items"}, "type": "function"}]', role=<ChatRole.ASSISTANT: 'assistant'>, name=None, meta={'model': 'gpt-3.5-turbo-0125', 'index': 0, 'finish_reason': 'tool_calls', 'usage': {}})]}

In [15]:
function_call = json.loads(response["replies"][0].content)[0]
function_name = function_call["function"]["name"]
function_args = json.loads(function_call["function"]["arguments"])
print("Function Name:", function_name)
print("Function Arguments:", function_args)# Another question
messages.append(ChatMessage.from_user("Where's the coffee shop?"))

# Invoke the chat generator, and passing the tools list
response = chat_generator.run(messages=messages, generation_kwargs= {"tools": tools})
function_call = json.loads(response["replies"][0].content)[0]
function_name = function_call["function"]["name"]
function_args = json.loads(function_call["function"]["arguments"])
print("Function Name:", function_name)
print("Function Arguments:", function_args)

Function Name: get_items
Function Arguments: {'categories': 'Food and beverages'}


In [18]:
# Another question
messages.append(ChatMessage.from_user("Where's the coffee shop?"))

# Invoke the chat generator, and passing the tools list
response = chat_generator.run(messages=messages, generation_kwargs= {"tools": tools})
function_call = json.loads(response["replies"][0].content)[0]
function_name = function_call["function"]["name"]
function_args = json.loads(function_call["function"]["arguments"])
print("Function Name:", function_name)
print("Function Arguments:", function_args)

Function Name: rag_pipeline_func
Function Arguments: {'query': 'Where is the coffee shop located?'}


In [19]:
## Find the correspoding function and call it with the given arguments
available_functions = {"get_items": get_items, "purchase_item": purchase_item,"rag_pipeline_func": rag_pipeline_func}
function_to_call = available_functions[function_name]
function_response = function_to_call(**function_args)
print("Function Response:", function_response)

Batches: 100%|██████████| 1/1 [00:01<00:00,  1.03s/it]


Function Response: {'reply': 'The coffee shop is located in a building or area separate from the gym room.'}


In [20]:
messages.append(ChatMessage.from_function(content=json.dumps(function_response), name=function_name))
response = chat_generator.run(messages=messages)
response_msg = response["replies"][0]

print(response_msg.content)

The coffee shop is located in a building or area separate from the gym room.The coffee shop is located in a building or area separate from the gym room.


In [21]:
import json
from haystack.dataclasses import ChatMessage, ChatRole

response = None
messages = [
    ChatMessage.from_system(context)
]

while True:
    # if OpenAI response is a tool call
    if response and response["replies"][0].meta["finish_reason"] == "tool_calls":
        function_calls = json.loads(response["replies"][0].content)

        for function_call in function_calls:
            ## Parse function calling information
            function_name = function_call["function"]["name"]
            function_args = json.loads(function_call["function"]["arguments"])

            ## Find the correspoding function and call it with the given arguments
            function_to_call = available_functions[function_name]
            function_response = function_to_call(**function_args)

            ## Append function response to the messages list using `ChatMessage.from_function`
            messages.append(ChatMessage.from_function(content=json.dumps(function_response), name=function_name))

    # Regular Conversation
    else:
        # Append assistant messages to the messages list
        if not messages[-1].is_from(ChatRole.SYSTEM):
            messages.append(response["replies"][0])

        user_input = input("ENTER YOUR MESSAGE 👇 INFO: Type 'exit' or 'quit' to stop\n")
        if user_input.lower() == "exit" or user_input.lower() == "quit":
            break
        else:
            messages.append(ChatMessage.from_user(user_input))

    response = chat_generator.run(messages=messages, generation_kwargs={"tools": tools})

Batches: 100%|██████████| 1/1 [00:03<00:00,  3.13s/it]


The coffee shop opens at 9 am.Please, give me a second to confirm that for you.

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

: 