### Imports

In [136]:
from llama_index.core.llms import ChatMessage, ChatResponse
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event

from typing import Any, List, Optional

from llama_index.core.llms.function_calling import FunctionCallingLLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step

from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI

### Setup

In [137]:
class InputEvent(Event):
    user_input: list[ChatMessage  | ChatResponse] 
    
class ToolCallEvent(Event):
    user_input: list[ChatMessage | ChatResponse] 
    tool_calls: list[ToolSelection]

class FunctionOutputEvent(Event):
    output: ToolOutput

In [138]:
class FuncationCallingAgent(Workflow):
    def __init__(
        self,
        *args: Any,
        llm: FunctionCallingLLM | None = None,
        tools: List[BaseTool] | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self.tools = tools or []

        self.llm = llm or OpenAI()
        assert self.llm.metadata.is_function_calling_model

    @step
    async def prepare_chat_history(self, ev: StartEvent) -> InputEvent:
        query = ev.get("query")
        if not query:
            return None
        user_msg = ChatMessage(role="user", content=query)
        return InputEvent(user_input=[user_msg])

    @step
    async def handle_llm_input(
        self, ev: InputEvent
    ) -> ToolCallEvent | StopEvent:
        user_input= ev.user_input

        response = await self.llm.achat_with_tools(
            self.tools, chat_history=user_input
        )

        tool_calls = self.llm.get_tool_calls_from_response(
            response, error_on_no_tool_call=False
        )

        if not tool_calls:
            return StopEvent(result=response)
        else:
            return ToolCallEvent(user_input=user_input, tool_calls=tool_calls)

    @step
    async def handle_tool_calls(self, ev: ToolCallEvent) -> InputEvent:
        user_input = ev.user_input
        tool_calls = ev.tool_calls
        tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}

        tool_msgs = []

        # call tools -- safely!
        for tool_call in tool_calls:
            tool = tools_by_name.get(tool_call.tool_name)
            additional_kwargs = {
                "tool_call_id": tool_call.tool_id,
                "name": tool.metadata.get_name(),
            }
            if not tool:
                tool_msgs.append(
                    ChatMessage(
                        role="tool",
                        content=f"Tool {tool_call.tool_name} does not exist",
                        additional_kwargs=additional_kwargs,
                    )
                )
                continue

            try:
                tool_output = tool(**tool_call.tool_kwargs)
                tool_msgs.append(
                    ChatMessage(
                        role="tool",
                        content=tool_output.content,
                        additional_kwargs=additional_kwargs,
                    )
                )
            except Exception as e:
                tool_msgs.append(
                    ChatMessage(
                        role="tool",
                        content=f"Encountered error in tool call: {e}",
                        additional_kwargs=additional_kwargs,
                    )
                )
        tool_msgs_str = " ".join([msg.content for msg in tool_msgs])
        return StopEvent(result=tool_msgs_str)

### Making Tools

In [None]:
def find_medisave_bonus(
    birth_year: int,
    num_property_own: Optional[int] = None,
    av_of_residence: Optional[int] = None,
) -> str:
    """Function to determine the medisave bonus for a person"""
    if birth_year <=1959:
        return "You will get $750 worth of MediSave Bonus under the Majulah Package."
    elif birth_year <= 1973:
        if num_property_own is not None and num_property_own > 1:
            return "You will get $750 worth of MediSave Bonus under the Majulah Package."
        elif av_of_residence is not None and av_of_residence > 25000:
            return "You will get $750 worth of MediSave Bonus under the Majulah Package."
        elif num_property_own is not None and num_property_own <= 1 and av_of_residence is not None and av_of_residence <= 25000:
            return "You will get $1,500 worth of MediSave Bonus under the Majulah Package."
        else:
            return f"For your birth year of {birth_year}, you will get between $750 to $1,500 worth of MediSave Bonus under the Majulah Package. For a more exact value, please provide the number of properties you own and the annual value of your residence."

    elif birth_year <= 1983:
        if num_property_own is not None and num_property_own > 1:
            return "You will get $200 worth of one-time MediSave Bonus."
        elif av_of_residence is not None and av_of_residence > 25000:
            return "You will get $200 worth of one-time MediSave Bonus."
        elif num_property_own is not None and num_property_own <= 1 and av_of_residence is not None and av_of_residence <= 25000:
            return "You will get $300 worth of one-time MediSave Bonus."
        else:
            return f"For your birth year of {birth_year}, you will get between $200 to $300 worth of one-time MediSave Bonus. For a more exact value, please provide the number of properties you own and the annual value of your residence."

    elif birth_year <= 2003:
        if num_property_own is not None and num_property_own > 1:
            return "You will get $100 worth of one-time MediSave Bonus."
        elif av_of_residence is not None and av_of_residence > 25000:
            return "You will get $100 worth of one-time MediSave Bonus."
        elif num_property_own is not None and num_property_own <= 1 and av_of_residence is not None and av_of_residence <= 25000:
            return "You will get $200 worth of one-time MediSave Bonus."
        else:
            return f"For your birth year of {birth_year}, you will get between $100 to $200 worth of one-time MediSave Bonus. For a more exact value, please provide the number of properties you own and the annual value of your residence."
    else:
        return "You are not eligible for any MediSave Bonus."

tools = [FunctionTool.from_defaults(find_medisave_bonus)]

agent = FuncationCallingAgent(
    llm=OpenAI(model="gpt-4o-mini", temperature=0), tools=tools, timeout=120, verbose=True
)

ret = await agent.run(query="how much medisave bonus can I get? I am born in 2010 and own 1 property worth 25001 in annual value.")
print(ret)

Running step prepare_chat_history
Step prepare_chat_history produced event InputEvent
Running step handle_llm_input
Step handle_llm_input produced event ToolCallEvent
Running step handle_tool_calls
Step handle_tool_calls produced event StopEvent
You are not eligible for any MediSave Bonus.
