In [None]:
!pip install -U llama-index-core llama-index-llms-openai llama-index-utils-workflow python-dotenv

In [None]:
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Context
)
import random
from llama_index.utils.workflow import draw_all_possible_flows
from llama_index.utils.workflow import draw_most_recent_execution
from llama_index.llms.openai import OpenAI

In [None]:
from dotenv import load_dotenv
load_dotenv()

In [None]:
from llama_index.llms.openai import OpenAI

class OpenAIGenerator(Workflow):
    @step()
    async def generate(self, ev: StartEvent) -> StopEvent:
        llm = OpenAI(model="gpt-4o")
        response = await llm.acomplete(ev.query)
        return StopEvent(result=str(response))

w = OpenAIGenerator(timeout=10, verbose=False)
result = await w.run(query="What's LlamaIndex?")
print(result)

In [None]:
class FailedEvent(Event):
    error: str

class QueryEvent(Event):
    query: str

class LoopExampleFlow(Workflow):

    @step()
    async def answer_query(self, ev: StartEvent | QueryEvent ) -> FailedEvent | StopEvent:
        query = ev.query
        # try to answer the query
        random_number = random.randint(0, 1)
        if (random_number == 0):
            return FailedEvent(error="Failed to answer the query.")
        else:
            return StopEvent(result="The answer to your query")

    @step()
    async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:
        # improve the query or decide it can't be fixed
        random_number = random.randint(0, 1)
        if (random_number == 0):
            return QueryEvent(query="Here's a better query.")
        else:
            return StopEvent(result="Your query can't be fixed.")

In [None]:
l = LoopExampleFlow(timeout=10, verbose=True)
result = await l.run(query="What's LlamaIndex?")
print(result)

In [None]:
class GlobalExampleFlow(Workflow):

    @step(pass_context=True)
    async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:
        # load our data here
        ctx.data["some_database"] = ["value1","value2","value3"]

        return QueryEvent(query=ev.query)

    @step(pass_context=True)
    async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:
        # use our data with our query
        data = ctx.data["some_database"]

        result = f"The answer to your query is {data[1]}"
        return StopEvent(result=result)

In [None]:
g = GlobalExampleFlow(timeout=10, verbose=True)
result = await g.run(query="What's LlamaIndex?")
print(result)

In [None]:
class WaitExampleFlow(Workflow):

    @step(pass_context=True)
    async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent:
        if (hasattr(ev,"data")):
            ctx.data["some_database"] = ev.data

        return StopEvent(result=None)

    @step(pass_context=True)
    async def query(self, ctx: Context, ev: StartEvent) -> StopEvent:
        if (hasattr(ev,"query")):
            # do we have any data?
            if ("some_database" in ctx.data):
                data = ctx.data["some_database"]
                return StopEvent(result=f"Got the data {data}")
            else:
                # there's non data yet
                return None
        else:
            # this isn't a query
            return None

In [None]:
w = WaitExampleFlow(verbose=True)
result = await w.run(query="Can I kick it?")
if (result is None):
    print("No you can't")
print("---")
result = await w.run(data="Yes you can")
print("---")
result = await w.run(query="Can I kick it?")
print(result)

In [None]:
class InputEvent(Event):
    input: str

class SetupEvent(Event):
    error: bool

class QueryEvent(Event):
    query: str

class CollectExampleFlow(Workflow):

    @step(pass_context=True)
    async def setup(self,ctx: Context, ev: StartEvent) -> SetupEvent:
        # generically start everything up
        if ("setup" not in ctx.data):
            ctx.data["setup"] = True
            print("I got set up")
            return SetupEvent(error=False)

    @step()
    async def collect_input(self, ev: StartEvent ) -> InputEvent:
        if (hasattr(ev, 'input')):
            # perhaps validate the input
            print("I got some input")
            return InputEvent(input=ev.input)

    @step()
    async def parse_query(self, ev: StartEvent) -> QueryEvent:
        if (hasattr(ev, 'query')):
            # parse the query in some way
            print("I got a query")
            return QueryEvent(query=ev.query)

    @step(pass_context=True)
    async def run_query(self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent) -> StopEvent:
        ready = ctx.collect_events(ev,[QueryEvent, InputEvent, SetupEvent])
        if (ready is None):
            print("Not enough events yet")
            return StopEvent()

        # run the query
        print("Now I have all the events")
        print(ready)
        return StopEvent(result=f"Ran query {ready[0].query} on input {ready[1].input}")

In [None]:
c = CollectExampleFlow()
result = await c.run()
print("---")
result = await c.run(input="Here's some input")
print("---")
result = await c.run(query="Here's my question")
print(result)

In [None]:
class FailureEvent(Event):
    error: str

class FailureExampleFlow(Workflow):

    @step()
    async def try_to_do_thing(self, ev: StartEvent) -> StopEvent | FailureEvent:
        random_number = random.randint(0, 10)
        if (random_number < 5):
            print("There was a failure")
            return FailureEvent(error=f"This is the bad place: {random_number}")
        else:
            print("There was success")
            return StopEvent(result="Success")

    @step(pass_context=True)
    async def monitor_failures(self, ctx: Context, ev: FailureEvent ) -> StopEvent | None:
        failed = ctx.collect_events(ev, [FailureEvent, FailureEvent, FailureEvent])
        if failed is not None:
            print(failed)
            print("The simulation has failed!")
            return StopEvent(result="Failure")

        return StopEvent()

In [None]:
f = FailureExampleFlow(verbose=False)
result = await f.run()
print("====", result)
result = await f.run()
print("====", result)
result = await f.run()
print("====", result)
result = await f.run()
print("====", result)
result = await f.run()
print("====", result)
result = await f.run()