A scalable, event-driven runtime extension for Microsoft AutoGen that enables autonomous agents to communicate over Apache Kafka. This extension provides distributed agent communication capabilities supporting message-based patterns including pub/sub and RPC-style interactions across multiple programming languages.
- Event-Driven Architecture: Built on Apache Kafka for scalable, distributed agent communication
- Multi-Language Support: Extensible architecture supporting multiple programming languages
- Agent Lifecycle Management: Dynamic registration and management of agent factories and instances
- Multiple Communication Patterns: Support for both pub/sub and RPC-style messaging
- Distributed Memory: Kafka-based memory implementation for sharing state across agent instances
- Streaming Processing: Asynchronous event processing for high-throughput scenarios
- Schema Support: Standardized message serialization with CloudEvents support
- Observability: Integrated tracing and monitoring capabilities
- Fault Tolerance: Robust error handling and recovery mechanisms
The Autogen Kafka Extension implements a distributed agent runtime that leverages Apache Kafka's streaming capabilities to enable:
- Horizontal Scaling: Agents can be distributed across multiple instances and locations
- Loose Coupling: Agents communicate through well-defined message contracts
- Event Sourcing: All interactions are captured as immutable events
- Resilience: Built-in fault tolerance and recovery mechanisms
- Language Agnostic: Core patterns can be implemented across different programming languages
autogen-kafka/
├── .github/ # GitHub workflows and settings
├── python/ # Python implementation
│ ├── packages/
│ │ └── autogen-kafka-extension/ # Core Python extension package
│ │ ├── src/
│ │ │ └── autogen_kafka_extension/
│ │ │ ├── memory/ # Distributed memory implementation
│ │ │ │ ├── kafka_memory.py # Kafka-based memory provider
│ │ │ │ └── memory_config.py # Memory configuration
│ │ │ ├── runtimes/ # Agent runtime implementations
│ │ │ │ ├── worker_runtime.py # Main Kafka worker runtime
│ │ │ │ ├── worker_config.py # Worker configuration classes
│ │ │ │ ├── messaging_client.py # Kafka messaging client
│ │ │ │ └── services/ # Runtime service components
│ │ │ │ ├── agent_manager.py # Agent lifecycle management
│ │ │ │ ├── agent_registry.py # Agent registration management
│ │ │ │ ├── message_processor.py # Message processing logic
│ │ │ │ └── subscription_service.py # Subscription management
│ │ │ ├── shared/ # Shared components and utilities
│ │ │ │ ├── events/ # Event definitions and serialization
│ │ │ │ │ ├── memory_event.py # Memory synchronization events
│ │ │ │ │ ├── request_event.py # Agent request events
│ │ │ │ │ ├── response_event.py # Agent response events
│ │ │ │ │ ├── registration_event.py # Agent registration events
│ │ │ │ │ └── subscription_event.py # Subscription events
│ │ │ │ ├── kafka_config.py # Base Kafka configuration
│ │ │ │ ├── streaming_service.py # Kafka streaming service
│ │ │ │ ├── streaming_worker_base.py # Base streaming worker class
│ │ │ │ └── topic_admin_service.py # Topic administration
│ │ │ └── py.typed # Type hints marker
│ │ ├── tests/ # Package tests
│ │ │ ├── test_kafka_memory.py # Memory implementation tests
│ │ │ ├── test_worker_runtime.py # Runtime tests
│ │ │ └── utils.py # Test utilities
│ │ └── pyproject.toml # Package configuration
│ └── README.md # Python-specific implementation guide
├── dotnet/ # Future C# implementation
├── docs/ # Architecture and design documentation
├── examples/ # Cross-language usage examples
├── docker-compose.yml # Kafka development environment
├── service.yml # Service configuration
├── CHANGELOG.md # Version history
├── LICENSE # Apache 2.0 License
└── README.md # This file
- Python (
python/
): Full-featured implementation with comprehensive agent runtime- AutoGen integration via
KafkaWorkerAgentRuntime
- Kafka Streams processing with
kstreams
- CloudEvents support and OpenTelemetry tracing
- See Python README for detailed usage
- AutoGen integration via
- C# (
dotnet/
): Planned implementation for .NET ecosystems
The extension provides language-specific implementations of agent runtimes that:
- Register and manage agent lifecycles
- Route messages between agents via Kafka topics
- Handle both synchronous (RPC) and asynchronous (pub/sub) communication patterns
- Provide observability and error handling
- Direct Messaging: Point-to-point communication between specific agents
- Topic Broadcasting: Publish-subscribe patterns for event distribution
- Request-Response: RPC-style interactions with response correlation
- Event Streaming: Continuous processing of event streams
The extension provides a Kafka-based memory implementation (KafkaMemory
) that enables:
- Shared State: Memory content synchronized across multiple agent instances
- Session Isolation: Each memory session uses dedicated Kafka topics for isolation
- Persistence: Memory state persisted in Kafka topics for durability
- Event Synchronization: Real-time memory updates broadcast to all instances
- Flexible Backend: Wraps existing memory implementations (e.g.,
ListMemory
)
- Environment-specific configurations for Kafka connectivity
- Topic and partition management
- Consumer group and scaling strategies
- Security and authentication settings
- Apache Kafka: Version 2.8+ (local cluster or managed service)
- ZooKeeper: If using older Kafka versions
- Container Runtime: Docker for local development (optional)
- Python: 3.10+ with AutoGen Core dependencies
- C#: .NET 6+ (planned)
Start a local Kafka cluster for development:
# Using the provided Docker Compose
docker-compose up -d
Or configure connection to your existing Kafka infrastructure.
Navigate to the Python implementation:
cd python
Follow the Python README for detailed setup and usage instructions.
Additional language implementations are planned. Check the respective directories when available.
All implementations follow these core patterns:
- Runtime Configuration: Configure Kafka connectivity and topics
- Agent Registration: Register agent factories and instances
- Message Handling: Implement agents that process incoming messages
- Communication: Use direct messaging or topic publishing for agent interaction
This repository welcomes contributions across all language implementations:
- Architecture: Core patterns and message schemas
- Implementation: Language-specific runtime implementations
- Documentation: Usage guides and architectural decisions
- Testing: Integration and performance testing
- Examples: Cross-language demonstration scenarios
- Fork the repository
- Create a feature branch
- Implement changes with appropriate tests
- Ensure compatibility with existing message formats
- Submit a pull request with clear documentation
Each language implementation includes:
- Unit tests for core functionality
- Integration tests with real Kafka clusters
- Performance benchmarks
- Cross-language compatibility tests
The extension provides comprehensive observability:
- Distributed Tracing: OpenTelemetry integration for message flow tracking
- Metrics: Agent performance and message throughput monitoring
- Logging: Structured logging for debugging and audit trails
- Health Checks: Runtime and dependency health monitoring
The extension uses standardized topic naming conventions:
agent.requests
- Direct agent messagingagent.responses
- Response correlationagent.subscription
- Agent subscription eventsagent.registry
- Agent lifecycle eventsmemory.<session_id>
- Distributed memory synchronization (per session)
All implementations use CloudEvents-compatible message formats for:
- Cross-language compatibility
- Schema evolution support
- Observability integration
- Standard tooling compatibility
- Complete Python implementation with full AutoGen integration
- Kafka-based distributed memory implementation
- Schema registry integration
- Agent state persistence enhancements
- Comprehensive documentation and examples
- C# implementation planning and design
- C# implementation with .NET AutoGen integration
- Cross-language message format standardization
- Advanced observability and monitoring tools
- Performance optimization and benchmarking
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
- Issues: Report bugs and request features via GitHub Issues
- Discussions: Join architectural discussions in GitHub Discussions
- Documentation: Language-specific guides in respective directories
- AutoGen Community: Connect with the broader AutoGen ecosystem
- Kafka Resources: Apache Kafka Documentation
- Microsoft AutoGen - Core agent framework
- Apache Kafka - Distributed streaming platform
- CloudEvents - Event specification standard
- OpenTelemetry - Observability framework
Note: This is an extension for Microsoft AutoGen. Familiarity with core AutoGen concepts is recommended before using this Kafka extension.