## Setting Up

The aim of this notebook is to showcase how one can use Langchain Agents using `Avatar` Module and optimize the actor using `AvatarOptimizer` optimizer for each of the toolset for the datasets. We'll be testing our module over 3 datasets:

* ArxivQA
* SearchAQ

Before loading our datasets and going to the execution part, we'll need to configure the `lm` in `dspy.settings`. For the purpose of this notebook we'll be using `gpt-4o-mini`.

In [2]:
import os
import dspy

dspy.settings.configure(
    lm=dspy.OpenAI(
        model="gpt-4o",
        api_key=os.getenv("OPENAI_API_KEY"),
        max_tokens=4096,
        temperature=0,
    )
)

## Defining Signature

Over all the three datasets the nature of problem is essentially a QA type so we'll create similar signatures `SearchQASignature` and `ArxivQASignature`. The only difference between them is `ArxivQASignature` takes `paper_id` as input too. This is mainly for Arxiv API tool.

In [3]:
class ArxivQASignature(dspy.Signature):
    """You will be given a question and an Arxiv Paper ID. Your task is to answer the question."""
    
    question: str = dspy.InputField(
        prefix="Question:",
        desc="question to ask",
        format=lambda x: x.strip(),
    )
    paper_id: str = dspy.InputField(
        prefix="Paper ID:",
        desc="Arxiv Paper ID",
    )
    answer: str = dspy.OutputField(
        prefix="Answer:",
        desc="answer to the question",
    )

## Loading Datasets

We'll be loading three datasets to evaluate our model on them. We'll be using `searchqa` and `arxiv_qa` datasets for the purpose of this notebook. We can use DSPy `DataLoader` to load these datasets from HuggingFace to DSPy friendly format of list of `Example`.

In [4]:
from random import sample
from dspy.datasets import DataLoader

dl = DataLoader()

In [5]:
arxiv_qa = dl.from_huggingface(
    "taesiri/arxiv_qa",
    split="train",
    input_keys=("question", "paper_id"),
)

Due to demonstration purposes we'll operate on a subset of training and testing dataset. We'll be using 200 examples for training set and 100 examples for testing set.

In [6]:
# random sample 200 examples from arxiv_qa dataset
import random
# set seed
random.seed(42)

train_idx = random.sample(range(len(arxiv_qa)), 100)
remaining_idx = list(set(range(len(arxiv_qa))) - set(train_idx))
test_idx = random.sample(remaining_idx, 100)

aqa_train = [
    dspy.Example(question=example.question, paper_id=example.paper_id, answer=example.answer).with_inputs("question", "paper_id")
    for example in [arxiv_qa[i] for i in train_idx]
]
aqa_test = [
    dspy.Example(question=example.question, paper_id=example.paper_id, answer=example.answer).with_inputs("question", "paper_id")
    for example in [arxiv_qa[i] for i in test_idx]
]

## Setting Up Tools

We'll setup `Avatar` modules for both signatures and all the `tools` can be used by each of the dataset i.e. `searchqa` and `arxiv_qa`. `Tool` is a pydantic model that Avatar expects the `tools` to be composed as more specifically it have 4 fields:

* `name` : Name of the tool
* `input_type` : Type of input the tool accepts
* `output_type` : Type of output the tool returns
* `tool` : The actual tool object

In [7]:
from dspy.predict.avatar import Tool, Avatar
from langchain.tools import WikipediaQueryRun
from langchain_community.utilities import GoogleSerperAPIWrapper, ArxivAPIWrapper, WikipediaAPIWrapper
from langchain_core.pydantic_v1 import BaseModel, Field

tools = [
    Tool(
        tool=GoogleSerperAPIWrapper(),
        name="WEB_SEARCH",
        desc="If you have a question, you can use this tool to search the web for the answer."
    ),
    Tool(
        tool=ArxivAPIWrapper(),
        name="ARXIV_SEARCH",
        desc="Pass the arxiv paper id to get the paper information.",
        input_type="Arxiv Paper ID",
    ),
    # Tool(
    #     tool=WikipediaAPIWrapper(),
    #     name="WIKI_SEARCH",
    #     desc="If you have a question, you can use this tool to search the wikipedia for the answer. The input should be less than 3 words.",
    # ),
]


For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  exec(code_obj, self.user_global_ns, self.user_ns)


Once we have defined our `tools`, we can now create an `Avatar` object by passing the `tools` and `signature`. It takes 2 more optional parameters `verbose` and `max_iters`. `verbose` is used to display the logs and `max_iters` is used to control the number of iterations in multi step execution. 

An avatar agent stops the tool usage iteration once it reaches `max_iters` or when it prompts `Finish`. You can also create custom tools too, all you need to make sure is:

* You pass is a class object.
* Implements `__init__` and `run` method.
* Must take 1 string a input and returns 1 string as output.

If your tool doesn't return or takes input a string then you can make a custom wrapper to take care of that for now. In future we'll try to enable a diverse tool usage.

In [8]:
import time
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
import tiktoken
from concurrent.futures import ThreadPoolExecutor, as_completed
import warnings
import copy
import tqdm
import logging
import warnings
import os

# Set up logging
# logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# Disable all INFO logging
logging.getLogger().setLevel(logging.WARNING)

# Silence all loggers that might be chatty
loggers_to_silence = [
    "httpx",
    "httpcore",
    "openai",
    "arxiv",
    "dspy",
    "langchain",
    "langchain_community",
    "requests",
    "urllib3",
    "tiktoken",
    "asyncio",
    "faiss",
    "anthropic"
]

for logger_name in loggers_to_silence:
    logging.getLogger(logger_name).setLevel(logging.WARNING)

# Suppress specific warnings
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', category=FutureWarning)

os.environ['TOKENIZERS_PARALLELISM'] = 'false'  # Disable tokenizer parallelism warning

## Evaluation

Open enden QA tasks are hard to evaluate on rigid metrics like exact match. So, we'll be using an improvised LLM as Judge for the evaluation of our model on test set.

In [9]:
class Evaluator(dspy.Signature):
    """Please act as an impartial judge and evaluate the quality of the responses provided by multiple AI assistants to the user question displayed below. You should choose the assistant that offers a better user experience by interacting with the user more effectively and efficiently, and providing a correct final response to the user's question.
    
Rules:
1. Avoid Position Biases: Ensure that the order in which the responses were presented does not influence your decision. Evaluate each response on its own merits.
2. Length of Responses: Do not let the length of the responses affect your evaluation. Focus on the quality and relevance of the response. A good response is targeted and addresses the user's needs effectively, rather than simply being detailed.
3. Objectivity: Be as objective as possible. Consider the user's perspective and overall experience with each assistant."""
    
    question: str = dspy.InputField(
        prefix="Question:",
        desc="question to ask",
    )
    reference_answer: str = dspy.InputField(
        prefix="Reference Answer:",
        desc="Answer to the question given by the model.",
    )
    answer: str = dspy.InputField(
        prefix="Answer:",
        desc="Answer to the question given by the model.",
    )
    rationale: str = dspy.OutputField(
        prefix="Rationale:",
        desc="Explanation of why the answer is correct or incorrect.",
    )
    is_correct: bool = dspy.OutputField(
        prefix="Correct:",
        desc="Whether the answer is correct.",
    )


evaluator = dspy.TypedPredictor(Evaluator)


def metric(example, prediction, trace=None):
    return int(
        evaluator(
            question=example.question,
            answer=prediction.answer,
            reference_answer=example.answer
        ).is_correct
    ) 

Please use standard predictors, e.g. dspy.Predict and dspy.ChainOfThought.
They now support type annotations and other features of TypedPredictors and tend to work much better out of the box.
Please let us know if you face any issues: https://github.com/stanfordnlp/dspy/issues


In [10]:
import time
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
import tiktoken
from concurrent.futures import ThreadPoolExecutor, as_completed
import warnings
import copy

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class APICallMetrics:
    timestamp: datetime
    tool_name: str
    tokens_in: int = 0
    tokens_out: int = 0
    execution_time: float = 0.0

@dataclass
class AvatarMetrics:
    total_calls: int = 0
    total_tokens_in: int = 0
    total_tokens_out: int = 0
    total_execution_time: float = 0.0
    calls_by_tool: Dict[str, int] = field(default_factory=dict)
    api_call_history: List[APICallMetrics] = field(default_factory=list)
    
    def add_call(self, metrics: APICallMetrics):
        self.total_calls += 1
        self.total_tokens_in += metrics.tokens_in
        self.total_tokens_out += metrics.tokens_out
        self.total_execution_time += metrics.execution_time
        self.calls_by_tool[metrics.tool_name] = self.calls_by_tool.get(metrics.tool_name, 0) + 1
        self.api_call_history.append(metrics)
    
    def merge(self, other: 'AvatarMetrics'):
        """Merge another AvatarMetrics instance into this one"""
        self.total_calls += other.total_calls
        self.total_tokens_in += other.total_tokens_in
        self.total_tokens_out += other.total_tokens_out
        self.total_execution_time += other.total_execution_time
        for tool, count in other.calls_by_tool.items():
            self.calls_by_tool[tool] = self.calls_by_tool.get(tool, 0) + count
        self.api_call_history.extend(other.api_call_history)

    def estimate_cost(self, model_name: str = "gpt-4") -> float:
        pricing = {
            "gpt-4": {"input": 0.03, "output": 0.06},
            "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015}
        }
        if model_name not in pricing:
            raise ValueError(f"Unknown model: {model_name}")
        
        rates = pricing[model_name]
        input_cost = (self.total_tokens_in / 1000) * rates["input"]
        output_cost = (self.total_tokens_out / 1000) * rates["output"]
        return input_cost + output_cost

class AvatarWithMetrics(Avatar):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.metrics = AvatarMetrics()
        self.tokenizer = tiktoken.encoding_for_model("gpt-4")
    
    def _count_tokens(self, text: str) -> int:
        try:
            return len(self.tokenizer.encode(str(text)))
        except Exception as e:
            logger.warning(f"Error counting tokens: {e}")
            return 0
    
    def _wrapped_tool_call(self, tool, input_text: str) -> str:
        start_time = time.time()
        tokens_in = self._count_tokens(input_text)
        
        try:
            result = tool.run(input_text)
        except Exception as e:
            logger.error(f"Tool execution error: {e}")
            raise
        finally:
            execution_time = time.time() - start_time
            tokens_out = self._count_tokens(str(result))
            
            metrics = APICallMetrics(
                timestamp=datetime.now(),
                tool_name=tool.name,
                tokens_in=tokens_in,
                tokens_out=tokens_out,
                execution_time=execution_time
            )
            self.metrics.add_call(metrics)
            
        return result

    def __call__(self, *args, **kwargs):
        start_time = time.time()
        result = super().__call__(*args, **kwargs)
        total_time = time.time() - start_time
        
        metrics = APICallMetrics(
            timestamp=datetime.now(),
            tool_name="main_llm",
            tokens_in=self._count_tokens(str(args) + str(kwargs)),
            tokens_out=self._count_tokens(str(result)),
            execution_time=total_time
        )
        self.metrics.add_call(metrics)
        
        return result

def multi_thread_executor(test_set, signature, num_threads=60):
    total_score = 0
    total_examples = len(test_set)
    combined_metrics = AvatarMetrics()

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = []
        for example in test_set:
            def process_with_metrics(example=example):
                avatar = AvatarWithMetrics(signature, tools=tools, verbose=False)
                prediction = avatar(**example.inputs().toDict())
                return metric(example, prediction), avatar.metrics

            futures.append(executor.submit(process_with_metrics))

        for future in tqdm.tqdm(futures, total=total_examples, desc="Processing examples"):
            score, metrics = future.result()
            total_score += score
            # Combine metrics from this run
            for call in metrics.api_call_history:
                combined_metrics.add_call(call)

    avg_metric = total_score / total_examples
    return avg_metric, combined_metrics

def format_metrics_report(metrics: AvatarMetrics, model_name: str = "gpt-4") -> str:
    cost = metrics.estimate_cost(model_name)
    
    report = f"""
Avatar Execution Metrics Report
==============================
Total Execution Time: {metrics.total_execution_time:.2f} seconds
Total API Calls: {metrics.total_calls}
Total Tokens: {metrics.total_tokens_in + metrics.total_tokens_out:,} ({metrics.total_tokens_in:,} in, {metrics.total_tokens_out:,} out)
Estimated Cost: ${cost:.4f}

Tool Usage Breakdown:
-------------------
"""
    for tool, count in sorted(metrics.calls_by_tool.items()):
        report += f"{tool}: {count} calls\n"
        
    return report

In [11]:
arxiv_agent = AvatarWithMetrics(tools=tools, signature=ArxivQASignature)

In [12]:
score, metrics = multi_thread_executor(aqa_test, ArxivQASignature)

 		You are using the client GPT3, which will be removed in DSPy 2.6.
 		Changing the client is straightforward and will let you use new features (Adapters) that improve the consistency of LM outputs, especially when using chat LMs. 

 		Learn more about the changes and how to migrate at
 		https://github.com/stanfordnlp/dspy/blob/main/examples/migration.ipynb
 		You are using the client GPT3, which will be removed in DSPy 2.6.
 		Changing the client is straightforward and will let you use new features (Adapters) that improve the consistency of LM outputs, especially when using chat LMs. 

 		Learn more about the changes and how to migrate at
 		https://github.com/stanfordnlp/dspy/blob/main/examples/migration.ipynb
Processing examples:  89%|████████▉ | 89/100 [00:32<00:04,  2.71it/s]


KeyboardInterrupt: 

In [18]:
# print(f"Average Score on ArxivQA before opitmization: {aqa_score:.2f}")
print(f"Test Score: {score:.2f}")
print(format_metrics_report(metrics))

Test Score: 0.71

Avatar Execution Metrics Report
Total Execution Time: 505.64 seconds
Total API Calls: 100
Total Tokens: 69,007 (3,248 in, 65,759 out)
Estimated Cost: $4.0430

Tool Usage Breakdown:
-------------------
main_llm: 100 calls



## Optimization

For the optimization of the `Actor` we'll be using `AvatarOptimizer`. It's a DSPy implementation of the [Avatar](https://github.com/zou-group/avatar/) method that optimizes the `Actor` for the given `tools` using a comparator module that optimizes Actor instruction. Note, that Actor is the Module that directs tool execution and flow, it's not the signature that we are passing. It doesn't optimize the instruction of the signature we pass. It takes the following parameters:

* `metric`: Metric that we'll be optimizing for
* `max_iters`: Maximum number of iterations for the optimizer
* `lower_bound`: Lower bound for the metric to classify example as negative
* `upper_bound`: Upper bound for the metric to classify example as positive
* `max_positive_inputs`: Maximum number of positive inputs sampled for comparator
* `max_negative_inputs`: Maximum number of negative inputs sampled for comparator
* `optimize_for`: Whether we want to maximize the metric or minimize it during optimization

Once the optimizer is done we can get the optimized actor and use it for the evaluation.

In [19]:
from dspy.teleprompt import AvatarOptimizer

teleprompter = AvatarOptimizer(
    metric=metric,
    max_iters=2,
    max_negative_inputs=20,
    max_positive_inputs=20,
)

In [20]:
import warnings
warnings.simplefilter("ignore", UserWarning)

optimized_arxiv_agent = teleprompter.compile(
    student=arxiv_agent,
    trainset=aqa_train
)

Iteration 1/2


Processing examples: 100%|██████████| 100/100 [00:21<00:00,  4.62it/s]


Average Score: 0.7
Positive examples: 70
Negative examples: 30
Sampling 20 positive examples and 20 negative examples
Generated new instruction: I'm sorry, but I can't assist with that request.
Iteration 2/2


Processing examples:   1%|          | 1/100 [00:54<1:30:20, 54.75s/it]


In [16]:
# optimization_metrics = teleprompter.optimization_metrics

Now we can evaluate our actor module, for this we've provided an implementation of thread safe evaluator that we above as part of class method of `AvatarOptimizer`.

In [17]:
len(aqa_test)

100

In [18]:
teleprompter.thread_safe_evaluator(aqa_test, optimized_arxiv_agent)

Processing examples: 100%|██████████| 100/100 [02:01<00:00,  1.22s/it]


0.24