Skip to content

Commit 01b821c

Browse files
mehtaracMurat Kaan Meralpgrayy
authored
Bidirectional Streaming Agent (#1276)
Introduce bidirectional streaming capabilities to Strands SDK, enabling real-time voice and audio conversations with AI models through persistent streaming connections. Bidirectional streaming moves beyond traditional request-response patterns by maintaining long-running conversations where users can interrupt, provide continuous input, and receive real-time audio responses. This implementation is marked as experimental as we refine the API based on user feedback and evolving model capabilities. --------- Co-authored-by: Murat Kaan Meral <murmeral@amazon.nl> Co-authored-by: Patrick Gray <pgrayy@amazon.com>
1 parent f8c3008 commit 01b821c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+9715
-82
lines changed

.github/workflows/test-lint.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,20 @@ jobs:
5959
uses: actions/setup-python@v6
6060
with:
6161
python-version: ${{ matrix.python-version }}
62+
- name: Install system audio dependencies (Linux)
63+
if: matrix.os-name == 'linux'
64+
run: |
65+
sudo apt-get update
66+
sudo apt-get install -y portaudio19-dev libasound2-dev
67+
- name: Install system audio dependencies (macOS)
68+
if: matrix.os-name == 'macOS'
69+
run: |
70+
brew install portaudio
71+
- name: Install system audio dependencies (Windows)
72+
if: matrix.os-name == 'windows'
73+
run: |
74+
# Windows typically has audio libraries available by default
75+
echo "Windows audio dependencies handled by PyAudio wheels"
6276
- name: Install dependencies
6377
run: |
6478
pip install --no-cache-dir hatch
@@ -89,6 +103,11 @@ jobs:
89103
python-version: '3.10'
90104
cache: 'pip'
91105

106+
- name: Install system audio dependencies (Linux)
107+
run: |
108+
sudo apt-get update
109+
sudo apt-get install -y portaudio19-dev libasound2-dev
110+
92111
- name: Install dependencies
93112
run: |
94113
pip install --no-cache-dir hatch
@@ -97,3 +116,4 @@ jobs:
97116
id: lint
98117
run: hatch fmt --linter --check
99118
continue-on-error: false
119+

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ dist
1313
repl_state
1414
.kiro
1515
uv.lock
16+
.audio_cache

README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,74 @@ agent("What is the square root of 1764")
197197

198198
It's also available on GitHub via [strands-agents/tools](https://github.com/strands-agents/tools).
199199

200+
### Bidirectional Streaming
201+
202+
> **⚠️ Experimental Feature**: Bidirectional streaming is currently in experimental status. APIs may change in future releases as we refine the feature based on user feedback and evolving model capabilities.
203+
204+
Build real-time voice and audio conversations with persistent streaming connections. Unlike traditional request-response patterns, bidirectional streaming maintains long-running conversations where users can interrupt, provide continuous input, and receive real-time audio responses. Get started with your first BidiAgent by following the [Quickstart](https://strandsagents.com/latest/documentation/docs/user-guide/concepts/experimental/bidirectional-streaming/quickstart) guide.
205+
206+
**Supported Model Providers:**
207+
- Amazon Nova Sonic (`amazon.nova-sonic-v1:0`)
208+
- Google Gemini Live (`gemini-2.5-flash-native-audio-preview-09-2025`)
209+
- OpenAI Realtime API (`gpt-realtime`)
210+
211+
**Quick Example:**
212+
213+
```python
214+
import asyncio
215+
from strands.experimental.bidi import BidiAgent
216+
from strands.experimental.bidi.models import BidiNovaSonicModel
217+
from strands.experimental.bidi.io import BidiAudioIO, BidiTextIO
218+
from strands.experimental.bidi.tools import stop_conversation
219+
from strands_tools import calculator
220+
221+
async def main():
222+
# Create bidirectional agent with audio model
223+
model = BidiNovaSonicModel()
224+
agent = BidiAgent(model=model, tools=[calculator, stop_conversation])
225+
226+
# Setup audio and text I/O
227+
audio_io = BidiAudioIO()
228+
text_io = BidiTextIO()
229+
230+
# Run with real-time audio streaming
231+
# Say "stop conversation" to gracefully end the conversation
232+
await agent.run(
233+
inputs=[audio_io.input()],
234+
outputs=[audio_io.output(), text_io.output()]
235+
)
236+
237+
if __name__ == "__main__":
238+
asyncio.run(main())
239+
```
240+
241+
**Configuration Options:**
242+
243+
```python
244+
# Configure audio settings
245+
model = BidiNovaSonicModel(
246+
provider_config={
247+
"audio": {
248+
"input_rate": 16000,
249+
"output_rate": 16000,
250+
"voice": "matthew"
251+
},
252+
"inference": {
253+
"max_tokens": 2048,
254+
"temperature": 0.7
255+
}
256+
}
257+
)
258+
259+
# Configure I/O devices
260+
audio_io = BidiAudioIO(
261+
input_device_index=0, # Specific microphone
262+
output_device_index=1, # Specific speaker
263+
input_buffer_size=10,
264+
output_buffer_size=10
265+
)
266+
```
267+
200268
## Documentation
201269

202270
For detailed guidance & examples, explore our documentation:

pyproject.toml

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,18 @@ a2a = [
6969
"fastapi>=0.115.12,<1.0.0",
7070
"starlette>=0.46.2,<1.0.0",
7171
]
72+
73+
bidi = [
74+
"aws_sdk_bedrock_runtime; python_version>='3.12'",
75+
"prompt_toolkit>=3.0.0,<4.0.0",
76+
"pyaudio>=0.2.13,<1.0.0",
77+
"smithy-aws-core>=0.0.1; python_version>='3.12'",
78+
]
79+
bidi-gemini = ["google-genai>=1.32.0,<2.0.0"]
80+
bidi-openai = ["websockets>=15.0.0,<16.0.0"]
81+
7282
all = ["strands-agents[a2a,anthropic,docs,gemini,litellm,llamaapi,mistral,ollama,openai,writer,sagemaker,otel]"]
83+
bidi-all = ["strands-agents[a2a,bidi,bidi-gemini,bidi-openai,docs,otel]"]
7384

7485
dev = [
7586
"commitizen>=4.4.0,<5.0.0",
@@ -104,9 +115,10 @@ features = ["all"]
104115
dependencies = [
105116
"mypy>=1.15.0,<2.0.0",
106117
"ruff>=0.13.0,<0.14.0",
107-
# Include required pacakge dependencies for mypy
118+
# Include required package dependencies for mypy
108119
"strands-agents @ {root:uri}",
109120
]
121+
python = "3.10"
110122

111123
# Define static-analysis scripts so we can include mypy as part of the linting check
112124
[tool.hatch.envs.hatch-static-analysis.scripts]
@@ -118,7 +130,7 @@ format-fix = [
118130
]
119131
lint-check = [
120132
"ruff check",
121-
"mypy -p src"
133+
"mypy ./src"
122134
]
123135
lint-fix = [
124136
"ruff check --fix"
@@ -192,11 +204,16 @@ warn_no_return = true
192204
warn_unreachable = true
193205
follow_untyped_imports = true
194206
ignore_missing_imports = false
207+
exclude = ["src/strands/experimental/bidi"]
195208

209+
[[tool.mypy.overrides]]
210+
module = ["strands.experimental.bidi.*"]
211+
follow_imports = "skip"
196212

197213
[tool.ruff]
198214
line-length = 120
199215
include = ["examples/**/*.py", "src/**/*.py", "tests/**/*.py", "tests_integ/**/*.py"]
216+
exclude = ["src/strands/experimental/bidi/**/*.py", "tests/strands/experimental/bidi/**/*.py", "tests_integ/bidi/**/*.py"]
200217

201218
[tool.ruff.lint]
202219
select = [
@@ -219,6 +236,7 @@ convention = "google"
219236
[tool.pytest.ini_options]
220237
testpaths = ["tests"]
221238
asyncio_default_fixture_loop_scope = "function"
239+
addopts = "--ignore=tests/strands/experimental/bidi --ignore=tests_integ/bidi"
222240

223241

224242
[tool.coverage.run]
@@ -227,6 +245,7 @@ source = ["src"]
227245
context = "thread"
228246
parallel = true
229247
concurrency = ["thread", "multiprocessing"]
248+
omit = ["src/strands/experimental/bidi/*"]
230249

231250
[tool.coverage.report]
232251
show_missing = true
@@ -256,3 +275,48 @@ style = [
256275
["text", ""],
257276
["disabled", "fg:#858585 italic"]
258277
]
278+
279+
# =========================
280+
# Bidi development configs
281+
# =========================
282+
283+
[tool.hatch.envs.bidi]
284+
dev-mode = true
285+
features = ["dev", "bidi-all"]
286+
installer = "uv"
287+
288+
[tool.hatch.envs.bidi.scripts]
289+
prepare = [
290+
"hatch run bidi-lint:format-fix",
291+
"hatch run bidi-lint:quality-fix",
292+
"hatch run bidi-lint:type-check",
293+
"hatch run bidi-test:test-cov",
294+
]
295+
296+
[tools.hatch.envs.bidi-lint]
297+
template = "bidi"
298+
299+
[tool.hatch.envs.bidi-lint.scripts]
300+
format-check = "format-fix --check"
301+
format-fix = "ruff format {args} --target-version py312 ./src/strands/experimental/bidi/**/*.py"
302+
quality-check = "ruff check {args} --target-version py312 ./src/strands/experimental/bidi/**/*.py"
303+
quality-fix = "quality-check --fix"
304+
type-check = "mypy {args} --python-version 3.12 ./src/strands/experimental/bidi/**/*.py"
305+
306+
[tool.hatch.envs.bidi-test]
307+
template = "bidi"
308+
309+
[tool.hatch.envs.bidi-test.scripts]
310+
test = "pytest {args} tests/strands/experimental/bidi"
311+
test-cov = """
312+
test \
313+
--cov=strands.experimental.bidi \
314+
--cov-config= \
315+
--cov-branch \
316+
--cov-report=term-missing \
317+
--cov-report=xml:build/coverage/bidi-coverage.xml \
318+
--cov-report=html:build/coverage/bidi-html
319+
"""
320+
321+
[[tool.hatch.envs.bidi-test.matrix]]
322+
python = ["3.13", "3.12"]
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""Bidirectional streaming package."""
2+
3+
import sys
4+
5+
if sys.version_info < (3, 12):
6+
raise ImportError("bidi only supported for >= Python 3.12")
7+
8+
# Main components - Primary user interface
9+
# Re-export standard agent events for tool handling
10+
from ...types._events import (
11+
ToolResultEvent,
12+
ToolStreamEvent,
13+
ToolUseStreamEvent,
14+
)
15+
from .agent.agent import BidiAgent
16+
17+
# IO channels - Hardware abstraction
18+
from .io.audio import BidiAudioIO
19+
20+
# Model interface (for custom implementations)
21+
from .models.model import BidiModel
22+
from .models.nova_sonic import BidiNovaSonicModel
23+
24+
# Built-in tools
25+
from .tools import stop_conversation
26+
27+
# Event types - For type hints and event handling
28+
from .types.events import (
29+
BidiAudioInputEvent,
30+
BidiAudioStreamEvent,
31+
BidiConnectionCloseEvent,
32+
BidiConnectionStartEvent,
33+
BidiErrorEvent,
34+
BidiImageInputEvent,
35+
BidiInputEvent,
36+
BidiInterruptionEvent,
37+
BidiOutputEvent,
38+
BidiResponseCompleteEvent,
39+
BidiResponseStartEvent,
40+
BidiTextInputEvent,
41+
BidiTranscriptStreamEvent,
42+
BidiUsageEvent,
43+
ModalityUsage,
44+
)
45+
46+
__all__ = [
47+
# Main interface
48+
"BidiAgent",
49+
# IO channels
50+
"BidiAudioIO",
51+
# Model providers
52+
"BidiNovaSonicModel",
53+
# Built-in tools
54+
"stop_conversation",
55+
# Input Event types
56+
"BidiTextInputEvent",
57+
"BidiAudioInputEvent",
58+
"BidiImageInputEvent",
59+
"BidiInputEvent",
60+
# Output Event types
61+
"BidiConnectionStartEvent",
62+
"BidiConnectionCloseEvent",
63+
"BidiResponseStartEvent",
64+
"BidiResponseCompleteEvent",
65+
"BidiAudioStreamEvent",
66+
"BidiTranscriptStreamEvent",
67+
"BidiInterruptionEvent",
68+
"BidiUsageEvent",
69+
"ModalityUsage",
70+
"BidiErrorEvent",
71+
"BidiOutputEvent",
72+
# Tool Event types (reused from standard agent)
73+
"ToolUseStreamEvent",
74+
"ToolResultEvent",
75+
"ToolStreamEvent",
76+
# Model interface
77+
"BidiModel",
78+
]
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""Utilities for async operations."""
2+
3+
from typing import Awaitable, Callable
4+
5+
from ._task_pool import _TaskPool
6+
7+
__all__ = ["_TaskPool"]
8+
9+
10+
async def stop_all(*funcs: Callable[..., Awaitable[None]]) -> None:
11+
"""Call all stops in sequence and aggregate errors.
12+
13+
A failure in one stop call will not block subsequent stop calls.
14+
15+
Args:
16+
funcs: Stop functions to call in sequence.
17+
18+
Raises:
19+
ExceptionGroup: If any stop function raises an exception.
20+
"""
21+
exceptions = []
22+
for func in funcs:
23+
try:
24+
await func()
25+
except Exception as exception:
26+
exceptions.append(exception)
27+
28+
if exceptions:
29+
raise ExceptionGroup("failed stop sequence", exceptions)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""Manage pool of active async tasks.
2+
3+
This is particularly useful for cancelling multiple tasks at once.
4+
"""
5+
6+
import asyncio
7+
from typing import Any, Coroutine
8+
9+
10+
class _TaskPool:
11+
"""Manage pool of active async tasks."""
12+
13+
def __init__(self) -> None:
14+
"""Setup task container."""
15+
self._tasks: set[asyncio.Task] = set()
16+
17+
def __len__(self) -> int:
18+
"""Number of active tasks."""
19+
return len(self._tasks)
20+
21+
def create(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task:
22+
"""Create async task.
23+
24+
Adds a clean up callback to run after task completes.
25+
26+
Returns:
27+
The created task.
28+
"""
29+
task = asyncio.create_task(coro)
30+
task.add_done_callback(lambda task: self._tasks.remove(task))
31+
32+
self._tasks.add(task)
33+
return task
34+
35+
async def cancel(self) -> None:
36+
"""Cancel all active tasks in pool."""
37+
for task in self._tasks:
38+
task.cancel()
39+
40+
try:
41+
await asyncio.gather(*self._tasks)
42+
except asyncio.CancelledError:
43+
pass
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""Bidirectional agent for real-time streaming conversations."""
2+
3+
from .agent import BidiAgent
4+
5+
__all__ = ["BidiAgent"]

0 commit comments

Comments
 (0)