Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,18 @@ LLM_MODEL=gpt-4o-mini
LLM_API_KEY=
LLM_HOST=
LLM_MODELS_PATH=

LANGSMITH_API_KEY=

#------------------------
# SERVICES
#------------------------
ETH_RPC=https://eth-pokt.nodies.app
ETHER_SCAN_API_KEY=
GOLDRUSH_API_KEY=
MORALIS_API_KEY=
EXA_API_KEY=
PERPLEXITY_API_KEY=
TAVILY_API_KEY=

ETH_RPC=


#------------------------
# Agents
#------------------------
AGENTS=ETHER_SCAN,GOLDRUSH,MORALIS
ARBITRUM_ONE_RPC=https://endpoints.omniatech.io/v1/arbitrum/one/public
ARBI_SCAN_API_KEY=
BASE_RPC=https://base.llamarpc.com
BASE_SCAN_API_KEY=
POLYGON_RPC=https://polygon-pokt.nodies.app
POLYGON_SCAN_API_KEY=
FANTOM_RPC=https://1rpc.io/ftm
FTM_SCAN_API_KEY=
14 changes: 13 additions & 1 deletion .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ jobs:
-e MORALIS_API_KEY=${{ secrets.MORALIS_API_KEY }} \
-e ETHER_SCAN_API_KEY=${{ secrets.ETHER_SCAN_API_KEY }} \
-e GOLDRUSH_API_KEY=${{ secrets.GOLDRUSH_API_KEY }} \
-e ETH_RPC=${{ secrets.ETH_RPC }} \
-e LLM_PROVIDER=${{ secrets.LLM_PROVIDER }} \
-e LLM_MODEL=${{ secrets.LLM_MODEL }} \
-e LLM_API_KEY=${{ secrets.LLM_API_KEY }} \
Expand All @@ -83,6 +82,19 @@ jobs:
-e LOG_FILE_PATH=${{ secrets.LOG_FILE_PATH }} \
-e LOG_BACKUP_COUNT=${{ secrets.LOG_BACKUP_COUNT }} \
-e LOG_MAX_BYTES=${{ secrets.LOG_MAX_BYTES }} \

### services
-e ETH_RPC=${{ secrets.ETH_RPC }} \
-e ETHER_SCAN_API_KEY=${{ secrets.ETHER_SCAN_API_KEY }} \
-e ARBITRUM_ONE_RPC=${{ secrets.ARBITRUM_ONE_RPC }} \
-e ARBI_SCAN_API_KEY=${{ secrets.ARBI_SCAN_API_KEY }} \
-e BASE_RPC=${{ secrets.BASE_RPC }} \
-e BASE_SCAN_API_KEY=${{ secrets.BASE_SCAN_API_KEY }} \
-e POLYGON_RPC=${{ secrets.POLYGON_RPC }} \
-e POLYGON_SCAN_API_KEY=${{ secrets.POLYGON_SCAN_API_KEY }} \
-e FANTOM_RPC=${{ secrets.FANTOM_RPC }} \
-e FTM_SCAN_API_KEY=${{ secrets.FTM_SCAN_API_KEY }} \

patterntechnology/pattern-core-api:latest

# Wait a few seconds to give the container time to start
Expand Down
6 changes: 3 additions & 3 deletions src/agent/routers/agent_router.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fastapi import APIRouter

from src.agent.services.agent_service import RouterAgentService
from src.agent.services.agent_service import AgentService
from src.db.sql_alchemy import Database

router = APIRouter(prefix="/agent")
Expand All @@ -15,5 +15,5 @@ def get_db():
db.close()


def get_agent_service() -> RouterAgentService:
return RouterAgentService()
def get_agent_service() -> AgentService:
return AgentService()
146 changes: 99 additions & 47 deletions src/agent/services/agent_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import asyncio
from typing import Dict, Any, Optional, AsyncGenerator
from typing import Dict, Any, AsyncGenerator

from datetime import datetime
from langchain.agents import AgentExecutor
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.runnables.history import RunnableWithMessageHistory
Expand All @@ -16,6 +17,7 @@ class StreamingCallbackHandler(BaseCallbackHandler):
A callback handler that collects tokens and intermediate events in an asyncio queue.
Uses a newline-delimited JSON (NDJSON) protocol for reliable streaming.
Each event is a complete JSON object with a newline terminator.
Captures detailed information about tool execution including inputs and outputs.
"""

def __init__(self):
Expand All @@ -42,31 +44,103 @@ def on_agent_action(self, action, **kwargs) -> None:
action: The action being performed by the agent.
**kwargs: Additional keyword arguments.
"""
event = {
"type": "agent_start",
"timestamp": str(datetime.now())
}
self.queue.put_nowait(json.dumps(event) + "\n")

def on_agent_finish(self, action, **kwargs) -> None:
"""
Handle agent finish events.

Args:
action: The action being performed by the agent.
**kwargs: Additional keyword arguments.
"""
event = {
"type": "agent_finish",
"timestamp": str(datetime.now())
}
self.queue.put_nowait(json.dumps(event) + "\n")

def on_tool_start(self, serialized, input_str, **kwargs) -> None:
"""
Handle tool start events.

Args:
serialized: The serialized input to the tool.
input_str: The string representation of the input.
**kwargs: Additional keyword arguments.
"""
event = {
"type": "tool_start",
"tool": getattr(action, "tool", None),
"tool_input": getattr(action, "tool_input", {})
"tool_name": serialized["name"],
"params": input_str,
"timestamp": str(datetime.now())
}
# Use NDJSON format
self.queue.put_nowait(json.dumps(event) + "\n")

def on_tool_end(self, output, **kwargs) -> None:
"""
Handle tool completion events.

Args:
output: The output produced by the tool.
**kwargs: Additional keyword arguments.
"""
# Extract information about the completed tool
observation = kwargs.get("observation", output)
tool_name = kwargs.get("name", None)

# Create a detailed event for tool completion
event = {
"type": "tool_end",
"tool_name": tool_name,
"output": observation,
"timestamp": str(datetime.now())
}
# Use NDJSON format
self.queue.put_nowait(json.dumps(event) + "\n")

def on_tool_error(self, error, **kwargs) -> None:
"""
Handle tool error events.

class RouterAgentService:
Args:
error: The error that occurred during tool execution.
**kwargs: Additional keyword arguments.
"""
# Extract information about the tool that caused the error
tool_name = kwargs.get("name", None)

# Create a detailed event for tool error
event = {
"type": "tool_error",
"tool_name": tool_name,
"error": str(error),
"timestamp": str(datetime.now())
}
# Use NDJSON format
self.queue.put_nowait(json.dumps(event) + "\n")


class AgentService:
"""
RouterAgentService is responsible for routing the input message to the appropriate agent
and returning the response.
AgentService is responsible for doing the job
"""

def __init__(self, sub_agents, memory=None, streaming: bool = True):
def __init__(self, tools, memory=None, streaming: bool = True):
"""
Initialize the RouterAgentService.
Initialize the AgentService.

Args:
sub_agents: The sub-agents to use for routing.
tools: The tools to use for agent.
memory: The memory to use for storing conversation history.
streaming (bool): Whether to enable streaming responses.
"""
self.sub_agents = sub_agents
self.tools = tools
self.memory = memory
self.streaming = streaming
self.streaming_handler = None
Expand All @@ -88,22 +162,32 @@ def __init__(self, sub_agents, memory=None, streaming: bool = True):
stream=streaming,
callbacks=[self.streaming_handler] if self.streaming else None)

self.prompt = init_prompt(self.llm, AgentType.ROUTER_AGENT)
self.prompt = init_prompt(self.llm, AgentType.PATTERN_CORE_AGENT)

self.agent = init_agent(self.llm, self.sub_agents, self.prompt)
self.agent = init_agent(self.llm, self.tools, self.prompt)

if streaming:
# Wrap each tool with the callback handler to ensure tool events are captured
wrapped_tools = []
for tool in self.tools:
# Create a copy of the tool with callbacks attached
tool_with_callbacks = tool.copy()
tool_with_callbacks.callbacks = [self.streaming_handler]
wrapped_tools.append(tool_with_callbacks)

# Make sure the streaming handler is registered for all events, including tool completion
self.agent_executor = AgentExecutor(
agent=self.agent,
tools=self.sub_agents,
tools=wrapped_tools, # Use the wrapped tools with callbacks
return_intermediate_steps=True,
verbose=True,
callbacks=[self.streaming_handler]
callbacks=[self.streaming_handler],
handle_tool_error=True # Ensure tool errors are also captured
)
else:
self.agent_executor = AgentExecutor(
agent=self.agent,
tools=self.sub_agents,
tools=self.tools,
return_intermediate_steps=True,
verbose=True
)
Expand Down Expand Up @@ -263,38 +347,6 @@ async def stream(self, message: str) -> AsyncGenerator[str, None]:
}
yield json.dumps(error_event) + "\n"

# Send a completion event to signal the end of streaming
try:
completion_event = {
"type": "completion",
"data": "Stream completed"
}
yield json.dumps(completion_event) + "\n"

# Wait for the task to complete and get the result
await task
except asyncio.CancelledError:
# Handle task cancellation gracefully
error_event = {
"type": "info",
"data": "Task was cancelled"
}
yield json.dumps(error_event) + "\n"
except ConnectionError as e:
# Handle connection errors specifically
error_event = {
"type": "error",
"data": f"Connection error: {str(e)}"
}
yield json.dumps(error_event) + "\n"
except Exception as e:
# Handle any errors during task execution
error_event = {
"type": "error",
"data": f"Task execution error: {str(e)}"
}
yield json.dumps(error_event) + "\n"

def ask(self, message: str) -> Dict[str, Any]:
"""
Sends a message to the agent and returns the response.
Expand Down
72 changes: 0 additions & 72 deletions src/agentflow/agents/ether_scan_agent.py

This file was deleted.

Loading