In [3]:
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import MessagesState, StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition

from dotenv import load_dotenv

load_dotenv()


def search_tool(messages: list):
    "Search for information about Ray Serve."
    return f"""
        Search result for {messages}: Ray Serve is a scalable and versatile library for serving machine learning models.
    """


class Agent:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-3.5-turbo-0125")
        self.llm_with_tools = self.llm.bind_tools([search_tool])
        self.sys_msg = SystemMessage(content="You are a helpful assistant.")
        self.graph = self.build_graph()
    
    def assistant(self, state: MessagesState):
        return {"messages": [self.llm_with_tools.invoke([self.sys_msg] + state["messages"])]}

    def build_graph(self):
        # Build graph
        builder = StateGraph(MessagesState)
        builder.add_node("assistant", self.assistant)
        builder.add_node("tools", ToolNode([search_tool]))
        builder.add_edge(START, "assistant")
        builder.add_conditional_edges(
            "assistant",
            # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
            # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
            tools_condition,
        )
        builder.add_edge("tools", "assistant")
        builder.add_edge("assistant", END)
        return builder.compile()
    
    def handle_query(self, input: str):
        messages = [HumanMessage(content=input)]
        messages = self.graph.invoke({"messages": messages})
        for m in messages['messages']:
            m.pretty_print()
        return messages['messages'][-1].content


In [None]:
Agent().handle_query("What is a cat?")

In [3]:
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import MessagesState, StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition

from dotenv import load_dotenv

load_dotenv()


def multiply(a: int, b: int) -> int:
    """Multiply a and b.

    Args:
        a: first int
        b: second int
    """
    return a * b

def add(a: int, b: int) -> int:
    """Adds a and b.

    Args:
        a: first int
        b: second int
    """
    return a + b

def divide(a: int, b: int) -> float:
    """Divide a and b.

    Args:
        a: first int
        b: second int
    """
    return a / b

tools = [add, multiply, divide]


In [31]:
from typing import Type

from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field


class CalculatorInput(BaseModel):
    a: int = Field(description="first number")
    b: int = Field(description="second number")
    operation: str = Field(description="add, multiply or divide operation to perform")


# Note: It's important that every field has type hints. BaseTool is a
# Pydantic class and not having type hints can lead to unexpected behavior.
class CustomCalculatorTool(BaseTool):
    name: str = "Calculator"
    description: str = "useful for when you need to answer questions about math"
    args_schema: Type[BaseModel] = CalculatorInput
    return_direct: bool = True

    def _run(
        self, a: int, b: int, operation: str) -> str:
        """Use the tool."""
        operation = operation.lower()
        if operation == "add":
            return a + b
        elif operation == "multiply":
            return a * b
        elif operation == "divide":
            return a / b
        else:
            raise ValueError("Invalid operation")

calculator = CustomCalculatorTool()
tools = [calculator]

In [32]:
class CalculatorAgent:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-3.5-turbo-0125")
        self.llm_with_tools = self.llm.bind_tools(tools)
        self.sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")
        self.graph = self.build_graph()
    
    def assistant(self, state: MessagesState):
        return {"messages": [self.llm_with_tools.invoke([self.sys_msg] + state["messages"])]}

    def build_graph(self):
        # Build graph
        builder = StateGraph(MessagesState)
        builder.add_node("assistant", self.assistant)
        builder.add_node("tools", ToolNode(tools))
        builder.add_edge(START, "assistant")
        builder.add_conditional_edges(
            "assistant",
            # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
            # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
            tools_condition,
        )
        builder.add_edge("tools", "assistant")
        return builder.compile()
    
    def handle_query(self, input: str):
        messages = [HumanMessage(content=input)]
        messages = self.graph.invoke({"messages": messages})
        for m in messages['messages']:
            m.pretty_print()
        return messages['messages'][-1].content


In [None]:
CalculatorAgent().handle_query("Add 3 and 4. Multiply the output by 2. Divide the output by 5")

In [None]:
class ToolDeployment:
    def __init__(self, tool_name):
        self.tool_name = tool_name

    def execute(self, input_data: str) -> str:
        # Simulate tool processing
        return f"Tool {self.tool_name} processed: {input_data}"

In [1]:
import os

import ray
from ray import serve
from ray.serve.handle import DeploymentHandle
from starlette.requests import Request

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage,SystemMessage
from langgraph.graph import MessagesState, StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition

ray.init()

2025-02-07 09:55:43,211	INFO worker.py:1654 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-02-07 09:55:43,282	INFO worker.py:1832 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.11.11
Ray version:,2.41.0
Dashboard:,http://127.0.0.1:8265


[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m 
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m Search for information about Ray Serve.
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m 
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m I'm unable to browse the internet. If you have any specific questions about Ray Serve or need information on how to use it, feel free to ask!


[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m INFO 2025-02-07 09:55:52,142 agent_CalculatorAgent rtl87u2f 25da8ca5-cc67-49a9-a423-527feb08a6cd -- POST /agent 200 1449.3ms
[36m(ServeReplica:calculator_tool:CustomCalculatorTool pid=2524)[0m INFO 2025-02-07 09:56:47,931 calculator_tool_CustomCalculatorTool yodoqt8c f550697a-7f5a-420e-9797-2e2e0da1e199 -- CALL /agent OK 14.5ms
[36m(ServeReplica:calculator_tool:CustomCalculatorTool pid=2524)[0m INFO 2025-02-07 09:56:47,958 calculator_tool_CustomCalculatorTool yodoqt8c f550697a-7f5a-420e-9797-2e2e0da1e199 -- CALL /agent OK 37.8ms
[36m(ServeReplica:calculator_tool:CustomCalculatorTool pid=2524)[0m INFO 2025-02-07 09:56:47,959 calculator_tool_CustomCalculatorTool yodoqt8c f550697a-7f5a-420e-9797-2e2e0da1e199 -- CALL /agent OK 37.6ms


[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m 
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m Add 3 and 4. Multiply the output by 2. Divide the output by 5
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m Tool Calls:
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m   calculator_tool (call_qwztuVd23FKoX29ef2TzMlVE)
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m  Call ID: call_qwztuVd23FKoX29ef2TzMlVE
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m   Args:
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m     a: 3
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m     b: 4
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m     operation: add
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m   calculator_tool (call_o8Iankm9aSPGgBjgTIW8dGCD)
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m  Call ID: call_o8Iankm9aSPGgBjgTIW8dGCD
[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m   Args:
[36m(ServeReplica:agent:Calcula

[36m(ServeReplica:agent:CalculatorAgent pid=2531)[0m INFO 2025-02-07 09:56:48,892 agent_CalculatorAgent rtl87u2f f550697a-7f5a-420e-9797-2e2e0da1e199 -- POST /agent 200 2249.0ms


In [2]:
from typing import Type

from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field


class CalculatorInput(BaseModel):
    a: int = Field(description="first number")
    b: int = Field(description="second number")
    operation: str = Field(description="add, multiply or divide operation to perform")


# Note: It's important that every field has type hints. BaseTool is a
# Pydantic class and not having type hints can lead to unexpected behavior.
@serve.deployment
class CustomCalculatorTool(BaseTool):
    name: str = "Calculator"
    description: str = "useful for when you need to answer questions about math"
    args_schema: Type[BaseModel] = CalculatorInput
    return_direct: bool = True

    def _run(
        self, a: int, b: int, operation: str) -> str:
        """Use the tool."""
        operation = operation.lower()
        if operation == "add":
            return a + b
        elif operation == "multiply":
            return a * b
        elif operation == "divide":
            return a / b
        else:
            raise ValueError("Invalid operation")


calculator_app = CustomCalculatorTool.bind(tool_name="CalculatorTool")
calculator_handle = serve.run(calculator_app, name="calculator_tool", route_prefix="/calculator_tool")

# calculator = CustomCalculatorTool()
# tools = [calculator]

INFO 2025-02-07 09:55:43,359 serve 2601 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.
INFO 2025-02-07 09:55:45,490 serve 2601 -- Application 'calculator_tool' is ready at http://0.0.0.0:8000/calculator_tool.
INFO 2025-02-07 09:55:45,491 serve 2601 -- Deployed app 'calculator_tool' successfully.


[36m(ServeController pid=2526)[0m INFO 2025-02-07 09:55:43,471 controller 2526 -- Deploying new version of Deployment(name='CustomCalculatorTool', app='calculator_tool') (initial target replicas: 1).
[36m(ServeController pid=2526)[0m INFO 2025-02-07 09:55:43,574 controller 2526 -- Adding 1 replica to Deployment(name='CustomCalculatorTool', app='calculator_tool').
[36m(ProxyActor pid=2529)[0m INFO 2025-02-07 09:55:43,475 proxy 127.0.0.1 -- Got updated endpoints: {Deployment(name='CustomCalculatorTool', app='calculator_tool'): EndpointInfo(route='/calculator_tool', app_is_cross_language=False)}.
[36m(ServeController pid=2526)[0m INFO 2025-02-07 09:55:47,450 controller 2526 -- Deploying new version of Deployment(name='CalculatorAgent', app='agent') (initial target replicas: 1).
[36m(ProxyActor pid=2529)[0m INFO 2025-02-07 09:55:47,453 proxy 127.0.0.1 -- Got updated endpoints: {Deployment(name='CustomCalculatorTool', app='calculator_tool'): EndpointInfo(route='/calculator_tool', 

In [3]:
@serve.deployment
class CalculatorAgent():
    def __init__(self, tool_handle):
        self.tool_handle = tool_handle
        self.tools = [self.calculator_tool]

        self.llm = ChatOpenAI(model="gpt-3.5-turbo-0125")
        self.llm_with_tools = self.llm.bind_tools(self.tools)

        self.sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")
        self.graph = self.build_graph()
    
    def calculator_tool(self, a: int, b: int, operation: str):
        """
        Perform a calculation using the specified operation.

        Args:
            a (int): The first operand.
            b (int): The second operand.
            operation (str): The operation to perform. Supported operations are 'add', 'subtract', 'multiply', and 'divide'.

        Returns:
            Future: A future object representing the result of the calculation.
        """
        return self.tool_handle._run.remote(a, b, operation)

    def assistant(self, state: MessagesState):
        return {"messages": [self.llm_with_tools.invoke([self.sys_msg] + state["messages"])]}

    def build_graph(self):
        # Build graph
        builder = StateGraph(MessagesState)
        builder.add_node("assistant", self.assistant)
        builder.add_node("tools", ToolNode(self.tools))
        builder.add_edge(START, "assistant")
        builder.add_conditional_edges(
            "assistant",
            # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
            # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
            tools_condition,
        )
        builder.add_edge("tools", "assistant")
        return builder.compile()
    
    def handle_query(self, input: str):
        messages = [HumanMessage(content=input)]
        messages = self.graph.invoke({"messages": messages})
        for m in messages['messages']:
            m.pretty_print()
        return messages['messages'][-1].content
    
    async def __call__(self, request: Request):
        data = await request.json()
        return self.handle_query(data['input_data'])


In [4]:
# Deploy the agent
agent_deployment = CalculatorAgent.bind(tool_handle=calculator_handle)
serve.run(agent_deployment, name="agent", route_prefix="/agent")

INFO 2025-02-07 09:55:47,356 serve 2601 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.
INFO 2025-02-07 09:55:50,516 serve 2601 -- Application 'agent' is ready at http://0.0.0.0:8000/agent.
INFO 2025-02-07 09:55:50,517 serve 2601 -- Deployed app 'agent' successfully.


DeploymentHandle(deployment='CalculatorAgent')

In [6]:
import requests

# Define the base URL for Ray Serve
base_url = "http://localhost:8000"

# Test the agent deployment
response = requests.post(
    f"{base_url}/agent",
    json={"input_data": "Add 3 and 4. Multiply the output by 2. Divide the output by 5"}
)
print("Agent Response:", response.text)

Agent Response: The result of adding 3 and 4 is 7.  
The result of multiplying 7 by 2 is 14.  
The result of dividing 14 by 5 is 2.8.


In [3]:
@serve.deployment
class ToolDeployment:
    def __init__(self, tool_name):
        self.tool_name = tool_name

    def execute(self, input_data: str) -> str:
        # Simulate tool processing
        return f"Tool {self.tool_name} processed: {input_data}"

In [None]:
tool_deployment = ToolDeployment.bind(tool_name="SearchTool")
tool_handle = serve.run(tool_deployment, name="search_tool", route_prefix="/search_tool")

In [None]:
# Define the agent deployment
@serve.deployment
class AgentDeployment:
    def __init__(self, tool_handle):
        
        self.tool_handle = tool_handle
        self.llm = ChatOpenAI(model="gpt-3.5-turbo-0125")
        self.llm_with_tools = self.llm.bind_tools([self.search_tool])

        
        self.graph = self.build_graph()

    def search_tool(self, messages: list):
        "Search for information about Ray Serve."
        return self.tool_handle.execute.remote(messages)
    
    def assistant(self, state: MessagesState):
        return {"messages": [self.llm_with_tools.invoke(state["messages"])]}
    
    def build_graph(self):
        # Build graph
        builder = StateGraph(MessagesState)
        builder.add_node("assistant", self.assistant)
        builder.add_node("tools", ToolNode([self.search_tool]))
        builder.add_edge(START, "assistant")
        builder.add_conditional_edges(
            "assistant",
            # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
            # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
            tools_condition,
        )
        builder.add_edge("tools", END)
        return builder.compile()
    
    
    async def __call__(self, http_request: Request) -> str:
        input: dict = await http_request.json()
        messages = [HumanMessage(content=input.get('input_data', ''))]
        messages = self.graph.invoke({"messages": messages})
        for m in messages['messages']:
            m.pretty_print()
        return f"Agent processed: {messages['messages']}"
        

In [None]:
# Deploy the agent
agent_deployment = AgentDeployment.bind(tool_handle=tool_deployment)
serve.run(agent_deployment, name="agent", route_prefix="/agent")

In [None]:
import requests

# Define the base URL for Ray Serve
base_url = "http://localhost:8000"

# Test the agent deployment
response = requests.post(
    f"{base_url}/agent",
    json={"input_data": "Add 3 and 4. Multiply the output by 2. Divide the output by 5"}
)
print("Agent Response:", response.text)