# Plan-and-Execute

This notebook shows how to create a "plan-and-execute" style agent. This is heavily inspired by the [Plan-and-Solve](https://arxiv.org/abs/2305.04091) paper as well as the [Baby-AGI](https://github.com/yoheinakajima/babyagi) project.

The core idea is to first come up with a multi-step plan, and then go through that plan one item at a time.
After accomplishing a particular task, you can then revisit the plan and modify as appropriate.


The general computational graph looks like the following:


![plan-and-execute diagram](./img/plan-and-execute.png)


This compares to a typical [ReAct](https://arxiv.org/abs/2210.03629) style agent where you think one step at a time.
The advantages of this "plan-and-execute" style agent are:

1. Explicit long term planning (which even really strong LLMs can struggle with)
2. Ability to use smaller/weaker models for the execution step, only using larger/better models for the planning step


The following walkthrough demonstrates how to do so in LangGraph. The resulting agent will leave a trace like the following example: ([link](https://smith.langchain.com/public/d46e24d3-dda6-44d5-9550-b618fca4e0d4/r)).

## Setup

First, we need to install the packages required.

Next, we need to set API keys for OpenAI (the LLM we will use) and Tavily (the search tool we will use)

In [None]:
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())


Optionally, we can set API key for LangSmith tracing, which will give us best-in-class observability.

In [None]:
import os
os.environ["LANGCHAIN_PROJECT"] = "Plan-and-execute"

## Define Tools

We will first define the tools we want to use. For this simple example, we will use a built-in search tool via Tavily. However, it is really easy to create your own tools - see documentation [here](https://python.langchain.com/v0.2/docs/how_to/custom_tools) on how to do that.

In [None]:
from langchain_community.tools.tavily_search import TavilySearchResults

tools = [TavilySearchResults(max_results=3)]

## Define our Execution Agent

Now we will create the execution agent we want to use to execute tasks. 
Note that for this example, we will be using the same execution agent for each task, but this doesn't HAVE to be the case.

In [None]:
from langchain import hub
from langchain_groq import ChatGroq

from langgraph.prebuilt import create_react_agent

# Get the prompt to use - you can modify this!
prompt = hub.pull("wfh/react-agent-executor")
prompt.pretty_print()

# Choose the LLM that will drive the agent
llm = ChatGroq(model="Llama3-70b-8192", temperature=0.3)
agent_executor = create_react_agent(llm, tools, messages_modifier=prompt)

In [None]:
agent_executor.invoke({"messages": [("user", "who is the winnner of the us open")]})

## Define the State

Let's now start by defining the state the track for this agent.

First, we will need to track the current plan. Let's represent that as a list of strings.

Next, we should track previously executed steps. Let's represent that as a list of tuples (these tuples will contain the step and then the result)

Finally, we need to have some state to represent the final response as well as the original input.

In [None]:
import operator
from typing import Annotated, List, Tuple, TypedDict


class PlanExecute(TypedDict):
    input: str
    plan: List[str]
    past_steps: Annotated[List[Tuple], operator.add]
    response: str

In [None]:
# # Creating an instance of PlanExecute
# example_plan_execute: PlanExecute = {
#     "input": "Calculate total sales",
#     "plan": [
#         "Gather daily sales data",
#         "Sum up all daily totals",
#         "Adjust for returns",
#     ],
#     "past_steps": [(100, 200), (150, 250), (175, 225)],
#     "response": "Total sales calculated successfully",
# }

# print(example_plan_execute)

## Planning Step

Let's now think about creating the planning step. This will use function calling to create a plan.

In [None]:
from langchain_core.pydantic_v1 import BaseModel, Field


class Plan(BaseModel):
    """Plan to follow in future"""

    steps: List[str] = Field(
        description="different steps to follow, should be in sorted order"
    )

In [None]:
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.""",
        ),
        ("placeholder", "{messages}"),
    ]
)
planner = planner_prompt | llm.with_structured_output(Plan)

In [None]:
planner.invoke(
    {
        "messages": [
            ("user", "what is the hometown of the current Australia open winner?")
        ]
    }
)

## Re-Plan Step

Now, let's create a step that re-does the plan based on the result of the previous step.

In [None]:
from typing import Union


class Response(BaseModel):
    """Response to user."""

    response: str


class Act(BaseModel):
    """Action to perform."""

    action: Union[Response, Plan] = Field(
        description="Action to perform. If you want to respond to user, use Response. "
        "If you need to further use tools to get the answer, use Plan."
    )


replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)


replanner = replanner_prompt | llm.with_structured_output(Act)

## Create the Graph

We can now create the graph!

In [None]:
from typing import Literal


async def execute_step(state: PlanExecute):
    # plan is a list of steps [step1, step2, step3, ...]
    plan = state["plan"]
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
    task = plan[0]
    task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing step {1}, {task}."""
    agent_response = await agent_executor.ainvoke(
        {"messages": [("user", task_formatted)]}
    )
    return {
        "past_steps": (task, agent_response["messages"][-1].content),
    }


async def plan_step(state: PlanExecute):
    plan = await planner.ainvoke({"messages": [("user", state["input"])]})
    return {"plan": plan.steps}


async def replan_step(state: PlanExecute):
    output = await replanner.ainvoke(state)
    if isinstance(output.action, Response):
        return {"response": output.action.response}
    else:
        return {"plan": output.action.steps}


def should_end(state: PlanExecute) -> Literal["agent", "__end__"]:
    if "response" in state and state["response"]:
        return "__end__"
    else:
        return "agent"

In [None]:
from langgraph.graph import StateGraph

workflow = StateGraph(PlanExecute)

# Add the plan node
workflow.add_node("planner", plan_step)

# Add the execution step
workflow.add_node("agent", execute_step)

# Add a replan node
workflow.add_node("replan", replan_step)

workflow.set_entry_point("planner")

# From plan we go to agent
workflow.add_edge("planner", "agent")

# From agent, we replan
workflow.add_edge("agent", "replan")

workflow.add_conditional_edges(
    "replan",
    # Next, we pass in the function that will determine which node is called next.
    should_end,
)

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()

In [None]:
from IPython.display import Image, display

display(Image(app.get_graph(xray=True).draw_mermaid_png()))

In [None]:
config = {"recursion_limit": 50}
inputs = {"input": "what is the hometown of the 2024 Australia open winner?"}
async for event in app.astream(inputs, config=config):
    for k, v in event.items():
        if k != "__end__":
            print(v)

## Conclusion

Congrats on making a plan-and-execute agent! One known limitations of the above design is that each task is still executed in sequence, meaning embarrassingly parallel operations all add to the total execution time. You could improve on this by having each task represented as a DAG (similar to LLMCompiler), rather than a regular list.

In [None]:
import math


def calculate_new_headings(lat1, lon1, hdg1, lat2, lon2, hdg2, required_separation):
    """
    Calculate the necessary heading adjustments for two aircraft to ensure minimum horizontal separation.

    Args:
    lat1, lon1: Latitude and longitude of aircraft 1
    hdg1: Current heading of aircraft 1 (degrees)
    lat2, lon2: Latitude and longitude of aircraft 2
    hdg2: Current heading of aircraft 2 (degrees)
    required_separation: Required minimum horizontal separation in nautical miles

    Returns:
    Tuple of new headings for aircraft 1 and 2 (hdg1_new, hdg2_new)
    """
    # Convert degrees to radians for computation
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    hdg1, hdg2 = map(math.radians, [hdg1, hdg2])

    # Assume a typical speed and calculate the time to reach 5 nm apart
    speed = 450  # Speed in knots (assumed typical cruise speed)
    time_to_separation = required_separation / speed * 3600  # time in seconds

    # Calculate new positions assuming they maintain their headings
    def new_position(lat, lon, hdg, speed, time):
        # Earth's radius in nautical miles
        R = 3440.065
        # Distance traveled in nautical miles
        d = speed * time / 3600
        # New latitude
        new_lat = math.asin(
            math.sin(lat) * math.cos(d / R)
            + math.cos(lat) * math.sin(d / R) * math.cos(hdg)
        )
        # New longitude
        new_lon = lon + math.atan2(
            math.sin(hdg) * math.sin(d / R) * math.cos(lat),
            math.cos(d / R) - math.sin(lat) * math.sin(new_lat),
        )
        return new_lat, new_lon

    # Calculate potential new positions after the time to separation
    lat1_new, lon1_new = new_position(lat1, lon1, hdg1, speed, time_to_separation)
    lat2_new, lon2_new = new_position(lat2, lon2, hdg2, speed, time_to_separation)

    # Calculate the bearing from new positions to see if they are diverging
    def bearing(lat1, lon1, lat2, lon2):
        y = math.sin(lon2 - lon1) * math.cos(lat2)
        x = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(
            lat2
        ) * math.cos(lon2 - lon1)
        brng = math.atan2(y, x)
        return math.degrees(brng) % 360

    bearing_new = bearing(lat1_new, lon1_new, lat2_new, lon2_new)

    # Calculate the difference in bearing and adjust
    heading_adjustment = (bearing_new - math.degrees(hdg1) + 360) % 360
    hdg1_new = (math.degrees(hdg1) + heading_adjustment) % 360
    hdg2_new = (math.degrees(hdg2) - heading_adjustment) % 360

    return hdg1_new, hdg2_new


# Example usage:
# Aircraft 1 at N52°00'00.0" E003°59'11.0" heading 270 degrees
# Aircraft 2 at N52°00'0.0" E002°30'48.0" heading 090 degrees
# Required separation of 5 nautical miles
lat1, lon1 = 52.0000, 3.986389  # converted E003°59'11.0" to decimal
lat2, lon2 = 52.0000, 2.513333  # converted E002°30'48.0" to decimal
new_headings = calculate_new_headings(lat1, lon1, 270, lat2, lon2, 90, 5)
print("New headings for Aircraft 1 and Aircraft 2:", new_headings)

In [None]:
import math

# Aircraft information
aircraft_ids = ["KL204", "KL420"]
aircraft_info = {
    "KL204": {
        "pos": (52, 3.9833),
        "hdg": 270,
        "alt": 25000,
        "cas": 300,
        "tas": 432,
        "gs": 432,
        "m": 0.717,
    },
    "KL420": {
        "pos": (52, 2.5167),
        "hdg": 90,
        "alt": 25000,
        "cas": 300,
        "tas": 432,
        "gs": 432,
        "m": 0.717,
    },
}

# Conflict information
conflict_info = {
    "KL204": "KL420",
    "TCPA": 224.32,
    "QDR": 270,
    "distance": 53.78,
    "dcpa": 0,
    "tlos": 203.46,
}


# Calculate the new headings
def calculate_new_headings(aircraft_id, conflict_id, tcpa, qdr, distance, dcpa):
    # Calculate the turn radius
    turn_radius = distance / math.tan(math.radians(qdr))

    # Calculate the turn rate
    turn_rate = math.radians(360) / tcpa

    # Calculate the new heading
    if aircraft_id == "KL204":
        new_heading = aircraft_info[aircraft_id]["hdg"] - turn_rate * tcpa
    else:
        new_heading = aircraft_info[aircraft_id]["hdg"] + turn_rate * tcpa

    return new_heading


# Calculate the new headings for the aircraft
new_heading_kl204 = calculate_new_headings(
    "KL204",
    "KL420",
    conflict_info["TCPA"],
    conflict_info["QDR"],
    conflict_info["distance"],
    conflict_info["dcpa"],
)
new_heading_kl420 = calculate_new_headings(
    "KL420",
    "KL204",
    conflict_info["TCPA"],
    conflict_info["QDR"],
    conflict_info["distance"],
    conflict_info["dcpa"],
)

print(f"New heading for KL204: {math.degrees(new_heading_kl204):.2f}°")
print(f"New heading for KL420: {math.degrees(new_heading_kl420):.2f}°")

In [None]:
import numpy as np

# Constants
R = 3440.065  # Earth's radius in nautical miles

# Initial positions in degrees and converted to radians for calculation
lat1, lon1 = 52.0000, 3.9864
lat2, lon2 = 52.0000, 2.5133
lat1_rad, lon1_rad = np.radians(lat1), np.radians(lon1)
lat2_rad, lon2_rad = np.radians(lat2), np.radians(lon2)

# Initial headings
hdg1, hdg2 = 270, 90


# Function to project new position based on heading and distance traveled
def project_position(lat, lon, hdg, dist):
    lat_rad, lon_rad = np.radians(lat), np.radians(lon)
    hdg_rad = np.radians(hdg)
    lat_new = np.arcsin(
        np.sin(lat_rad) * np.cos(dist / R)
        + np.cos(lat_rad) * np.sin(dist / R) * np.cos(hdg_rad)
    )
    lon_new = lon_rad + np.arctan2(
        np.sin(hdg_rad) * np.sin(dist / R) * np.cos(lat_rad),
        np.cos(dist / R) - np.sin(lat_rad) * np.sin(lat_new),
    )
    return np.degrees(lat_new), np.degrees(lon_new)


# Simulate the flight for 200 seconds at current ground speed (432 knots)
# Assuming no change in speed, only direction changes by +/- 10 degrees
hdg1_new = hdg1 - 10  # New heading for KL204
hdg2_new = hdg2 + 10  # New heading for KL420

# Project new positions
lat1_new, lon1_new = project_position(lat1, lon1, hdg1_new, 432 * 200 / 3600)
lat2_new, lon2_new = project_position(lat2, lon2, hdg2_new, 432 * 200 / 3600)


# Calculate new distance
def great_circle_distance(lat1, lon1, lat2, lon2):
    lat1_rad, lon1_rad, lat2_rad, lon2_rad = map(np.radians, [lat1, lon1, lat2, lon2])
    dlon = lon2_rad - lon1_rad
    dlat = lat2_rad - lat1_rad
    a = (
        np.sin(dlat / 2) ** 2
        + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon / 2) ** 2
    )
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    return R * c


# New separation distance
new_distance = great_circle_distance(lat1_new, lon1_new, lat2_new, lon2_new)

# Result
new_distance

In [None]:
if "no conflicts" in "No conflicts detected.".lower():
    print("No conflicts detected.")
else:
    print("Conflicts detected.")