-
Notifications
You must be signed in to change notification settings - Fork 0
feat(bidirectional_streaming): Add experimental bidirectional streaming MVP POC implementation #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ng MVP POC implementation
to send(), Updated imports
…on 09-29, added a lock for interruption handling
…nitialized, and removed asyncio.sleep() as they were mainly for defensive purposes and following the pattern of nova sonic samples.
…dd user messages to the agent messages
|
|
||
| def __init__( | ||
| self, | ||
| model: BidirectionalModel, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in strands we have Union[Model, str, None] = None in init, we can make it same here for future extensibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can make this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Converts provider-specific events to a common format that can be | ||
| processed uniformly by the event loop. | ||
| """ | ||
| raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got smiliar comment just leave blank instead of raise error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can make this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| await self._handle_response_data(result.value.bytes_.decode("utf-8")) | ||
|
|
||
| except asyncio.TimeoutError: | ||
| await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Does this mean we do nothing if there is a timeout while processing ? Do we wanna retry here?
| raise | ||
|
|
||
|
|
||
| class NovaSonicBidirectionalModel(BidirectionalModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not in same file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can iterate on this in a follow-up
| logger.error("Tool error send failed: %s", str(send_error)) | ||
|
|
||
|
|
||
| def _extract_callable_function(tool_func: any) -> any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed this due to tool executor not being implemented as part of the POC and using this method we were directly calling the tool as a function call. This is fixed in the new PR: #5
| # Execute tool function with provided input | ||
| result = actual_func(**tool_use.get("input", {})) | ||
|
|
||
| tool_result = _create_success_result(tool_use["toolUseId"], result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: ToolContext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am assuming this is part of bar raising the types?
| channels: Literal[1, 2] | ||
|
|
||
|
|
||
| class TextOutputEvent(TypedDict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Add transcription
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am assuming this is part of bar raising the types?
| raise NotImplementedError | ||
|
|
||
| @abc.abstractmethod | ||
| async def send_audio_content(self, audio_input: AudioInputEvent) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will merge these into single send(BidirectionalInputEvent) (or sth similar) right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we can do that as part of issue concerned with bar-raising the model interface.
| tools: list | None = None, | ||
| system_prompt: str | None = None, | ||
| messages: Messages | None = None, | ||
| ): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add **kwargs to our interfaces/methods for future extensibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can iterate on this
mkmeral
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving to merge and iterate faster. We should still iterate over these comments
Description
Pull Request: Bidirectional Streaming Implementation
Overview
This PR introduces bidirectional streaming capabilities to Strands SDK for real-time, interactive conversations between users and AI models through persistent connections. This changes Strands from a request-response pattern to a concurrent, connection-based streaming approach.
Problem Statement
Strands currently uses a sequential request-response architecture that prevents real-time interaction:
Solution
Bidirectional streaming introduces persistent connections with concurrent processing:
Architecture Overview
graph TB subgraph "Current Unidirectional Architecture" A1[Agent] --> B1[Model.stream] B1 --> C1[Sequential Events] C1 --> D1[Tool Execution BLOCKS] D1 --> E1[Response Complete] end subgraph "New Bidirectional Architecture" A2[BidirectionalAgent] --> B2[BidirectionalConnection] B2 --> C2[Model Events Processor] B2 --> D2[Tool Execution Processor] B2 --> E2[Connection Coordinator] C2 --> F2[Event Queue] D2 --> G2[Tool Queue] E2 --> H2[Background Tasks Management] F2 --> I2[Agent.receive] G2 --> J2[Concurrent Tool Execution] endComponent Architecture
1. BidirectionalAgent - User Interface Layer
The BidirectionalAgent provides the user-facing interface for bidirectional streaming conversations. It follows the same patterns as Strands' existing Agent class but is built for persistent connections and real-time interaction.
Like the standard Agent, BidirectionalAgent uses compositional design, delegating to specialized components (ToolRegistry, ToolExecutor) rather than implementing functionality directly. It requires a
BidirectionalModeltype in its constructor, providing compile-time validation that prevents runtime configuration errors.Key differences from the standard Agent:
send_audio(),interrupt(),receive()) for live interactionstart_conversation()parallelsinvoke_async())2. BidirectionalConnection - Concurrent Event Loop Engine
The BidirectionalConnection transforms Strands from sequential event processing to concurrent task coordination. This replaces the existing
event_loop_cycle()pattern with persistent, concurrent processing.Current Event Loop Architecture
The existing event loop processes one conversation turn at a time in a sequential pattern (see Event Loop Cycle documentation).
Each call to
event_loop_cycle()handles one complete conversation turn then terminates. Tool execution blocks the entire conversation flow until completion.New Concurrent Architecture
BidirectionalConnection runs continuously throughout the connection (8-30 minutes) with three concurrent processors working together:
graph TB A[Model Events Processor] --> D[Event Queue] B[Tool Execution Processor] --> E[Tool Queue] C[Connection Coordinator] --> F[Connection State] D --> G[Agent.receive] E --> H[Tool Results] I[Provider Events] --> A J[Tool Requests] --> B K[User Input] --> AThe three processors work concurrently:
Event Loop Design
sequenceDiagram participant User participant Agent as BidirectionalAgent participant Conn as BidirectionalConnection participant ModelSession as BidirectionalModelSession participant ModelEventsTask as _process_model_events participant ToolExecTask as _process_tool_execution participant CycleTask as bidirectional_event_loop_cycle participant Provider as Provider Stream User->>Agent: start_conversation() Agent->>+Conn: start_bidirectional_connection(agent) Conn->>+ModelSession: model.create_bidirectional_connection() ModelSession->>Provider: Initialize provider stream par Background Task Initialization Conn->>ModelEventsTask: asyncio.create_task(_process_model_events) Conn->>ToolExecTask: asyncio.create_task(_process_tool_execution) Conn->>CycleTask: asyncio.create_task(bidirectional_event_loop_cycle) end Conn-->>-Agent: return BidirectionalConnection User->>Agent: send_audio(audio_input) Agent->>ModelSession: send_audio_content(audio_input) ModelSession->>Provider: Send formatted provider event loop Concurrent Processing Provider-->>ModelSession: Raw provider events ModelSession->>ModelSession: Convert to standardized format ModelEventsTask->>ModelSession: receive_events() ModelSession-->>ModelEventsTask: Standardized events alt Tool Use Event ModelEventsTask->>ToolExecTask: tool_queue.put(tool_use) ToolExecTask->>ToolExecTask: Execute tool with Strands infrastructure ToolExecTask->>ModelSession: send_tool_result(result) ModelSession->>Provider: Send formatted tool result else Text/Audio Output ModelEventsTask->>Agent: agent._output_queue.put(event) Agent-->>User: receive() yields event else Interruption Detected ModelEventsTask->>Conn: _handle_interruption() Conn->>ToolExecTask: Cancel pending tool tasks Conn->>Agent: Clear audio output queue end CycleTask->>CycleTask: Supervise background tasks health endEvent Flow and Processing
The sequence diagram shows the actual implementation flow with accurate component interactions:
start_bidirectional_connection()creates a model session and launches three background tasksreceive_events(), tool execution task monitors tool queue, cycle task supervises healthreceive()methodKey implementation detail: Events flow through the BidirectionalModelSession layer which normalizes provider-specific formats before reaching the background processing tasks.
3. Model Interface - Protocol Normalization
The new model interface creates a unified interface across different bidirectional streaming protocols. This design maintains Strands' core philosophy that users should be able to switch between model providers without changing their application code.
Separation from Existing Model Architecture
The existing Model interface handles stateless, discrete operations where each
stream()call is independent. The new BidirectionalModel interfaces manage persistent connections with continuous event streams and multiple concurrent input methods (send_audio_content(),send_text_content(),send_interrupt()). This separation is necessary because bidirectional streaming providers use different protocols compared to traditional request-response models. Each provider implements their own event sequences, connection management, and data formats for real-time streaming.4. Bidirectional Type System
The type system extends Strands' existing
StreamEventtypes to support bidirectional streaming while maintaining full backward compatibility.New event types include:
audioOutputandaudioInputwith standardized format (raw bytes, explicit sample rates)BidirectionalConnectionStartandBidirectionalConnectionEndfor lifecycle managementinterruptionDetectedfor real-time conversation control5. Nova Sonic Model Provider Implementation
Strands follows a model-agnostic philosophy, supporting multiple AI providers through a unified interface. Users can switch between Amazon Bedrock, Anthropic, OpenAI, Ollama, and others without changing their application code. This same philosophy extends to bidirectional streaming.
Nova Sonic is Amazon's bidirectional speech-to-speech streaming model, and serves as the reference implementation for this architecture. Nova Sonic requires event sequencing with hierarchical structures (sessionStart → promptStart → contentStart → input → contentEnd). The implementation handles this complexity internally while presenting a simple
send_text()andsend_audio()interface to users.Implementation Benefits
Architecture Advantages
Maintained Compatibility
Experimental Status
Current State
This implementation is a working proof-of-concept that validates the architectural approach with Nova Sonic integration. The core functionality is operational and demonstrates end-to-end bidirectional streaming capabilities.
API Stability Warning
This feature is experimental and subject to breaking changes:
Testing and Validation
Interactive Test Script
The implementation includes a comprehensive test script at
src/strands/experimental/bidirectional_streaming/tests/test_bidirectional_streaming.pythat demonstrates real-time bidirectional streaming capabilities:# Run the interactive test python src/strands/experimental/bidirectional_streaming/tests/test_bidirectional_streaming.pyRecommended Setup: Use headphones for the best experience to prevent audio feedback between microphone and speakers.
The test script demonstrates:
Related Issues
strands-agents#217
Documentation PR
Type of Change
New feature
Testing
How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
hatch run prepareChecklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.