# Llama-Index Workflow Patterns

In [25]:
import asyncio
import nest_asyncio
nest_asyncio.apply()

In [26]:
from llama_index.utils.workflow import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)

In [27]:
from random import random
from typing import Any
from workflows import Context, Workflow, step
from workflows.errors import WorkflowRuntimeError
from workflows.events import (
    Event,
    StartEvent,
    StopEvent,
)

class DispatchEvent(Event):
    pass

class FetchDataEvent(Event):
    """Event to trigger data collection with a query"""
    query: str

class ResponseDataEvent(Event):
    """Event to return data collected to the following step"""
    data: Any

## First Pattern: Single Data Collection from One Provider

Docs:
- https://docs.llamaindex.ai/en/stable/workflows/

In [28]:
class Pattern1Workflow(Workflow):
    """
    Pattern1Workflow launches a single data collection using one provider 
    """
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        # create providers
        self.provider = random

    @step
    async def fetch_data(self, ev: StartEvent) -> StopEvent:
        """
        Entry step initiated by StartEvent
        Running w.run(query="MSFT") where w is an instantiate Workflow
        makes query available on StartEvent (which can be subclassed)
        In this very simple scenario it is also the exit step
        raising StopEvent.
        Like functions, if there are two many instructions in this step
        It would make sense to break the step into two or more
        sequantial step where the returned event of the previous step
        is the ev parameter of the fucntion decorated with @step
        that implements the next step in the sequence.
        """
        query = ev.query # possibly a ticker
        # Collect data with provider1
        d = self.provider()
        # Do some data processing if needed
        result = query + " -> " + str(d)
        # Return an event to allow next step to follow sequentially
        return StopEvent(result=result)

try:
    w = Pattern1Workflow()
    result = await w.run(query="AAPL")
    print(result)
except WorkflowRuntimeError as e:
    print(e)

AAPL -> 0.2041220296720596


In [29]:
draw_all_possible_flows(Pattern1Workflow, filename="flows/pattern_1.html")

flows/pattern_1.html


## Second Pattern: Collect and Combine Data From Various Providers

Docs:
- https://docs.llamaindex.ai/en/stable/examples/workflow/workflows_cookbook/

In [30]:
class FetchData1Event(FetchDataEvent):
    """Event to trigger the data collection with provider1"""
    pass

class FetchData2Event(FetchDataEvent):
    """Event to trigger the data collection with provider2"""
    pass

class ResponseData1Event(ResponseDataEvent):
    """Event to pass the data collection with provider1 to the following step"""
    pass

class ResponseData2Event(ResponseDataEvent):
    """Event to pass the data collection with provider2 to the following step"""
    pass

class Pattern2Workflow(Workflow):
    """
    Pattern2Workflow launches parallel data collection using providers 
    """
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        # create providers
        self.provider1 = random
        self.provider2 = random

    @step
    async def dispatch(
        self, ctx: Context, ev: StartEvent
    ) -> FetchData1Event | FetchData2Event | DispatchEvent:
        """
        Entry step initiated by StartEvent
        Running w.run(query="MSFT") where w is an instantiate Workflow
        makes query available on StartEvent (which can be subclassed)
        """
        query = ev.query # possibly a ticker
        # Do some setup/prep work here
        # ...
        # Make data available to other steps
        await ctx.store.set("query", query)
        # Manually send events to trigger parallel steps (return is sequential)
        ctx.send_event(FetchData1Event(query=query))
        ctx.send_event(FetchData2Event(query=query))
        # We need to return an event anyway
        return DispatchEvent()

    @step
    async def fetch_data_1(self, ev: FetchData1Event) -> ResponseData1Event:
        """
        Step triggered by FetchData1Event to collect data with provider1
        """
        # Collect data with provider1
        d = self.provider1()
        # Do some data processing if needed
        data = "1. " + ev.query + " -> " + str(d)
        # Return an event to allow next step to follow sequentially
        return ResponseData1Event(data=data)

    @step
    async def fetch_data_2(self, ev: FetchData2Event) -> ResponseData2Event:
        """
        Step triggered by FetchData2Event to collect data with provider2
        """
        # Collect data with provider 2
        d = self.provider2()
        # Do some data processing if needed 
        data = "2. " + ev.query + " -> " + str(d)
        # Return an event to allow next step to follow sequentially
        return ResponseData2Event(data=data)

    @step
    async def combine(
        self, ctx: Context, ev: DispatchEvent | ResponseData1Event | ResponseData2Event
    ) -> StopEvent | None:
        """
        Exit step where all events (and data) combine
        """
        # Continue waiting after receiving DispatchEvent
        if isinstance(ev, DispatchEvent):
            return None
        # Wait for receiving both ResponseData1Event and ResponseData2Event
        events = ctx.collect_events(ev, [ResponseData1Event, ResponseData2Event])
        if not events:
            return None
        # All required events received to process this step
        ev1, ev2 = events
        # Get data made available to all steps via context store
        query = await ctx.store.get("query")
        # Do some further data processing if needed
        result = { "query": query, "data": [ev1.data, ev2.data]}
        # Return the result in a StopEvent which exits the workflow
        # Note that StopEvent can be subclassed
        return StopEvent(result=result)

try:
    w = Pattern2Workflow()
    result = await w.run(query="AAPL")
    result["data"]
except WorkflowRuntimeError as e:
    print(e)

In [31]:
draw_all_possible_flows(Pattern2Workflow, filename="flows/pattern_2.html")

flows/pattern_2.html


### Third Pattern: Iterate to Collect Data from the Same Provider and Combine

Docs:
- https://docs.llamaindex.ai/en/stable/examples/workflow/parallel_execution/

In [32]:
class Pattern3Workflow(Workflow):
    """
    Pattern3Workflow launches parallel data collection using one provider
    and several queries 
    """
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        # create providers
        self.provider = random

    @step
    async def dispatch(
        self, ctx: Context, ev: StartEvent
    ) -> FetchDataEvent | DispatchEvent:
        """
        Entry step initiated by StartEvent
        Running w.run(query=["AAPL", "GOOG", "MSFT"])
        where w is an instantiate Workflow
        makes query available on StartEvent (which can be subclassed)
        """
        queries = ev.queries # possibly a list of tickers
        # Do some setup/prep work here
        # ...
        # Make data available to other steps
        # Here we need to know how many FetchDataEvent we need to collect
        await ctx.store.set("num_to_collect", len(queries))
        # Manually send events to trigger parallel steps (return is sequential)
        for query in queries:
            ctx.send_event(FetchDataEvent(query=query))
        # We need to return an event anyway
        return DispatchEvent()

    @step(num_workers=3) # We can define the number of workers (defaults to 4)
    async def fetch_data(self, ev: FetchDataEvent) -> ResponseDataEvent:
        """
        Step triggered by FetchDataEvent to collect data with provider
        Step will run for each query in queries in parallel 
        """
        # Collect data from a provider
        d = self.provider()
        # Do some data processing if needed 
        data = ev.query + " -> " + str(d)
         # Return an event to allow next step to follow sequentially
        return ResponseDataEvent(data=data)

    @step
    async def combine(
        self, ctx: Context, ev: DispatchEvent | ResponseDataEvent
    ) -> StopEvent | None:
        """
        Exit step where all events (and data) combine
        """
        # Continue waiting after receiving DispatchEvent
        if isinstance(ev, DispatchEvent):
            return None
        # Wait for all events to finish (we know how many to collect)
        num_to_collect = await ctx.store.get("num_to_collect")
        events = ctx.collect_events(ev, [ResponseDataEvent] * num_to_collect)
        if not events:
            return None
        # Do some further data processing if needed
        result = [ev.data for ev in events]
        # Return the result in a StopEvent which exits the workflow
        # Note that StopEvent can be subclassed
        return StopEvent(result=result)

try:
    w = Pattern3Workflow()
    result = await w.run(queries=["AAPL", "GOOG", "MSFT"])
    print(result)
except WorkflowRuntimeError as e:
    print(e)

['AAPL -> 0.5534719916765076', 'GOOG -> 0.37682825392136343', 'MSFT -> 0.9931788901406868']


In [33]:
draw_all_possible_flows(Pattern3Workflow, filename="flows/pattern_3.html")

flows/pattern_3.html


# Fourth Pattern: Exception Management

In most cases, the flow is useless if one of the steps fails. In such case, let exceptions bubble.

In some cases, for example when displaying several tickers, it is worthwhile to catch exceptions within each step and to return a mix of data (successsful tickers) and exceptions (failed tickers) in the returned event.

In [35]:
# Stubs for @app.lib.exceptions
class ProviderException(Exception):
    pass

class WorkflowException(Exception):
    pass

class Pattern4Workflow(Workflow):
    """
    Pattern4Workflow fails
    """

    @step
    async def fetch_data(self, ev: StartEvent) -> StopEvent:
        """
        A step that raises an exception
        Only raise WorkflowException to avoid too-broad-exception warnings
        Do not catch other exceptions in steps, let them bubble up
        to be handled by the workflow engine so that we only end up
        with WorkflowException or ProviderException in the end.
        These exceptions shall be handled by the global exception handler
        in the UI layer to display a toast message to the user.
        """
        raise WorkflowException(f"Can't deal with {ev.query}")
        return StopEvent

try:
    w = Pattern4Workflow()
    result = await w.run(query="AAPL")
    print(result)
except ProviderException as e:
    # raise e (let it bubble up)
    print(e)
except WorkflowException as e:
    # raise e (let it bubble up)
    print(e)
except WorkflowRuntimeError as e:
    # raise a WorkflowException instead
    print(e)

Can't deal with AAPL
