## Lesson 2: Building a Workflow

Lesson objective: Use event-based Workflows to define the flow of information around a system

In this lab, you'll learn about Workflow concepts.


In [1]:
import asyncio
import os
import random

from dotenv import load_dotenv

In [2]:
load_dotenv()

api_key = os.environ["OPENAI_API_KEY"]

### Creating a Workflow


In [3]:
from llama_index.core.workflow import Context, StartEvent, StopEvent, Workflow, step

In [4]:
class MyWorkflow(Workflow):
    # declare a function as a step
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        await asyncio.sleep(0.3)
        print("aloha!")

        return StopEvent(result="Hello, world!")

In [5]:
# instantiate the workflow
basic_workflow = MyWorkflow(timeout=10, verbose=False)

# run the workflow
result = await basic_workflow.run()

print(result)

aloha!
Hello, world!


### Visualizing a workflow


In [6]:
from llama_index.utils.workflow import draw_all_possible_flows

In [7]:
draw_all_possible_flows(basic_workflow, filename="./workflows/basic_workflow.html")

./workflows/basic_workflow.html


### Creating Custom Events


In [8]:
from llama_index.core.workflow import Event

In [9]:
class FirstEvent(Event):
    first_output: str


class SecondEvent(Event):
    second_output: str

In [10]:
# `step_one` takes a `StartEvent` and returns a `FirstEvent`
# `step_two` takes a `FirstEvent` and returns a `SecondEvent`
# `step_three` takes a `SecondEvent` and returns a `StopEvent`


class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent) -> FirstEvent:
        print(ev.first_input)
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete.")

In [11]:
workflow = MyWorkflow(timeout=10, verbose=False)

result = await workflow.run(first_input="Start the workflow.")

print(result)

Start the workflow.
First step complete.
Second step complete.
Workflow complete.


In [12]:
WORKFLOW_FILE = "workflows/custom_events.html"

draw_all_possible_flows(workflow, filename=WORKFLOW_FILE)

workflows/custom_events.html


### Creating Loops


In [13]:
class LoopEvent(Event):
    loop_output: str

In [14]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print("Bad thing happened")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Good thing happened")
            return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second step complete.")

    @step
    async def step_three(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Workflow complete.")

In [15]:
loop_workflow = MyWorkflow(timeout=10, verbose=False)

result = await loop_workflow.run(first_input="Start the workflow.")

print(result)

Bad thing happened
Bad thing happened
Good thing happened
First step complete.
Second step complete.
Workflow complete.


In [16]:
WORKFLOW_FILE = "workflows/loop_events.html"

draw_all_possible_flows(loop_workflow, filename=WORKFLOW_FILE)

workflows/loop_events.html


### Branching


In [17]:
class BranchA1Event(Event):
    payload: str


class BranchA2Event(Event):
    payload: str


class BranchB1Event(Event):
    payload: str


class BranchB2Event(Event):
    payload: str

In [18]:
class BranchWorkflow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to branch A")
            return BranchA1Event(payload="Branch A")
        else:
            print("Go to branch B")
            return BranchB1Event(payload="Branch B")

    @step
    async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
        print(ev.payload)
        return BranchA2Event(payload=ev.payload)

    @step
    async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
        print(ev.payload)
        return BranchB2Event(payload=ev.payload)

    @step
    async def step_a2(self, ev: BranchA2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch A complete.")

    @step
    async def step_b2(self, ev: BranchB2Event) -> StopEvent:
        print(ev.payload)
        return StopEvent(result="Branch B complete.")

In [19]:
WORKFLOW_FILE = "workflows/branching.html"

draw_all_possible_flows(BranchWorkflow, filename=WORKFLOW_FILE)

workflows/branching.html


### Concurent Execution


In [20]:
class StepTwoEvent(Event):
    query: str


class ParallelFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        print(ev.query, "completed")

        return StopEvent(result=ev.query)

In [21]:
parallel_workflow = ParallelFlow(timeout=10, verbose=False)

result = await parallel_workflow.run()

print(result)

Running slow query  Query 1
Running slow query  Query 2
Running slow query  Query 3
Query 3 completed
Query 3


#### Collecting events


In [22]:
class StepThreeEvent(Event):
    result: str


class ConcurrentFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StepThreeEvent:
        print("Running query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=ev.query)

    @step
    async def step_three(self, ctx: Context, ev: StepThreeEvent) -> StopEvent | None:
        # wait until we receive 3 events
        result = ctx.collect_events(ev, [ev.__class__] * 3)

        if result is None:
            print("Not all events received yet.")
            return None

        # do something with all 3 results together
        print(result)
        return StopEvent(result="Done")

In [23]:
w = ConcurrentFlow(timeout=10, verbose=False)

result = await w.run(message="Start the workflow.")

print(result)

Running query  Query 1
Running query  Query 2
Running query  Query 3
Not all events received yet.
Not all events received yet.
[StepThreeEvent(result='Query 2'), StepThreeEvent(result='Query 3'), StepThreeEvent(result='Query 1')]
Done


_Note:_ This flow control lets you perform map-reduce style tasks.
To implement a map-reduce pattern, you would split your task up into as many steps as necessary,
and use `Context` to store that number with `ctx.set("num_events", some_number)`.
Then in `step_three` you would wait for the number stored in the context using `await ctx.get("num_events")`.
So you don't need to know in advance exactly how many concurrent steps you're taking. You'll do exactly this in a later lesson.


#### Collecting different event types


In [24]:
class StepAEvent(Event):
    query: str


class StepACompleteEvent(Event):
    result: str


class StepBEvent(Event):
    query: str


class StepBCompleteEvent(Event):
    result: str


class StepCEvent(Event):
    query: str


class StepCCompleteEvent(Event):
    result: str

In [25]:
class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent | None:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        events = ctx.collect_events(
            ev,
            [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],
        )

        if events is None:
            print("Not all events received yet.")
            return None

        # do something with all 3 results together
        print("All events received: ", events)

        return StopEvent(result="Done")

In [26]:
w = ConcurrentFlow(timeout=10, verbose=False)

result = await w.run(message="Start the workflow.")

print(result)

Doing something A-ish
Doing something B-ish
Doing something C-ish
Received event  Query 1
Not all events received yet.
Received event  Query 2
Not all events received yet.
Received event  Query 3
All events received:  [StepCCompleteEvent(result='Query 3'), StepACompleteEvent(result='Query 1'), StepBCompleteEvent(result='Query 2')]
Done


In [27]:
WORKFLOW_FILE = "workflows/concurrent_different_events.html"

draw_all_possible_flows(w, filename=WORKFLOW_FILE)

workflows/concurrent_different_events.html


### Streaming

In practical use, agents can take a long time to run. It's a poor user-experience to have the user execute a workflow and then wait a long time to see if it works or not; it's better to give them some indication that things are happening in real-time, even if the process is not complete.

To do this, Workflows allow streaming events back to the user. Here you'll use Context.write_event_to_stream to emit these events.


In [28]:
from llama_index.llms.openai import OpenAI

In [29]:
class FirstEvent(Event):
    first_output: str


class SecondEvent(Event):
    second_output: str
    response: str


class TextEvent(Event):
    delta: str


class ProgressEvent(Event):
    msg: str

The specific event we'll be sending back is the "delta" responses from the LLM. When you ask an LLM to generate a streaming response as you're doing here, it sends back each chunk of its response as it becomes available. This is available as the "delta". You're going to wrap the delta in a TextEvent and send it back to the Workflow's own stream.


In [30]:
llm = OpenAI(
    model="gpt-4.1-mini",
    api_key=api_key,
    api_base="https://models.inference.ai.azure.com/",
    temperature=0.5,
)

In [31]:
class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening\n"))
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        generator = await llm.astream_complete(
            "Please give me the first 50 words of Moby Dick, a book in the public domain."
        )

        async for response in generator:
            # Allow the workflow to stream this piece of response
            ctx.write_event_to_stream(TextEvent(delta=response.delta))

        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="\nStep three is happening"))
        return StopEvent(result="Workflow complete.")

In [32]:
workflow = MyWorkflow(timeout=30, verbose=False)
handler = workflow.run(first_input="Start the workflow.")

async for ev in handler.stream_events():
    if isinstance(ev, ProgressEvent):
        print(ev.msg, flush=True)
    elif isinstance(ev, TextEvent):
        print(ev.delta, end="", flush=True)

final_result = await handler

print("\nFinal result = ", final_result)

Step one is happening

Certainly! Here are the first 50 words of *Moby-Dick* by Herman Melville:

"Call me Ishmael. Some years ago—never mind how long precisely—having little or no money in my purse, and nothing particular to interest me on shore, I thought I would sail about a little and see the watery part of the world. It is a way I have of..."
Step three is happening

Final result =  Workflow complete.


[Guide to LlamaIndex's Workflows](https://docs.llamaindex.ai/en/stable/module_guides/workflow/)
