diff --git a/docs/src/content/docs/workflows/deployment.md b/docs/src/content/docs/workflows/deployment.md index 1119da4e..f4fd052e 100644 --- a/docs/src/content/docs/workflows/deployment.md +++ b/docs/src/content/docs/workflows/deployment.md @@ -129,3 +129,105 @@ To start a workflow without waiting for it to finish, use the `/run-nowait` endp ``` You can then use the `handler_id` to check for the result or stream events. + +## Using `WorkflowClient` to Interact with Servers + +In order to interact with a deployed `WorkflowServer` programmatically, beyond the raw API calls detailed above, we also provide a `WorkflowClient` class. + +`WorkflowClient` provides methods for listing available workflows and workflow handlers, verifying the health of the server, running a workflow (both synchronously and asynchronously), streaming events and sending events. + +Assuming you are running the server example from above, we can use `WorkflowClient` in the following way: + +```python +from workflows.client import WorkflowClient + +async def main() + client = WorkflowClient(base_url="http://0.0.0.0:8080") + workflows = await client.list_workflows() + print("===== AVAILABLE WORKFLOWS ====") + print(workflows) + await client.is_healthy() # will raise an exception if the server is not healthy + handler = await client.run_workflow_nowait( + "greet", + start_event=StartEvent(name="John"), + context=None, + ) + handler_id = handler.handler_id + print("==== STARTING THE WORKFLOW ===") + print(f"Workflow running with handler ID: {handler_id}") + print("=== STREAMING EVENTS ===") + + async for event in client.get_workflow_events(handler_id=handler_id): + print("Received data:", event) + result = await client.get_result(handler_id) + + print(f"Final result: {result.result} (status: {result.status})") + +if __name__ == "__main__": + import asyncio + asyncio.run(main()) +``` + +You can use the client also to interactively run human-in-the-loop workflows, like this one: + +```python +from workflows import Workflow, step +from workflows.context import Context +from workflows.events import ( + StartEvent, + StopEvent, + InputRequiredEvent, + HumanResponseEvent, +) +from workflows.server import WorkflowServer + +class RequestEvent(InputRequiredEvent): + prompt: str + +class ResponseEvent(HumanResponseEvent): + response: str + +class OutEvent(StopEvent): + output: str + +class HumanInTheLoopWorkflow(Workflow): + @step + async def prompt_human(self, ev: StartEvent, ctx: Context) -> RequestEvent: + return RequestEvent(prompt="What is your name?") + + @step + async def greet_human(self, ev: ResponseEvent) -> OutEvent: + return OutEvent(output=f"Hello, {ev.response}") + +server = WorkflowServer() +server.add_workflow("human", HumanInTheLoopWorkflow(timeout=1000)) +await server.serve("0.0.0.0", "8080") +``` + +You can now run the workflow and, when the human interaction is required, send the human response back: + +```python +from workflow.client import WorkflowClient + +client = WorkflowClient(base_url="http://0.0.0.0:8080") +handler = await client.run_workflow_nowait("human") +handler_id = handler.handler_id +print(handler_id) +async for event in client.get_workflow_events(handler_id=handler_id): + if "RequestEvent" == event.type + print( + "Workflow is requiring human input:", + event.value.get("prompt", ""), + ) + name = input("Reply here: ") + sent_event = await client.send_event( + handler_id=handler_id, + event=ResponseEvent(response=name.capitalize().strip()), + ) + msg = "Event has been sent" if sent_event else "Event failed to send" + print(msg) +result = await client.get_result(handler_id) +print(f"Workflow complete with status: {result.status})") +res = OutEvent.model_validate(result.result) +print("Received final message:", res.output) +```