# Plan and Execute with Persistence using Pydantic models as State

In this example, I've combined the Plan and Execute cognitive architecture with the state as Pydantic model and persistence examples. This means all nodes receive an instance of the current state as their first argument and it is validated before each node executes. It also shows how new input is received into the state.

## Setup

First we need to install the packages required

In [1]:
%pip install -U langchain langchain_openai tavily-python aiosqlite

Note: you may need to restart the kernel to use updated packages.


In [2]:
from dotenv import load_dotenv

load_dotenv()

True

## Set up the tools

We will first define the tools we want to use.
For this simple example, we will use a built-in search tool via Tavily.
However, it is really easy to create your own tools - see documentation [here](https://python.langchain.com/docs/modules/agents/tools/custom_tools) on how to do that.


In [3]:
from langchain_community.tools.tavily_search import TavilyAnswer

tools = [TavilyAnswer()]

## Set up the model

Now we need to load the chat model we want to use.
Importantly, this should satisfy two criteria:

1. It should work with messages. We will represent all agent state in the form of messages, so it needs to be able to work well with them.
2. It should work with OpenAI function calling. This means it should either be an OpenAI model or a model that exposes a similar interface.

Note: these model requirements are not requirements for using LangGraph - they are just requirements for this one example.


In [4]:
from langchain_openai import ChatOpenAI

# We will set streaming=True so that we can stream tokens
# See the streaming section for more information on this.
model = ChatOpenAI(temperature=0, streaming=True, model="gpt-4-turbo")


After we've done this, we should make sure the model knows that it has these tools available to call.
We can do this by converting the LangChain tools into the format for OpenAI function calling, and then bind them to the model class.


In [5]:
from langchain_core.utils.function_calling import convert_to_openai_function

# It is important to convert and bind correctly!
functions = [convert_to_openai_function(t) for t in tools]
model_with_tools = model.bind(functions=functions)

## Define the agent state

The main type of graph in `langgraph` is the `StateGraph`.
This graph is parameterized by a state object that it passes around to each node.
Each node then returns operations to update that state.
These operations can either SET specific attributes on the state (e.g. overwrite the existing values) or ADD to the existing attribute.
Whether to set or add is denoted by annotating the state object you construct the graph with.

For this example, the state will track input, a plan, past steps, and the final response. This State allows the agent to keep track of their plan and the steps they have taken.

In [6]:
from langchain.pydantic_v1 import BaseModel, Field
from langchain_core.messages import BaseMessage
from typing import Annotated, TypedDict, List, Tuple, Sequence
import operator

# The overall state
class PlanExecute(TypedDict):
    input: str
    # Holds thread of messages, human and AI responses
    messages: Annotated[Sequence[BaseMessage], operator.add]
    # Couldn't this plan be a Plan object?
    plan: List[str]
    # Holds all past steps, including those from previous runs.
    past_steps: Annotated[List[Tuple], operator.add]
    response: str

# The individual plan
class Plan(BaseModel):
    """Plan to follow in the future"""

    steps: list[str] = Field(description="Steps to follow in the future, should be sorted in order")

# The final response
class Response(BaseModel):
    """Response to user"""

    response: str = Field(description="Response to user")


# Define the Prompt Templates

Several prompt templates are used depending on the step of the graph the agent is in. Different system messages combined with human input (when needed) are used to generate the proper thoughts at each step.

## Input prompt template

In [7]:
from langchain_core.prompts import ChatPromptTemplate

executor_prompt = ChatPromptTemplate.from_messages(
    [
        # The system message could be parameterized as well.
        ("system", "You are a helpful assistant."),
        # This is where thread history is maintained
        ("placeholder"), ("{chat_history}"),
        # This is where user input is inserted
        ("human"),("{input}"),
        # This is where state is kept.
        ("placeholder"), ("{agent_scratchpad}"),
    ]
)

## Initial planning prompt

In [8]:
from langchain.chains.openai_functions import create_structured_output_runnable
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_template(
    """For the given objective and associated conversation, come up with a simple \
step by step plan. This plan should involve individual tasks, that if executed \
correctly will yield the correct answer. Do not add any superfluous steps. The result \
of the final step should be the final answer. Make sure that each step has all the \
information needed - do not skip steps.

Conversation so far:
{chat_history}

Current objective:
{objective}"""
)
planner = create_structured_output_runnable(
    Plan, ChatOpenAI(model="gpt-4-turbo", temperature=0), planner_prompt
)


ValueError: A pending deprecation cannot have a scheduled removal

## Replanning prompt

After a task is executed, we want to consider if our plan is on track and we may want to allow a human to insert additional input (if they've enabled the interrupt flag).

In [18]:
from langchain.chains.openai_functions import create_openai_fn_runnable

replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective and associated conversation, come up with a simple step \
by step plan. This plan should involve individual tasks, that if executed correctly \
will yield the correct answer. Do not add any superfluous steps. The result of the \
final step should be the final answer. Make sure that each step has all the information \
needed - do not skip steps.

The conversation so far is this:
{messages}

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the following steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the \
user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan \
that still NEED to be done. Do not return previously done steps as part of the plan."""
)
# OPEN QUESTION: I do not understand why we are using the fn runnable here. I don't understand what it does
# or why we need it.
#
# My current guess is that by passing in the Plan and Response models, it is encorcing to the agent that
# the input and output should be of those types.
replanner = create_openai_fn_runnable(
    [Plan, Response], ChatOpenAI(model="gpt-4-turbo", temperature=0), replanner_prompt
)


ValueError: A pending deprecation cannot have a scheduled removal

# Persistence

To add in persistence, we create a checkpoint. In this case we have to use the Async version of SQLlite to support async calls.

In [None]:
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

memory = AsyncSqliteSaver.from_conn_string(":memory:")


# Build the graph

## Define the nodes

We now need to define a few different nodes in our graph.
In `langgraph`, a node can be either a function or a [runnable](https://python.langchain.com/docs/expression_language/).
There are three main nodes we need for this:

1. Planning step: this is where the agent develops a plan of how to accomplish the goal given to it.
2. Execution step: this is where the agent executes the next step of the plan. This may involve calling a tool, and as such, uses a subgraph. **NOTE**: The subgraph could have been created using a prebuilt sequence (i.e., `create_openai_functions_agent`) but for this example, we will create the subgraph manually.
3. Replanning step: this is where the agent decides if it needs to replan. This is a conditional node that will either go back to the execution step or end the conversation and return the response.

We will also need to define some edges.
Some of these edges may be conditional.
The reason they are conditional is that based on the output of a node, one of several paths may be taken.
The path that is taken is not known until that node is run (the LLM decides).

1. Conditional Edge: after the replanning step, if the agent decides to replan, it should go back to the planning step. If it decides to end the conversation, it should go to the end node.
2. Normal Edge: after the planning and execution steps, it should always go back to the execution and replanning steps, respectively.

Let's define the nodes, as well as a function to decide how what conditional edge to take.

**MODIFICATION**

We define each node to receive the AgentState base model as its first argument.

### Subgraph for Execution

In [None]:
# This mimics the code of the `create_openai_functions_agent`` function in the `langchain.chains.openai_functions` module
from typing import Union
from langchain_core.runnables import RunnablePassthrough
from langchain_core.agents import AgentAction, AgentFinish
from langchain.agents.format_scratchpad.openai_functions import (
    format_to_openai_function_messages,
)
from langchain.agents.output_parsers.openai_functions import (
    OpenAIFunctionsAgentOutputParser,
)
from langgraph.prebuilt.tool_executor import ToolExecutor
from langgraph.graph import StateGraph, END

agent_runnable = (
    RunnablePassthrough.assign(
        agent_scratchpad=lambda x: format_to_openai_function_messages(
            x["intermediate_steps"]
        )
    )
    | executor_prompt
    | model_with_tools
    | OpenAIFunctionsAgentOutputParser()
)

tool_executor = ToolExecutor(tools)

# State for the sub-graph
class ExecAgentState(TypedDict):
    # The input string
    input: str
    # The list of previous messages in the conversation
    chat_history: Sequence[BaseMessage]
    # The outcome of a given call to the agent
    # Needs `None` as a valid type, since this is what this will start as
    agent_outcome: Union[AgentAction, AgentFinish, None]
    # List of actions and corresponding observations
    # Here we annotate this with `operator.add` to indicate that operations to
    # this state should be ADDED to the existing values (not overwrite it)
    intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]

def should_continue(data):
    # If the agent outcome is an AgentFinish, then we return `exit` string
    # This will be used when setting up the graph to define the flow
    if isinstance(data["agent_outcome"], AgentFinish):
        return "end"
    # Otherwise, an AgentAction is returned
    # Here we return `continue` string
    # This will be used when setting up the graph to define the flow
    else:
        return "continue"

async def run_agent(data):
    agent_outcome = await agent_runnable.ainvoke(data)
    return {"agent_outcome": agent_outcome}

async def execute_tools(data):
    # Get the most recent agent_outcome - this is the key added in the `agent` above
    agent_action = data["agent_outcome"]
    if not isinstance(agent_action, list):
        agent_action = [agent_action]
    output = await tool_executor.abatch(agent_action, return_exceptions=True)
    return {
        "intermediate_steps": [
            (action, str(out)) for action, out in zip(agent_action, output)
        ]
    }

# Define a new graph
subworkflow = StateGraph(ExecAgentState)

# Define the two nodes we will cycle between
subworkflow.add_node("subagent", run_agent)
subworkflow.add_node("action", execute_tools)

# Set the entrypoint as `agent`
# This means that this node is the first one called
subworkflow.set_entry_point("subagent")

# We now add a conditional edge
subworkflow.add_conditional_edges(
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    "subagent",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
    # Finally we pass in a mapping.
    # The keys are strings, and the values are other nodes.
    # END is a special node marking that the graph should finish.
    # What will happen is we will call `should_continue`, and then the output of that
    # will be matched against the keys in this mapping.
    # Based on which one it matches, that node will then be called.
    {
        # If `tools`, then we call the tool node.
        "continue": "action",
        # Otherwise we finish.
        "end": END,
    },
)

# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
subworkflow.add_edge("action", "subagent")

executor_agent = subworkflow.compile()

In [None]:
# We check the subgraph can be invoked in isolation
config = {"configurable": {"thread_id": "2"}}
inputs = {"input": "What is the capital of France?", "chat_history": []}
await executor_agent.ainvoke(inputs, config=config)

{'input': 'What is the capital of France?',
 'chat_history': [],
 'agent_outcome': AgentFinish(return_values={'output': 'The capital of France is Paris.'}, log='The capital of France is Paris.'),
 'intermediate_steps': []}

### Creating the main nodes

In [None]:
from langchain_core.messages import AIMessage

# Create a plan based on the input, note how input from state is passed to the planner
async def plan_step(state: PlanExecute):
    plan = await planner.ainvoke({"objective": state["input"], "chat_history": state["messages"]})
    # The pydantic models will convert the output of the planner into a Plan object
    return {"plan": plan.steps, "response": ""}

# Execute a step using the shared state
async def execute_step(state: PlanExecute):
    task = state["plan"][0] # the next step to execute
    agent_response = await executor_agent.ainvoke({"input": task, "chat_history": state["messages"]})
    # Return the state with the current step and the new output added to past steps.
    # Note: chat_history will be updated at the final response point.
    return {"past_steps": (task, agent_response['agent_outcome'].return_values['output'])}

# Replan based on the current state
async def replan_step(state: PlanExecute):
    output = await replanner.ainvoke(state)
    if isinstance(output, Response):
        # Chat history will now be updated with the final response as a list of AIMessages
        return {"response": output.response, "messages": [AIMessage(content=output.response)]}
    else:
        return {"plan": output.steps}

# decide if we should continue or return to the user
def should_end(state: PlanExecute):
    if "response" in state and state["response"]:
        return True
    else:
        return False

### Define the main graph

We can now put it all together and define the graph!

In [None]:
from langgraph.graph import StateGraph, END

# Create a new workflow
workflow = StateGraph(PlanExecute)

# add planning node
workflow.add_node("planner", plan_step)

# add execution node
workflow.add_node("agent", execute_step)

# add replanning node
workflow.add_node("replan", replan_step)

# Set entry
workflow.set_entry_point("planner")

# Create edges
workflow.add_edge("planner", "agent")
workflow.add_edge("agent", "replan")
workflow.add_conditional_edges(
    "replan",
    should_end,
    {
        True: END,
        False: "agent",
    }
)
# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile(checkpointer=memory)

## Use it!

We can now use it!
This now exposes the [same interface](https://python.langchain.com/docs/expression_language/) as all other LangChain runnables.

In [9]:
config = {"configurable": {"thread_id": "4","recursion_limit": 50}}
inputs = {
    "input": "What is the hometown of the 2020 US presidential election winner?"
}
async for event in app.astream(inputs, config=config):
    for k, v in event.items():
        if k != "__end__":
            print(v)
            if "response" in v:
                print(f"End of conversation, final output: {v['response']}")
        else:
            print(f"End of conversation, final output: {v}")


NameError: name 'app' is not defined

Continue the conversation!

In [16]:
config = {"configurable": {"thread_id": "4","recursion_limit": 50}}
inputs = {
    "input": "What was the hometown of his VP?"
}
async for event in app.astream(inputs, config=config):
    for k, v in event.items():
        if k != "__end__":
            print(v)


{'plan': ["Identify Joe Biden's Vice President during his term starting in 2021.", 'Research the hometown of the identified Vice President.', 'Report the hometown of the Vice President.'], 'response': ''}
{'past_steps': ("Identify Joe Biden's Vice President during his term starting in 2021.", "Joe Biden's Vice President during his term starting in 2021 is Kamala Harris.")}
{'plan': ['Research the hometown of Kamala Harris.', 'Report the hometown of Kamala Harris.']}
{'past_steps': ('Research the hometown of Kamala Harris.', "Kamala Harris's hometown is Oakland, California.")}
{'messages': [AIMessage(content="Kamala Harris's hometown is Oakland, California.")], 'response': "Kamala Harris's hometown is Oakland, California."}


# Concluding thoughts

So if you want a StateGraph that include persistence, you need to consider how you define the input or messages component to allow for additional input. Considering the plan-and-execute model, the example implementations generally have the chat history as a placeholder prior to the input. In this case new input would replace the existing input in the state prompt pulled back into the model, so this may work. But this needs to be considered more before I decide for sure that works that way.

Because the actual state object has the keys `input`, `plan`, `past_steps`, and `response`. There is no place to access new human input unless they were appended to `past_steps`. 