### Notebook to investigate Plan-and-execute agentic pattern

#### Plan-And-Execute

Based loosely on Wang, et. al.’s paper on Plan-and-Solve Prompting, and Yohei Nakajima’s BabyAGI project, this simple architecture is emblematic of the planning agent architecture. It consists of two basic components:

1) A planner, which prompts an LLM to generate a multi-step plan to complete a large task.
2) Executor(s), which accept the user query and a step in the plan and invoke 1 or more tools to complete that task.

Once execution is completed, the agent is called again with a re-planning prompt, letting it decide whether to finish with a response or whether to generate a follow-up plan (if the first plan didn’t have the desired effect).

This agent design lets us avoid having to call the large planner LLM for each tool invocation. It still is restricted by serial tool calling and uses an LLM for each task since it doesn't support variable assignment.

There are 2 patterns that extends the plan-and-execute pattern:
- Reasoning Without Observation (ReWOO) (Can execute multiple tasks without replaning)
- LLM Compiler (Improve speed of task execution)

source: https://blog.langchain.dev/planning-agents/

Cons:

- Single task agent
- Serial tool calling 
- Use LLM for each task 

ReWOO

1) Planner generates a plan list consisting of interleaving "Plan" (reasoning) and "E#" lines.

Query:  "What are the stats for the quarterbacks of the super bowl contenders this year"

Plan: I need to know the teams playing in the superbowl this year
E1: Search[Who is competing in the superbowl?]
Plan: I need to know the quarterbacks for each team
E2: LLM[Quarterback for the first team of #E1]

The planner can reference previous outputs using syntax like #E2 . This means it can execute a task list without having to re-plan every time.

2) The worker node loops through each task and assigns the task output to the corresponding variable. It also replaces variables with their results when calling subsequent calls.

3) Finally, the Solver integrates all these outputs into a final answer.


Pros:

- This means it can execute a task list without having to re-plan every time.


##### LLM Compiler

The LLMCompiler has the following main components:

1) Planner: streams a DAG of tasks. Each task contains a tool, arguments, and list of dependencies.

2) Task Fetching Unit schedules and executes the tasks. This accepts a stream of tasks. This unit schedules tasks once their dependencies are met. Since many tools involve other calls to search engines or LLMs, the extra parallelism can grant a significant speed boost (the paper claims 3.6x).

3) Joiner: dynamically replan or finish based on the entire graph history (including task execution results) is an LLM step that decides whether to respond with the final answer or whether to pass the progress back to the (re-)planning agent to continue work.

The key runtime-boosting ideas here are:

- Planner outputs are streamed; the output parser eagerly yields task parameters and their dependencies.

- The task fetching unit receives the parsed task stream and schedules tasks once all their dependencies are satisfied.

- Task arguments can be variables, which are the outputs of previous tasks in the DAG. For instance, the model can call search("${1}") to search for queries generated by the output of task 1. This lets the agent work even faster than the "embarrassingly parallel" tool calling in OpenAI.

In [None]:
%load_ext dotenv
%dotenv /redbox/.env.test

In [None]:
from redbox.graph.nodes.tools import build_search_wikipedia_tool, build_search_documents_tool
from redbox.models.settings import Settings
from redbox.models.chain import RedboxQuery, RedboxState, AISettings, ChatLLMBackend
from redbox.models.file import ChunkResolution
from langchain_core.messages import AIMessage
from redbox.chains.components import get_chat_llm
from langfuse.callback import CallbackHandler

In [None]:
env = Settings()
ai_claude_setting = AISettings(chat_backend=ChatLLMBackend(name="anthropic.claude-3-sonnet-20240229-v1:0", provider="bedrock"))

### Step 1: Define tools

In [None]:
import numpy as np
import requests
from langchain_core.documents import Document
from langchain_core.tools import Tool, tool
from sklearn.metrics.pairwise import cosine_similarity

from redbox.api.format import format_documents
from redbox.chains.components import get_embeddings
from redbox.models.chain import RedboxState
from redbox.models.file import ChunkCreatorType, ChunkMetadata, ChunkResolution
from redbox.models.settings import get_settings
from redbox.transform import bedrock_tokeniser

In [None]:
def build_govuk_search_tool(filter=True) -> Tool:
    """Constructs a tool that searches gov.uk and sets state["documents"]."""

    tokeniser = bedrock_tokeniser

    def recalculate_similarity(response, query, num_results):
        embedding_model = get_embeddings(get_settings())
        em_query = embedding_model.embed_query(query)
        for r in response.get("results"):
            description = r.get("description")
            em_des = embedding_model.embed_query(description)
            r["similarity"] = cosine_similarity(np.array(em_query).reshape(1, -1), np.array(em_des).reshape(1, -1))[0][
                0
            ]
        response["results"] = sorted(response.get("results"), key=lambda x: x["similarity"], reverse=True)[:num_results]
        return response

    @tool(response_format="content_and_artifact")
    def _search_govuk(query: str) -> tuple[str, list[Document]]:
        """
        Search for documents on gov.uk based on a query string.
        This endpoint is used to search for documents on gov.uk. There are many types of documents on gov.uk.
        Types include:
        - guidance
        - policy
        - legislation
        - news
        - travel advice
        - departmental reports
        - statistics
        - consultations
        - appeals
        """
        tool_govuk_retrieved_results = 10
        tool_govuk_returned_results = 1
        url_base = "https://www.gov.uk"
        required_fields = [
            "format",
            "title",
            "description",
            "indexable_content",
            "link",
        ]
        # ai_settings = state.request.ai_settings
        response = requests.get(
            f"{url_base}/api/search.json",
            params={
                "q": query,
                "count": (
                    tool_govuk_retrieved_results if filter else tool_govuk_returned_results
                    # ai_settings.tool_govuk_retrieved_results if filter else ai_settings.tool_govuk_returned_results
                ),
                "fields": required_fields,
            },
            headers={"Accept": "application/json"},
        )
        response.raise_for_status()
        response = response.json()

        if filter:
            # response = recalculate_similarity(response, query, ai_settings.tool_govuk_returned_results)
            response = recalculate_similarity(response, query, tool_govuk_returned_results)

        mapped_documents = []
        for i, doc in enumerate(response["results"]):
            if any(field not in doc for field in required_fields):
                continue

            mapped_documents.append(
                Document(
                    page_content=doc["indexable_content"],
                    metadata=ChunkMetadata(
                        index=i,
                        uri=f"{url_base}{doc['link']}",
                        token_count=tokeniser(doc["indexable_content"]),
                        creator_type=ChunkCreatorType.gov_uk,
                    ).model_dump(),
                )
            )

        return format_documents(mapped_documents), mapped_documents

    return _search_govuk

In [None]:
# Tools
# Grabbing tools from redbox

search_documents = build_search_documents_tool(
    es_client=env.elasticsearch_client(),
    index_name=env.elastic_chunk_alias,
    embedding_model=env.embedding_backend,
    embedding_field_name=env.embedding_document_field_name,
    chunk_resolution=ChunkResolution.normal,
)
search_wikipedia = build_search_wikipedia_tool()
search_govuk = build_govuk_search_tool()


# tools = [search_documents, search_wikipedia, search_govuk]

tools = [search_wikipedia, search_govuk]


### Step 2: Planner

The planner accepts the input question and generates a task list to execute.

In [None]:
def get_tool_description(tools):
    return "\n".join(
            f"{i+1}. Name: {tool.name}. Description: {tool.description}.\n"
            for i, tool in enumerate(
                tools
            )  # +1 to offset the 0 starting index, we want it count normally from 1.
        )

In [None]:
REPLAN_PROMPT = (' - You are given "Previous Plan" which is the plan that the previous agent created along with the execution results '
"(given as Observation) of each plan and a general thought (given as Thought) about the executed results."
'You MUST use these information to create the next plan under "Current Plan".\n'
' - When starting the Current Plan, you should start with "Thought" that outlines the strategy for the next plan.\n'
" - In the Current Plan, you should NEVER repeat the actions that are already executed in the Previous Plan.\n"
" - You must continue the task index from the end of the previous one. Do not repeat task indices.")

PLAN_PROMPT = (
    "Given a user query, create a plan to solve it with the utmost parallelizability. Each plan should comprise an" " action from the following {num_tools} types:\n"
    "{tool_descriptions}\n"
    "{num_tools}. join(): Collects and combines results from prior actions.\n\n"

    "- An LLM agent is called upon invoking join() to either finalize the user query or wait until the plans are "
    "executed."
    "- join should always be the last action in the plan, and will be called in two scenarios:\n"
        "(a) if the answer can be determined by gathering the outputs from tasks to generate the final response.\n"
    "(b) if the answer cannot be determined in the planning phase before you execute the plans. Guidelines:\n"
    "- Each action described above contains input/output types and description.\n"
    " - You must strictly adhere to the input and output types for each action.\n"
    "- The action descriptions contain the guidelines. You MUST strictly follow those guidelines when you use the actions.\n"
    "- Each action in the plan should strictly be one of the above types. Follow the Python conventions for each action.\n"
    "- Each action MUST have a unique ID, which is strictly increasing.\n"
    "- Inputs for actions can either be constants or outputs from preceding actions. In the latter case, use the " "format $id to denote the ID of the previous action whose output will be the input.\n"
    "- Always call join as the last action in the plan. Say '<END_OF_PLAN>' after you call join\n"
    "- Ensure the plan maximizes parallelizability.\n"
    "- Only use the provided action types. If a query cannot be addressed using these, invoke the join action for the next steps.\n"
    "- Never introduce new actions other than the ones provided.\n\n"
    
    "{messages}\n\n"
    
    "Remember, ONLY respond with the task list in the correct format! E.g.:\n"
    "idx. tool(arg_name=args)"
)

In [None]:
from typing import Sequence
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import (
    BaseMessage,
    FunctionMessage,
    HumanMessage,
    SystemMessage,
)
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableBranch
from langchain_core.tools import BaseTool
from output_parser import LLMCompilerPlanParser, Task

In [None]:
def create_planner(llm: BaseChatModel, tools: Sequence[BaseTool], base_prompt: ChatPromptTemplate
):
    tool_descriptions = get_tool_description(tools)

    planner_prompt = base_prompt.partial(
        replan="",
        num_tools=len(tools)
        + 1,  # Add one because we're adding the join() tool at the end.
        tool_descriptions=tool_descriptions,
    )

    replanner_prompt = base_prompt.partial(
        replan=REPLAN_PROMPT,
        num_tools=len(tools) + 1,
        tool_descriptions=tool_descriptions,
    )

    def should_replan(state: list):
        # Context is passed as a system message
        return isinstance(state[-1], SystemMessage)

    def wrap_messages(state: list):
        return {"messages": state}

    def wrap_and_get_last_index(state: list):
        next_task = 0
        for message in state[::-1]:
            if isinstance(message, FunctionMessage):
                next_task = message.additional_kwargs["idx"] + 1
                break
        state[-1].content = state[-1].content + f" - Begin counting at : {next_task}"
        return {"messages": state}

    return (
        RunnableBranch(
            (should_replan, wrap_and_get_last_index | replanner_prompt),
            wrap_messages | planner_prompt,
        )
        | llm
        | LLMCompilerPlanParser(tools=tools)
    )


#### 3) Task Fetching Unit

This component schedules the tasks. It receives a stream of tools of the following format:

{
    tool: BaseTool,
    dependencies: number[],
}
The basic idea is to begin executing tools as soon as their dependencies are met. This is done through multi-threading. We will combine the task fetching unit and executor below:

In [None]:
import re
import time
from concurrent.futures import ThreadPoolExecutor, wait
from typing import Any, Dict, Iterable, List, Union

from langchain_core.runnables import (
    chain as as_runnable,
)
from typing_extensions import TypedDict


def _get_observations(messages: List[BaseMessage]) -> Dict[int, Any]:
    # Get all previous tool responses
    results = {}
    for message in messages[::-1]:
        if isinstance(message, FunctionMessage):
            results[int(message.additional_kwargs["idx"])] = message.content
    return results


class SchedulerInput(TypedDict):
    messages: List[BaseMessage]
    tasks: Iterable[Task]


def _execute_task(task, observations, config):
    tool_to_use = task["tool"]
    if isinstance(tool_to_use, str):
        return tool_to_use
    args = task["args"]
    try:
        if isinstance(args, str):
            resolved_args = _resolve_arg(args, observations)
        elif isinstance(args, dict):
            resolved_args = {
                key: _resolve_arg(val, observations) for key, val in args.items()
            }
        else:
            # This will likely fail
            resolved_args = args
    except Exception as e:
        return (
            f"ERROR(Failed to call {tool_to_use.name} with args {args}.)"
            f" Args could not be resolved. Error: {repr(e)}"
        )
    try:
        return tool_to_use.invoke(resolved_args, config)
    except Exception as e:
        return (
            f"ERROR(Failed to call {tool_to_use.name} with args {args}."
            + f" Args resolved to {resolved_args}. Error: {repr(e)})"
        )


def _resolve_arg(arg: Union[str, Any], observations: Dict[int, Any]):
    # $1 or ${1} -> 1
    ID_PATTERN = r"\$\{?(\d+)\}?"

    def replace_match(match):
        # If the string is ${123}, match.group(0) is ${123}, and match.group(1) is 123.

        # Return the match group, in this case the index, from the string. This is the index
        # number we get back.
        idx = int(match.group(1))
        return str(observations.get(idx, match.group(0)))

    # For dependencies on other tasks
    if isinstance(arg, str):
        return re.sub(ID_PATTERN, replace_match, arg)
    elif isinstance(arg, list):
        return [_resolve_arg(a, observations) for a in arg]
    else:
        return str(arg)


@as_runnable
def schedule_task(task_inputs, config):
    task: Task = task_inputs["task"]
    observations: Dict[int, Any] = task_inputs["observations"]
    try:
        observation = _execute_task(task, observations, config)
    except Exception:
        import traceback

        observation = traceback.format_exception()  # repr(e) +
    observations[task["idx"]] = observation


def schedule_pending_task(
    task: Task, observations: Dict[int, Any], retry_after: float = 0.2
):
    while True:
        deps = task["dependencies"]
        if deps and (any([dep not in observations for dep in deps])):
            # Dependencies not yet satisfied
            time.sleep(retry_after)
            continue
        schedule_task.invoke({"task": task, "observations": observations})
        break


@as_runnable
def schedule_tasks(scheduler_input: SchedulerInput) -> List[FunctionMessage]:
    """Group the tasks into a DAG schedule."""
    # For streaming, we are making a few simplifying assumption:
    # 1. The LLM does not create cyclic dependencies
    # 2. That the LLM will not generate tasks with future deps
    # If this ceases to be a good assumption, you can either
    # adjust to do a proper topological sort (not-stream)
    # or use a more complicated data structure
    tasks = scheduler_input["tasks"]
    args_for_tasks = {}
    messages = scheduler_input["messages"]
    # If we are re-planning, we may have calls that depend on previous
    # plans. Start with those.
    observations = _get_observations(messages)
    task_names = {}
    originals = set(observations)
    # ^^ We assume each task inserts a different key above to
    # avoid race conditions...
    futures = []
    retry_after = 0.25  # Retry every quarter second
    with ThreadPoolExecutor() as executor:
        for task in tasks:
            deps = task["dependencies"]
            task_names[task["idx"]] = (
                task["tool"] if isinstance(task["tool"], str) else task["tool"].name
            )
            args_for_tasks[task["idx"]] = task["args"]
            if (
                # Depends on other tasks
                deps and (any([dep not in observations for dep in deps]))
            ):
                futures.append(
                    executor.submit(
                        schedule_pending_task, task, observations, retry_after
                    )
                )
            else:
                # No deps or all deps satisfied
                # can schedule now
                schedule_task.invoke(dict(task=task, observations=observations))
                # futures.append(executor.submit(schedule_task.invoke, dict(task=task, observations=observations)))

        # All tasks have been submitted or enqueued
        # Wait for them to complete
        wait(futures)
    # Convert observations to new tool messages to add to the state
    new_observations = {
        k: (task_names[k], args_for_tasks[k], observations[k])
        for k in sorted(observations.keys() - originals)
    }
    tool_messages = [
        FunctionMessage(
            name=name,
            content=str(obs),
            additional_kwargs={"idx": k, "args": task_args},
            tool_call_id=k,
        )
        for k, (name, task_args, obs) in new_observations.items()
    ]
    return tool_messages

In [None]:
import itertools


@as_runnable
def plan_and_schedule(state):
    messages = state["messages"]
    # This is the primary "agent" in our application
    planner_prompt = ChatPromptTemplate([(PLAN_PROMPT)])
    planner = create_planner(get_chat_llm(ai_claude_setting.chat_backend), tools, planner_prompt)
    tasks = planner.stream(messages)
    # Begin executing the planner immediately
    try:
        tasks = itertools.chain([next(tasks)], tasks)
    except StopIteration:
        # Handle the case where tasks is empty.
        tasks = iter([])
    scheduled_tasks = schedule_tasks.invoke(
        {
            "messages": messages,
            "tasks": tasks,
        }
    )

    if isinstance(state.get('counter', []), int):
        print('There is counter')
        counter = state.get('counter') + 1
    else:
        print('There is no counter')
        counter = 0
    
    return {"messages": scheduled_tasks, 'counter':counter}

#### 4) Joiner

So now we have the planning and initial execution done. We need a component to process these outputs and either:

Respond with the correct answer.
Loop with a new plan.
The paper refers to this as the "joiner". It's another LLM call. We are using function calling to improve parsing reliability.

In [None]:
JOINER_PROMPT = (
    "Solve a question answering task. Here are some guidelines:\n"
 "- In the Assistant Scratchpad, you will be given results of a plan you have executed to answer the user's question.\n"
 "- Thought needs to reason about the question based on the Observations in 1-2 sentences.\n"
 "- Ignore irrelevant action results.\n"
 "- If the required information is present, give a concise but complete and helpful answer to the user's question.\n"
 "- If you are unable to give a satisfactory finishing answer, replan to get the required information. Respond in the " "following format:\n"

"Thought: <reason about the task results and whether you have sufficient information to answer the question>\n"
"Action: <action to take>\n"
"Available actions:\n"
 "(1) Finish(the final answer to return to the user): returns the answer and finishes the task.\n"
 "(2) Replan(the reasoning and other information that will help you plan again. Can be a line of any length): instructs why we must replan\n\n"
"{messages}\n"
"Using the above previous actions, decide whether to replan or finish. If all the required information is present. You may finish. If you have made many attempts to find the information without success, admit so and respond with whatever information you have gathered so the user can work well with you."
"If the data is incomplete or insufficient for a thorough response, your secondary role is to guide the user on how they can provide additional input or context to improve the outcome."
)

In [None]:

from pydantic import BaseModel, Field
from redbox.chains.parser import ClaudeParser


class FinalResponse(BaseModel):
    """The final response/answer."""

    response: str


class Replan(BaseModel):
    feedback: str = Field(
        description="Analysis of the previous attempts and recommendations on what needs to be fixed."
    )


class JoinOutputs(BaseModel):
    """Decide whether to replan or whether you can return the final response."""

    thought: str = Field(
        description="The chain of thought reasoning for the selected action"
    )
    action: Union[FinalResponse, Replan]

parser = ClaudeParser(pydantic_object=JoinOutputs)
joiner_prompt = ChatPromptTemplate(messages=[(JOINER_PROMPT + "\n\n{format_instructions}\n")],
                                   partial_variables = {"format_instructions": parser.get_format_instructions()})

runnable = joiner_prompt | get_chat_llm(ai_claude_setting.chat_backend) | parser

In [None]:
def _parse_joiner_output(decision: JoinOutputs) -> List[BaseMessage]:
    response = [AIMessage(content=f"Thought: {decision.thought}")]
    if isinstance(decision.action, Replan):
        return {
            "messages": response
            + [
                SystemMessage(
                    content=f"Context from last attempt: {decision.action.feedback}"
                )
            ]
        }
    else:
        return {"messages": response + [AIMessage(content=decision.action.response)]}


def select_recent_messages(state) -> dict:
    messages = state["messages"]
    selected = []
    for msg in messages[::-1]:
        selected.append(msg)
        if isinstance(msg, HumanMessage):
            break
    return {"messages": selected[::-1], 'counter': state['counter']+1}


joiner = select_recent_messages | runnable | _parse_joiner_output

### 5) Create graph


We'll define the agent as a stateful graph, with the main nodes being:

Plan and execute (the DAG from the first step above)
Join: determine if we should finish or replan
Recontextualize: update the graph state based on the output from the joiner

In [None]:
from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import add_messages
from typing import Annotated



class State(TypedDict):
    messages: Annotated[list, add_messages]
    counter: int

graph_builder = StateGraph(State)

# 1.  Define vertices
# We defined plan_and_schedule above already
# Assign each node to a state variable to update
graph_builder.add_node("plan_and_schedule", plan_and_schedule)
graph_builder.add_node("join", joiner)


## Define edges
graph_builder.add_edge("plan_and_schedule", "join")

### This condition determines looping logic


def should_continue(state):
    messages = state["messages"]
    if state['counter'] > 3:
        return END
    if isinstance(messages[-1], AIMessage):
        return END
    return "plan_and_schedule"


graph_builder.add_conditional_edges(
    "join",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
)
graph_builder.add_edge(START, "plan_and_schedule")
chain = graph_builder.compile()

### Quick experiments

A script to test performance of current Redbox and Plan-and-Execute

In [None]:
from redbox.app import Redbox

def get_state(user_uuid, prompts, documents, ai_setting):
    q = RedboxQuery(
        question=f"{prompts[-1]}",
        s3_keys=documents,
        user_uuid=user_uuid,
        chat_history=prompts[:-1],
        ai_settings=ai_setting,
        permitted_s3_keys=documents,
    )

    return RedboxState(
        request=q,
    )

def run_app(app, state) -> RedboxState:
    langfuse_handler = CallbackHandler()
    return app.graph.invoke(state, config={"callbacks": [langfuse_handler]})

app = Redbox(env=env)

In [None]:
from uuid import uuid4
def run_gadget(question):
    x = get_state(uuid4(), prompts = [f'@gadget {question}'], documents = [], ai_setting = ai_claude_setting)
    res = run_app(app, x)
    return res['messages'][-1].content


def run_plan_and_execute(question):
    res = chain.invoke({"messages": [HumanMessage(content=f"{question}")]},{"recursion_limit": 10},)
    return res['messages'][-1].content


In [None]:
import pandas as pd

def save(filename, question, response, time_used, method):
    df = pd.Series({'question': question,
                            'response': response,
                            'time': time_used,
                            'method': method})
    df.to_frame().T.to_csv(filename, mode='a', index=False, header=False)

def exp(func_name, question):
    print(f'question: {question}')
    start = time.time()
    response = eval(func_name)(question)
    time_used = time.time() - start
    return response, time_used

def run_experiment(questions, methods, save_path):
    for method in methods:
        for i, question in enumerate(questions):
            
            if method == 'redbox':
                response, time_used = exp("run_gadget", question)
            if method == 'plan_and_execute':
                response, time_used = exp("run_plan_and_execute", question)
            print(response)
            save(save_path, question, response, time_used, method)
    

In [None]:
questions = ['Who is UK PM?', 
             'What is the oldest parrot alive, and how much longer is that than the average?', 
             'How does the trend in birth affect the current tax system?',
             'Compare the growth in tech businesses between UK and USA',
             'List top 5 areas where AI can be used to save taxpayers money',
             'What is the current state of UK economy? and how can it be improved?']

In [None]:
run_experiment(questions=questions,
               methods=['plan_and_execute'],
               save_path='exp_plan3.csv')

In [None]:
run_experiment(questions=questions,
               methods=['redbox'],
               save_path='exp_plan.csv')