Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Restate provides the following capabilities:
- 🌍 **Deploy anywhere** – Whether it's AWS Lambda, CloudRun, Fly.io, Cloudflare, Kubernetes, Deno Deploy,...
- ☁️ **Easy to self-host** – Single-binary self-hosted deployments or connect to [Restate Cloud](https://restate.dev/cloud/).

<img src="agents/openai-agents-python/img/invocation_ui_agent_sdk.png" alt="OpenAI Agent SDK invocation UI" width="1000px"/>
<img src="/get-started/openai-agents-python/img/invocation_ui.png" alt="OpenAI Agent SDK invocation UI" width="1000px"/>

Restate can also be used for other use cases, such as:
[workflows](https://docs.restate.dev/use-cases/workflows),
Expand Down
12 changes: 5 additions & 7 deletions advanced/insurance-claims/app/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,16 @@ def parse_claim_data(extra_info: str = "") -> ClaimData:

# Repetitively check for missing fields and request additional information if needed
while True:
missing_fields = await ctx.run(
"completeness check", lambda last_claim=claim: check_missing_fields(last_claim))
missing_fields = await ctx.run("completeness check", check_missing_fields, args=(claim,))
if not missing_fields:
break

id, promise = ctx.awakeable()
await ctx.run(
"Request missing info", lambda last_id=id: send_message_to_customer(missing_fields, last_id)
)
await ctx.run("Request missing info", send_message_to_customer, args=(missing_fields, id))
extra_info = await promise
claim = await ctx.run("Extracting", lambda new_info=extra_info: parse_claim_data(new_info), type_hint=ClaimData)

claim = await ctx.run("Extracting", parse_claim_data, args=(extra_info,))

# Create the claim in the legacy system
await ctx.run("create", lambda last_claim=claim: create_claim(last_claim))
await ctx.run("create", lambda: create_claim(claim))
return claim
47 changes: 15 additions & 32 deletions agents/openai-agents-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

**The agent composes the workflow on the fly, and Restate persists the execution as it takes place.**

Combining Restate and an Agent SDK is ideal for turning brittle agent implementations into resilient ones.
Combining Restate and the OpenAI Agent SDK is ideal for turning the default brittle agents into resilient ones.

Restate powers your agents with the following features:
- 🛡️ **Automatic retries**: Built-in retry mechanisms for failed operations
Expand All @@ -16,48 +16,30 @@ Restate powers your agents with the following features:
- 🙂 **Resilient human-in-the-loop**: Both approaches support human intervention in workflows
- 👬 **Idempotency/deduplication**: Prevents duplicate agent requests

## Plugging Restate into existing Agent SDKs
Combine Restate's durability with existing Agent SDKs for rapid development.
As opposed to the [OpenAI Agents + Restate template](../../get-started/openai-agents-python/README.md), this example shows how to do handoffs and stateful sessions.

To make the agent resilient, we need to:
- persist the results of LLM calls in Restate's journal by wrapping them in `ctx.run()`
- have the context available to us in the tools so that we can use it to persist the intermediate tool execution steps.

The details of how to do this depend on the Agent SDK you are using.

⚠ **LIMITATIONS**: You cannot do parallel tool calls or any type of parallel execution if you integrate Restate with an Agent SDK.
If you execute actions on the context in different tools in parallel, Restate will not be able to deterministically replay them because the order might be different during recovery and will crash.
We are working on a solution to this, but for now, you can only use Restate with Agent SDKs for sequential tool calls.

### Restate + OpenAI Agent SDK
[<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/show-code.svg">](openai_sdk/agent.py)
## Plugging Restate into the OpenAI Agents Python SDK

Use the OpenAI Agent SDK to implement the agent loop, while Restate handles the persistence and resiliency of the agent's decisions and tool executions.

The OpenAI Agent SDK lets you wrap the LLM calls into durable actions by implementing a Restate Model Provider ([code](openai_sdk/middleware.py)).
In order to have access to the Restate context in the tools, we can pass it along in the context that we pass to the tools.

The example is a customer service agent for an airline that can send invoices and update seat bookings.
This is [an OpenAI SDK example](https://github.com/openai/openai-agents-python/blob/main/examples/customer_service/main.py) that has been adapted to use Restate for resiliency and workflow guarantees:
To make the agent resilient, we need to:
- persist the results of LLM calls in Restate's journal by wrapping them in `ctx.run()`. This is handled by the [`DurableModelCalls` model provider](utils/middleware.py).
- persist intermediate tool execution steps by wrapping steps in Restate SDK actions. To do this, we pass the Restate context along to the tools.

<img src="img/using_agent_sdk.png" alt="Using Agent SDK" width="650px"/>

<img src="img/invocation_ui_agent_sdk.png" alt="Using Agent SDK - journal" width="1200px"/>
<img src="img/invocation_ui_agent_state.png" alt="Using Agent SDK - state" width="1200px"/>

### Other Agent SDKs
Are you using another Agent SDK? We can help you evaluate whether it can be integrated with Restate.
Join our [Discord](https://discord.gg/skW3AZ6uGd) or [Slack](https://join.slack.com/t/restatecommunity/shared_invite/zt-2v9gl005c-WBpr167o5XJZI1l7HWKImA) to discuss.

## Running the examples
⚠ **LIMITATIONS**: You cannot do parallel tool calls or any type of parallel execution if you integrate Restate with an Agent SDK.
If you execute actions on the context in different tools in parallel, Restate will not be able to deterministically replay them because the order might be different during recovery and will crash.
We are working on a solution to this, but for now, you can only use Restate with Agent SDKs for sequential tool calls.

### Restate + OpenAI Agent SDK
[<img src="https://raw.githubusercontent.com/restatedev/img/refs/heads/main/show-code.svg">](openai_sdk/agent.py)
## Running the example

This example implements an airline customer service agent that can answer questions about your flights, and change your seat.

The example uses the OpenAI Agent SDK to implement the agent. Although this could be adapted to other agent SDKs.

1. Export your OpenAI or Anthrophic API key as an environment variable:
```shell
export OPENAI_API_KEY=your_openai_api_key
Expand All @@ -68,7 +50,6 @@ The example uses the OpenAI Agent SDK to implement the agent. Although this coul
```
3. Start the services:
```shell
cd openai_sdk
uv run .
```
4. Register the services:
Expand All @@ -85,22 +66,24 @@ Or with the [client](client/__main__.py):
- **Request**:

```shell
uv run client "can you send me an invoice for booking AB4568?"
uv run client peter "can you send me an invoice for booking AB4568?"
```

With `peter` as the conversation ID.

Example response: `I've sent the invoice to your email associated with confirmation number AB4568. If there's anything else you need, feel free to ask!.`

- **Or have longer conversations**:

```shell
uv run client "can you change my seat to 10b?"
uv run client peter "can you change my seat to 10b?"
```

Example response: `To change your seat to 10B, I'll need your confirmation number. Could you please provide that?`

Respond to the question by sending a new message to the same stateful session:
```shell
uv run client "5666"
uv run client peter "5666"
```

Example response: `Your seat has been successfully changed to 5B. If there's anything else you need, feel free to ask!`
Expand Down
84 changes: 28 additions & 56 deletions agents/openai-agents-python/agent.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import agents
import restate
from pydantic import BaseModel, ConfigDict
from agents import Agent, function_tool, handoff, RunContextWrapper, RunConfig
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX

from utils.middleware import RestateModelProvider
from utils.middleware import DurableModelCalls
from utils.utils import (
retrieve_flight_info,
send_invoice,
Expand All @@ -19,21 +18,11 @@
https://github.com/openai/openai-agents-python/blob/main/examples/customer_service/main.py
"""


# To have access to the Restate context in the tools, we can pass it along in the context that we pass to the tools
class ToolContext(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)

restate_context: restate.ObjectContext
customer_id: str | None = None


# TOOLS


@function_tool
async def update_seat(
context: RunContextWrapper[ToolContext],
wrapper: RunContextWrapper[restate.ObjectContext],
confirmation_number: str,
new_seat: str,
) -> str:
Expand All @@ -46,15 +35,15 @@ async def update_seat(
"""

# Do durable steps in your tools by using the Restate context
ctx = context.context.restate_context
restate_context = wrapper.context

# 1. Look up the flight using the confirmation number
flight = await ctx.run(
flight = await restate_context.run(
"Info lookup", retrieve_flight_info, args=(confirmation_number,)
)

# 2. Update the seat in the booking system
success = await ctx.run(
success = await restate_context.run(
"Update seat",
update_seat_in_booking_system,
args=(confirmation_number, new_seat, flight),
Expand All @@ -70,24 +59,24 @@ async def update_seat(
description_override="Sends invoices to customers for booked flights.",
)
async def invoice_sending(
context: RunContextWrapper[ToolContext], confirmation_number: str
wrapper: RunContextWrapper[restate.ObjectContext], confirmation_number: str
):
# Do durable steps in your tools by using the Restate context
ctx = context.context.restate_context
restate_context = wrapper.context

# 1. Look up the flight using the confirmation number
flight = await ctx.run(
flight = await restate_context.run(
"Info lookup", retrieve_flight_info, args=(confirmation_number,)
)

# 2. Send the invoice to the customer
await ctx.run("Send invoice", send_invoice, args=(confirmation_number, flight))
await restate_context.run("Send invoice", send_invoice, args=(confirmation_number, flight))


### AGENTS

# This is identical to the examples of the OpenAI SDK
faq_agent = Agent[ToolContext](
faq_agent = Agent[restate.ObjectContext](
name="Invoice Sending Agent",
handoff_description="A helpful agent that can send invoices.",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
Expand All @@ -100,7 +89,7 @@ async def invoice_sending(
tools=[invoice_sending],
)

seat_booking_agent = Agent[ToolContext](
seat_booking_agent = Agent[restate.ObjectContext](
name="Seat Booking Agent",
handoff_description="A helpful agent that can update a seat on a flight.",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
Expand All @@ -114,8 +103,7 @@ async def invoice_sending(
tools=[update_seat],
)

triage_agent = Agent[ToolContext](
model="o3-mini",
triage_agent = Agent[restate.ObjectContext](
name="Triage Agent",
handoff_description="A triage agent that can delegate a customer's request to the appropriate agent.",
instructions=(
Expand All @@ -140,43 +128,27 @@ async def invoice_sending(
# Keyed by conversation id
agent = restate.VirtualObject("Agent")

# Keys of the K/V state stored in Restate per chat
INPUT_ITEMS = "input-items"


@agent.handler()
async def run(ctx: restate.ObjectContext, req: str) -> str:
"""
Send a message to the agent.
async def run(restate_context: restate.ObjectContext, message: str) -> str:
"""Send a message to the agent."""

Args:
req (str): The message to send to the agent.

Returns:
str: The response from the agent.
"""
# Use Restate's K/V store to keep track of the conversation history and last agent
input_items = await ctx.get(INPUT_ITEMS) or []
input_items.append({"role": "user", "content": req})
ctx.set(INPUT_ITEMS, input_items)

last_agent_name = await ctx.get("agent") or triage_agent.name
last_agent = agent_dict[last_agent_name]

# Pass the Restate context to the tools
tool_context = ToolContext(restate_context=ctx, customer_id=ctx.key())
memory = await restate_context.get("memory") or []
memory.append({"role": "user", "content": message})
restate_context.set("memory", memory)

last_agent_name = await restate_context.get("agent") or triage_agent.name
result = await agents.Runner.run(
last_agent,
input=input_items,
context=tool_context,
# Use the RestateModelProvider to persist the LLM calls in Restate
run_config=RunConfig(model_provider=RestateModelProvider(ctx)),
agent_dict[last_agent_name],
input=memory,
# Pass the Restate context to the tools to make tool execution steps durable
context=restate_context,
# Choose any model and let Restate persist your calls
run_config=RunConfig(model="gpt-4o", model_provider=DurableModelCalls(restate_context)),
)

ctx.set("agent", result.last_agent.name)

output = str(result.final_output)
input_items.append({"role": "system", "content": output})
ctx.set(INPUT_ITEMS, input_items)
return output
restate_context.set("agent", result.last_agent.name)
memory.append({"role": "assistant", "content": result.final_output})
restate_context.set("memory", memory)
return result.final_output
5 changes: 3 additions & 2 deletions agents/openai-agents-python/client/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
def main():
if len(sys.argv) == 0:
raise ValueError("No input provided")
data = sys.argv[1]
key = sys.argv[1]
data = sys.argv[2]

r = httpx.post(
"http://localhost:8080/Agent/my-user/run",
f"http://localhost:8080/Agent/{key}/run",
json=data,
timeout=60,
)
Expand Down
6 changes: 2 additions & 4 deletions agents/openai-agents-python/utils/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ def to_input_items(self) -> list[TResponseInputItem]:
return [it.model_dump(exclude_unset=True) for it in self.output] # type: ignore


class RestateModelProvider(MultiProvider):
class DurableModelCalls(MultiProvider):
"""
A Restate model provider that wraps the OpenAI SDK's default MultiProvider.
It let
to return a Restate persist LLM calls in the Restate journal.
"""

def __init__(self, ctx: restate.Context):
Expand Down Expand Up @@ -90,7 +88,7 @@ async def call_llm() -> RestateModelResponse:
response_id=resp.response_id,
)

return await self.ctx.run("call LLM", call_llm)
return await self.ctx.run("call LLM", call_llm, max_attempts=3)

def stream_response(
self,
Expand Down
Loading