# Workflow

In [4]:
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

from llama_index.llms.openai import OpenAI

In [5]:
class JokeEvent(Event):
    joke: str

In [3]:
class JokeFlow(Workflow):
    llm = OpenAI()

    @step()
    async def generate_joke(self, ev: StartEvent) -> JokeEvent:
        topic = ev.topic
        prompt = f"Write your best joke about {topic}."
        response = await self.llm.acomplete(prompt)
        return JokeEvent(joke=str(response))

    @step()
    async def critique_joke(self, ev: JokeEvent) -> StopEvent:
        joke = ev.joke

        prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
        response = await self.llm.acomplete(prompt)
        return StopEvent(result=str(response))


w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))

Analysis:
This joke plays on the pun of "fish and ships" sounding like "fish and chips," a popular dish at seafood restaurants. The joke also incorporates the pirate theme by mentioning a pirate going to a seafood restaurant, which adds an element of humor and surprise.

Critique:
Overall, this joke is light-hearted and playful, making it suitable for a general audience. The use of wordplay is clever and adds an element of wit to the punchline. However, the joke may be considered somewhat predictable as the punchline is somewhat obvious once the setup is given. Additionally, the humor may not appeal to everyone as it relies on a specific pun that may not resonate with all listeners. Overall, while the joke is amusing and well-crafted, it may not be considered particularly original or groundbreaking.


In [4]:
from llama_index.utils.workflow import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)

# Draw all
draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")

# Draw an execution
w = JokeFlow()
await w.run(topic="Pirates")
draw_most_recent_execution(w, filename="joke_flow_recent.html")

joke_flow_all.html
joke_flow_recent.html


In [5]:
from typing import Union
from llama_index.core.workflow import step, Context, Event, Workflow, StartEvent, StopEvent

class MyEvent(Event):
    pass

class MyEventResult(Event):
    result: str

class GatherEvent(Event):
    pass

class MyWorkflow(Workflow):
    @step()
    async def dispatch_step(self, ev: StartEvent) -> Union[MyEvent, GatherEvent]:
        # Manually trigger two MyEvent events
        self.send_event(MyEvent())
        self.send_event(MyEvent())
        
        # Return a GatherEvent to signal the next step
        return GatherEvent()

    @step()
    async def handle_my_event(self, ev: MyEvent) -> MyEventResult:
        # Handle MyEvent and return a MyEventResult
        return MyEventResult(result="result")

    @step(pass_context=True)
    async def gather(self, ctx: Context, ev: Union[GatherEvent, MyEventResult]) -> Union[StopEvent, None]:
        # Wait for MyEventResult events to finish
        events = ctx.collect_events(ev, [MyEventResult, MyEventResult])
        if not events:
            return None
        
        # Return a StopEvent with the gathered results
        return StopEvent(result=events)

# Running the workflow in an async environment
async def main():
    workflow = MyWorkflow()
    
    # Pass any necessary attributes directly to the run() method
    result = await workflow.run(topic="example_topic")  # Use keyword arguments here
    print(result)

# Use await main() in interactive environments like Jupyter Notebook
await main()


[MyEventResult(result='result'), MyEventResult(result='result')]


In [6]:
from llama_index.utils.workflow import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)

# Assuming JokeFlow is already defined as per your previous examples
# Draw all possible flows
draw_all_possible_flows(MyWorkflow, filename="my_flow_all.html")

# Draw an execution
w = MyWorkflow()
await w.run(topic="Pirates")
draw_most_recent_execution(w, filename="my_flow_recent.html")


my_flow_all.html
my_flow_recent.html


# Reflection Workflow for Structured Outputs¶

In [6]:
from llama_index.core.workflow import Event


class ExtractionDone(Event):
    output: str
    passage: str


class ValidationErrorEvent(Event):
    error: str
    wrong_output: str
    passage: str

In [7]:
from pydantic import BaseModel


class Car(BaseModel):
    brand: str
    model: str
    power: int


class CarCollection(BaseModel):
    cars: list[Car]

In [37]:
import json

from llama_index.core.workflow import (
    Workflow,
    StartEvent,
    StopEvent,
    Context,
    step,
)
from llama_index.llms.ollama import Ollama
import nest_asyncio

nest_asyncio.apply()

EXTRACTION_PROMPT = """
Context information is below:
---------------------
{passage}
---------------------

Given the context information and not prior knowledge, create a JSON object from the information in the context.
The JSON object must follow the JSON schema:
{schema}

"""

REFLECTION_PROMPT = """
You already created this output previously:
---------------------
{wrong_answer}
---------------------

This caused the JSON decode error: {error}

Try again, the response must contain only valid JSON code. Do not add any sentence before or after the JSON object.
Do not repeat the schema.
"""


class ReflectionWorkflow(Workflow):
    max_retries: int = 3

    @step(pass_context=True)
    async def extract(
        self, ctx: Context, ev: StartEvent | ValidationErrorEvent
    ) -> StopEvent | ExtractionDone:
        current_retries = ctx.data.get("retries", 0)
        if current_retries >= self.max_retries:
            return StopEvent(result="Max retries reached")
        else:
            ctx.data["retries"] = current_retries + 1

        if isinstance(ev, StartEvent):
            passage = ev.get("passage")
            if not passage:
                return StopEvent(result="Please provide some text in input")
            reflection_prompt = ""
        elif isinstance(ev, ValidationErrorEvent):
            passage = ev.passage
            reflection_prompt = REFLECTION_PROMPT.format(
                wrong_answer=ev.wrong_output, error=ev.error
            )

        llm = Ollama(model="llama3.1", request_timeout=120)
        prompt = EXTRACTION_PROMPT.format(
            passage=passage, schema=CarCollection.schema_json()
        )
        if reflection_prompt:
            prompt += reflection_prompt
            
        output = await llm.acomplete(prompt)

        return ExtractionDone(output=str(output), passage=passage)

    @step()
    async def validate(
        self, ev: ExtractionDone
    ) -> StopEvent | ValidationErrorEvent:
        try:
            json.loads(ev.output)
        except Exception as e:
            print("Validation failed, retrying...")
            
            return ValidationErrorEvent(
                error=str(e), wrong_output=ev.output, passage=ev.passage
            )

        return StopEvent(result=ev.output)

In [39]:
w = ReflectionWorkflow(timeout=120, verbose=True)

# Run the workflow
ret = await w.run(
    passage="I own two cars: a Fiat Panda with 45Hp and a Honda Civic with 30Hp."
)

Running step extract
Step extract produced event ExtractionDone
Running step validate
Here is the JSON object based on the provided context information:

```json
{
  "$defs": {
    "Car": {
      "properties": {
        "brand": {"title": "Brand", "type": "string"},
        "model": {"title": "Model", "type": "string"},
        "power": {"title": "Power", "type": "integer"}
      },
      "required": ["brand", "model", "power"],
      "title": "Car",
      "type": "object"
    }
  },
  "properties": {
    "cars": {
      "items": {
        "$ref": "#/$defs/Car"
      },
      "title": "Cars",
      "type": "array"
    }
  },
  "required": ["cars"],
  "title": "CarCollection",
  "type": "object",
  "cars": [
    {
      "brand": "Fiat",
      "model": "Panda",
      "power": 45
    },
    {
      "brand": "Honda",
      "model": "Civic",
      "power": 30
    }
  ]
}
```

This JSON object follows the provided schema and includes the two cars mentioned in the context, with their respecti

In [29]:
import json

# Assuming `ret` is your raw JSON string from the workflow output
raw_json = ret

# Parse the JSON string into a Python dictionary
parsed_json = json.loads(raw_json)

# Define the filename where you want to save the JSON data
filename = 'output.json'

# Dump the formatted JSON into a file
with open(filename, 'w') as json_file:
    json.dump(parsed_json, json_file, indent=4)

print(f"JSON data has been written to {filename}")


JSON data has been written to output.json


# Workflow for a Function Calling Agent¶

In [2]:
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event


class InputEvent(Event):
    input: list[ChatMessage]


class ToolCallEvent(Event):
    tool_calls: list[ToolSelection]


class FunctionOutputEvent(Event):
    output: ToolOutput

In [18]:
from typing import Any, List

from llama_index.core.llms.function_calling import FunctionCallingLLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step


class FuncationCallingAgent(Workflow):
    def __init__(
        self,
        *args: Any,
        llm: FunctionCallingLLM | None = None,
        tools: List[BaseTool] | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self.tools = tools or []

        self.llm = llm or OpenAI()
        assert self.llm.metadata.is_function_calling_model

        self.memory = ChatMemoryBuffer.from_defaults(llm=llm)
        self.sources = []

    @step()
    async def prepare_chat_history(self, ev: StartEvent) -> InputEvent:
        # clear sources
        self.sources = []

        # get user input
        user_input = ev.input
        user_msg = ChatMessage(role="user", content=user_input)
        self.memory.put(user_msg)

        # get chat history
        chat_history = self.memory.get()
        return InputEvent(input=chat_history)

    @step()
    async def handle_llm_input(
        self, ev: InputEvent
    ) -> ToolCallEvent | StopEvent:
        chat_history = ev.input

        response = await self.llm.achat_with_tools(
            self.tools, chat_history=chat_history
        )
        self.memory.put(response.message)

        tool_calls = self.llm.get_tool_calls_from_response(
            response, error_on_no_tool_call=False
        )

        if not tool_calls:
            return StopEvent(
                result={"response": response, "sources": [*self.sources]}
            )
        else:
            return ToolCallEvent(tool_calls=tool_calls)

    @step()
    async def handle_tool_calls(self, ev: ToolCallEvent) -> InputEvent:
        tool_calls = ev.tool_calls
        tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}

        tool_msgs = []

        # call tools -- safely!
        for tool_call in tool_calls:
            tool = tools_by_name.get(tool_call.tool_name)
            additional_kwargs = {
                "tool_call_id": tool_call.tool_id,
                "name": tool.metadata.get_name(),
            }
            if not tool:
                tool_msgs.append(
                    ChatMessage(
                        role="tool",
                        content=f"Tool {tool_call.tool_name} does not exist",
                        additional_kwargs=additional_kwargs,
                    )
                )
                continue

            try:
                tool_output = tool(**tool_call.tool_kwargs)
                self.sources.append(tool_output)
                tool_msgs.append(
                    ChatMessage(
                        role="tool",
                        content=tool_output.content,
                        additional_kwargs=additional_kwargs,
                    )
                )
            except Exception as e:
                tool_msgs.append(
                    ChatMessage(
                        role="tool",
                        content=f"Encountered error in tool call: {e}",
                        additional_kwargs=additional_kwargs,
                    )
                )

        for msg in tool_msgs:
            self.memory.put(msg)

        chat_history = self.memory.get()
        return InputEvent(input=chat_history)

In [19]:
workflow = FuncationCallingAgent()

In [24]:
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI


def add(x: int, y: int) -> int:
    """Useful function to add two numbers."""
    return x + y


def multiply(x: int, y: int) -> int:
    """Useful function to multiply two numbers."""
    return x * y


tools = [
    FunctionTool.from_defaults(add),
    FunctionTool.from_defaults(multiply),
]

agent = FuncationCallingAgent(
    llm=OpenAI(model="gpt-4o"), tools=tools, timeout=120, verbose=True
)

ret = await agent.run(input="What is (4 x 5) + 10!")

Running step prepare_chat_history
Step prepare_chat_history produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event ToolCallEvent
Running step handle_tool_calls
SOURCE [ToolOutput(content='20', tool_name='multiply', raw_input={'args': (), 'kwargs': {'x': 4, 'y': 5}}, raw_output=20, is_error=False)]
Step handle_tool_calls produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event ToolCallEvent
Running step handle_tool_calls
SOURCE [ToolOutput(content='20', tool_name='multiply', raw_input={'args': (), 'kwargs': {'x': 4, 'y': 5}}, raw_output=20, is_error=False), ToolOutput(content='30', tool_name='add', raw_input={'args': (), 'kwargs': {'x': 20, 'y': 10}}, raw_output=30, is_error=False)]
Step handle_tool_calls produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event StopEvent


In [25]:
ret

{'response': ChatResponse(message=ChatMessage(role=<MessageRole.ASSISTANT: 'assistant'>, content='The result of \\((4 \\times 5) + 10\\) is \\(30\\).', additional_kwargs={}), raw=ChatCompletion(id='chatcmpl-9vsTzdexzzq2O0nFYeE1Qub87hYKu', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='The result of \\((4 \\times 5) + 10\\) is \\(30\\).', refusal=None, role='assistant', function_call=None, tool_calls=None))], created=1723581155, model='gpt-4o-2024-05-13', object='chat.completion', service_tier=None, system_fingerprint='fp_3aa7262c27', usage=CompletionUsage(completion_tokens=22, prompt_tokens=169, total_tokens=191)), delta=None, logprobs=None, additional_kwargs={}),
 'sources': [ToolOutput(content='20', tool_name='multiply', raw_input={'args': (), 'kwargs': {'x': 4, 'y': 5}}, raw_output=20, is_error=False),
  ToolOutput(content='30', tool_name='add', raw_input={'args': (), 'kwargs': {'x': 20, 'y': 10}}, raw_output=30, is_error=False)]