From 87ed0706f66396487f9355555cf82a051b7698bc Mon Sep 17 00:00:00 2001 From: yasinfakhar Date: Wed, 9 Apr 2025 09:51:21 +0330 Subject: [PATCH 1/3] feat: Add tool selector and refactor agent service for dynamic tool selection --- src/agent/routers/agent_router.py | 6 +- src/agent/services/agent_service.py | 90 ++++++++-- src/agentflow/tool_hub/hub.py | 53 ++++++ src/agentflow/utils/enum.py | 3 + src/agentflow/utils/shared_tools.py | 2 + src/agentflow/utils/tool_selector.py | 162 ++++++++++++++++++ .../services/conversation_service.py | 13 +- 7 files changed, 306 insertions(+), 23 deletions(-) create mode 100644 src/agentflow/tool_hub/hub.py create mode 100644 src/agentflow/utils/tool_selector.py diff --git a/src/agent/routers/agent_router.py b/src/agent/routers/agent_router.py index 956ccab..18f77c4 100644 --- a/src/agent/routers/agent_router.py +++ b/src/agent/routers/agent_router.py @@ -1,6 +1,6 @@ from fastapi import APIRouter -from src.agent.services.agent_service import RouterAgentService +from src.agent.services.agent_service import AgentService from src.db.sql_alchemy import Database router = APIRouter(prefix="/agent") @@ -15,5 +15,5 @@ def get_db(): db.close() -def get_agent_service() -> RouterAgentService: - return RouterAgentService() +def get_agent_service() -> AgentService: + return AgentService() diff --git a/src/agent/services/agent_service.py b/src/agent/services/agent_service.py index 2e67d54..1e77a9b 100644 --- a/src/agent/services/agent_service.py +++ b/src/agent/services/agent_service.py @@ -1,7 +1,8 @@ import json import asyncio -from typing import Dict, Any, Optional, AsyncGenerator +from typing import Dict, Any, AsyncGenerator +from datetime import datetime from langchain.agents import AgentExecutor from langchain.callbacks.base import BaseCallbackHandler from langchain_core.runnables.history import RunnableWithMessageHistory @@ -16,6 +17,7 @@ class StreamingCallbackHandler(BaseCallbackHandler): A callback handler that collects tokens and intermediate events in an asyncio queue. Uses a newline-delimited JSON (NDJSON) protocol for reliable streaming. Each event is a complete JSON object with a newline terminator. + Captures detailed information about tool execution including inputs and outputs. """ def __init__(self): @@ -42,31 +44,79 @@ def on_agent_action(self, action, **kwargs) -> None: action: The action being performed by the agent. **kwargs: Additional keyword arguments. """ + # Extract more detailed information about the tool being called + tool_name = getattr(action, "tool", None) + tool_input = getattr(action, "tool_input", {}) + + # Create a more detailed event for tool start event = { "type": "tool_start", - "tool": getattr(action, "tool", None), - "tool_input": getattr(action, "tool_input", {}) + "tool": tool_name, + "tool_input": tool_input, + "timestamp": str(datetime.now()) + } + # Use NDJSON format + self.queue.put_nowait(json.dumps(event) + "\n") + + def on_tool_end(self, output, **kwargs) -> None: + """ + Handle tool completion events. + + Args: + output: The output produced by the tool. + **kwargs: Additional keyword arguments. + """ + # Extract information about the completed tool + observation = kwargs.get("observation", output) + tool_name = kwargs.get("name", None) + + # Create a detailed event for tool completion + event = { + "type": "tool_end", + "tool": tool_name, + "output": observation, + "timestamp": str(datetime.now()) + } + # Use NDJSON format + self.queue.put_nowait(json.dumps(event) + "\n") + + def on_tool_error(self, error, **kwargs) -> None: + """ + Handle tool error events. + + Args: + error: The error that occurred during tool execution. + **kwargs: Additional keyword arguments. + """ + # Extract information about the tool that caused the error + tool_name = kwargs.get("name", None) + + # Create a detailed event for tool error + event = { + "type": "tool_error", + "tool": tool_name, + "error": str(error), + "timestamp": str(datetime.now()) } # Use NDJSON format self.queue.put_nowait(json.dumps(event) + "\n") -class RouterAgentService: +class AgentService: """ - RouterAgentService is responsible for routing the input message to the appropriate agent - and returning the response. + AgentService is responsible for doing the job """ - def __init__(self, sub_agents, memory=None, streaming: bool = True): + def __init__(self, tools, memory=None, streaming: bool = True): """ - Initialize the RouterAgentService. + Initialize the AgentService. Args: - sub_agents: The sub-agents to use for routing. + tools: The tools to use for agent. memory: The memory to use for storing conversation history. streaming (bool): Whether to enable streaming responses. """ - self.sub_agents = sub_agents + self.tools = tools self.memory = memory self.streaming = streaming self.streaming_handler = None @@ -88,22 +138,32 @@ def __init__(self, sub_agents, memory=None, streaming: bool = True): stream=streaming, callbacks=[self.streaming_handler] if self.streaming else None) - self.prompt = init_prompt(self.llm, AgentType.ROUTER_AGENT) + self.prompt = init_prompt(self.llm, AgentType.PATTERN_CORE_AGENT) - self.agent = init_agent(self.llm, self.sub_agents, self.prompt) + self.agent = init_agent(self.llm, self.tools, self.prompt) if streaming: + # Wrap each tool with the callback handler to ensure tool events are captured + wrapped_tools = [] + for tool in self.tools: + # Create a copy of the tool with callbacks attached + tool_with_callbacks = tool.copy() + tool_with_callbacks.callbacks = [self.streaming_handler] + wrapped_tools.append(tool_with_callbacks) + + # Make sure the streaming handler is registered for all events, including tool completion self.agent_executor = AgentExecutor( agent=self.agent, - tools=self.sub_agents, + tools=wrapped_tools, # Use the wrapped tools with callbacks return_intermediate_steps=True, verbose=True, - callbacks=[self.streaming_handler] + callbacks=[self.streaming_handler], + handle_tool_error=True # Ensure tool errors are also captured ) else: self.agent_executor = AgentExecutor( agent=self.agent, - tools=self.sub_agents, + tools=self.tools, return_intermediate_steps=True, verbose=True ) diff --git a/src/agentflow/tool_hub/hub.py b/src/agentflow/tool_hub/hub.py new file mode 100644 index 0000000..6be92f7 --- /dev/null +++ b/src/agentflow/tool_hub/hub.py @@ -0,0 +1,53 @@ +from typing import List, Any + +from src.agentflow.utils.tools_index import get_all_tools +from src.agentflow.utils.tool_selector import ToolSelector + + +class ToolRegistery: + """ + A hub for managing and retrieving tools from various providers. + Provides functionality to get all tools or select appropriate tools for a query. + """ + + providers = [ + "ether_scan", + "moralis", + ] + + _tool_selector = None + + @classmethod + def _get_tool_selector(cls): + """Get or initialize the tool selector.""" + if cls._tool_selector is None: + cls._tool_selector = ToolSelector() + return cls._tool_selector + + @classmethod + def get_tools(cls): + """Get all available tools from all providers.""" + tools = [] + for provider in cls.providers: + tools.extend(get_all_tools(f"{provider}_tools")) + return tools + + @classmethod + def select_tools_for_query(cls, query: str) -> List[Any]: + """ + Select appropriate tools for a given user query. + + Args: + query (str): The user's query + + Returns: + List[Any]: List of selected tool function references + """ + # Get all available tools + all_tools = cls.get_tools() + + # Use the tool selector to select appropriate tools + selector = cls._get_tool_selector() + selected_tools = selector.select_tools(query, all_tools) + + return selected_tools diff --git a/src/agentflow/utils/enum.py b/src/agentflow/utils/enum.py index 4362fea..f3322f6 100644 --- a/src/agentflow/utils/enum.py +++ b/src/agentflow/utils/enum.py @@ -12,6 +12,7 @@ class AgentType(Enum): ROUTER_AGENT = "router_agent" BLOCKCHAIN_AGENT = "blockchain_agent" REACT_AGENT = "react_agent" + PATTERN_CORE_AGENT = "pattern_core_agent" class Prompt(): @@ -21,7 +22,9 @@ class Prompt(): BLOCKCHAIN_AGENT: Prompt for the blockchain agent. ROUTER_AGENT: Prompt for the router agent. REACT_AGENT: Prompt for the react agent. + PATTERN_CORE_AGENT: Prompt for the pattern core agent. """ BLOCKCHAIN_AGENT = hub.pull("pattern-agent/eth-agent") ROUTER_AGENT = hub.pull("pattern-agent/pattern-agent") REACT_AGENT = hub.pull("hwchase17/react") + PATTERN_CORE_AGENT = hub.pull("pattern-agent/pattern-core-agent") diff --git a/src/agentflow/utils/shared_tools.py b/src/agentflow/utils/shared_tools.py index 149a744..b07db8e 100644 --- a/src/agentflow/utils/shared_tools.py +++ b/src/agentflow/utils/shared_tools.py @@ -295,5 +295,7 @@ def init_prompt(llm: Any, agent_type: AgentType): else: if agent_type == AgentType.ROUTER_AGENT: return Prompt.ROUTER_AGENT + elif agent_type == AgentType.PATTERN_CORE_AGENT: + return Prompt.PATTERN_CORE_AGENT else: return Prompt.BLOCKCHAIN_AGENT diff --git a/src/agentflow/utils/tool_selector.py b/src/agentflow/utils/tool_selector.py new file mode 100644 index 0000000..2ba699f --- /dev/null +++ b/src/agentflow/utils/tool_selector.py @@ -0,0 +1,162 @@ +# Standard library imports first +import json +from typing import List, Dict, Any, Optional, Callable + +# Project imports +from src.util.configuration import Config +from src.agentflow.utils.shared_tools import init_llm + + +class ToolSelector: + """ + A module that uses an LLM to select appropriate tools based on a user query. + It analyzes the query and tool descriptions to determine which tools are most relevant. + """ + + def __init__(self): + """Initialize the ToolSelector with an LLM.""" + config = Config.get_config() + self.llm = init_llm( + service=config["llm"]["provider"], + model_name=config["llm"]["model"], + api_key=config["llm"]["api_key"], + stream=False, + callbacks=None + ) + + def get_tool_descriptions(self, tools: List[Any]) -> List[Dict[str, str]]: + """ + Extract name and description from each tool. + + Args: + tools (List[Any]): List of tool function references + + Returns: + List[Dict[str, str]]: List of dictionaries with tool name and description + """ + tool_descriptions = [] + + for tool in tools: + # Extract name and docstring + name = tool.name + description = tool.description or "No description available" + + tool_descriptions.append({ + "name": name, + "description": description + }) + + return tool_descriptions + + def select_tools(self, query: str, available_tools: List[Any]) -> List[Any]: + """ + Select appropriate tools for a given user query. + + Args: + query (str): The user's query + available_tools (List[Any]): List of all available tool function references + + Returns: + List[Any]: List of selected tool function references + """ + # Get tool descriptions + tool_descriptions = self.get_tool_descriptions(available_tools) + + # Create a prompt for the LLM + prompt = self._create_tool_selection_prompt(query, tool_descriptions) + + # Get LLM response + messages = [ + ("system", "You are a tool selection assistant. Your job is to analyze a user query and select the most appropriate tools to answer it."), + ("human", prompt) + ] + + response = self.llm.invoke(messages) + + # Parse the response to get selected tool names + try: + selected_tool_names = self._parse_tool_selection_response( + response.content) + + # Filter the original tools list to return only selected tools + selected_tools = [ + tool for tool in available_tools + if tool.name in selected_tool_names + ] + + return selected_tools + except Exception as e: + print(f"Error parsing tool selection response: {e}") + # Return all tools as fallback if parsing fails + return available_tools + + def _create_tool_selection_prompt(self, query: str, tool_descriptions: List[Dict[str, str]]) -> str: + """ + Create a prompt for the LLM to select appropriate tools. + + Args: + query (str): The user's query + tool_descriptions (List[Dict[str, str]]): List of tool descriptions + + Returns: + str: The prompt for the LLM + """ + tools_json = json.dumps(tool_descriptions, indent=2) + + prompt = f""" +User Query: {query} + +Available Tools: +{tools_json} + +Instructions: +- Analyze the user query carefully +- Review each available tool and its description +- Select the tools that are directly relevant to answering the query +- If you are not sure if a tool is needed or not select it +- Return your answer as a JSON array of tool names, like this: ["tool_name1", "tool_name2"] +- If no tools are relevant, return an empty array: [] +- Do not include any explanation or additional text, just the JSON array +- Break down the user task into smaller subtasks if needed + +Selected Tools: +""" + return prompt + + def _parse_tool_selection_response(self, response: str) -> List[str]: + """ + Parse the LLM response to extract selected tool names. + + Args: + response (str): The LLM's response + + Returns: + List[str]: List of selected tool names + """ + # Clean up the response to extract just the JSON part + response = response.strip() + + # Handle potential formatting issues + if not response.startswith('['): + # Try to find the JSON array in the response + start_idx = response.find('[') + end_idx = response.rfind(']') + + if start_idx != -1 and end_idx != -1: + response = response[start_idx:end_idx+1] + else: + # If no JSON array is found, return an empty list + return [] + + try: + # Parse the JSON array + selected_tools = json.loads(response) + + # Ensure it's a list of strings + if isinstance(selected_tools, list) and all(isinstance(item, str) for item in selected_tools): + return selected_tools + else: + return [] + except json.JSONDecodeError: + # If parsing fails, return an empty list + return [] diff --git a/src/conversation/services/conversation_service.py b/src/conversation/services/conversation_service.py index 265cb16..fb8f3cb 100644 --- a/src/conversation/services/conversation_service.py +++ b/src/conversation/services/conversation_service.py @@ -12,11 +12,13 @@ from src.user.services.user_service import UserService from src.agent.services.memory_service import MemoryService from src.project.services.project_service import ProjectService -from src.agent.services.agent_service import RouterAgentService +from src.agent.services.agent_service import AgentService from src.project.repositories.project_repository import ProjectRepository from src.query_usage.services.query_usage_service import QueryUsageService from src.conversation.repositories.conversation_repository import ConversationRepository +from src.agentflow.tool_hub.hub import ToolRegistery + class ConversationService: """ @@ -205,14 +207,15 @@ async def send_message( if conversation.project_id != project_id: raise NotFoundError("Project not found or is not owned by user") - config = Config.get_config() + # Select appropriate tools for the user's message using our new tool selector + selected_tools = ToolRegistery.select_tools_for_query(message) - sub_agents = AgentHub().get_agents(config["agents"]) + print(str([selected_tool.name for selected_tool in selected_tools])) memory = self.memory_service.get_memory(conversation_id) - agent = RouterAgentService( - sub_agents=sub_agents, memory=memory, streaming=stream) + agent = AgentService( + tools=selected_tools, memory=memory, streaming=stream) if stream: try: From 85bad0f9da16fa15fdb7d29754861f1cd7bf8b95 Mon Sep 17 00:00:00 2001 From: yasinfakhar Date: Thu, 10 Apr 2025 13:49:40 +0330 Subject: [PATCH 2/3] fix: Refactor tools directory structure and consolidate chain scan providers --- .env.sample | 24 +- .github/workflows/dev.yml | 14 +- src/agent/services/agent_service.py | 72 +++--- src/agentflow/agents/ether_scan_agent.py | 72 ------ src/agentflow/agents/goldrush_agent.py | 64 ----- src/agentflow/agents/hub.py | 35 --- src/agentflow/agents/moralis_agent.py | 65 ----- ...ther_scan_tools.py => chain_scan_tools.py} | 232 +++++++++++++++--- src/agentflow/providers/moralis_tools.py | 6 +- src/agentflow/{tool_hub => tool}/hub.py | 6 +- .../{utils => tool}/tool_selector.py | 32 +-- src/agentflow/{utils => tool}/tools_index.py | 0 .../services/conversation_service.py | 50 +++- 13 files changed, 316 insertions(+), 356 deletions(-) delete mode 100644 src/agentflow/agents/ether_scan_agent.py delete mode 100644 src/agentflow/agents/goldrush_agent.py delete mode 100644 src/agentflow/agents/hub.py delete mode 100644 src/agentflow/agents/moralis_agent.py rename src/agentflow/providers/{ether_scan_tools.py => chain_scan_tools.py} (67%) rename src/agentflow/{tool_hub => tool}/hub.py (90%) rename src/agentflow/{utils => tool}/tool_selector.py (76%) rename src/agentflow/{utils => tool}/tools_index.py (100%) diff --git a/.env.sample b/.env.sample index 80a63ef..a738680 100644 --- a/.env.sample +++ b/.env.sample @@ -43,22 +43,18 @@ LLM_MODEL=gpt-4o-mini LLM_API_KEY= LLM_HOST= LLM_MODELS_PATH= - +LANGSMITH_API_KEY= #------------------------ # SERVICES #------------------------ +ETH_RPC=https://eth-pokt.nodies.app ETHER_SCAN_API_KEY= -GOLDRUSH_API_KEY= -MORALIS_API_KEY= -EXA_API_KEY= -PERPLEXITY_API_KEY= -TAVILY_API_KEY= - -ETH_RPC= - - -#------------------------ -# Agents -#------------------------ -AGENTS=ETHER_SCAN,GOLDRUSH,MORALIS \ No newline at end of file +ARBITRUM_ONE_RPC=https://endpoints.omniatech.io/v1/arbitrum/one/public +ARBI_SCAN_API_KEY= +BASE_RPC=https://base.llamarpc.com +BASE_SCAN_API_KEY= +POLYGON_RPC=https://polygon-pokt.nodies.app +POLYGON_SCAN_API_KEY= +FANTOM_RPC=https://1rpc.io/ftm +FTM_SCAN_API_KEY= \ No newline at end of file diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml index ef3c7b7..f006162 100644 --- a/.github/workflows/dev.yml +++ b/.github/workflows/dev.yml @@ -73,7 +73,6 @@ jobs: -e MORALIS_API_KEY=${{ secrets.MORALIS_API_KEY }} \ -e ETHER_SCAN_API_KEY=${{ secrets.ETHER_SCAN_API_KEY }} \ -e GOLDRUSH_API_KEY=${{ secrets.GOLDRUSH_API_KEY }} \ - -e ETH_RPC=${{ secrets.ETH_RPC }} \ -e LLM_PROVIDER=${{ secrets.LLM_PROVIDER }} \ -e LLM_MODEL=${{ secrets.LLM_MODEL }} \ -e LLM_API_KEY=${{ secrets.LLM_API_KEY }} \ @@ -83,6 +82,19 @@ jobs: -e LOG_FILE_PATH=${{ secrets.LOG_FILE_PATH }} \ -e LOG_BACKUP_COUNT=${{ secrets.LOG_BACKUP_COUNT }} \ -e LOG_MAX_BYTES=${{ secrets.LOG_MAX_BYTES }} \ + + ### services + -e ETH_RPC=${{ secrets.ETH_RPC }} \ + -e ETHER_SCAN_API_KEY=${{ secrets.ETHER_SCAN_API_KEY }} \ + -e ARBITRUM_ONE_RPC=${{ secrets.ARBITRUM_ONE_RPC }} \ + -e ARBI_SCAN_API_KEY=${{ secrets.ARBI_SCAN_API_KEY }} \ + -e BASE_RPC=${{ secrets.BASE_RPC }} \ + -e BASE_SCAN_API_KEY=${{ secrets.BASE_SCAN_API_KEY }} \ + -e POLYGON_RPC=${{ secrets.POLYGON_RPC }} \ + -e POLYGON_SCAN_API_KEY=${{ secrets.POLYGON_SCAN_API_KEY }} \ + -e FANTOM_RPC=${{ secrets.FANTOM_RPC }} \ + -e FTM_SCAN_API_KEY=${{ secrets.FTM_SCAN_API_KEY }} \ + patterntechnology/pattern-core-api:latest # Wait a few seconds to give the container time to start diff --git a/src/agent/services/agent_service.py b/src/agent/services/agent_service.py index 1e77a9b..8d18370 100644 --- a/src/agent/services/agent_service.py +++ b/src/agent/services/agent_service.py @@ -44,15 +44,39 @@ def on_agent_action(self, action, **kwargs) -> None: action: The action being performed by the agent. **kwargs: Additional keyword arguments. """ - # Extract more detailed information about the tool being called - tool_name = getattr(action, "tool", None) - tool_input = getattr(action, "tool_input", {}) + event = { + "type": "agent_start", + "timestamp": str(datetime.now()) + } + self.queue.put_nowait(json.dumps(event) + "\n") + + def on_agent_finish(self, action, **kwargs) -> None: + """ + Handle agent finish events. - # Create a more detailed event for tool start + Args: + action: The action being performed by the agent. + **kwargs: Additional keyword arguments. + """ + event = { + "type": "agent_finish", + "timestamp": str(datetime.now()) + } + self.queue.put_nowait(json.dumps(event) + "\n") + + def on_tool_start(self, serialized, input_str, **kwargs) -> None: + """ + Handle tool start events. + + Args: + serialized: The serialized input to the tool. + input_str: The string representation of the input. + **kwargs: Additional keyword arguments. + """ event = { "type": "tool_start", - "tool": tool_name, - "tool_input": tool_input, + "tool_name": serialized["name"], + "params": input_str, "timestamp": str(datetime.now()) } # Use NDJSON format @@ -73,7 +97,7 @@ def on_tool_end(self, output, **kwargs) -> None: # Create a detailed event for tool completion event = { "type": "tool_end", - "tool": tool_name, + "tool_name": tool_name, "output": observation, "timestamp": str(datetime.now()) } @@ -94,7 +118,7 @@ def on_tool_error(self, error, **kwargs) -> None: # Create a detailed event for tool error event = { "type": "tool_error", - "tool": tool_name, + "tool_name": tool_name, "error": str(error), "timestamp": str(datetime.now()) } @@ -323,38 +347,6 @@ async def stream(self, message: str) -> AsyncGenerator[str, None]: } yield json.dumps(error_event) + "\n" - # Send a completion event to signal the end of streaming - try: - completion_event = { - "type": "completion", - "data": "Stream completed" - } - yield json.dumps(completion_event) + "\n" - - # Wait for the task to complete and get the result - await task - except asyncio.CancelledError: - # Handle task cancellation gracefully - error_event = { - "type": "info", - "data": "Task was cancelled" - } - yield json.dumps(error_event) + "\n" - except ConnectionError as e: - # Handle connection errors specifically - error_event = { - "type": "error", - "data": f"Connection error: {str(e)}" - } - yield json.dumps(error_event) + "\n" - except Exception as e: - # Handle any errors during task execution - error_event = { - "type": "error", - "data": f"Task execution error: {str(e)}" - } - yield json.dumps(error_event) + "\n" - def ask(self, message: str) -> Dict[str, Any]: """ Sends a message to the agent and returns the response. diff --git a/src/agentflow/agents/ether_scan_agent.py b/src/agentflow/agents/ether_scan_agent.py deleted file mode 100644 index c7d56cb..0000000 --- a/src/agentflow/agents/ether_scan_agent.py +++ /dev/null @@ -1,72 +0,0 @@ -import os - -from langchain.tools import tool -from langchain.agents import AgentExecutor -from langchain_community.callbacks.manager import get_openai_callback - -from src.util.configuration import Config -from src.agentflow.utils.enum import AgentType -from src.agentflow.utils.tools_index import get_all_tools -from src.agentflow.utils.shared_tools import handle_exceptions -from src.agentflow.utils.shared_tools import init_llm, init_agent, init_prompt - - -@tool -@handle_exceptions -def etherscan_agent(query: str): - """ - An agent for handling Ethereum blockchain-related queries and tasks. - This agent can perform the following tasks: - - - Get the current Unix timestamp - - Convert a natural language date string into a Unix timestamp - - Retrieve the source code of a smart contract - - Retrieve the ABI of a smart contract - - Retrieve the ABI of a specific event from a smart contract - - Fetch events for a given smart contract event within a block range - - Retrieve the latest Ethereum block number and hash - - Convert a Unix timestamp to the nearest Ethereum block number - - Decode the input data of an Ethereum transaction - - Args: - query (str): query about Ethereum blockchain tasks. - - Returns: - str: Response containing the requested Ethereum blockchain information - """ - config = Config.get_config() - - llm = init_llm(service=config["llm"]["provider"], - model_name=config["llm"]["model"], - api_key=config["llm"]["api_key"], - stream=False) - - tools = get_all_tools(tools_path="ether_scan_tools") - - prompt = init_prompt(llm, AgentType.BLOCKCHAIN_AGENT) - - agent = init_agent(llm, tools, prompt) - - agent_executor = AgentExecutor( - agent=agent, - tools=tools, - return_intermediate_steps=True, - verbose=True, - stream_runnable=False) - - response = agent_executor.invoke({"input": query}) - - try: - agent_steps = [] - for step in response["intermediate_steps"]: - agent_steps.append({ - "function_name": step[0].tool, - "function_args": step[0].tool_input, - "function_output": step[-1] - }) - if agent_steps: - return {"agent_steps": agent_steps} - else: - return {"agent_answer": response["output"]} - except: - return "no tools called inside agent" diff --git a/src/agentflow/agents/goldrush_agent.py b/src/agentflow/agents/goldrush_agent.py deleted file mode 100644 index f658a06..0000000 --- a/src/agentflow/agents/goldrush_agent.py +++ /dev/null @@ -1,64 +0,0 @@ -from langchain.tools import tool -from langchain.agents import AgentExecutor - -from src.util.configuration import Config -from src.agentflow.utils.enum import AgentType -from src.agentflow.utils.tools_index import get_all_tools -from src.agentflow.utils.shared_tools import handle_exceptions -from src.agentflow.utils.shared_tools import init_llm, init_agent, init_prompt - -@tool -@handle_exceptions -def goldrush_agent(query: str): - """ - An agent for handling Ethereum blockchain-related queries and tasks. - This agent can perform the following tasks: - - - Get activity across all chains for address - - fetch the native, fungible (ERC20), and non-fungible (ERC721 & ERC1155) tokens held by an address - - Fetch transactions for a given wallet address (paginated) - - Fetch a summary of transactions (earliest and latest) for a given wallet address. - - Fetch a single transaction including its decoded event logs - - Fetch list of approvals across all token contracts categorized by spenders for a wallet’s assets - - Args: - query (str): query about Ethereum blockchain tasks. - - Returns: - str: Response containing the requested Ethereum blockchain information - """ - config = Config.get_config() - - llm = init_llm(service=config["llm"]["provider"], - model_name=config["llm"]["model"], - api_key=config["llm"]["api_key"], - stream=False) - - tools = get_all_tools(tools_path="goldrush_tools") - - prompt = init_prompt(llm, AgentType.BLOCKCHAIN_AGENT) - - agent = init_agent(llm, tools, prompt) - - agent_executor = AgentExecutor( - agent=agent, - tools=tools, - return_intermediate_steps=True, - verbose=True) - - response = agent_executor.invoke({"input": query}) - - try: - agent_steps = [] - for step in response["intermediate_steps"]: - agent_steps.append({ - "function_name": step[0].tool, - "function_args": step[0].tool_input, - "function_output": step[-1] - }) - if agent_steps: - return {"agent_steps": agent_steps} - else: - return {"agent_answer": response["output"]} - except: - return "no tools called inside agent" diff --git a/src/agentflow/agents/hub.py b/src/agentflow/agents/hub.py deleted file mode 100644 index 73ffb9c..0000000 --- a/src/agentflow/agents/hub.py +++ /dev/null @@ -1,35 +0,0 @@ -from src.agentflow.agents import ether_scan_agent, goldrush_agent, moralis_agent -from typing import Any, List - - -class AgentHub: - """ - A hub for managing and retrieving different agents. - - Attributes: - agents (dict): A dictionary mapping agent names to their respective agent instances. - - Methods: - get_agents(agent_names: list[str]) -> list: - Static method to retrieve a list of agents based on the provided agent names. - """ - - def __init__(self): - self.agents = { - "ETHER_SCAN": ether_scan_agent.etherscan_agent, - "GOLDRUSH": goldrush_agent.goldrush_agent, - "MORALIS": moralis_agent.moralis_agent - } - - def get_agents(self, agent_names: List[str]) -> List[Any]: - """ - Retrieve a list of agent instances based on the provided agent names. - - Args: - agent_names (list[str]): A list of agent names to retrieve. - - Returns: - list: A list of agent instances corresponding to the provided agent names. - If an agent name is not found, it is ignored. - """ - return [self.agents[agent_name] for agent_name in agent_names if agent_name in self.agents.keys()] diff --git a/src/agentflow/agents/moralis_agent.py b/src/agentflow/agents/moralis_agent.py deleted file mode 100644 index ad79d75..0000000 --- a/src/agentflow/agents/moralis_agent.py +++ /dev/null @@ -1,65 +0,0 @@ -from langchain.tools import tool -from langchain.agents import AgentExecutor - -from src.util.configuration import Config -from src.agentflow.utils.enum import AgentType -from src.agentflow.utils.tools_index import get_all_tools -from src.agentflow.utils.shared_tools import handle_exceptions -from src.agentflow.utils.shared_tools import init_llm, init_agent, init_prompt - - -@tool -@handle_exceptions -def moralis_agent(query: str): - """ - An agent for handling Ethereum blockchain-related queries and tasks. - This agent can perform the following tasks: - - - Get active chains for a wallet address across all chains - - Get token balances for a specific wallet address and their token prices in USD. (paginated) - - Get the stats for a wallet address. - - Retrieve the full transaction history of a specified wallet address, including sends, receives, token and NFT transfers and contract interactions. - - Get the contents of a transaction by the given transaction hash. - - Get ERC20 approvals for one or many wallet addresses and/or contract addresses, ordered by block number in descending order. - - Args: - query (str): query about Ethereum blockchain tasks. - - Returns: - str: Response containing the requested Ethereum blockchain information - """ - config = Config.get_config() - - llm = init_llm(service=config["llm"]["provider"], - model_name=config["llm"]["model"], - api_key=config["llm"]["api_key"], - stream=False) - - tools = get_all_tools(tools_path="moralis_tools") - - prompt = init_prompt(llm, AgentType.BLOCKCHAIN_AGENT) - - agent = init_agent(llm, tools, prompt) - - agent_executor = AgentExecutor( - agent=agent, - tools=tools, - return_intermediate_steps=True, - verbose=True) - - response = agent_executor.invoke({"input": query}) - - try: - agent_steps = [] - for step in response["intermediate_steps"]: - agent_steps.append({ - "function_name": step[0].tool, - "function_args": step[0].tool_input, - "function_output": step[-1] - }) - if agent_steps: - return {"agent_steps": agent_steps} - else: - return {"agent_answer": response["output"]} - except: - return "no tools called inside agent" diff --git a/src/agentflow/providers/ether_scan_tools.py b/src/agentflow/providers/chain_scan_tools.py similarity index 67% rename from src/agentflow/providers/ether_scan_tools.py rename to src/agentflow/providers/chain_scan_tools.py index 1107540..ffc68c2 100644 --- a/src/agentflow/providers/ether_scan_tools.py +++ b/src/agentflow/providers/chain_scan_tools.py @@ -15,25 +15,51 @@ _config = Config.get_config() _ether_scan_config = Config.get_service_config(_config, "ETHER_SCAN") -_ETHERSCAN_URL = "https://api.etherscan.io/v2/api" -_ETH_RPC = os.environ["ETH_RPC"] + +def _get_chain_config(chain_id: str) -> Dict: + _config = {} + if chain_id == "1": + _config["RPC"] = os.environ["ETH_RPC"] + _config["URL"] = "https://api.etherscan.io/v2/api" + _config["API_KEY"] = os.environ["ETHER_SCAN_API_KEY"] + elif chain_id == "42161": + _config["RPC"] = os.environ["ARBITRUM_ONE_RPC"] + _config["URL"] = "https://api.arbiscan.io/api" + _config["API_KEY"] = os.environ["ARBI_SCAN_API_KEY"] + elif chain_id == "8453": + _config["RPC"] = os.environ["BASE_RPC"] + _config["URL"] = "https://api.basescan.org/api" + _config["API_KEY"] = os.environ["BASE_SCAN_API_KEY"] + elif chain_id == "137": + _config["RPC"] = os.environ["POLYGON_RPC"] + _config["URL"] = "https://api.polygonscan.com/api" + _config["API_KEY"] = os.environ["POLYGON_SCAN_API_KEY"] + elif chain_id == "250": + _config["RPC"] = os.environ["FANTOM_RPC"] + _config["URL"] = "https://api.ftmscan.com/api" + _config["API_KEY"] = os.environ["FTM_SCAN_API_KEY"] + else: + raise ValueError(f"Invalid chain ID: {chain_id}") + + return _config @handle_exceptions -def fetch_contract_abi(contract_address: str, api_key: str) -> Dict: +def fetch_contract_abi(contract_address: str, chain_id: str, api_key: str) -> Dict: """ Retrieve the ABI of a smart contract from the Etherscan API. Args: contract_address (str): The contract address. + chain_id (str): The chain ID can be 1, 42161, 8453 api_key (str): The decrypted Etherscan API key. Returns: Dict: A dictionary representing the contract ABI. """ - url = _ETHERSCAN_URL + url = _get_chain_config(chain_id)["URL"] params = { - "chainid": "1", + "chainid": chain_id, "module": "contract", "action": "getabi", "address": contract_address, @@ -49,20 +75,21 @@ def fetch_contract_abi(contract_address: str, api_key: str) -> Dict: @handle_exceptions -def fetch_contract_source_code(contract_address: str, api_key: str) -> str: +def fetch_contract_source_code(contract_address: str, chain_id: str, api_key: str) -> str: """ Retrieve the source code of a smart contract from the Etherscan API. Args: contract_address (str): The contract address. + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 api_key (str): The decrypted Etherscan API key. Returns: str: The contract source code. """ - url = _ETHERSCAN_URL + url = _get_chain_config(chain_id)["URL"] params = { - "chainid": "1", + "chainid": chain_id, "module": "contract", "action": "getsourcecode", "address": contract_address, @@ -91,20 +118,21 @@ def get_event_abi(abi: List[Dict], event_name: str) -> Optional[Dict]: @handle_exceptions -def timestamp_to_block_number(timestamp: int, api_key: str) -> int: +def timestamp_to_block_number(timestamp: int, chain_id: str, api_key: str) -> int: """ Convert a given Unix timestamp to the nearest Ethereum block number. Args: timestamp (int): Unix timestamp. + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 api_key (str): The decrypted Etherscan API key. Returns: int: The closest block number. """ - url = _ETHERSCAN_URL + url = _get_chain_config(chain_id)["URL"] params = { - "chainid": "1", + "chainid": chain_id, "module": "block", "action": "getblocknobytime", "timestamp": timestamp, @@ -151,19 +179,20 @@ def convert_to_timestamp(date_str: str) -> int: @tool @handle_exceptions -def get_contract_source_code(contract_address: str) -> str: +def get_contract_source_code(contract_address: str, chain_id: str) -> str: """ Retrieve the source code of a smart contract. Args: contract_address (str): The contract address. + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 Returns: str: The contract source code. """ - api_key = _ether_scan_config["api_key"] + api_key = _get_chain_config(chain_id)["API_KEY"] - response = fetch_contract_source_code(contract_address, api_key) + response = fetch_contract_source_code(contract_address, chain_id, api_key) final_output = {"proxy": [], "implementation": []} @@ -171,7 +200,8 @@ def get_contract_source_code(contract_address: str) -> str: while int(response.get('Proxy', 0)): final_output["proxy"].append(response["SourceCode"]) current_address = response["Implementation"] - response = fetch_contract_source_code(current_address, api_key) + response = fetch_contract_source_code( + current_address, chain_id, api_key) final_output["implementation"].append(response["SourceCode"]) @@ -180,28 +210,30 @@ def get_contract_source_code(contract_address: str) -> str: @tool @handle_exceptions -def get_contract_abi(contract_address: str) -> Dict: +def get_contract_abi(contract_address: str, chain_id: str) -> Dict: """ Retrieve the ABI of a smart contract. Args: contract_address (str): The contract address. + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 Returns: Dict: The contract ABI. """ - api_key = _ether_scan_config["api_key"] - return fetch_contract_abi(contract_address, api_key) + api_key = _get_chain_config(chain_id)["API_KEY"] + return fetch_contract_abi(contract_address, chain_id, api_key) @tool @handle_exceptions -def get_abi_of_event(contract_address: str, event_name: str) -> Dict: +def get_abi_of_event(contract_address: str, chain_id: str, event_name: str) -> Dict: """ Retrieve the ABI of a specific event from a smart contract. Args: contract_address (str): The smart contract address. + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 event_name (str): The name of the event. Returns: @@ -210,8 +242,8 @@ def get_abi_of_event(contract_address: str, event_name: str) -> Dict: Raises: Exception: If the API key is not found or the event is not in the contract ABI. """ - api_key = _ether_scan_config["api_key"] - abi = fetch_contract_abi(contract_address, api_key) + api_key = _get_chain_config(chain_id)["API_KEY"] + abi = fetch_contract_abi(contract_address, chain_id, api_key) event_abi = get_event_abi(abi, event_name) if event_abi is None: raise Exception(f"Event '{event_name}' not found in the ABI.") @@ -222,6 +254,7 @@ def get_abi_of_event(contract_address: str, event_name: str) -> Dict: @handle_exceptions def get_contract_events( contract_address: str, + chain_id: str, event_name: str, from_block: Optional[int] = None, to_block: Optional[int] = None @@ -231,6 +264,7 @@ def get_contract_events( Args: contract_address (str): The smart contract address. + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 event_name (str): The name of the event to fetch. from_block (Optional[int]): The starting block (default: current block - 10). to_block (Optional[int]): The ending block (default: current block). @@ -241,10 +275,10 @@ def get_contract_events( Raises: Exception: If the event is not found in the contract's ABI. """ - api_key = _ether_scan_config["api_key"] - abi = fetch_contract_abi(contract_address, api_key) + api_key = _get_chain_config(chain_id)["API_KEY"] + abi = fetch_contract_abi(contract_address, chain_id, api_key) - web3 = Web3(Web3.HTTPProvider(_ETH_RPC)) + web3 = Web3(Web3.HTTPProvider(_get_chain_config(chain_id)["RPC"])) contract = web3.eth.contract(address=contract_address, abi=abi) # Resolve the actual event name case-insensitively @@ -270,55 +304,64 @@ def get_contract_events( @tool @handle_exceptions -def get_latest_eth_block_number() -> int: +def get_latest_chain_block_number(chain_id: str) -> int: """ - Retrieve the latest Ethereum block number. + Retrieve the latest chain block number. + + Args: + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 Returns: int: The current block number on the Ethereum mainnet. """ - web3 = Web3(Web3.HTTPProvider(_ETH_RPC)) + web3 = Web3(Web3.HTTPProvider(_get_chain_config(chain_id)["RPC"])) return web3.eth.block_number @tool @handle_exceptions -def convert_timestamp_to_block_number(timestamp: int) -> int: +def convert_timestamp_to_block_number(timestamp: int, chain_id: str) -> int: """ Convert a Unix timestamp to the nearest Ethereum block number. Args: timestamp (int): The Unix timestamp. + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 Returns: int: The block number closest to the provided timestamp. """ - api_key = _ether_scan_config["api_key"] - return timestamp_to_block_number(timestamp, api_key) + api_key = _get_chain_config(chain_id)["API_KEY"] + return timestamp_to_block_number(timestamp, chain_id, api_key) @tool @handle_exceptions -def get_latest_eth_block_hash() -> str: +def get_latest_eth_block_hash(chain_id: str) -> str: """ - Retrieve the hash of the latest Ethereum block. + Retrieve the hash of the latest chain block. + + Args: + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 Returns: str: The hash of the latest block on the Ethereum mainnet. """ - web3 = Web3(Web3.HTTPProvider(_ETH_RPC)) + web3 = Web3(Web3.HTTPProvider(_get_chain_config(chain_id)["RPC"])) latest_block = web3.eth.get_block('latest') return web3.to_hex(latest_block.hash) @tool @handle_exceptions -def get_block_transactions(block_number: int, output_include: List[str]) -> List[Dict[str, Any]]: +def get_block_transactions(block_number: int, chain_id: str, output_include: List[str]) -> List[Dict[str, Any]]: """ Retrieve all transactions in a specific Ethereum block. Args: block_number (int): The block number to retrieve transactions from. + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 + output_include (List[str]): List of fields to include in the output. Returns: List[dict[str, Any]]: @@ -329,7 +372,7 @@ def get_block_transactions(block_number: int, output_include: List[str]) -> List - blockHash, blockNumber, from, gas, gasPrice, maxPriorityFeePerGas, maxFeePerGas, hash, input, nonce, to, transactionIndex, value, type, accessList, chainId, v, yParity, r, s """ - web3 = Web3(Web3.HTTPProvider(_ETH_RPC)) + web3 = Web3(Web3.HTTPProvider(_get_chain_config(chain_id)["RPC"])) # Validate block number latest_block = web3.eth.block_number @@ -367,13 +410,14 @@ def get_block_transactions(block_number: int, output_include: List[str]) -> List @tool @handle_exceptions -def decode_transaction_input(transaction_input: str, contract_address: str) -> Dict[str, Any]: +def decode_transaction_input(transaction_input: str, contract_address: str, chain_id: str) -> Dict[str, Any]: """ Decode the input data of an Ethereum transaction using the ABI of the contract. Args: transaction_input (str): The input data of the transaction (hex string starting with '0x') contract_address (str): The address of the contract that was called in the transaction + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 Returns: Dict[str, Any]: A dictionary containing the decoded transaction input with the following fields: @@ -402,10 +446,10 @@ def decode_transaction_input(transaction_input: str, contract_address: str) -> D try: # Get the contract ABI - abi = get_contract_abi(contract_address) + abi = get_contract_abi(contract_address, chain_id) # Initialize Web3 - web3 = Web3(Web3.HTTPProvider(_ETH_RPC)) + web3 = Web3(Web3.HTTPProvider(_get_chain_config(chain_id)["RPC"])) contract = web3.eth.contract(address=contract_address, abi=abi) # Try direct decoding first using web3.py's built-in functionality @@ -491,7 +535,7 @@ def decode_transaction_input(transaction_input: str, contract_address: str) -> D # Try with implementation ABI implementation_abi = get_contract_abi( - implementation_address) + implementation_address, chain_id) implementation_contract = web3.eth.contract( address=contract_address, abi=implementation_abi @@ -585,3 +629,113 @@ def decode_transaction_input(transaction_input: str, contract_address: str) -> D "raw_input": transaction_input, "function_selector": function_selector } + + +@tool +@handle_exceptions +def call_contract_function(contract_address: str, chain_id: str, function_name: str, function_params: Optional[List[Any]] = None) -> Dict[str, Any]: + """ + Call a read-only (view/pure) function of a smart contract and return its result to get data from contract + + Args: + contract_address (str): The address of the smart contract. + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 + function_name (str): The name of the function to call. + function_params (Optional[List[Any]]): List of parameters to pass to the function. Default is None (no parameters). + + Returns: + Dict[str, Any]: A dictionary containing the following fields: + - success: Boolean indicating if the call was successful + - result: The result of the function call if successful + - error: Error message if unsuccessful + - result_type: The data type of the result + + Raises: + Exception: If the contract ABI cannot be retrieved or the function call fails + """ + # Initialize parameters if None + if function_params is None: + function_params = [] + + api_key = _get_chain_config(chain_id)["API_KEY"] + web3 = Web3(Web3.HTTPProvider(_get_chain_config(chain_id)["RPC"])) + + try: + # Get the contract ABI + contract_address = Web3.to_checksum_address(contract_address) + abi = fetch_contract_abi(contract_address, chain_id, api_key) + contract = web3.eth.contract(address=contract_address, abi=abi) + + # Find the function in the ABI + function_entries = [f for f in abi if f.get( + 'type') == 'function' and f.get('name') == function_name] + if not function_entries: + available_functions = [f['name'] + for f in abi if f.get('type') == 'function'] + raise Exception( + f"Function '{function_name}' not found in contract ABI. Available functions: {available_functions}") + + # Get the function object + function_obj = getattr(contract.functions, function_name) + + # Check if the function is read-only + function_entry = function_entries[0] + if function_entry.get('stateMutability') not in ['view', 'pure', 'constant']: + raise Exception( + f"Function '{function_name}' is not a read-only function and might modify state or require a transaction.") + + # Call the function with provided parameters + result = function_obj(*function_params).call() + + # Process the result + result_type = "unknown" + processed_result = result + + # Determine result type and format accordingly + if isinstance(result, (int, float, bool, str)): + result_type = type(result).__name__ + elif isinstance(result, bytes): + processed_result = web3.to_hex(result) + result_type = "bytes (hex)" + elif isinstance(result, tuple): + # Handle named tuples (common in Solidity returns) + if hasattr(result, '_asdict'): + processed_result = dict(result._asdict()) + result_type = "struct" + else: + processed_result = list(result) + result_type = "tuple" + + # Convert any bytes in the result to hex + if isinstance(processed_result, dict): + for key, value in processed_result.items(): + if isinstance(value, bytes): + processed_result[key] = web3.to_hex(value) + # Large ints might be wei values + elif isinstance(value, int) and value > 10**10: + processed_result[f"{key}_eth"] = web3.from_wei( + value, 'ether') + elif isinstance(processed_result, list): + processed_result = [web3.to_hex(v) if isinstance( + v, bytes) else v for v in processed_result] + elif isinstance(result, list): + processed_result = result + result_type = "array" + # Convert any bytes in the list to hex + for i, item in enumerate(processed_result): + if isinstance(item, bytes): + processed_result[i] = web3.to_hex(item) + + return { + "success": True, + "result": processed_result, + "result_type": result_type + } + + except Exception as e: + return { + "success": False, + "error": str(e), + "result": None, + "result_type": None + } diff --git a/src/agentflow/providers/moralis_tools.py b/src/agentflow/providers/moralis_tools.py index a99ca39..811b63e 100644 --- a/src/agentflow/providers/moralis_tools.py +++ b/src/agentflow/providers/moralis_tools.py @@ -51,7 +51,7 @@ def get_wallet_active_chains(wallet_address: str, output_include: list[str]) -> @tool @handle_exceptions -def get_wallet_token_balances(wallet_address: str, output_include: list[str], cursor: str = None) -> dict: +def get_wallet_token_balances(wallet_address: str, output_include: list[str], cursor: str = "") -> dict: """ Get token balances for a specific wallet address and their token prices in USD. (paginated) apply decimal conversion for balance @@ -128,7 +128,7 @@ def get_wallet_stats(wallet_address: str, output_include: list[str]) -> dict: @tool @handle_exceptions -def get_wallet_history(wallet_address: str, output_include: list[str], cursor: str = None) -> dict: +def get_wallet_history(wallet_address: str, output_include: list[str], cursor: str = "") -> dict: """ Retrieve the full transaction history of a specified wallet address, including sends, receives, token and NFT transfers and contract interactions. (paginated & in descending order) @@ -211,7 +211,7 @@ def get_transaction_detail(transaction_hash: str, output_include: list[str]) -> @tool @handle_exceptions -def get_token_approvals(wallet_address: str, output_include: list[str], cursor: str = None) -> dict: +def get_token_approvals(wallet_address: str, output_include: list[str], cursor: str = "") -> dict: """ Get ERC20 approvals for one or many wallet addresses and/or contract addresses, ordered by block number in descending order. diff --git a/src/agentflow/tool_hub/hub.py b/src/agentflow/tool/hub.py similarity index 90% rename from src/agentflow/tool_hub/hub.py rename to src/agentflow/tool/hub.py index 6be92f7..c4169e2 100644 --- a/src/agentflow/tool_hub/hub.py +++ b/src/agentflow/tool/hub.py @@ -1,7 +1,7 @@ from typing import List, Any -from src.agentflow.utils.tools_index import get_all_tools -from src.agentflow.utils.tool_selector import ToolSelector +from src.agentflow.tool.tools_index import get_all_tools +from src.agentflow.tool.tool_selector import ToolSelector class ToolRegistery: @@ -11,7 +11,7 @@ class ToolRegistery: """ providers = [ - "ether_scan", + "chain_scan", "moralis", ] diff --git a/src/agentflow/utils/tool_selector.py b/src/agentflow/tool/tool_selector.py similarity index 76% rename from src/agentflow/utils/tool_selector.py rename to src/agentflow/tool/tool_selector.py index 2ba699f..d36ae1f 100644 --- a/src/agentflow/utils/tool_selector.py +++ b/src/agentflow/tool/tool_selector.py @@ -48,12 +48,12 @@ def get_tool_descriptions(self, tools: List[Any]) -> List[Dict[str, str]]: return tool_descriptions - def select_tools(self, query: str, available_tools: List[Any]) -> List[Any]: + def select_tools(self, query: List[str], available_tools: List[Any]) -> List[Any]: """ Select appropriate tools for a given user query. Args: - query (str): The user's query + query (List[str]): List of user queries available_tools (List[Any]): List of all available tool function references Returns: @@ -67,7 +67,7 @@ def select_tools(self, query: str, available_tools: List[Any]) -> List[Any]: # Get LLM response messages = [ - ("system", "You are a tool selection assistant. Your job is to analyze a user query and select the most appropriate tools to answer it."), + ("system", "You are a tool selection assistant. Your job is to analyze a user chat conversation and select the most appropriate tools to answer user need."), ("human", prompt) ] @@ -90,34 +90,38 @@ def select_tools(self, query: str, available_tools: List[Any]) -> List[Any]: # Return all tools as fallback if parsing fails return available_tools - def _create_tool_selection_prompt(self, query: str, tool_descriptions: List[Dict[str, str]]) -> str: + def _create_tool_selection_prompt(self, query: List[str], tool_descriptions: List[Dict[str, str]]) -> str: """ Create a prompt for the LLM to select appropriate tools. Args: - query (str): The user's query + query (List[str]): List of user queries tool_descriptions (List[Dict[str, str]]): List of tool descriptions Returns: str: The prompt for the LLM """ + current_query = query[-1] + previous_queries = query[:-1] + tools_json = json.dumps(tool_descriptions, indent=2) prompt = f""" -User Query: {query} +previous user queries: {str(previous_queries)} +current user query : {str(current_query)} Available Tools: {tools_json} Instructions: -- Analyze the user query carefully -- Review each available tool and its description -- Select the tools that are directly relevant to answering the query -- If you are not sure if a tool is needed or not select it -- Return your answer as a JSON array of tool names, like this: ["tool_name1", "tool_name2"] -- If no tools are relevant, return an empty array: [] -- Do not include any explanation or additional text, just the JSON array -- Break down the user task into smaller subtasks if needed +1. Review the current user query alongside previous user queries to fully understand the context and user needs. +2. Carefully examine each available tool and its description. +3. Identify and select only those tools that are directly applicable to addressing the current query or are necessary based on the context provided by previous queries. +4. If uncertainty exists about the necessity of a tool, include it. +5. For complex queries, consider decomposing the task into smaller subtasks and select tools accordingly. +6. Return your response strictly as a JSON array of tool names, formatted like this: ["tool_name1", "tool_name2"]. +7. If no tools are relevant, return an empty array: []. +8. Do not include any explanation, commentary, or additional text—only the JSON array of selected tool names. Selected Tools: """ diff --git a/src/agentflow/utils/tools_index.py b/src/agentflow/tool/tools_index.py similarity index 100% rename from src/agentflow/utils/tools_index.py rename to src/agentflow/tool/tools_index.py diff --git a/src/conversation/services/conversation_service.py b/src/conversation/services/conversation_service.py index fb8f3cb..805ae5c 100644 --- a/src/conversation/services/conversation_service.py +++ b/src/conversation/services/conversation_service.py @@ -1,24 +1,26 @@ +import json + from uuid import UUID from typing import List +from datetime import datetime from sqlalchemy.orm import Session from fastapi import HTTPException, status from langchain_core.messages.human import HumanMessage from src.util.configuration import Config -from src.agentflow.agents.hub import AgentHub from src.util.execptions import NotFoundError from src.db.models import Conversation, QueryUsage +from src.agentflow.tool.hub import ToolRegistery from src.agentflow.utils.shared_tools import init_llm from src.user.services.user_service import UserService +from src.agent.services.agent_service import AgentService from src.agent.services.memory_service import MemoryService from src.project.services.project_service import ProjectService -from src.agent.services.agent_service import AgentService from src.project.repositories.project_repository import ProjectRepository from src.query_usage.services.query_usage_service import QueryUsageService +from src.agentflow.providers.chain_scan_tools import get_current_timestamp from src.conversation.repositories.conversation_repository import ConversationRepository -from src.agentflow.tool_hub.hub import ToolRegistery - class ConversationService: """ @@ -207,10 +209,30 @@ async def send_message( if conversation.project_id != project_id: raise NotFoundError("Project not found or is not owned by user") + all_user_messages = self.get_history( + db_session, user_id, conversation_id) + all_user_messages.append(message) + + tool_selection_start_event = { + "type": "tool_selection_start", + "timestamp": str(datetime.now()) + } + yield json.dumps(tool_selection_start_event) + "\n" + # Select appropriate tools for the user's message using our new tool selector - selected_tools = ToolRegistery.select_tools_for_query(message) + selected_tools = ToolRegistery.select_tools_for_query( + all_user_messages) + + # langchain raise exception if the tools list is empty + if len(selected_tools) == 0: + selected_tools = [get_current_timestamp] - print(str([selected_tool.name for selected_tool in selected_tools])) + tool_selection_end_event = { + "type": "tool_selection_end", + "selected_tools": [selected_tool.name for selected_tool in selected_tools], + "timestamp": str(datetime.now()) + } + yield json.dumps(tool_selection_end_event) + "\n" memory = self.memory_service.get_memory(conversation_id) @@ -283,6 +305,22 @@ def get_history( generated_id += 1 return history + def get_user_messages(self, conversation_id: UUID): + """ + Retrieves only the user (human) messages from a conversation. + + Args: + conversation_id (UUID): ID of the conversation to get messages from + + Returns: + list: List of HumanMessage objects from the conversation + """ + memory = self.memory_service.get_memory(conversation_id) + # filter user messages + user_messages = [ + message.content for message in memory.messages if isinstance(message, HumanMessage)] + return user_messages + def rename_title(self, db_session: Session, conversation_id: UUID, user_id: UUID, message: str) -> str: """ Renames a conversation title using the LLM to generate a title. From e47a26a319bad410dad9d2158ef8591b27dece0e Mon Sep 17 00:00:00 2001 From: yasinfakhar Date: Thu, 10 Apr 2025 15:04:18 +0330 Subject: [PATCH 3/3] feat: Add Polygon and Fantom chain IDs to supported networks in chain scan tools --- src/agentflow/providers/chain_scan_tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/agentflow/providers/chain_scan_tools.py b/src/agentflow/providers/chain_scan_tools.py index ffc68c2..62c0aba 100644 --- a/src/agentflow/providers/chain_scan_tools.py +++ b/src/agentflow/providers/chain_scan_tools.py @@ -51,7 +51,7 @@ def fetch_contract_abi(contract_address: str, chain_id: str, api_key: str) -> Di Args: contract_address (str): The contract address. - chain_id (str): The chain ID can be 1, 42161, 8453 + chain_id (str): The chain ID can be 1, 42161, 8453, 137, 250 api_key (str): The decrypted Etherscan API key. Returns: