In [1]:
!pip install -qU \
    datasets==2.19.1 \
    llama-index-core==0.11.9 \
    llama-index-llms-openai \
    llama-index-utils-workflow==0.2.1 \
    serpapi==0.1.5 \
    google-search-results==2.4.2 \
    semantic-router[pinecone]==0.0.65

# Knowledge Base Setup

We'll be running our agent against a knowledge base — which requires a Pinecone index to be built.

You can, if needed, skip this step and replace the `search` tool with a placeholder value if wanting to quickly test the structure of the Llama Index Workflow.

If you want full functionality here, you do need to run this section — but we'll make it quick.

## Download a Dataset

The first thing we need for an agent using RAG is somewhere we want to pull knowledge from. We will use v2 of the AI ArXiv dataset, available on Hugging Face Datasets at `jamescalam/ai-arxiv2-chunks`.

Note: we're using the prechunked dataset. For the raw version see `jamescalam/ai-arxiv2`.

In [1]:
import pandas as pd
import numpy as np

In [2]:
from datasets import load_dataset

dataset = load_dataset("jamescalam/ai-arxiv2-semantic-chunks", split="train")
dataset

Dataset({
    features: ['id', 'title', 'content', 'prechunk_id', 'postchunk_id', 'arxiv_id', 'references'],
    num_rows: 209760
})

In [3]:
dataset[0]

{'id': '2401.04088#0',
 'title': 'Mixtral of Experts',
 'content': '4 2 0 2 n a J 8 ] G L . s c [ 1 v 8 8 0 4 0 . 1 0 4 2 : v i X r a # Mixtral of Experts Albert Q. Jiang, Alexandre Sablayrolles, Antoine Roux, Arthur Mensch, Blanche Savary, Chris Bamford, Devendra Singh Chaplot, Diego de las Casas, Emma Bou Hanna, Florian Bressand, Gianna Lengyel, Guillaume Bour, Guillaume Lample, LÃ©lio Renard Lavaud, Lucile Saulnier, Marie-Anne Lachaux, Pierre Stock, Sandeep Subramanian, Sophia Yang, Szymon Antoniak, Teven Le Scao, ThÃ©ophile Gervet, Thibaut Lavril, Thomas Wang, TimothÃ©e Lacroix, William El Sayed Abstract We introduce Mixtral 8x7B, a Sparse Mixture of Experts (SMoE) language model. Mixtral has the same architecture as Mistral 7B, with the difference that each layer is composed of 8 feedforward blocks (i.e. experts). For every token, at each layer, a router network selects two experts to process the current state and combine their outputs. Even though each token only sees two experts

## Construct Knowledge Base

Initialized the HuggingfaceEncoder

In [4]:
# import os
# from getpass import getpass
from semantic_router.encoders import HuggingFaceEncoder

# os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") or getpass("OpenAI API key: ")

# encoder = OpenAIEncoder(name="text-embedding-3-small")

encoder = HuggingFaceEncoder(name = "BAAI/bge-small-en") # "text-embedding-3-small"


Initialize our connection to qdrant:

In [5]:
import os
from getpass import getpass
from qdrant_client import QdrantClient
from qdrant_client.http import models
import time
from tqdm.auto import tqdm

In [6]:
# Initialize connection to Qdrant
qdrant_url = "http://localhost:6333"
client = QdrantClient(qdrant_url)

In [7]:
# Get dimensions of the embeddings
dims = len(encoder(["some random text"])[0])

index_name = "gpt-4o-research-agent"

In [8]:
dims

384

In [9]:
# Check if collection already exists
existing_collections = client.get_collections().collections
existing_collection_names = [collection.name for collection in existing_collections]

In [10]:
if index_name not in existing_collection_names:
    # If it does not exist, create collection
    client.create_collection(
        collection_name=index_name,
        vectors_config=models.VectorParams(size=dims, distance=models.Distance.DOT),
    )
    # Wait for collection to be initialized
    while True:
        collection_info = client.get_collection(index_name)
        if collection_info.status == models.CollectionStatus.GREEN:
            break
        time.sleep(1)

In [11]:
# View collection stats
collection_info = client.get_collection(index_name)
print(collection_info)

status=<CollectionStatus.GREEN: 'green'> optimizer_status=<OptimizersStatusOneOf.OK: 'ok'> vectors_count=None indexed_vectors_count=0 points_count=1408 segments_count=8 config=CollectionConfig(params=CollectionParams(vectors=VectorParams(size=384, distance=<Distance.DOT: 'Dot'>, hnsw_config=None, quantization_config=None, on_disk=None, datatype=None, multivector_config=None), shard_number=1, sharding_method=None, replication_factor=1, write_consistency_factor=1, read_fan_out_factor=None, on_disk_payload=True, sparse_vectors=None), hnsw_config=HnswConfig(m=16, ef_construct=100, full_scan_threshold=10000, max_indexing_threads=0, on_disk=False, payload_m=None), optimizer_config=OptimizersConfig(deleted_threshold=0.2, vacuum_min_vector_number=1000, default_segment_number=0, max_segment_size=None, memmap_threshold=None, indexing_threshold=20000, flush_interval_sec=5, max_optimization_threads=None), wal_config=WalConfig(wal_capacity_mb=32, wal_segments_ahead=0), quantization_config=None) pay

Populate the knowledge base:

In [12]:
# Convert dataset to pandas DataFrame if it's not already
if not isinstance(dataset, pd.DataFrame):
    data = dataset.to_pandas()
else:
    data = dataset

data = data[:10000]

batch_size = 128

for i in tqdm(range(0, len(data), batch_size)):
    # find end of batch
    i_end = min(len(data), i + batch_size)
    # create batch
    batch = data.iloc[i:i_end]

    # create chunks
    chunks = [f'{row["title"]}: {row["content"]}' for _, row in batch.iterrows()]

    # create embeddings
    embeds = encoder(chunks)
    assert len(embeds) == (i_end - i)

    # prepare points for Qdrant
    points = [
        models.PointStruct(
            id=int(id),
            vector=embed.tolist() if isinstance(embed, np.ndarray) else embed,  # Convert numpy array to list if necessary
            payload={
                "title": row["title"],
                "content": row["content"],
                "arxiv_id": row["arxiv_id"],
                "references": row["references"].tolist() if isinstance(row["references"], np.ndarray) else row["references"]
            }
        )
        for id, embed, (_, row) in zip(batch.index, embeds, batch.iterrows())
    ]
    # print(points)
    # upsert to Qdrant
    client.upsert(
        collection_name=index_name,
        points=points
    )

  0%|          | 0/79 [00:00<?, ?it/s]

In [13]:
# After upserting all data, you can get collection info again to see the changes
final_collection_info = client.get_collection(index_name)
print("Final collection info:", final_collection_info)

Final collection info: status=<CollectionStatus.GREEN: 'green'> optimizer_status=<OptimizersStatusOneOf.OK: 'ok'> vectors_count=None indexed_vectors_count=0 points_count=10000 segments_count=8 config=CollectionConfig(params=CollectionParams(vectors=VectorParams(size=384, distance=<Distance.DOT: 'Dot'>, hnsw_config=None, quantization_config=None, on_disk=None, datatype=None, multivector_config=None), shard_number=1, sharding_method=None, replication_factor=1, write_consistency_factor=1, read_fan_out_factor=None, on_disk_payload=True, sparse_vectors=None), hnsw_config=HnswConfig(m=16, ef_construct=100, full_scan_threshold=10000, max_indexing_threads=0, on_disk=False, payload_m=None), optimizer_config=OptimizersConfig(deleted_threshold=0.2, vacuum_min_vector_number=1000, default_segment_number=0, max_segment_size=None, memmap_threshold=None, indexing_threshold=20000, flush_interval_sec=5, max_optimization_threads=None), wal_config=WalConfig(wal_capacity_mb=32, wal_segments_ahead=0), quant

# Agent Components

## Tools

We define the separate tool functions. When integrating with our graph all of these will be executed using the same `run_tools` class - which we will define later.

For now, let's define the functions that our agent will have access to.

In [14]:
import requests
import re

# our regex
abstract_pattern = re.compile(
    r'\s*Abstract:\s*(.*?)\s*',
    re.DOTALL
)

async def fetch_arxiv(arxiv_id: str):
    """Gets the abstract from an ArXiv paper given the arxiv ID. Useful for
    finding high-level context about a specific paper."""
    print(">>> fetch_arxiv")
    # get paper page in html
    res = requests.get(
        f"https://export.arxiv.org/abs/{arxiv_id}"
    )
    # search html for abstract
    re_match = abstract_pattern.search(res.text)
    # return abstract text
    return re_match.group(1)

## Web Search

The web search tool will provide the agent with access to web search. It will be instructed to use this for more general knowledge queries.

In [15]:
from serpapi import GoogleSearch
from getpass import getpass
import os

serpapi_params = {
    "engine": "google",
    "api_key": os.getenv("SERPAPI_KEY") or getpass("SerpAPI key: ")
}

async def web_search(query: str):
    """Finds general knowledge information using Google search. Can also be used
    to augment more 'general' knowledge to a previous specialist query."""
    print(">>> web_search")
    search = GoogleSearch({
        **serpapi_params,
        "q": query,
        "num": 5
    })
    results = search.get_dict()["organic_results"]
    contexts = "\n---\n".join(
        ["\n".join([x["title"], x["snippet"], x["link"]]) for x in results]
    )
    return contexts

## Rag Search

We provide two RAG-focused tools for our agent. The `rag_search` allows the agent to perform a simple RAG search for some information across all indexed research papers. The `rag_search_filter` also searches, but within a specific paper which is filtered for via the `arxiv_id` parameter.

We also define the `format_rag_contexts` function to handle the transformation of our Pinecone results from a JSON object to a readble plaintext format.

In [16]:
def format_rag_contexts(matches: list):
    contexts = []
    for x in matches:
        text = (
            f"Title: {x['metadata']['title']}\n"
            f"Content: {x['metadata']['content']}\n"
            f"ArXiv ID: {x['metadata']['arxiv_id']}\n"
            f"Related Papers: {x['metadata']['references']}\n"
        )
        contexts.append(text)
    context_str = "\n---\n".join(contexts)
    return context_str

async def rag_search_filter(query: str, arxiv_id: str):
    """Finds information from our ArXiv database using a natural language query
    and a specific ArXiv ID. Allows us to learn more details about a specific paper."""
    print(">>> rag_search_filter")
    xq = await encoder.acall([query])
    xc = index_name.query(vector=xq, top_k=6, include_metadata=True, filter={"arxiv_id": arxiv_id})
    context_str = format_rag_contexts(xc["matches"])
    return context_str

async def rag_search(query: str):
    """Finds specialist information on AI using a natural language query."""
    print(">>> rag_search")
    xq = await encoder.acall([query])
    xc = index_name.query(vector=xq, top_k=2, include_metadata=True)
    context_str = format_rag_contexts(xc["matches"])
    return context_str

In [17]:
async def final_answer(
    introduction: str,
    research_steps: str,
    main_body: str,
    conclusion: str,
    sources: str
):
    """Returns a natural language response to the user in the form of a research
    report. There are several sections to this report, those are:
    - `introduction`: a short paragraph introducing the user's question and the
    topic we are researching.
    - `research_steps`: a few bullet points explaining the steps that were taken
    to research your report.
    - `main_body`: this is where the bulk of high quality and concise
    information that answers the user's question belongs. It is 3-4 paragraphs
    long in length.
    - `conclusion`: this is a short single paragraph conclusion providing a
    concise but sophisticated view on what was found.
    - `sources`: a bulletpoint list provided detailed sources for all information
    referenced during the research process
    """
    print(">>> final_answer")
    if type(research_steps) is list:
        research_steps = "\n".join([f"- {r}" for r in research_steps])
    if type(sources) is list:
        sources = "\n".join([f"- {s}" for s in sources])
    return ""

## Oracle LLM

Our prompt for the Oracle will emphasize it's decision making ability within the `system_prompt`, leave a placeholder for us to later insert `chat_history`, and provide a place for us to insert the user `input`.

In [18]:
system_prompt = """You are the oracle, the great AI decision maker.
Given the user's query you must decide what to do with it based on the
list of tools provided to you.

If you see that a tool has been used (in the scratchpad) with a particular
query, do NOT use that same tool with the same query again. Also, do NOT use
any tool more than twice (ie, if the tool appears in the scratchpad twice, do
not use it again).

You should aim to collect information from a diverse range of sources before
providing the answer to the user. Once you have collected plenty of information
to answer the user's question (stored in the scratchpad) use the final_answer
tool."""

The oracle agent will be provided the tools we previously built.

In [20]:
from llama_index.core.tools import FunctionTool

tools = [
    FunctionTool.from_defaults(fn=fetch_arxiv),
    FunctionTool.from_defaults(fn=web_search),
    FunctionTool.from_defaults(fn=rag_search_filter),
    FunctionTool.from_defaults(fn=rag_search),
    FunctionTool.from_defaults(fn=final_answer),
]

### install llama index ollam

- %pip install llama-index-llms-ollama


In [21]:
import os
from llama_index.llms.ollama import Ollama


llm = Ollama(model="llama3:latest", request_timeout=120.0)

## Events

We need to create a few events for our workflow. Llama-index comes with a few predefined event types, two of which we will use (`StartEvent` and `StopEvent`). However, we need to define a few additional custom event types - these are:

* `InputEvent` to handle new messages and prepare chat history.

* `ToolCallEvent` to trigger tool calls.

In [23]:
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event


class InputEvent(Event):
    input: list[ChatMessage]


class ToolCallEvent(Event):
    id: str
    name: str
    params: dict

Now we build the workflow. Workflows consist of a single `Workflow` class with multiple `steps`. Each step is like a compute/execution step in our agentic flow.

We control which step is triggered by using different `Event` types. Each step consumes a different type of event, like `InputEvent` or `ToolCallEvent`. Additionally, our workflow begins and ends with the `StartEvent` and `StopEvent` events respectively.

In [27]:
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.llms import MessageRole

class ResearchAgent(Workflow):
    def __init__(
        self,
        *args: any,
        oracle: Ollama,
        tools: list[FunctionTool],
        timeout: int = 20,
    ):
        super().__init__(*args)
        self._timeout = timeout
        self.oracle = oracle
        self.tools = tools
        self.get_tool = {tool.metadata.get_name(): tool for tool in self.tools}
        # initialize chat history/memory with system prompt
        self.sys_msg = ChatMessage(
            role=MessageRole.SYSTEM,
            content=system_prompt
        )
        self.memory = ChatMemoryBuffer.from_defaults(llm=llm)

    @step
    async def prepare_chat_history(self, ev: StartEvent) -> InputEvent:
        # clear memory
        self.memory = ChatMemoryBuffer.from_defaults(llm=llm)
        self.memory.put(message=self.sys_msg)
        # get user input
        user_input = ev.input
        user_msg = ChatMessage(role="user", content=user_input)
        self.memory.put(message=user_msg)
        # get chat history
        chat_history = self.memory.get()
        # return input event
        return InputEvent(input=chat_history)

    @step
    async def handle_llm_input(self, ev: InputEvent) -> ToolCallEvent | StopEvent:
        chat_history = ev.input
        # get oracle response
        response = await self.oracle.achat_with_tools(
            tools=self.tools,
            chat_history=chat_history,
            tool_choice="required",
        )
        # add response to chat history / memory
        self.memory.put(message=response.message)
        # get tool calls
        tool_calls = self.oracle.get_tool_calls_from_response(
            response
        )
        # if final_answer tool used we return to the user with the StopEvent
        if tool_calls[-1].tool_name == "final_answer":
            return StopEvent(result={"response": tool_calls[-1].tool_kwargs})
        else:
            # return tool call event
            return ToolCallEvent(
                id=tool_calls[-1].tool_id,
                name=tool_calls[-1].tool_name,
                params=tool_calls[-1].tool_kwargs,
            )

    @step
    async def run_tool(self, ev: ToolCallEvent) -> InputEvent:
        tool_name = ev.name
        additional_kwargs = {
            "tool_call_id": ev.id,
            "name": tool_name
        }
        # get chosen tool
        tool = self.get_tool.get(tool_name)
        if not tool:
            tool_msg = ChatMessage(
                role="tool",
                content=f"Tool {tool_name} not found",
                additional_kwargs=additional_kwargs
            )
        else:
            # now call tool
            tool_output = await tool.acall(**ev.params)
            tool_msg = ChatMessage(
                role="tool",
                content=tool_output.content,
                additional_kwargs=additional_kwargs
            )
        self.memory.put(message=tool_msg)
        chat_history = self.memory.get()
        return InputEvent(input=chat_history)

In [25]:
from llama_index.core.workflow import draw_all_possible_flows

draw_all_possible_flows(ResearchAgent, filename="research_agent.html")

  draw_all_possible_flows(ResearchAgent, filename="research_agent.html")


research_agent.html


Initialize the workflow:

In [30]:
agent = ResearchAgent(
    oracle=llm,
    tools=tools,
    timeout=130,
)

In [None]:
res = await agent.run(input="tell me about AI")
res["response"]

---

Let's test with async. To avoid overwriting state with asynchronous runs taking place we will initialize three new agent instances.

In [48]:
agent1 = ResearchAgent(oracle=llm, tools=tools, timeout=30)
agent2 = ResearchAgent(oracle=llm, tools=tools, timeout=30)
agent3 = ResearchAgent(oracle=llm, tools=tools, timeout=30)

In [49]:
import asyncio

calls = [
    agent.run(input="tell me about AI"),
    agent1.run(input="tell me about AI"),
    agent2.run(input="what is RAG?"),
    agent3.run(input="what is the latest LLM from OpenAI?")
]

outputs = await asyncio.gather(*calls)

>>> web_search
>>> web_search
>>> web_search
>>> web_search
>>> rag_search
>>> rag_search
>>> rag_search
>>> fetch_arxiv
>>> fetch_arxiv
>>> fetch_arxiv
>>> rag_search
>>> fetch_arxiv
>>> rag_search_filter
>>> rag_search_filter
>>> rag_search_filter


In [50]:
outputs[0]

{'response': {'introduction': 'Artificial Intelligence (AI) continues to evolve rapidly, with 2023 witnessing significant advancements and trends that are shaping the future of technology. This report delves into the latest AI research trends of 2023, focusing on the rise of generative AI, the development of autonomous cognitive entities, and other key areas.',
  'research_steps': '- Conducted a web search to identify the latest AI research trends in 2023.\n- Utilized RAG search to gather specialist information on AI trends.\n- Retrieved abstracts and detailed information from specific ArXiv papers related to generative AI and autonomous cognitive entities.\n- Analyzed and synthesized the collected information to provide a comprehensive overview.',
  'main_body': 'In 2023, the field of AI has been marked by several groundbreaking trends, with generative AI taking center stage. Generative AI, which involves creating new content from existing data, has seen explosive growth. Tools like C

In [51]:
outputs[1]

{'response': {'introduction': 'Artificial Intelligence (AI) is a transformative field of computer science focused on creating systems capable of performing tasks that typically require human intelligence. This includes activities such as learning, reasoning, problem-solving, perception, and language understanding.',
  'research_steps': '1. Conducted a web search to gather general information about AI.\n2. Performed a specialized search in AI literature to obtain detailed insights.\n3. Retrieved and reviewed abstracts and content from relevant ArXiv papers.\n4. Synthesized information from multiple sources to provide a comprehensive overview.',
  'main_body': "Artificial Intelligence (AI) is a broad and dynamic field within computer science that aims to create machines capable of intelligent behavior. The concept of AI dates back to ancient philosophical discussions about the nature of intelligence and the possibility of non-human entities possessing it. In the 1950s, Alan Turing formal

In [52]:
outputs[2]

{'response': {'introduction': 'Retrieval-Augmented Generation (RAG) is an advanced AI framework that enhances the capabilities of generative models by integrating external knowledge sources. This approach aims to improve the accuracy and reliability of AI-generated content by leveraging both the inherent knowledge of the language model and additional information retrieved from external databases.',
  'research_steps': '- Conducted a web search to gather general information about RAG.\n- Performed a specialized RAG search to obtain detailed insights.\n- Retrieved and analyzed an ArXiv paper (ID: 2308.03983) to understand the technical aspects of RAG.\n- Filtered the ArXiv paper for specific details about RAG.',
  'main_body': 'Retrieval-Augmented Generation (RAG) is a technique designed to enhance the performance of generative AI models by incorporating external knowledge sources. Traditional generative models rely solely on their pre-trained knowledge, which can sometimes lead to inacc

In [53]:
outputs[3]

{'response': {'introduction': "OpenAI has been at the forefront of developing advanced large language models (LLMs) that push the boundaries of artificial intelligence. The latest model from OpenAI, as of 2023, is known as 'o1'.",
  'research_steps': "1. Conducted a web search to find the latest information on OpenAI's LLMs.\n2. Reviewed multiple sources to confirm the details about the latest model.\n3. Compiled the information into a concise report.",
  'main_body': "OpenAI's latest large language model, introduced in 2023, is called 'o1'. This model represents a significant advancement in the field of AI, particularly in its ability to perform complex reasoning tasks. The 'o1' model has been trained using reinforcement learning techniques, which enable it to think before responding, thereby improving its inferencing capabilities. This makes 'o1' particularly adept at handling complex, multi-step tasks that require a higher level of cognitive processing compared to its predecessors.\

---