A sophisticated agent system built on advanced language models with dynamic code generation, self-modification capabilities, and inter-agent communication.
- Distributed Agent Architecture: Multiple autonomous agents running as separate processes
- FastAPI Integration: Each agent exposes a REST API with WebSocket support
- Redis PubSub Communication: Realtime messaging between agents
- Self-Improving Capabilities: Agents can improve their own code and capabilities
- Process Manager: Centralized manager for monitoring and controlling agents
- Fault Tolerance: Automatic restart of crashed agents
- Resource Monitoring: Track memory and CPU usage of agent processes
- Dynamic Code Extension: Add new capabilities without restarting
- Agent Collaboration: Agents can communicate and collaborate on tasks
- Hybrid Search: Combined semantic and keyword search capabilities
- CLI and Web Interfaces: Multiple interaction options
agent_process_manager.py
: Main process manager for controlling agent lifecycleself_improving_agent.py
: Self-improving agent with dynamic capability learningstart_agent_system.py
: Script to launch the entire agent systemagent_config.json
: Configuration for the agent systempubsub_service.py
: Redis PubSub service for real-time communicationdistributed_services.py
: Base services for distributed agent architectureweb_app.py
: Web interface for interacting with agentscli_agent.py
: Command line interface agent
The system follows a distributed architecture with these key components:
-
Agent Process Manager: Central process that manages agent lifecycles, monitors health, and provides a management API
-
Individual Agents: Each agent runs as a separate process with:
- FastAPI server for REST API and WebSocket
- Redis PubSub for inter-agent messaging
- Capabilities specific to the agent type
- Self-improvement mechanisms
-
Communication Layer: Redis PubSub for real-time messaging between agents
-
Web Interface: Optional web UI for interacting with the agent system
- Self-Improving Agent: Can learn new capabilities and improve its own code
- CLI Agent: Command line interface for user interaction
- Web Agent: Web interface for browser-based interaction
- Custom Agents: Define specialized agents for specific tasks
-
Set up your environment:
pip install -r requirements.txt # Start Redis server for PubSub communication redis-server # Set API keys as needed export OPENAI_API_KEY=your_api_key # OR export ANTHROPIC_API_KEY=your_api_key
-
Start the agent system:
# Start with default configuration python start_agent_system.py # Start with custom config python start_agent_system.py --config agent_config.json
-
Or start components individually:
# Start process manager python agent_process_manager.py --mode manager --port 8500 # Start a self-improving agent python self_improving_agent.py --port 8600 --model gpt-4
GET /agents
: List all running agentsGET /agents/{agent_id}
: Get details about a specific agentPOST /agents
: Create a new agentDELETE /agents/{agent_id}
: Stop and remove an agentPOST /agents/{agent_id}/restart
: Restart an agentWebSocket /ws
: Real-time updates about agents
GET /health
: Health check endpointGET /capabilities
: List agent capabilitiesPOST /chat
: Send a chat messagePOST /improve
: Request capability improvementPOST /analyze
: Analyze code for improvementsGET /memory
: Query agent's memoryPOST /upload_improvement
: Upload code to improve the agent
Agents can communicate with each other using Redis PubSub channels:
agent:{agent_id}:commands
: Send commands to a specific agentagent:{agent_id}:responses
: Receive responses from an agentagent_events
: System-wide events (agent started, agent stopped, etc.)
Example communication pattern:
# Agent A sends a chat message to Agent B
await agent_a.publish_event(f"agent:{agent_b_id}:commands", {
"command": "chat",
"message_id": message_id,
"agent_id": agent_a_id,
"message": "Hello, can you help me with this task?",
"timestamp": time.time()
})
# Agent B receives the message and responds
await agent_b.publish_event(f"agent:{agent_b_id}:responses", {
"type": "chat_response",
"message_id": message_id,
"agent_id": agent_b_id,
"receiver": agent_a_id,
"message": "Yes, I can help with that task.",
"timestamp": time.time()
})
The self-improving agent can learn new capabilities through:
- User requests: Direct API calls to improve specific capabilities
- Self-detection: Automatically detecting improvement opportunities during conversations
- Code analysis: Analyzing existing code for improvement opportunities
- Agent collaboration: Learning from other agents in the system
When an improvement is made:
- The agent generates code for the new capability using an LLM
- The code is validated for syntax and security
- The capability is dynamically added to the agent
- The improvement is reported to the agent process manager
- The new capability becomes immediately available
See agent_config.json
for an example configuration:
{
"manager": {
"host": "0.0.0.0",
"port": 8500
},
"agents": [
{
"agent_type": "self_improving",
"agent_name": "Learning Agent",
"port": 8600,
"model": "gpt-4",
"env_vars": {
"IMPROVEMENT_LOGGING": "detailed"
}
},
{
"agent_type": "cli",
"agent_name": "CLI Helper",
"port": 8700,
"model": "gpt-4"
}
]
}
To create a new agent type:
- Create a new Python file for your agent (e.g.,
my_agent.py
) - Extend the
AgentServer
class fromagent_process_manager.py
- Implement custom methods and override
process_command
as needed - Add the agent to your configuration file
Example:
from agent_process_manager import AgentServer
class MySpecialAgent(AgentServer):
def __init__(self, agent_id=None, agent_name=None,
host="127.0.0.1", port=8600, redis_url=None, model="gpt-4"):
super().__init__(
agent_id=agent_id,
agent_name=agent_name or "My Special Agent",
agent_type="special",
host=host,
port=port,
redis_url=redis_url,
model=model
)
self.capabilities = ["special_task", "another_capability"]
async def process_command(self, command, data, sender):
if command == "special_task":
# Handle special task command
result = await self.do_special_task(data)
# Send response back
await self.publish_event(f"agent:{self.agent_id}:responses", {
"type": "special_task_result",
"agent_id": self.agent_id,
"receiver": sender,
"result": result,
"timestamp": time.time()
})
else:
# Fall back to parent implementation for unknown commands
await super().process_command(command, data, sender)
async def do_special_task(self, data):
# Implement your special task here
return {"success": True, "message": "Special task completed"}
- Agents run as separate processes with proper isolation
- LLM-generated code is validated before execution
- Resource limits can be applied to prevent runaway processes
- API endpoints should be properly secured in production deployments