In [1]:
import os
from getpass import getpass

if not (openai_api_key := os.getenv("OPENAI_API_KEY")):
    openai_api_key = getpass("🔑 Enter your OpenAI API key: ")
os.environ["OPENAI_API_KEY"] = openai_api_key

import pandas as pd

# Display the complete contents of dataframe cells.
pd.set_option("display.max_colwidth", None)

In [1]:
from tqdm.auto import tqdm
from typing import List
from pydantic import BaseModel
from llama_index.llms.openai import OpenAI
from llama_index.core.tools import FunctionTool
from llama_index.core.agent import ReActAgent
import wikipedia
from pydantic import BaseModel
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
from typing import Dict
from typing import Any, List, Union

from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser
from llama_index.core.agent.react.types import (
    ActionReasoningStep,
    ObservationReasoningStep,
)
from llama_index.core.llms.llm import LLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)
from llama_index.llms.openai import OpenAI

from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event

from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
    OTLPSpanExporter as HTTPSpanExporter,
)
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
import pandas as pd


### Build a testset

In [79]:


class EvalGenPrompt(BaseModel):
    prompt: str

def generate_eval_prompt(prompt: str) -> EvalGenPrompt:
    """Generates a concise, markdown-formatted evaluation dataset generation prompt."""
    return EvalGenPrompt(prompt=prompt)

tool = FunctionTool.from_defaults(fn=generate_eval_prompt)

# Now use the LLM to call this function
llm = OpenAI(model="gpt-4o", strict=True)

In [56]:
examples = """
'Who is the president?'
'Who scored the most goals in the European Champion's League in 2020?'
'articles similar to the article on 'Philosophy'?'
"the school of athens"
"css flex"
"""

user_unstructured_prompt = f"""
Your task is to clean up the following unstructured prompt into a well structured, markdown formatted prompt 
that I can pass to another LLM for synthetic data generation. The prompt should be concise 
but clear and unambiguous. 
It should include the following sections: Task, Context, Instructions, and Example Queries.
It should respond by calling the tool assigned to it.
The task is to generate a dataset of queries that can be used to evaluate the 
performance of a wikipedia-based agentic search engine.
The context is that the user queries are generally of three types: 1) direct query (eg: Who is the 
president?), 2) return articles similar to a given article (eg: articles similar to 'Philosophy'), 
3) return a summary of a given subect (eg: the school of athens). The queries generated
should cover each of these types and be similar in theme to the user query examples provided below but 
they should be worded differently and may include typos, capitalization differences, etc. They should be
designed to sample a realistic distribution of queries that a user could answer.
It should also generate a few out of distribution queries that are not of these types or difficult 
to answer in some way. 

user query examples:
{examples}
"""

In [57]:
prompt = user_unstructured_prompt

response = llm.predict_and_call(
    [tool],
    prompt
)

In [58]:
for s in response.sources:
    tool_output = s.raw_output
    if isinstance(tool_output, EvalGenPrompt):
        print(tool_output.prompt)

        eval_gen_prompt = tool_output.prompt


## Task
Generate a dataset of queries to evaluate the performance of a Wikipedia-based agentic search engine.

## Context
User queries are generally of three types:
1. Direct query (e.g., Who is the president?)
2. Return articles similar to a given article (e.g., articles similar to 'Philosophy')
3. Return a summary of a given subject (e.g., the school of Athens)

The queries generated should cover each of these types and be similar in theme to the user query examples provided below but should be worded differently and may include typos, capitalization differences, etc. They should be designed to sample a realistic distribution of queries that a user could ask. Additionally, generate a few out-of-distribution queries that are not of these types or are difficult to answer in some way.

## Instructions
1. Generate queries that fit into the three specified types.
2. Ensure the queries are similar in theme to the provided examples but are worded differently.
3. Include variations such as t

In [62]:
n_queries_to_generate = 20

eval_gen_prompt += f"\n\n Number of queries to generate: {n_queries_to_generate}."

In [63]:

print(eval_gen_prompt)

## Task
Generate a dataset of queries to evaluate the performance of a Wikipedia-based agentic search engine.

## Context
User queries are generally of three types:
1. Direct query (e.g., Who is the president?)
2. Return articles similar to a given article (e.g., articles similar to 'Philosophy')
3. Return a summary of a given subject (e.g., the school of Athens)

The queries generated should cover each of these types and be similar in theme to the user query examples provided below but should be worded differently and may include typos, capitalization differences, etc. They should be designed to sample a realistic distribution of queries that a user could ask. Additionally, generate a few out-of-distribution queries that are not of these types or are difficult to answer in some way.

## Instructions
1. Generate queries that fit into the three specified types.
2. Ensure the queries are similar in theme to the provided examples but are worded differently.
3. Include variations such as t

In [64]:
class EvalDataset(BaseModel):
    query: str

def generate_eval_dataset(query: str) -> EvalDataset:
    """Generates an evaluation dataset with a single query."""
    return EvalDataset(query=query)

tool = FunctionTool.from_defaults(fn=generate_eval_dataset)

In [72]:
llm = OpenAI(model="gpt-4o", strict=True)
prompt = eval_gen_prompt
# query = "Give me articles similar to 'Philosophy'"
# Call predict_and_call with both tools
response = llm.predict_and_call(
    [tool],
    prompt,
    allow_parallel_tool_calls=True,
)

In [73]:
eval_queries = []
for s in response.sources:
    tool_output = s.raw_output
    if isinstance(tool_output, list) and all(isinstance(item, EvalDataset) for item in tool_output):
        for result in tool_output:
            print(f"Query: {result.query}")
            eval_queries.append(result.query)

    if isinstance(tool_output, EvalDataset):
        print(tool_output.query)

        eval_queries.append(tool_output.query)

Who is the current Prime Minister?
Who won the Nobel Prize in Literature in 2021?
articles like the one on 'Metaphysics'
the theory of relativity
articles similar to 'Quantum Mechanics'
Who is the CEO of Tesla?
articles related to 'Artificial Intelligence'
the history of the Roman Empire
Who discovered penicillin?
articles like 'Renaissance Art'
the life of Albert Einstein
Who is the author of '1984'?
articles similar to 'Existentialism'
the causes of World War II
Who painted the Mona Lisa?
articles like 'Modern Architecture'
the principles of democracy
Who is the founder of Microsoft?
articles related to 'Climate Change'
the impact of the Industrial Revolution


In [74]:


test_df = pd.DataFrame(eval_queries, columns=["query"])
test_df.head()

Unnamed: 0,query
0,Who is the current Prime Minister?
1,Who won the Nobel Prize in Literature in 2021?
2,articles like the one on 'Metaphysics'
3,the theory of relativity
4,articles similar to 'Quantum Mechanics'


In [83]:
test_examples = [
"Who is the president?",
"Who scored the most goals in the European Champion's League in 2020?",
"articles similar to the article on 'Philosophy'?",
"the school of athens",
"css flex"
]

test_examples_df = pd.DataFrame(test_examples, columns=["query"])

# Concatenate the new DataFrame with the existing test_df
test_df = pd.concat([test_examples_df, test_df], ignore_index=True)
test_df.head()

# Write the DataFrame to a CSV file
test_df.to_csv("../data/evaluation/test_queries.csv", index=True)


In [2]:
# Load the DataFrame from the CSV file
test_df = pd.read_csv("../data/evaluation/test_queries.csv", index_col=0)

# Display the first few rows of the loaded DataFrame
test_df.head()

Unnamed: 0,query
0,Who is the president?
1,Who scored the most goals in the European Cham...
2,articles similar to the article on 'Philosophy'?
3,the school of athens
4,css flex


### Link to Phoenix Arize (Traceability)

In [3]:

# Add Phoenix API Key for tracing
# PHOENIX_API_KEY = "<YOUR-PHOENIX-API-KEY>"
# os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"api_key={PHOENIX_API_KEY}"

# Add Phoenix
span_phoenix_processor = SimpleSpanProcessor(
    # HTTPSpanExporter(endpoint="https://app.phoenix.arize.com/v1/traces")
    HTTPSpanExporter(endpoint="http://0.0.0.0:6006/v1/traces")
)

# Add them to the tracer
tracer_provider = trace_sdk.TracerProvider()
tracer_provider.add_span_processor(span_processor=span_phoenix_processor)

# Instrument the application
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)

### Define ReAct Agent

In [4]:


class PrepEvent(Event):
    pass


class InputEvent(Event):
    input: list[ChatMessage]


class ToolCallEvent(Event):
    tool_calls: list[ToolSelection]


class FunctionOutputEvent(Event):
    output: ToolOutput

In [10]:


class ReActAgent(Workflow):
    def __init__(
        self,
        *args: Any,
        llm: Union[LLM, None] = None,
        tools: Union[list[BaseTool], None] = None,
        extra_context: Union[str, None] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self.tools = tools or []

        self.llm = llm or OpenAI()

        self.memory = ChatMemoryBuffer.from_defaults(llm=llm)
        self.formatter = ReActChatFormatter(context=extra_context or "")
        self.output_parser = ReActOutputParser()
        self.sources = []

    @step
    async def new_user_msg(self, ctx: Context, ev: StartEvent) -> PrepEvent:
        # clear sources
        self.sources = []

        # get user input
        user_input = ev.input
        user_msg = ChatMessage(role="user", content=user_input)
        self.memory.put(user_msg)

        # clear current reasoning
        await ctx.set("current_reasoning", [])

        return PrepEvent()

    @step
    async def prepare_chat_history(
        self, ctx: Context, ev: PrepEvent
    ) -> InputEvent:
        # get chat history
        chat_history = self.memory.get()
        current_reasoning = await ctx.get("current_reasoning", default=[])
        llm_input = self.formatter.format(
            self.tools, chat_history, current_reasoning=current_reasoning
        )
        return InputEvent(input=llm_input)

    @step
    async def handle_llm_input(
        self, ctx: Context, ev: InputEvent
    ) -> Union[ToolCallEvent, StopEvent, PrepEvent]:
        chat_history = ev.input

        response = await self.llm.achat(chat_history)

        try:
            reasoning_step = self.output_parser.parse(response.message.content)
            (await ctx.get("current_reasoning", default=[])).append(
                reasoning_step
            )
            if reasoning_step.is_done:
                self.memory.put(
                    ChatMessage(
                        role="assistant", content=reasoning_step.response
                    )
                )
                return StopEvent(
                    result={
                        "response": reasoning_step.response,
                        "sources": [*self.sources],
                        "reasoning": await ctx.get(
                            "current_reasoning", default=[]
                        ),
                    }
                )
            elif isinstance(reasoning_step, ActionReasoningStep):
                tool_name = reasoning_step.action
                tool_args = reasoning_step.action_input
                return ToolCallEvent(
                    tool_calls=[
                        ToolSelection(
                            tool_id="fake",
                            tool_name=tool_name,
                            tool_kwargs=tool_args,
                        )
                    ]
                )
        except Exception as e:
            (await ctx.get("current_reasoning", default=[])).append(
                ObservationReasoningStep(
                    observation=f"There was an error in parsing my reasoning: {e}"
                )
            )

        # if no tool calls or final response, iterate again
        return PrepEvent()

    @step
    async def handle_tool_calls(
        self, ctx: Context, ev: ToolCallEvent
    ) -> PrepEvent:
        tool_calls = ev.tool_calls
        tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}

        # call tools -- safely!
        for tool_call in tool_calls:
            tool = tools_by_name.get(tool_call.tool_name)
            if not tool:
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(
                        observation=f"Tool {tool_call.tool_name} does not exist"
                    )
                )
                continue

            try:
                tool_output = tool(**tool_call.tool_kwargs)
                self.sources.append(tool_output)
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(observation=tool_output.content)
                )
            except Exception as e:
                (await ctx.get("current_reasoning", default=[])).append(
                    ObservationReasoningStep(
                        observation=f"Error calling tool {tool.metadata.get_name()}: {e}"
                    )
                )

        # prep the next iteraiton
        return PrepEvent()

In [7]:

class WikiSearchResult(BaseModel):
    title: str
    url: str

class WikiArticle(BaseModel):
    title: str
    content: str
    url: str
    
def wikipedia_similar_articles(query: str) -> list[Dict[str,str]]:
    """Search Wikipedia for articles similar to the given query and return titles and URLs."""
    search_results = wikipedia.search(query, results=5)
    result_list = []
    for result in search_results:
        try:
            page = wikipedia.page(result)
            result_list.append(WikiSearchResult(title=page.title, url=page.url))
        except wikipedia.exceptions.DisambiguationError as e:
            # Handle disambiguation pages by logging or ignoring
            print(f"Disambiguation page: {e.options}")
        except wikipedia.exceptions.PageError:
            print(f"PageError: {result}")
    return result_list



def wikipedia_full_article(query: str) -> Dict[str,str]:
    """Fetch the full Wikipedia article for the given query."""
    try:
        page = wikipedia.page(query)
        return WikiArticle(title=page.title, content=page.content, url=page.url)
    except wikipedia.exceptions.DisambiguationError as e:
        # Handle disambiguation pages
        print(f"Disambiguation page: {e.options}")
    except wikipedia.exceptions.PageError:
        print(f"PageError: {query}")
    return None


# Wrap these functions in a tool
similar_articles_tool = FunctionTool.from_defaults(fn=wikipedia_similar_articles)
full_article_tool = FunctionTool.from_defaults(fn=wikipedia_full_article)



In [34]:
llm = OpenAI(model="gpt-4o")
agent = ReActAgent(
    llm=llm, tools=[similar_articles_tool, full_article_tool], timeout=120, verbose=True
)


### Evaluate the agent

In [47]:
from datasets import Dataset
from typing import List, Union
# for normal python
# import asyncio
# for jupyter notebook
import nest_asyncio
nest_asyncio.apply()
agent.memory.reset()
from phoenix.trace import using_project

def get_context(response) -> List[str]:
    content = []  
    for source in response['sources']:
        tool_output = source.raw_output
        if isinstance(tool_output, list) and all(isinstance(item, WikiSearchResult) for item in tool_output):
            for result in tool_output:
                content.append(f"Title: {result.title}, URL: {result.url}")
        elif isinstance(tool_output, WikiArticle):
            content.append(f"Title: {tool_output.title}, Content: {tool_output.content}, URL: {tool_output.url}")
    return content

async def generate_response(agent, question) -> Dict[str, Union[str, List[str]]]:
    response = await agent.run(input=question)
    return {
        "answer": response['response'],
        "contexts": get_context(response),
    }

async def generate_ragas_dataset(agent, test_df):
    test_questions = test_df["query"].values
    responses = []
    
    for question in tqdm(test_questions):
        response = await generate_response(agent, question)  # Process each question serially
        responses.append(response)
    
    dataset_dict = {
        "question": test_questions,
        "answer": [response["answer"] for response in responses],
        "contexts": [response["contexts"] for response in responses],  # Store contexts separately for each question
    }
    ds = Dataset.from_dict(dataset_dict)
    return ds

# limit to first 5 rows for testing
with using_project("llama-index"):
    ragas_eval_dataset = await generate_ragas_dataset(agent, test_df.head(5))

ragas_evals_df = ragas_eval_dataset.to_pandas()
ragas_evals_df.head()

  0%|          | 0/5 [00:00<?, ?it/s]

Running step new_user_msg
Step new_user_msg produced event PrepEvent
Running step prepare_chat_history
Step prepare_chat_history produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event ToolCallEvent
Running step handle_tool_calls
Step handle_tool_calls produced event PrepEvent
Running step prepare_chat_history
Step prepare_chat_history produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event StopEvent
Running step new_user_msg
Step new_user_msg produced event PrepEvent
Running step prepare_chat_history
Step prepare_chat_history produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event ToolCallEvent
Running step handle_tool_calls
Step handle_tool_calls produced event PrepEvent
Running step prepare_chat_history
Step prepare_chat_history produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event ToolCallEvent
Running step handle_tool_calls
Step hand

Unnamed: 0,question,answer,contexts
0,Who is the president?,The current president of the United States is ...,"[Title: President of the United States, Conten..."
1,Who scored the most goals in the European Cham...,The top goalscorer in the UEFA Champions Leagu...,[Title: List of UEFA Champions League top scor...
2,articles similar to the article on 'Philosophy'?,Here are some articles similar to the article ...,"[Title: Philosophy, URL: https://en.wikipedia...."
3,the school of athens,"""The School of Athens"" is a fresco by the Ital...","[Title: The School of Athens, Content: The Sch..."
4,css flex,CSS Flexbox (Flexible Box Layout) is a layout ...,[]


In [49]:
from phoenix.session.evaluation import get_qa_with_reference
import phoenix as px

client = span_phoenix_processor.client
# dataset containing span data for evaluation with Ragas
spans_dataframe = get_qa_with_reference(client, project_name="llama-index")
print(spans_dataframe.head(2))

AttributeError: 'SimpleSpanProcessor' object has no attribute 'client'

In [45]:
from ragas import evaluate
from ragas.metrics import (
    answer_relevancy,
    faithfulness,
)

evaluation_result = evaluate(
    dataset=ragas_eval_dataset,
    metrics=[faithfulness, answer_relevancy],
)
eval_scores_df = pd.DataFrame(evaluation_result.scores)

Evaluating:   0%|          | 0/10 [00:00<?, ?it/s]

No statements were generated from the answer.


In [46]:
print(eval_scores_df)

   faithfulness  answer_relevancy
0           1.0          0.886055
1           0.0          0.936000
2           NaN          0.979653
3           1.0          0.861893
4           1.0          0.847856


In [None]:
from phoenix.trace.dsl.helpers import SpanQuery

client = px.Client()