Skip to content
21 changes: 13 additions & 8 deletions src/strands/experimental/bidirectional_streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
# Main components - Primary user interface
from .agent.agent import BidirectionalAgent

# Advanced interfaces (for custom implementations)
from .models.bidirectional_model import BidirectionalModel, BidirectionalModelSession
# Model interface (for custom implementations)
from .models.bidirectional_model import BidirectionalModel

# Model providers - What users need to create models
from .models.novasonic import NovaSonicBidirectionalModel
from .models.openai import OpenAIRealtimeBidirectionalModel
from .models.gemini_live import GeminiLiveModel
from .models.novasonic import NovaSonicModel
from .models.openai import OpenAIRealtimeModel

# Event types - For type hints and event handling
from .types.bidirectional_streaming import (
AudioInputEvent,
AudioOutputEvent,
BidirectionalStreamEvent,
ImageInputEvent,
InterruptionDetectedEvent,
TextInputEvent,
TextOutputEvent,
UsageMetricsEvent,
VoiceActivityEvent,
Expand All @@ -26,12 +29,15 @@
"BidirectionalAgent",

# Model providers
"NovaSonicBidirectionalModel",
"OpenAIRealtimeBidirectionalModel",
"GeminiLiveModel",
"NovaSonicModel",
"OpenAIRealtimeModel",

# Event types
"AudioInputEvent",
"AudioOutputEvent",
"AudioOutputEvent",
"ImageInputEvent",
"TextInputEvent",
"TextOutputEvent",
"InterruptionDetectedEvent",
"BidirectionalStreamEvent",
Expand All @@ -40,5 +46,4 @@

# Model interface
"BidirectionalModel",
"BidirectionalModelSession",
]
16 changes: 10 additions & 6 deletions src/strands/experimental/bidirectional_streaming/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,15 @@ async def send(self, input_data: str | AudioInputEvent | ImageInputEvent) -> Non
self.messages.append({"role": "user", "content": input_data})

logger.debug("Text sent: %d characters", len(input_data))
await self._session.model_session.send_text_content(input_data)
# Create TextInputEvent for send()
text_event = {"text": input_data, "role": "user"}
await self._session.model.send(text_event)
elif isinstance(input_data, dict) and "audioData" in input_data:
# Handle audio input
await self._session.model_session.send_audio_content(input_data)
# Handle audio input - already in AudioInputEvent format
await self._session.model.send(input_data)
elif isinstance(input_data, dict) and "imageData" in input_data:
# Handle image input (ImageInputEvent)
await self._session.model_session.send_image_content(input_data)
# Handle image input - already in ImageInputEvent format
await self._session.model.send(input_data)
else:
raise ValueError(
"Input must be either a string (text), AudioInputEvent "
Expand Down Expand Up @@ -419,7 +421,9 @@ async def interrupt(self) -> None:
ValueError: If no active session.
"""
self._validate_active_session()
await self._session.model_session.send_interrupt()
# Interruption is now handled internally by models through audio/event processing
# No explicit interrupt method needed in unified interface
logger.debug("Interrupt requested - handled by model's audio processing")

async def end(self) -> None:
"""End the conversation session and cleanup all resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ....types._events import ToolResultEvent, ToolStreamEvent
from ....types.content import Message
from ....types.tools import ToolResult, ToolUse
from ..models.bidirectional_model import BidirectionalModelSession
from ..models.bidirectional_model import BidirectionalModel
Comment thread
mkmeral marked this conversation as resolved.

logger = logging.getLogger(__name__)

Expand All @@ -37,14 +37,14 @@ class BidirectionalConnection:
handling while providing a simple interface for agent interactions.
"""

def __init__(self, model_session: BidirectionalModelSession, agent: "BidirectionalAgent") -> None:
"""Initialize session with model session and agent reference.
def __init__(self, model: BidirectionalModel, agent: "BidirectionalAgent") -> None:
"""Initialize connection with model and agent reference.

Args:
model_session: Provider-specific bidirectional model session.
model: Bidirectional model instance.
agent: BidirectionalAgent instance for tool registry access.
"""
self.model_session = model_session
self.model = model
self.agent = agent
self.active = True

Expand Down Expand Up @@ -76,15 +76,15 @@ async def start_bidirectional_connection(agent: "BidirectionalAgent") -> Bidirec
Returns:
BidirectionalConnection: Active session with background tasks running.
"""
logger.debug("Starting bidirectional session - initializing model session")
logger.debug("Starting bidirectional session - initializing model connection")

# Create provider-specific session
model_session = await agent.model.create_bidirectional_connection(
# Connect to model
await agent.model.connect(
system_prompt=agent.system_prompt, tools=agent.tool_registry.get_all_tool_specs(), messages=agent.messages
)

# Create session wrapper for background processing
session = BidirectionalConnection(model_session=model_session, agent=agent)
# Create connection wrapper for background processing
session = BidirectionalConnection(model=agent.model, agent=agent)

# Start concurrent background processors IMMEDIATELY after session creation
# This is critical - Nova Sonic needs response processing during initialization
Expand Down Expand Up @@ -135,9 +135,9 @@ async def stop_bidirectional_connection(session: BidirectionalConnection) -> Non
if all_tasks:
await asyncio.gather(*all_tasks, return_exceptions=True)

# Close model session
await session.model_session.close()
logger.debug("Session closed")
# Close model connection
await session.model.close()
logger.debug("Connection closed")


async def bidirectional_event_loop_cycle(session: BidirectionalConnection) -> None:
Expand Down Expand Up @@ -253,11 +253,11 @@ async def _process_model_events(session: BidirectionalConnection) -> None:
events to standardized formats, and manages interruption detection.

Args:
session: BidirectionalConnection containing model session.
session: BidirectionalConnection containing model.
"""
logger.debug("Model events processor started")
try:
async for provider_event in session.model_session.receive_events():
async for provider_event in session.model.receive():
if not session.active:
break

Expand Down Expand Up @@ -434,8 +434,8 @@ async def _execute_tool_with_strands(session: BidirectionalConnection, tool_use:
tool_result = tool_event.tool_result
tool_use_id = tool_result.get("toolUseId")

# Send result through provider-specific session
await session.model_session.send_tool_result(tool_use_id, tool_result)
# Send result through send() method
await session.model.send(tool_result)
logger.debug("Tool result sent: %s", tool_use_id)

# Handle streaming events if needed later
Expand Down Expand Up @@ -471,10 +471,10 @@ async def _execute_tool_with_strands(session: BidirectionalConnection, tool_use:
"content": [{"text": f"Error: {str(e)}"}]
}
try:
await session.model_session.send_tool_result(tool_id, error_result)
await session.model.send(error_result)
logger.debug("Error result sent: %s", tool_id)
except Exception:
logger.error("Failed to send error result: %s", tool_id)
pass # Session might be closed
except Exception as send_error:
logger.error("Failed to send error result: %s - %s", tool_id, str(send_error))
raise # Propagate exception since this is experimental code


Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
"""Bidirectional model interfaces and implementations."""

from .bidirectional_model import BidirectionalModel, BidirectionalModelSession
from .gemini_live import GeminiLiveBidirectionalModel, GeminiLiveSession
from .novasonic import NovaSonicBidirectionalModel, NovaSonicSession
from .openai import OpenAIRealtimeBidirectionalModel, OpenAIRealtimeSession
from .bidirectional_model import BidirectionalModel
from .gemini_live import GeminiLiveModel
from .novasonic import NovaSonicModel
from .openai import OpenAIRealtimeModel

__all__ = [
"BidirectionalModel",
"BidirectionalModelSession",
"GeminiLiveBidirectionalModel",
"GeminiLiveSession",
"NovaSonicBidirectionalModel",
"NovaSonicSession",
"OpenAIRealtimeBidirectionalModel",
"OpenAIRealtimeSession",
"GeminiLiveModel",
"NovaSonicModel",
"OpenAIRealtimeModel",
]
Loading
Loading