Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions docs/src/content/docs/workflows/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,105 @@ To start a workflow without waiting for it to finish, use the `/run-nowait` endp
```

You can then use the `handler_id` to check for the result or stream events.

## Using `WorkflowClient` to Interact with Servers

In order to interact with a deployed `WorkflowServer` programmatically, beyond the raw API calls detailed above, we also provide a `WorkflowClient` class.

`WorkflowClient` provides methods for listing available workflows and workflow handlers, verifying the health of the server, running a workflow (both synchronously and asynchronously), streaming events and sending events.

Assuming you are running the server example from above, we can use `WorkflowClient` in the following way:

```python
from workflows.client import WorkflowClient

async def main()
client = WorkflowClient(base_url="http://0.0.0.0:8080")
workflows = await client.list_workflows()
print("===== AVAILABLE WORKFLOWS ====")
print(workflows)
await client.is_healthy() # will raise an exception if the server is not healthy
handler = await client.run_workflow_nowait(
"greet",
start_event=StartEvent(name="John"),
context=None,
)
handler_id = handler.handler_id
print("==== STARTING THE WORKFLOW ===")
print(f"Workflow running with handler ID: {handler_id}")
print("=== STREAMING EVENTS ===")

async for event in client.get_workflow_events(handler_id=handler_id):
print("Received data:", event)
result = await client.get_result(handler_id)

print(f"Final result: {result.result} (status: {result.status})")

if __name__ == "__main__":
import asyncio
asyncio.run(main())
```

You can use the client also to interactively run human-in-the-loop workflows, like this one:

```python
from workflows import Workflow, step
from workflows.context import Context
from workflows.events import (
StartEvent,
StopEvent,
InputRequiredEvent,
HumanResponseEvent,
)
from workflows.server import WorkflowServer

class RequestEvent(InputRequiredEvent):
prompt: str

class ResponseEvent(HumanResponseEvent):
response: str

class OutEvent(StopEvent):
output: str

class HumanInTheLoopWorkflow(Workflow):
@step
async def prompt_human(self, ev: StartEvent, ctx: Context) -> RequestEvent:
return RequestEvent(prompt="What is your name?")

@step
async def greet_human(self, ev: ResponseEvent) -> OutEvent:
return OutEvent(output=f"Hello, {ev.response}")

server = WorkflowServer()
server.add_workflow("human", HumanInTheLoopWorkflow(timeout=1000))
await server.serve("0.0.0.0", "8080")
```

You can now run the workflow and, when the human interaction is required, send the human response back:

```python
from workflow.client import WorkflowClient

client = WorkflowClient(base_url="http://0.0.0.0:8080")
handler = await client.run_workflow_nowait("human")
handler_id = handler.handler_id
print(handler_id)
async for event in client.get_workflow_events(handler_id=handler_id):
if "RequestEvent" == event.type
print(
"Workflow is requiring human input:",
event.value.get("prompt", ""),
)
name = input("Reply here: ")
sent_event = await client.send_event(
handler_id=handler_id,
event=ResponseEvent(response=name.capitalize().strip()),
)
msg = "Event has been sent" if sent_event else "Event failed to send"
print(msg)
result = await client.get_result(handler_id)
print(f"Workflow complete with status: {result.status})")
res = OutEvent.model_validate(result.result)
print("Received final message:", res.output)
```
Loading