# Introduction  
- DeepSet is the creator of Haystack.  
- Haystack is a popular open-source AI framework that allows developers to build various AI applications.  

### Benefits of using a framework like Haystack  
- Faster development  
- Improves code maintainability and readability  
- Manages complexity by enabling plug-and-play integration of different models and components  
- Allows applications to be developed at a higher level of abstraction  
- Makes it easy to switch underlying components of a workflow (such as vector databases) without extensive refactoring  
- Provides common features out of the box (such as branching and looping)  
- Provides visualization utilities  

Haystack provides a common interface and simple abstractions that you can extend to fit your needs.  

It is based on two main elements: **Components** and **Pipelines**.  

- Components are connected to build powerful pipelines  
- Both standard and custom components are supported  

### What you will learn  
- The unique building blocks that make up the Haystack framework  
- How to use Haystack to build LLM applications  
- Core abstractions of Haystack: components, pipelines, document stores, etc.  
- How to build a customized RAG pipeline  
- How to create custom components  
- How to implement conditional routing and branching pipelines with a fallback to web search  
- How to build a self-reflecting agent using Haystack’s pipeline looping mechanism  
- How to create a chat agent that uses OpenAI's function-calling capability, enabling Haystack pipelines to be used as tools  

---

# Haystack Building Blocks  

- AI applications are often made up of multiple steps that work together to achieve a goal  
- Each small task (step) in Haystack is handled by a **component**  
- Components are combined with other components to form a **pipeline**
- Pipelines are the entities that enable us to build the desired application  

<center>
  <img src="images/intro.png"/>
</center>  

- A pipeline can access document stores (vector databases) through components  
- A component can accept any number of inputs and produce any number of outputs  

<center>
  <img src="images/components.png"/>
</center>  

Example: `SentenceTransformersDocumentEmbedder`  
- Expects a list of documents  
- Returns the same documents enriched with embeddings and metadata  
- Uses the **sentence-transformers** embedding model  

<center>
  <img src="images/sentence_transformer.png"/>
</center>  

Example of a Document Search Pipeline:

<center>
  <img src="images/example_pipeline.png"/>
</center>  

Haystack offers many ready-made components such as generators, embedders, retrievers, converters, rerankers, routers, and preprocessors. It also supports branching, decision-making components, custom components, looping, and advanced pipelines.  

By combining ready-made and custom components, it is possible to create pipelines for tasks such as:  
- Question answering  
- Document search  
- Chat  
- Question generation  
- Output validation  
and more.  

More on:
- Components: <https://docs.haystack.deepset.ai/docs/components?utm_campaign=developer-relations&utm_source=dlai>
- Pipelines: <https://docs.haystack.deepset.ai/docs/pipelines?utm_campaign=developer-relations&utm_source=dlai>
- Document Stores: <https://docs.haystack.deepset.ai/docs/document-store?utm_campaign=developer-relations&utm_source=dlai>
- InMemoryDocumentStore: <https://docs.haystack.deepset.ai/docs/inmemorydocumentstore?utm_campaign=developer-relations&utm_source=dlai>

#### Document Indexing Pipeline

In [None]:
! pip install -r requirements.txt

In [None]:
import warnings
from helper import load_env

warnings.filterwarnings('ignore')
load_env()

We will first use the `OpenAIDocumentEmbedder` which uses OpenAI Embedding models.

In [None]:
from haystack.components.embedders import OpenAIDocumentEmbedder
embedder = OpenAIDocumentEmbedder(model="text-embedding-3-small")
embedder # inspect component

In [None]:
from haystack.dataclasses import Document
documents = [Document(content="Haystack is an open source AI framework to build full AI applications in Python"),
             Document(content="You can build AI Pipelines by combining Components"),]
documents #inspect component

In [None]:
embedder.run(documents=documents) # produces list of documents with embeddings along with metadata

Lets now create a pipeline by initializing a document store and then writing documents along side with their embeddings. For now, we will use the `InMemoryDocumentStore`

In [None]:
from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()

Let's create our first indexing pipeline using the following components:
1. converter `TextFileToDocument`: converts text file to document format.
2. splitter `DocumentSplitter`: chunks the documents, default size = 200 words
3. embedder `OpenAIDocumentEmbedder`: performs embeddings using the embedding model
4. writer `DocumentWriter`: writes embeddings to vector database 

After initalizing the components, we provide component names to connect the components.

In [None]:
from haystack import Pipeline
from haystack.components.converters.txt import TextFileToDocument
from haystack.components.preprocessors.document_splitter import DocumentSplitter
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack.components.writers import DocumentWriter

# Initialize components
converter = TextFileToDocument()
splitter = DocumentSplitter()
embedder = OpenAIDocumentEmbedder()
writer = DocumentWriter(document_store=document_store)

# Create a pipeline and add components to it
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", converter)
indexing_pipeline.add_component("splitter", splitter)
indexing_pipeline.add_component("embedder", embedder)
indexing_pipeline.add_component("writer", writer)

# Connect the components
indexing_pipeline.connect("converter", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")
indexing_pipeline.show()

In [None]:
indexing_pipeline.run({"converter": {"sources": ['data/davinci.txt']}})

In [None]:
document_store.filter_documents()[0].content

#### Document Search Pipeline  

**Note:** If there are multiple ways in which components can connect, ensure that the relevant outputs of one component are linked to the appropriate inputs of the next component.  
In this case, we specifically connect the `query_embedder.embedding` output to the `retriever.query_embedding` input.  

If components are not connected properly, a **PipelineConnectError** will occur. This error usually provides useful logs to help identify and fix the connection issues.

In [None]:
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever

query_embedder = OpenAITextEmbedder()
retriever = InMemoryEmbeddingRetriever(document_store=document_store)

# Initialize the document search pipeline
document_search = Pipeline()

# Connect the components
document_search.add_component("query_embedder", query_embedder)
document_search.add_component("retriever", retriever)
document_search.connect("query_embedder.embedding", "retriever.query_embedding")
document_search.show()

In [None]:
question = "How old was Davinci when he died?"

results = document_search.run({"query_embedder": {"text": question}})

for i, document in enumerate(results["retriever"]["documents"]):
    print("\n--------------\n")
    print(f"DOCUMENT {i}")
    print(document.content)

In [None]:
question = "How old was Davinci when he died?"

results = document_search.run({"query_embedder": {"text": question}})

for i, document in enumerate(results["retriever"]["documents"]):
    print("\n--------------\n")
    print(f"DOCUMENT {i}")
    print(document.content)

In [None]:
question = "How old was Davinci when he died?"

results = document_search.run({"query_embedder": {"text": question},
                               "retriever": {"top_k": 3}}) # specify top_k to retrieve top 3 documents

for i, document in enumerate(results["retriever"]["documents"]):
    print("\n--------------\n")
    print(f"DOCUMENT {i}")
    print(document.content)

Summary:
- The above code shows how to run components individually and combining them into useful pipelines
- We also built a indexing and a document search pipeline

---

# Building a Customized RAG  

We will first create a simple QA RAG pipeline and then extend it to include answer referencing.  

Retrieval-Augmented Generation consists of the following main steps:  

0. **Indexing** – Cleaning, splitting, and storing documents in a vector database  
1. **Retrieval** – Receiving a query and retrieving the most relevant documents from the database  
2. **Augmentation** – Passing the retrieved context along with an augmentation prompt  
3. **Generation** – The LLM generates the final answer based on the prompt  

Retrieval can be performed in different ways, such as:  
- Semantic search  
- Keyword-based search  
- Retrieval with reranking  
- Retrieval from external APIs (e.g., web-based or other sources)  

##### **Indexing pipeline:**
<center>
  <img src="images/indexing_pipeline.png"/>
</center>  

##### **Web Indexing pipeline:**
<center>
  <img src="images/web_indexing_pipeline.png"/>
</center>  

##### **Semantic Search pipeline:**
<center>
  <img src="images/ss_rag_pipeline.png"/>
</center>  

##### **Keyword Search pipeline:**
<center>
  <img src="images/key_rag_pipeline.png">
</center>  

##### **Retrieval + Ranking pipeline:**
<center>
  <img src="images/rr_pipeline.png">
</center>  

##### **API Retrieval pipeline:**
<center>
  <img src="images/api_pipeline.png">
</center>  

The below code uses Cohere Document and Text embedders:

In [None]:
import os
import warnings
from helper import load_env

warnings.filterwarnings('ignore')
load_env()

In [None]:
from haystack import Pipeline
from haystack.utils.auth import Secret
from haystack.components.builders import PromptBuilder
from haystack.components.converters import HTMLToDocument
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore

from haystack_integrations.components.embedders.cohere import CohereDocumentEmbedder, CohereTextEmbedder

Instead of using documents, we will source data directly from website URLs. Since these websites do not contain much data, we can skip the document-splitting step. 
- The `LinkContentFetcher` component fetches the content of the URL  
- The `HTMLToDocument` component converts the content into a format that Haystack can process  

We then use the Cohere `embed-english-v3.0` model for embedding and store the embeddings in an in-memory vector database.  

Finally, we connect the components and run the pipeline.  

In [None]:
document_store = InMemoryDocumentStore()

fetcher = LinkContentFetcher()
converter = HTMLToDocument()
embedder = CohereDocumentEmbedder(model="embed-english-v3.0", api_base_url=os.getenv("CO_API_URL"))
writer = DocumentWriter(document_store=document_store)

indexing = Pipeline()
indexing.add_component("fetcher", fetcher)
indexing.add_component("converter", converter)
indexing.add_component("embedder", embedder)
indexing.add_component("writer", writer)

indexing.connect("fetcher.streams", "converter.sources")
indexing.connect("converter", "embedder")
indexing.connect("embedder", "writer")
indexing.connect("converter", "writer")
indexing.show()

In [None]:
indexing.run(
    {
        "fetcher": {
            "urls": [
                "https://haystack.deepset.ai/integrations/cohere",
                "https://haystack.deepset.ai/integrations/anthropic",
                "https://haystack.deepset.ai/integrations/jina",
                "https://haystack.deepset.ai/integrations/nvidia",
            ]
        }
    }
)

In [None]:
document_store.filter_documents()[0] # this document comes with metadata - URL.

Next, let's start building a RAG pipeline.  

Haystack uses **Jinja** for prompt templating. Jinja is a templating language that provides advanced capabilities such as looping within prompts. 
 
It also allows inserting specific text components into the prompt and supports features such as conditional statements (`if`), loops (`for`), truncation, lowercase conversion, and more.  

In [None]:
prompt = """
Answer the question based on the provided context.
Context:
{% for doc in documents %}
   {{ doc.content }} 
{% endfor %}
Question: {{ query }}
"""

In [None]:
# Build a pipeline
from haystack.components.generators import OpenAIGenerator

query_embedder = CohereTextEmbedder(model="embed-english-v3.0", api_base_url=os.getenv("CO_API_URL"))
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
prompt_builder = PromptBuilder(template=prompt)
generator = OpenAIGenerator()

rag = Pipeline()
rag.add_component("query_embedder", query_embedder)
rag.add_component("retriever", retriever)
rag.add_component("prompt", prompt_builder)
rag.add_component("generator", generator)

rag.connect("query_embedder.embedding", "retriever.query_embedding")
rag.connect("retriever.documents", "prompt.documents")
rag.connect("prompt", "generator")

rag.show()

In [None]:
question = "How can I use Cohere with Haystack?"

result = rag.run(
    {
        "query_embedder": {"text": question},
        "retriever": {"top_k": 1},
        "prompt": {"query": question},
    }
)

print(result["generator"]["replies"][0])

Next, we can customize the RAG pipeline:  

1. Modify the prompt to answer questions in a specific language (French)  
2. Add the URL in the prompt to generate references  

In [None]:
prompt = """
You will be provided some context, followed by the URL that this context comes from.
Answer the question based on the context, and reference the URL from which your answer is generated.
Your answer should be in {{ language }}.
Context:
{% for doc in documents %}
   {{ doc.content }} 
   URL: {{ doc.meta['url']}}
{% endfor %}
Question: {{ query }}
Answer:
"""

In [None]:
query_embedder = CohereTextEmbedder(model="embed-english-v3.0", api_base_url=os.getenv("CO_API_URL"))
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
prompt_builder = PromptBuilder(template=prompt)
generator = OpenAIGenerator(model="gpt-3.5-turbo")

rag = Pipeline()
rag.add_component("query_embedder", query_embedder)
rag.add_component("retriever", retriever)
rag.add_component("prompt", prompt_builder)
rag.add_component("generator", generator)

rag.connect("query_embedder.embedding", "retriever.query_embedding")
rag.connect("retriever.documents", "prompt.documents")
rag.connect("prompt", "generator")
rag.show()

In [None]:
question = "How can I use Cohere with Haystack?"

result = rag.run(
    {
        "query_embedder": {"text": question},
        "retriever": {"top_k": 1},
        "prompt": {"query": question, "language": "French"},
    }
)

print(result["generator"]["replies"][0])

More on:
- LinkContentFetcher: <https://docs.haystack.deepset.ai/docs/linkcontentfetcher?utm_campaign=developer-relations&utm_source=dlai>
- HTMLToDocument: <https://docs.haystack.deepset.ai/docs/htmltodocument?utm_campaign=developer-relations&utm_source=dlai>
- CohereDocumentEmbedder: <https://docs.haystack.deepset.ai/docs/coheredocumentembedder?utm_campaign=developer-relations&utm_source=dlai>
- InMemoryDocumentStore: <https://docs.haystack.deepset.ai/docs/inmemorydocumentstore?utm_campaign=developer-relations&utm_source=dlai>
- Embedders: <https://docs.haystack.deepset.ai/docs/embedders?utm_campaign=developer-relations&utm_source=dlai>
- Generators: <https://docs.haystack.deepset.ai/docs/generators?utm_campaign=developer-relations&utm_source=dlai>
- SentenceTransformers: <https://docs.haystack.deepset.ai/docs/sentencetransformersdocumentembedder?utm_campaign=developer-relations&utm_source=dlai>
- PromptBuilders: <https://docs.haystack.deepset.ai/docs/promptbuilder?utm_campaign=developer-relations&utm_source=dlai>

Summary  :
- We customized Haystack components to create a RAG pipeline  
- We customized the behavior of the RAG pipeline  

---

# Custom Components – News Summarizer  

Let's create a custom component to fetch the top Hacker News posts and then summarize them.  

As a refresher, some ready-made components in Haystack include: Embedders, Retrievers, Generators, Preprocessors, Rankers, Routers, etc.  

Each component can accept a certain number of inputs and produce a certain number of outputs.  

To create a custom component, the following requirements must be met:  

1. Define a class with a `@component` decorator  
2. The class must have a `run()` method with a `@component.output_types` decorator, specifying the component’s output types  
3. The `run()` method should return a dictionary  

Example:

In [None]:
from haystack import component
from typing import List

@component
class Translator:

    @component.output_types(documents=List[Document])
    def run(self, 
            from_lang: str = 'en',
            to_lang: str = 'fr',
            documents: List[Document] = []):
        
        translated_docs = []
        # translate each document from from_lang to to_lang and add them to translated_docs[]
        return {"documents": translated_docs}

In this lab, we build 2 separate pipelines:
1. Dialogue Builder - We use a custom greeter component to start with a greeting for the dialogue
2. Hacker News Summarizer - Provides summaries of top-k trending posts on Hacker News

<center>
  <img src="images/hacker_news.png"/>
</center>  

In [None]:
import warnings
from helper import load_env

warnings.filterwarnings('ignore')
load_env()

In [None]:
import requests

from typing import List

from haystack import Document, Pipeline, component
from haystack.components.builders import PromptBuilder
from haystack.components.generators.openai import OpenAIGenerator
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.converters import HTMLToDocument

In [None]:
@component
class Greeter:

    @component.output_types(greeting=str)
    def run(self, user_name: str):
        return {"greeting": f"Hello {user_name}"}

In [None]:
greeter = Greeter()

greeter.run(user_name="Tuana")

In [None]:
greeter = Greeter()
template = """ You will be given the beginning of a dialogue. 
Create a short play script using this as the start of the play.
Start of dialogue: {{ dialogue }}
Full script: 
"""
prompt = PromptBuilder(template=template)
llm = OpenAIGenerator()

dialogue_builder = Pipeline()
dialogue_builder.add_component("greeter", greeter)
dialogue_builder.add_component("prompt", prompt)
dialogue_builder.add_component("llm", llm)

dialogue_builder.connect("greeter.greeting", "prompt.dialogue")
dialogue_builder.connect("prompt", "llm")

dialogue_builder.show()

In [None]:
dialogue = dialogue_builder.run({"greeter": {"user_name": "Tuana"}})
dialogue
print(dialogue["llm"]["replies"][0])

#### Build a Hacker News Summarizer

We will build a component called `HackerNewsFetcher` and then create the fetcher pipeline

In [None]:
# this request fetches the top story post on Hacker News
trending_list = requests.get(
        url="https://hacker-news.firebaseio.com/v0/topstories.json?print=pretty"
    )
post = requests.get(
    url=f"https://hacker-news.firebaseio.com/v0/item/{trending_list.json()[0]}.json?print=pretty"
)

# View top story post
print(post.json())

In [None]:
@component
class HackernewsNewestFetcher:
    
    def __init__(self):
        
        fetcher = LinkContentFetcher()
        converter = HTMLToDocument()

        html_conversion_pipeline = Pipeline()
        html_conversion_pipeline.add_component("fetcher", fetcher)
        html_conversion_pipeline.add_component("converter", converter)

        html_conversion_pipeline.connect("fetcher", "converter")
        self.html_pipeline = html_conversion_pipeline # We add a HTML conversion pipeline to convert HTML to Document
        
    @component.output_types(articles=List[Document])
    def run(self, top_k: int):
        
        articles = []
        
        trending_list = requests.get(
            url="https://hacker-news.firebaseio.com/v0/topstories.json?print=pretty"
        )
        
        # loop thru the top_k posts and fetch their content - URL or text
        for id in trending_list.json()[0:top_k]:
            post = requests.get(
                url=f"https://hacker-news.firebaseio.com/v0/item/{id}.json?print=pretty"
            )
            
            if "url" in post.json():
                try:
                    article = self.html_pipeline.run(
                        {"fetcher": {"urls": [post.json()["url"]]}}
                    )
                    articles.append(article["converter"]["documents"][0])
                except:
                    print(f"Can't download {post}, skipped")
                    
            elif "text" in post.json():
                try:
                    articles.append(Document(content=post.json()["text"], meta= {"title": post.json()["title"]}))
                except:
                    print(f"Can't download {post}, skipped")   
        return {"articles": articles}

In [None]:
fetcher = HackernewsNewestFetcher()
results = fetcher.run(top_k=3)

print(results['articles'])

Now let's create a summarizer pipeline. We first create a prompt template and then connect the components.

In [None]:
prompt_template = """  
You will be provided a few of the top posts in HackerNews.  
For each post, provide a brief summary if possible.
  
Posts:  
{% for article in articles %}
  Post:\n
  {{ article.content}}
{% endfor %}  
"""

In [None]:
prompt_builder = PromptBuilder(template=prompt_template)
fetcher = HackernewsNewestFetcher()
llm = OpenAIGenerator()

summarizer_pipeline = Pipeline()
summarizer_pipeline.add_component("fetcher", fetcher)
summarizer_pipeline.add_component("prompt", prompt_builder)
summarizer_pipeline.add_component("llm", llm)

summarizer_pipeline.connect("fetcher.articles", "prompt.articles")
summarizer_pipeline.connect("prompt", "llm")

summarizer_pipeline.show()

In [None]:
summaries = summarizer_pipeline.run({"fetcher": {"top_k": 3}})
print(summaries)
print(summaries["llm"]["replies"][0])

Finally, let's customize the prompt to include URLs

In [None]:
prompt_template = """  
You will be provided a few of the top posts in HackerNews, followed by their URL.  
For each post, provide a brief summary followed by the URL the full post can be found at.  
  
Posts:  
{% for article in articles %}  
  {{ article.content }}
  URL: {{ article.meta["url"] }}
{% endfor %}  
"""

prompt_builder = PromptBuilder(template=prompt_template)
fetcher = HackernewsNewestFetcher()
llm = OpenAIGenerator()

summarizer_pipeline = Pipeline()
summarizer_pipeline.add_component("fetcher", fetcher)
summarizer_pipeline.add_component("prompt", prompt_builder)
summarizer_pipeline.add_component("llm", llm)

summarizer_pipeline.connect("fetcher.articles", "prompt.articles")
summarizer_pipeline.connect("prompt", "llm")

summaries = summarizer_pipeline.run({"fetcher": {"top_k": 2}})

print(summaries["llm"]["replies"][0])

Learn more about the Haystack integrations:
* [deepset-ai github repo](https://github.com/deepset-ai/haystack-integrations)
* [haystack.deepset.ai/integrations](https://haystack.deepset.ai/integrations)

----

# Fallbacks with Branching Pipelines  

We will now create a branching pipeline that falls back to web search when necessary. To achieve this, we use a special component called `ConditionalRouter`.  

This component allows us to define custom routes and trigger specific branches of the pipeline.  

<center>
  <img src="images/router.png"/>
</center>  

We will build a pipeline that first evaluates whether the answer can be obtained using a RAG pipeline. If not, the pipeline will fall back to web search.  

<center>
  <img src="images/web_search.png"/>
</center>  

For the web search branch, we will build a RAG pipeline in which the retrieved documents come from URLs.  

These documents will then be passed to a `PromptBuilder` and a `Generator` component.  

Our first branch will be named **answer**, and the fallback branch will be named **go_to_websearch**.  

<center>
  <img src="images/web_search_2.png"/>
</center>  

Since we are working with very short documents, we do not need to add a document-splitting component to our pipeline.  

Instead, we will write documents directly to the document store using `write_documents()`.  

In [None]:
import warnings
from helper import load_env

warnings.filterwarnings('ignore')
load_env()

In [None]:
from haystack import Pipeline, Document
from haystack.components.routers import ConditionalRouter
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.websearch.serper_dev import SerperDevWebSearch
from haystack.document_stores.in_memory import InMemoryDocumentStore

In [None]:
# Index Documents into InMemoryDocumentStore
documents = [Document(content="Retrievers: Retrieves relevant documents to a user query using keyword search or semantic search."),
             Document(content="Embedders: Creates embeddings for text or documents."),
             Document(content="Generators: Use a number of model providers to generate answers or content based on a prompt"),
             Document(content="File Converters: Converts different file types like TXT, Markdown, PDF, etc. into a Haystack Document type")]

document_store = InMemoryDocumentStore()
document_store.write_documents(documents=documents)

Now let's create our RAG pipeline. We will use a keyword based retriever because we have not embedded our documents. We thus use the `InMemoryBM25Retriever` component.

In [None]:
rag_prompt_template = """
Answer the following query given the documents.
If the answer is not contained within the documents, reply with 'no_answer'
Query: {{query}}
Documents:
{% for document in documents %}
  {{document.content}}
{% endfor %}
"""

In [None]:
rag = Pipeline()
rag.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
rag.add_component("prompt_builder", PromptBuilder(template=rag_prompt_template))
rag.add_component("llm", OpenAIGenerator())

rag.connect("retriever.documents", "prompt_builder.documents")
rag.connect("prompt_builder", "llm")
rag.show()

Now let's see how the pipeline reacts when we ask certain questions.

In [None]:
query = "What is a retriever for?"

rag.run({"prompt_builder":{"query": query},
         "retriever": {"query": query}})

In [None]:
query = "What Mistral components are there?"

rag.run({"prompt_builder":{"query": query},
         "retriever": {"query": query}})

Next, let's create a conditional router. This component expects a list of routes. We start by giving 2 routes.

To disable case sensitivity for the **no_answer** response, we use Jinja synatx *lower*.

In [None]:
# Notice - routers have 4 keys: condition, output, output_name, output_type
# for example, the first conditions checks if 'no_answer' is in the response from the LLM (case insensitive). If so, it routes to websearch.
routes = [
    {
        "condition": "{{'no_answer' in replies[0]|lower}}",
        "output": "{{query}}",
        "output_name": "go_to_websearch",
        "output_type": str,
    },
    {
        "condition": "{{'no_answer' not in replies[0]|lower}}",
        "output": "{{replies[0]}}",
        "output_name": "answer",
        "output_type": str,
    },
]

To test the router, we simulate an answer ("Geoff is my friend") for the query "Who is Geoff?". Since the string "Geoff is my friend" is not "**no_answer**", the conditional branch is not executed. 

In [None]:
router = ConditionalRouter(routes=routes)
router.run(replies=['Geoff is my friend'], query="Who is Geoff?")

In [None]:
router.run(replies=['No_answer'], query="Who is Geoff?")

Now let's add the router component and create the rest of the pipeline.

In [None]:
rag.add_component("router", ConditionalRouter(routes=routes))
rag.connect("llm.replies", "router.replies")
rag.show()

In [None]:
query = "What Mistral components does Haystack have?"

rag.run({"prompt_builder":{"query": query},
         "retriever": {"query": query},
         "router": {"query": query}})

Now, we develop the web search branch. For this, we use the component `SerperDevWebSearch`.

We then connect all components to create the final RAG pipeline.

In [None]:
from haystack.components.websearch.serper_dev import SerperDevWebSearch

prompt_for_websearch = """
Answer the following query given the documents retrieved from the web.
Your answer should indicate that your answer was generated from websearch.
You can also reference the URLs that the answer was generated from

Query: {{query}}
Documents:
{% for document in documents %}
  {{document.content}}
{% endfor %}
"""

rag_or_websearch = Pipeline()
rag_or_websearch.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
rag_or_websearch.add_component("prompt_builder", PromptBuilder(template=rag_prompt_template))
rag_or_websearch.add_component("llm", OpenAIGenerator())
rag_or_websearch.add_component("router", ConditionalRouter(routes))
rag_or_websearch.add_component("websearch", SerperDevWebSearch())
rag_or_websearch.add_component("prompt_builder_for_websearch", PromptBuilder(template=prompt_for_websearch))
rag_or_websearch.add_component("llm_for_websearch",  OpenAIGenerator())

rag_or_websearch.connect("retriever", "prompt_builder.documents")
rag_or_websearch.connect("prompt_builder", "llm")
rag_or_websearch.connect("llm.replies", "router.replies")
rag_or_websearch.connect("router.go_to_websearch", "websearch.query")
rag_or_websearch.connect("router.go_to_websearch", "prompt_builder_for_websearch.query")
rag_or_websearch.connect("websearch.documents", "prompt_builder_for_websearch.documents")
rag_or_websearch.connect("prompt_builder_for_websearch", "llm_for_websearch")

rag_or_websearch.show()

In [None]:
query= "What is a retriever for?"

rag_or_websearch.run({"prompt_builder":{"query": query},
                      "retriever": {"query": query},
                      "router": {"query": query}})

In [None]:
query = "What Mistral components does Haystack have?"

rag_or_websearch.run({"prompt_builder":{"query": query},
                      "retriever": {"query": query},
                      "router": {"query": query}})

More on ConditionalRouters: <https://docs.haystack.deepset.ai/docs/conditionalrouter?utm_campaign=developer-relations&utm_source=dlai>

---

# Self-Reflecting Agents with Loop  

Let's build our first agentic pipeline — a self-reflecting agent capable of looping.  

In this setup, an LLM will generate a set of named entities and then reflect on its own output to improve the response.  

**Self-reflection consists of the following:**  
- A mechanism for self-assessment and correction  
- Refining outputs to provide more reliable and accurate information  

This allows the LLM to provide feedback to itself for future iterations and to take actions based on predefined criteria.  

<center>
  <img src="images/self_reflection.png"/>
</center>  

In this lab, we will build a simple self-reflecting agent that asks the LLM to extract entities from text.  

The agent will then critique its own results, improving incorrect or incomplete entities in a loop, or return **Done** once the output is satisfactory.  

We will create a custom component called `EntityValidator`, which will evaluate the generated answer.  

The extracted data will be returned in JSON format, adhering to a schema. If validation fails, the pipeline will loop back for correction.  

<center>
  <img src="images/looping.png"/>
</center>  


In [None]:
import warnings
from helper import load_env

warnings.filterwarnings('ignore')
load_env()

In [None]:
from typing import List
from colorama import Fore
from haystack import Pipeline, component
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.generators.openai import OpenAIGenerator

Now let's create the `EntityValidator` custom component - it either replies DONE (replaced with empty string) and outputs entities, or it outputs entities to validate.

In [None]:
@component
class EntitiesValidator:

    @component.output_types(entities_to_validate=str, entities=str)
    def run(self, replies: List[str]):
        
        if 'DONE' in replies[0]:
            return {"entities":replies[0].replace('DONE', '')}
        else:
            print(Fore.RED + "Reflecting on entities\n", replies[0])
            return {"entities_to_validate": replies[0]}

In [None]:
entities_validator = EntitiesValidator()
entities_validator.run(replies= ["{'name': 'Tuana'}"])

In [None]:
entities_validator.run(replies= ["DONE {'name': 'Tuana'}"])

Now let's create a special prompt template with an if-block. This prompt handles the following cases - extracting entities and validating entities to improve on its answer

In [None]:
template = """"
{% if entities_to_validate %}
    Here was the text you were provided:
    {{ text }}
    Here are the entities you previously extracted: 
    {{ entities_to_validate[0] }}
    Are these the correct entities? 
    Things to check for:
    - Entity categories should exactly be "Person", "Location" and "Date"
    - There should be no extra categories
    - There should be no duplicate entities
    - If there are no appropriate entities for a category, the category should have an empty list
    If you are done say 'DONE' and return your new entities in the next line
    If not, simply return the best entities you can come up with.
    Entities:
{% else %}
    Extract entities from the following text
    Text: {{ text }} 
    The entities should be presented as key-value pairs in a JSON object.
    Example: 
    {
        "Person": ["value1", "value2"], 
        "Location": ["value3", "value4"],
        "Date": ["value5", "value6"]
    }
    If there are no possibilities for a particular category, return an empty list for this
    category
    Entities:
{% endif %}
"""

Finally, let's create our self-relflecting agent with max looping iterations = 10

In [None]:
prompt_template = PromptBuilder(template=template)
llm = OpenAIGenerator()
entities_validator = EntitiesValidator()

self_reflecting_agent = Pipeline(max_loops_allowed=10)

self_reflecting_agent.add_component("prompt_builder", prompt_template)
self_reflecting_agent.add_component("entities_validator", entities_validator)
self_reflecting_agent.add_component("llm", llm)

self_reflecting_agent.connect("prompt_builder.prompt", "llm.prompt")
self_reflecting_agent.connect("llm.replies", "entities_validator.replies")
self_reflecting_agent.connect("entities_validator.entities_to_validate", "prompt_builder.entities_to_validate")

self_reflecting_agent.show()

In [None]:
text = """
Istanbul is the largest city in Turkey, straddling the Bosporus Strait, 
the boundary between Europe and Asia. It is considered the country's economic, 
cultural and historic capital. The city has a population of over 15 million residents, 
comprising 19% of the population of Turkey,[4] and is the most populous city in Europe 
and the world's fifteenth-largest city."""

result = self_reflecting_agent.run({"prompt_builder": {"text": text}})
print(Fore.GREEN + result['entities_validator']['entities'])

In [None]:
text = """
Stefano: Hey all, let's start the all hands for June 6th 2024
Geoff: Thanks, I'll kick it off with a request. Could we please add persistent memory to the Chroma document store.
Stefano: Easy enough, I can add that to the feature requests. What else?
Julain: There's a bug, some BM25 algorithms return negative scores and we filter them out from the results by default.
Instead, we should probably check which algorithm is being used and keep results with negative scores accordingly.
Esmail: Before we end this call, we should add a new Generator component for LlamaCpp in the next release.
Tuana: Thanks all, I think we're done here, we can create some issues in GitHub about these."""

result = self_reflecting_agent.run({"prompt_builder": {"text": text}})
print(Fore.GREEN + result['entities_validator']['entities'])

---

# Chat Agent with Function Calling

We now use OpenAI's function calling capability to create a chat agent that uses Haystack pipelines as tools. This is done by components called `ChatGenerators`.

We work with messages, which an come from: User, Agent, System or Function. 

Generators can be provided with a set of tools in the form on functions. The LLM decides when and how to call the function.

A tool can be an external API, a simple function, or even a Haystack pipeline (e.g. a RAG pipeline that accesses your data).

In [None]:
import warnings
from helper import load_env

warnings.filterwarnings('ignore')
load_env()

In [None]:
import pprint
import gradio as gr
from typing import List
from haystack import component, Pipeline, Document
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.components.generators.chat.openai import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.components.joiners import BranchJoiner
from haystack_experimental.components.tools import OpenAIFunctionCaller

Now we will create a RAG pipeline and provide this RAG pipeline as a function.

In [None]:
template = """
Answer the questions based on the given context.

Context:
{% for document in documents %}
    {{ document.content }}
{% endfor %}
Question: {{ question }}
Answer:
"""
rag_pipe = Pipeline()
rag_pipe.add_component("prompt_builder", PromptBuilder(template=template))
rag_pipe.add_component("llm", OpenAIGenerator())

rag_pipe.connect("prompt_builder", "llm")

Let's wrap the RAG pipeline as a function:

In [None]:
def rag_pipeline_func(query: str):
    documents = [
        Document(content="My name is Jean and I live in Paris."),
        Document(content="My name is Mark and I live in Berlin."),
        Document(content="My name is Giorgio and I live in Rome."),
        Document(content="My name is Marta and I live in Madrid."),
        Document(content="My name is Harry and I live in London."),
    ]
    result = rag_pipe.run({"prompt_builder": {"question": query, 
                                              "documents": documents}})
    return {"reply": result["llm"]["replies"][0]}

Additionally, let's create a weather function:

In [None]:
WEATHER_INFO = {
    "Berlin": {"weather": "mostly sunny", "temperature": 7, "unit": "celsius"},
    "Paris": {"weather": "mostly cloudy", "temperature": 8, "unit": "celsius"},
    "Rome": {"weather": "sunny", "temperature": 14, "unit": "celsius"},
    "Madrid": {"weather": "sunny", "temperature": 10, "unit": "celsius"},
    "London": {"weather": "cloudy", "temperature": 9, "unit": "celsius"},
}

def get_current_weather(location: str):
    if location in WEATHER_INFO:
        return WEATHER_INFO[location]
    else:
        return {"weather": "sunny", "temperature": 70, "unit": "fahrenheit"}

Once we have our functions ready, we provide them as **tools** to the generator. 

We need to describe tools in a way that the OpenAI API expects.

In [None]:
tools = [
    {
        "type": "function",
        "function": {
            "name": "rag_pipeline_func",
            "description": "Get information about where people live",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The query to use in the search. Infer this from the user's message. It should be a question or a statement",
                    }
                },
                "required": ["query"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_current_weather",
            "description": "Get the current weather",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string", "description": "The city"}
                },
                "required": ["location"],
            },
        },
    },
]

Now that we have a list of tools, we can provide these tools to the `OpenAIChatGenerator` - this component is not only able to work with model names but also accepts tools. 

In [None]:
chat_generator = OpenAIChatGenerator(model="gpt-3.5-turbo", generation_kwargs={'tools': tools})
replies = chat_generator.run(messages=[ChatMessage.from_user("Where does Mark live?")])
print(replies['replies'][0])

Now, let's call the function using the `OpenAIFunctionCaller` component - this runs the function and adds its response to the message queue

In [None]:
function_caller = OpenAIFunctionCaller(available_functions={"rag_pipeline_func": rag_pipeline_func, 
                                                            "get_current_weather": get_current_weather})

results = function_caller.run(messages=replies['replies'])
pprint.pprint(results["function_replies"])

Finally, let's create a chat agent that can access tools - we will create a `BranchJoiner` that connects mesages from multiple tools in one, either from the user or from the function calling tool. The message queue stores all these messages.

In [None]:
message_collector = BranchJoiner(List[ChatMessage])
chat_generator = OpenAIChatGenerator(model="gpt-3.5-turbo", generation_kwargs={'tools': tools})
function_caller = OpenAIFunctionCaller(available_functions={"rag_pipeline_func": rag_pipeline_func, 
                                                            "get_current_weather": get_current_weather})

# Create a chat agent that can access tools
chat_agent = Pipeline()
chat_agent.add_component("message_collector", message_collector)
chat_agent.add_component("generator", chat_generator)
chat_agent.add_component("function_caller", function_caller)

# Connect the components
chat_agent.connect("message_collector", "generator.messages")
chat_agent.connect("generator", "function_caller")
chat_agent.connect("function_caller.function_replies", "message_collector") # generate a human readable message from function response
chat_agent.show()

In [None]:
# System Message
messages = [
    ChatMessage.from_system(
        """If needed, break down the user's question into simpler questions and follow-up questions that you can use with your tools.
        Don't make assumptions about what values to plug into functions. Ask for clarification if a user request is ambiguous."""
    )
]

# Start a chat loop
while True:
    user_input = input("INFO: Type 'exit' or 'quit' to stop\n")
    if user_input.lower() == "exit" or user_input.lower() == "quit":
        break
    messages.append(ChatMessage.from_user(user_input)) # append to message queue
    response = chat_agent.run({"message_collector": {"value": messages}})
    messages.extend(response['function_caller']['assistant_replies']) # append to message queue
    print(response['function_caller']['assistant_replies'][0].content)

We can provide this same exact application as a **Gradio** app. The `chat()` function below is used by Gradio to create a chat interface. Additionally, we also provide examples of what we can ask the chat agent.

In [None]:
messages = [
        ChatMessage.from_system(
            """If needed, break down the user's question to simpler questions and follow-up questions that you can use with your tools.
            Don't make assumptions about what values to plug into functions. Ask for clarification if a user request is ambiguous."""
        )
    ]
def chat(message, history): 
    messages.append(ChatMessage.from_user(message))
    response = chat_agent.run({"message_collector": {"value": messages}})
    messages.extend(response['function_caller']['assistant_replies'])
    return response['function_caller']['assistant_replies'][0].content

In [None]:
demo = gr.ChatInterface(
    fn=chat,
    examples=[
        "Can you tell me where Giorgio lives?",
        "What's the weather like in Madrid?",
        "Who lives in London?",
        "What's the weather like where Mark lives?",
    ],
    title="Ask me about weather or where people live!",
)
demo.launch(share=True)

More on:
- OpenAIChatGenerator: <https://docs.haystack.deepset.ai/docs/openaichatgenerator?utm_campaign=developer-relations&utm_source=dlai>
- Gradio: <https://huggingface.co/gradio>

---  