Example demonstrating how to use the WorkflowServer with event streaming.

This example shows how to:
1. Create workflows that emit streaming events
2. Set up the server with event streaming support
3. Register workflows
4. Run the server
5. Make HTTP requests to execute workflows
6. Stream real-time events from running workflows using the /events endpoint

In [1]:
%pip install llama-index-workflows[server]

## Define a workflow and setup the server

In [2]:
%%writefile server.py
import asyncio

from workflows import Workflow, step
from workflows.context import Context
from workflows.events import Event, StartEvent, StopEvent
from workflows.server import WorkflowServer


class StreamEvent(Event):
    sequence: int


# Define a simple workflow
class GreetingWorkflow(Workflow):
    @step
    async def greet(self, ctx: Context, ev: StartEvent) -> StopEvent:
        for i in range(3):
            ctx.write_event_to_stream(StreamEvent(sequence=i))
            await asyncio.sleep(0.3)

        name = getattr(ev, "name", "World")
        return StopEvent(result=f"Hello, {name}!")


class ProgressEvent(Event):
    step: str
    progress: int
    message: str


class MathWorkflow(Workflow):
    @step
    async def calculate(self, ev: StartEvent) -> StopEvent:
        a = getattr(ev, "a", 0)
        b = getattr(ev, "b", 0)
        operation = getattr(ev, "operation", "add")

        if operation == "add":
            result = a + b
        elif operation == "multiply":
            result = a * b
        elif operation == "subtract":
            result = a - b
        elif operation == "divide":
            result = a / b if b != 0 else None
        else:
            result = None

        return StopEvent(
            result={"a": a, "b": b, "operation": operation, "result": result}
        )


class ProcessingWorkflow(Workflow):
    """Example workflow that demonstrates event streaming with progress updates."""

    @step
    async def process(self, ctx: Context, ev: StartEvent) -> StopEvent:
        items = getattr(ev, "items", ["item1", "item2", "item3", "item4", "item5"])

        ctx.write_event_to_stream(
            ProgressEvent(
                step="start",
                progress=0,
                message=f"Starting processing of {len(items)} items",
            )
        )

        results = []
        for i, item in enumerate(items):
            # Simulate processing time
            await asyncio.sleep(0.5)

            # Emit progress event
            progress = int((i + 1) / len(items) * 100)
            ctx.write_event_to_stream(
                ProgressEvent(
                    step="processing",
                    progress=progress,
                    message=f"Processed {item} ({i + 1}/{len(items)})",
                )
            )

            results.append(f"processed_{item}")

        ctx.write_event_to_stream(
            ProgressEvent(
                step="complete",
                progress=100,
                message="Processing completed successfully",
            )
        )

        return StopEvent(result={"processed_items": results, "total": len(results)})


async def main():
  server = WorkflowServer()

  # Register workflows
  server.add_workflow("greeting", GreetingWorkflow())
  server.add_workflow("math", MathWorkflow())
  server.add_workflow("processing", ProcessingWorkflow())

  await server.serve(host="0.0.0.0", port=8000)


if __name__ == "__main__":
    asyncio.run(main())

Writing server.py


## Run the server in background

In [3]:
!nohup python server.py &

nohup: appending output to 'nohup.out'


## Interact with the server

In [4]:
# Hit the health endpoint to see the server is up and running
!curl http://localhost:8000/health

{"status":"healthy"}

In [5]:
# List available workflows
!curl http://localhost:8000/workflows

{"workflows":["greeting","math","processing"]}

In [6]:
# Run greeting workflow
!curl -X POST http://localhost:8000/workflows/greeting/run \
  -H "Content-Type: application/json" \
  -d '{"kwargs": {"name": "Alice"}}'

{"result":"Hello, Alice!"}

In [7]:
# Run math workflow
!curl -X POST http://localhost:8000/workflows/math/run \
  -H "Content-Type: application/json" \
  -d '{"kwargs": {"a": 10, "b": 5, "operation": "multiply"}}'

{"result":{"a":10,"b":5,"operation":"multiply","result":50}}

In [36]:
%%bash
# Run workflow with nowait
handler_id=$(curl -sX POST http://localhost:8000/workflows/math/run-nowait \
  -H "Content-Type: application/json" \
  -d '{"kwargs": {"a": 100, "b": 25, "operation": "divide"}}' | jq -r ".handler_id" )
printf "Got handler id: ${handler_id}\n\n"

# Wait for the workflow to run in background
sleep 1

# Fetch the result asynchronously
curl -s http://localhost:8000/results/${handler_id}

Got handler id: vuyyRzWikr
{"result":{"a":100,"b":25,"operation":"divide","result":4.0}}

In [40]:
%%bash
# Stream events from workflow

# 1. Run workflow with nowait
handler_id=$(curl -sX POST http://localhost:8000/workflows/greeting/run-nowait \
  -H "Content-Type: application/json" \
  -d '{"kwargs": {"name": "Async User"}}' | jq -r ".handler_id" )
printf "Got handler id: ${handler_id}\n\n"

# Wait for the workflow to run in background
sleep 1

printf "Streaming events...\n"
# 2. Stream events using Server-Sent Events using SSE format
curl -s http://localhost:8000/events/$handler_id?sse=true


printf "\nFinal result:\n"
# 3. Get the final result after events complete
curl -s http://localhost:8000/results/$handler_id

Got handler id: nPBSVhI8DO

Streaming events...
event: __main__.StreamEvent
data: {"sequence": 0}
event: __main__.StreamEvent
data: {"sequence": 1}
event: __main__.StreamEvent
data: {"sequence": 2}
event: workflows.events.StopEvent
data: {}

Final result:
{"result":"Hello, Async User!"}