From 43e3125e0ac9db40a5bb828a44858b1772bbbc7e Mon Sep 17 00:00:00 2001 From: Rachit Mehta Date: Wed, 26 Nov 2025 22:23:26 -0500 Subject: [PATCH] remove scripts directory before merging with sdk-python/main --- scripts/bidi/test_bidi.py | 34 --- scripts/bidi/test_bidi_novasonic.py | 246 -------------------- scripts/bidi/test_bidi_openai.py | 308 ------------------------ scripts/bidi/test_gemini_live.py | 349 ---------------------------- 4 files changed, 937 deletions(-) delete mode 100644 scripts/bidi/test_bidi.py delete mode 100644 scripts/bidi/test_bidi_novasonic.py delete mode 100644 scripts/bidi/test_bidi_openai.py delete mode 100644 scripts/bidi/test_gemini_live.py diff --git a/scripts/bidi/test_bidi.py b/scripts/bidi/test_bidi.py deleted file mode 100644 index 2beb3ddd7..000000000 --- a/scripts/bidi/test_bidi.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Test BidirectionalAgent with simple developer experience.""" - -import asyncio - -from strands_tools import calculator - -from strands.experimental.bidi.agent.agent import BidiAgent -from strands.experimental.bidi.io import BidiAudioIO, BidiTextIO -from strands.experimental.bidi.models.novasonic import BidiNovaSonicModel - - -async def main(): - """Test the BidirectionalAgent API.""" - # Nova Sonic model - audio_io = BidiAudioIO() - text_io = BidiTextIO() - model = BidiNovaSonicModel(region="us-east-1") - agent = BidiAgent(model=model, tools=[calculator]) - - print("New BidiAgent Experience") - print("Try asking: 'What is 25 times 8?' or 'Calculate the square root of 144'") - await agent.run(inputs=[audio_io.input()], outputs=[audio_io.output(), text_io.output()]) - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\nā¹ļø Conversation ended by user") - except Exception as e: - print(f"āŒ Error: {e}") - import traceback - - traceback.print_exc() diff --git a/scripts/bidi/test_bidi_novasonic.py b/scripts/bidi/test_bidi_novasonic.py deleted file mode 100644 index baa39226f..000000000 --- a/scripts/bidi/test_bidi_novasonic.py +++ /dev/null @@ -1,246 +0,0 @@ -"""Test suite for bidirectional streaming with real-time audio interaction. - -Tests the complete bidirectional streaming system including audio input/output, -interruption handling, and concurrent tool execution using Nova Sonic. -""" - -import asyncio -import base64 -import os -import time - -import pyaudio -from strands_tools import calculator - -from strands.experimental.bidi.agent.agent import BidiAgent -from strands.experimental.bidi.models.novasonic import BidiNovaSonicModel - - -def test_direct_tools(): - """Test direct tool calling.""" - print("Testing direct tool calling...") - - # Check AWS credentials - if not all([os.getenv("AWS_ACCESS_KEY_ID"), os.getenv("AWS_SECRET_ACCESS_KEY")]): - print("AWS credentials not set - skipping test") - return - - try: - model = BidiNovaSonicModel() - agent = BidiAgent(model=model, tools=[calculator]) - - # Test calculator - result = agent.tool.calculator(expression="2 * 3") - content = result.get("content", [{}])[0].get("text", "") - print(f"Result: {content}") - print("Test completed") - - except Exception as e: - print(f"Test failed: {e}") - - -async def play(context): - """Play audio output with responsive interruption support.""" - audio = pyaudio.PyAudio() - speaker = audio.open( - channels=1, - format=pyaudio.paInt16, - output=True, - rate=16000, - frames_per_buffer=1024, - ) - - try: - while context["active"]: - try: - # Check for interruption first - if context.get("interrupted", False): - # Clear entire audio queue immediately - while not context["audio_out"].empty(): - try: - context["audio_out"].get_nowait() - except asyncio.QueueEmpty: - break - - context["interrupted"] = False - await asyncio.sleep(0.05) - continue - - # Get next audio data - audio_data = await asyncio.wait_for(context["audio_out"].get(), timeout=0.1) - - if audio_data and context["active"]: - chunk_size = 1024 - for i in range(0, len(audio_data), chunk_size): - # Check for interruption before each chunk - if context.get("interrupted", False) or not context["active"]: - break - - end = min(i + chunk_size, len(audio_data)) - chunk = audio_data[i:end] - speaker.write(chunk) - await asyncio.sleep(0.001) - - except asyncio.TimeoutError: - continue # No audio available - except asyncio.QueueEmpty: - await asyncio.sleep(0.01) - except asyncio.CancelledError: - break - - except asyncio.CancelledError: - pass - finally: - speaker.close() - audio.terminate() - - -async def record(context): - """Record audio input from microphone.""" - audio = pyaudio.PyAudio() - microphone = audio.open( - channels=1, - format=pyaudio.paInt16, - frames_per_buffer=1024, - input=True, - rate=16000, - ) - - try: - while context["active"]: - try: - audio_bytes = microphone.read(1024, exception_on_overflow=False) - context["audio_in"].put_nowait(audio_bytes) - await asyncio.sleep(0.01) - except asyncio.CancelledError: - break - except asyncio.CancelledError: - pass - finally: - microphone.close() - audio.terminate() - - -async def receive(agent, context): - """Receive and process events from agent.""" - try: - async for event in agent.receive(): - event_type = event.get("type", "unknown") - - # Handle audio stream events (bidi_audio_stream) - if event_type == "bidi_audio_stream": - if not context.get("interrupted", False): - # Decode base64 audio string to bytes for playback - audio_b64 = event["audio"] - audio_data = base64.b64decode(audio_b64) - context["audio_out"].put_nowait(audio_data) - - # Handle interruption events (bidi_interruption) - elif event_type == "bidi_interruption": - context["interrupted"] = True - - # Handle transcript events (bidi_transcript_stream) - elif event_type == "bidi_transcript_stream": - text_content = event.get("text", "") - role = event.get("role", "unknown") - - # Log transcript output - if role == "user": - print(f"User: {text_content}") - elif role == "assistant": - print(f"Assistant: {text_content}") - - # Handle response complete events (bidi_response_complete) - elif event_type == "bidi_response_complete": - # Reset interrupted state since the turn is complete - context["interrupted"] = False - - # Handle tool use events (tool_use_stream) - elif event_type == "tool_use_stream": - tool_use = event.get("current_tool_use", {}) - tool_name = tool_use.get("name", "unknown") - tool_input = tool_use.get("input", {}) - print(f"šŸ”§ Tool called: {tool_name} with input: {tool_input}") - - # Handle tool result events (tool_result) - elif event_type == "tool_result": - tool_result = event.get("tool_result", {}) - tool_name = tool_result.get("name", "unknown") - result_content = tool_result.get("content", []) - result_text = "" - for block in result_content: - if isinstance(block, dict) and block.get("type") == "text": - result_text = block.get("text", "") - break - print(f"āœ… Tool result from {tool_name}: {result_text}") - - except asyncio.CancelledError: - pass - - -async def send(agent, context): - """Send audio input to agent.""" - try: - while time.time() - context["start_time"] < context["duration"]: - try: - audio_bytes = context["audio_in"].get_nowait() - # Create audio event using TypedEvent - from strands.experimental.bidi.types.events import BidiAudioInputEvent - - audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") - audio_event = BidiAudioInputEvent(audio=audio_b64, format="pcm", sample_rate=16000, channels=1) - await agent.send(audio_event) - except asyncio.QueueEmpty: - await asyncio.sleep(0.01) # Restored to working timing - except asyncio.CancelledError: - break - - context["active"] = False - except asyncio.CancelledError: - pass - - -async def main(duration=180): - """Main function for bidirectional streaming test.""" - print("Starting bidirectional streaming test...") - print("Audio optimizations: 1024-byte buffers, balanced smooth playback + responsive interruption") - - # Initialize model and agent - model = BidiNovaSonicModel(region="us-east-1") - agent = BidiAgent(model=model, tools=[calculator], system_prompt="You are a helpful assistant.") - - await agent.start() - - # Create shared context for all tasks - context = { - "active": True, - "audio_in": asyncio.Queue(), - "audio_out": asyncio.Queue(), - "connection": agent._loop, - "duration": duration, - "start_time": time.time(), - "interrupted": False, - } - - print("Speak into microphone. Press Ctrl+C to exit.") - - try: - # Run all tasks concurrently - await asyncio.gather( - play(context), record(context), receive(agent, context), send(agent, context), return_exceptions=True - ) - except KeyboardInterrupt: - print("\nInterrupted by user") - except asyncio.CancelledError: - print("\nTest cancelled") - finally: - print("Cleaning up...") - context["active"] = False - await agent.stop() - - -if __name__ == "__main__": - # Test direct tool calling first - test_direct_tools() - - asyncio.run(main()) diff --git a/scripts/bidi/test_bidi_openai.py b/scripts/bidi/test_bidi_openai.py deleted file mode 100644 index 50d2d2f55..000000000 --- a/scripts/bidi/test_bidi_openai.py +++ /dev/null @@ -1,308 +0,0 @@ -#!/usr/bin/env python3 -"""Test OpenAI Realtime API speech-to-speech interaction.""" - -import asyncio -import base64 -import os -import time - -import pyaudio -from strands_tools import calculator - -from strands.experimental.bidi.agent.agent import BidiAgent -from strands.experimental.bidi.models.openai import BidiOpenAIRealtimeModel - - -async def play(context): - """Handle audio playback with interruption support.""" - audio = pyaudio.PyAudio() - - try: - speaker = audio.open( - format=pyaudio.paInt16, - channels=1, - rate=24000, # OpenAI Realtime uses 24kHz - output=True, - frames_per_buffer=1024, - ) - - while context["active"]: - try: - # Check for interruption - if context.get("interrupted", False): - # Clear audio queue on interruption - while not context["audio_out"].empty(): - try: - context["audio_out"].get_nowait() - except asyncio.QueueEmpty: - break - - context["interrupted"] = False - await asyncio.sleep(0.05) - continue - - # Get audio data with timeout - try: - audio_data = await asyncio.wait_for(context["audio_out"].get(), timeout=0.1) - - if audio_data and context["active"]: - # Play in chunks to allow interruption - chunk_size = 1024 - for i in range(0, len(audio_data), chunk_size): - if context.get("interrupted", False) or not context["active"]: - break - - chunk = audio_data[i : i + chunk_size] - speaker.write(chunk) - await asyncio.sleep(0.001) # Brief pause for responsiveness - - except asyncio.TimeoutError: - continue - - except asyncio.CancelledError: - break - - except asyncio.CancelledError: - pass - except Exception as e: - print(f"Audio playback error: {e}") - finally: - try: - speaker.close() - except Exception: - pass - audio.terminate() - - -async def record(context): - """Handle microphone recording.""" - audio = pyaudio.PyAudio() - - try: - microphone = audio.open( - format=pyaudio.paInt16, - channels=1, - rate=24000, # Match OpenAI's expected input rate - input=True, - frames_per_buffer=1024, - ) - - while context["active"]: - try: - audio_bytes = microphone.read(1024, exception_on_overflow=False) - await context["audio_in"].put(audio_bytes) - await asyncio.sleep(0.01) - except asyncio.CancelledError: - break - - except asyncio.CancelledError: - pass - except Exception as e: - print(f"Microphone recording error: {e}") - finally: - try: - microphone.close() - except Exception: - pass - audio.terminate() - - -async def receive(agent, context): - """Handle events from the agent.""" - try: - async for event in agent.receive(): - if not context["active"]: - break - - # Get event type - event_type = event.get("type", "unknown") - - # Handle audio stream events (bidi_audio_stream) - if event_type == "bidi_audio_stream": - # Decode base64 audio string to bytes for playback - audio_b64 = event["audio"] - audio_data = base64.b64decode(audio_b64) - - if not context.get("interrupted", False): - await context["audio_out"].put(audio_data) - - # Handle transcript events (bidi_transcript_stream) - elif event_type == "bidi_transcript_stream": - source = event.get("role", "assistant") - text = event.get("text", "").strip() - - if text: - if source == "user": - print(f"šŸŽ¤ User: {text}") - elif source == "assistant": - print(f"šŸ”Š Assistant: {text}") - - # Handle interruption events (bidi_interruption) - elif event_type == "bidi_interruption": - context["interrupted"] = True - print("āš ļø Interruption detected") - - # Handle connection start events (bidi_connection_start) - elif event_type == "bidi_connection_start": - print(f"āœ“ Session started: {event.get('model', 'unknown')}") - - # Handle connection close events (bidi_connection_close) - elif event_type == "bidi_connection_close": - print(f"āœ“ Session ended: {event.get('reason', 'unknown')}") - context["active"] = False - break - - # Handle response complete events (bidi_response_complete) - elif event_type == "bidi_response_complete": - # Reset interrupted state since the turn is complete - context["interrupted"] = False - - # Handle tool use events (tool_use_stream) - elif event_type == "tool_use_stream": - tool_use = event.get("current_tool_use", {}) - tool_name = tool_use.get("name", "unknown") - tool_input = tool_use.get("input", {}) - print(f"šŸ”§ Tool called: {tool_name} with input: {tool_input}") - - # Handle tool result events (tool_result) - elif event_type == "tool_result": - tool_result = event.get("tool_result", {}) - tool_name = tool_result.get("name", "unknown") - result_content = tool_result.get("content", []) - result_text = "" - for block in result_content: - if isinstance(block, dict) and block.get("type") == "text": - result_text = block.get("text", "") - break - print(f"āœ… Tool result from {tool_name}: {result_text}") - - except asyncio.CancelledError: - pass - except Exception as e: - print(f"Receive handler error: {e}") - finally: - pass - - -async def send(agent, context): - """Send audio from microphone to agent.""" - try: - while context["active"]: - try: - audio_bytes = await asyncio.wait_for(context["audio_in"].get(), timeout=0.1) - - # Create audio event using TypedEvent - # Encode audio bytes to base64 string for JSON serializability - from strands.experimental.bidi.types.events import BidiAudioInputEvent - - audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") - audio_event = BidiAudioInputEvent(audio=audio_b64, format="pcm", sample_rate=24000, channels=1) - - await agent.send(audio_event) - - except asyncio.TimeoutError: - continue - except asyncio.CancelledError: - break - - except asyncio.CancelledError: - pass - except Exception as e: - print(f"Send handler error: {e}") - finally: - pass - - -async def main(): - """Main test function for OpenAI voice chat.""" - print("Starting OpenAI Realtime API test...") - - # Check API key - api_key = os.getenv("OPENAI_API_KEY") - if not api_key: - print("OPENAI_API_KEY environment variable not set") - return False - - # Check audio system - try: - audio = pyaudio.PyAudio() - audio.terminate() - except Exception as e: - print(f"Audio system error: {e}") - return False - - # Create OpenAI model - model = BidiOpenAIRealtimeModel( - model_id="gpt-4o-realtime-preview", - api_key=api_key, - session={ - "output_modalities": ["audio"], - "audio": { - "input": { - "format": {"type": "audio/pcm", "rate": 24000}, - "turn_detection": {"type": "server_vad", "threshold": 0.5, "silence_duration_ms": 700}, - }, - "output": {"format": {"type": "audio/pcm", "rate": 24000}, "voice": "alloy"}, - }, - }, - ) - - # Create agent - agent = BidiAgent( - model=model, - tools=[calculator], - system_prompt=( - "You are a helpful voice assistant. " - "Keep your responses brief and natural. " - "Say hello when you first connect." - ), - ) - - # Start the session - await agent.start() - - # Create shared context - context = { - "active": True, - "audio_in": asyncio.Queue(), - "audio_out": asyncio.Queue(), - "interrupted": False, - "start_time": time.time(), - } - - print("Speak into your microphone. Press Ctrl+C to stop.") - - try: - # Run all tasks concurrently - await asyncio.gather( - play(context), record(context), receive(agent, context), send(agent, context), return_exceptions=True - ) - - except KeyboardInterrupt: - print("\nInterrupted by user") - except asyncio.CancelledError: - print("\nTest cancelled") - except Exception as e: - print(f"\nError during voice chat: {e}") - finally: - print("Cleaning up...") - context["active"] = False - - try: - await agent.stop() - except Exception as e: - print(f"Cleanup error: {e}") - - return True - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\nTest interrupted by user") - except Exception as e: - print(f"Test error: {e}") - import traceback - - traceback.print_exc() diff --git a/scripts/bidi/test_gemini_live.py b/scripts/bidi/test_gemini_live.py deleted file mode 100644 index 656ca6dcd..000000000 --- a/scripts/bidi/test_gemini_live.py +++ /dev/null @@ -1,349 +0,0 @@ -"""Test suite for Gemini Live bidirectional streaming with camera support. - -Tests the Gemini Live API with real-time audio and video interaction including: -- Audio input/output streaming -- Camera frame capture and transmission -- Interruption handling -- Concurrent tool execution -- Transcript events - -Requirements: -- pip install opencv-python pillow pyaudio google-genai -- Camera access permissions -- GOOGLE_AI_API_KEY environment variable -""" - -import asyncio -import base64 -import io -import logging -import os -import time - -try: - import cv2 - import PIL.Image - - CAMERA_AVAILABLE = True -except ImportError as e: - print(f"Camera dependencies not available: {e}") - print("Install with: pip install opencv-python pillow") - CAMERA_AVAILABLE = False - -import pyaudio -from strands_tools import calculator - -from strands.experimental.bidi.agent.agent import BidiAgent -from strands.experimental.bidi.models.gemini_live import BidiGeminiLiveModel - -# Configure logging - debug only for Gemini Live, info for everything else -logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") -gemini_logger = logging.getLogger("strands.experimental.bidirectional_streaming.models.gemini_live") -gemini_logger.setLevel(logging.WARNING) -logger = logging.getLogger(__name__) - - -async def play(context): - """Play audio output with responsive interruption support.""" - audio = pyaudio.PyAudio() - speaker = audio.open( - channels=1, - format=pyaudio.paInt16, - output=True, - rate=24000, - frames_per_buffer=1024, - ) - - try: - while context["active"]: - try: - # Check for interruption first - if context.get("interrupted", False): - # Clear entire audio queue immediately - while not context["audio_out"].empty(): - try: - context["audio_out"].get_nowait() - except asyncio.QueueEmpty: - break - - context["interrupted"] = False - await asyncio.sleep(0.05) - continue - - # Get next audio data - audio_data = await asyncio.wait_for(context["audio_out"].get(), timeout=0.1) - - if audio_data and context["active"]: - chunk_size = 1024 - for i in range(0, len(audio_data), chunk_size): - # Check for interruption before each chunk - if context.get("interrupted", False) or not context["active"]: - break - - end = min(i + chunk_size, len(audio_data)) - chunk = audio_data[i:end] - speaker.write(chunk) - await asyncio.sleep(0.001) - - except asyncio.TimeoutError: - continue # No audio available - except asyncio.QueueEmpty: - await asyncio.sleep(0.01) - except asyncio.CancelledError: - break - - except asyncio.CancelledError: - pass - finally: - speaker.close() - audio.terminate() - - -async def record(context): - """Record audio input from microphone.""" - audio = pyaudio.PyAudio() - - # List all available audio devices - print("Available audio devices:") - for i in range(audio.get_device_count()): - device_info = audio.get_device_info_by_index(i) - if device_info["maxInputChannels"] > 0: # Only show input devices - print(f" Device {i}: {device_info['name']} (inputs: {device_info['maxInputChannels']})") - - # Get default input device info - default_device = audio.get_default_input_device_info() - print(f"\nUsing default input device: {default_device['name']} (Device {default_device['index']})") - - microphone = audio.open( - channels=1, - format=pyaudio.paInt16, - frames_per_buffer=1024, - input=True, - rate=16000, - ) - - try: - while context["active"]: - try: - audio_bytes = microphone.read(1024, exception_on_overflow=False) - context["audio_in"].put_nowait(audio_bytes) - await asyncio.sleep(0.01) - except asyncio.CancelledError: - break - except asyncio.CancelledError: - pass - finally: - microphone.close() - audio.terminate() - - -async def receive(agent, context): - """Receive and process events from agent.""" - try: - async for event in agent.receive(): - event_type = event.get("type", "unknown") - - # Handle audio stream events (bidi_audio_stream) - if event_type == "bidi_audio_stream": - if not context.get("interrupted", False): - # Decode base64 audio string to bytes for playback - audio_b64 = event["audio"] - audio_data = base64.b64decode(audio_b64) - context["audio_out"].put_nowait(audio_data) - - # Handle interruption events (bidi_interruption) - elif event_type == "bidi_interruption": - context["interrupted"] = True - print("āš ļø Interruption detected") - - # Handle transcript events (bidi_transcript_stream) - elif event_type == "bidi_transcript_stream": - transcript_text = event.get("text", "") - transcript_role = event.get("role", "unknown") - - # Print transcripts with special formatting - if transcript_role == "user": - print(f"šŸŽ¤ User: {transcript_text}") - elif transcript_role == "assistant": - print(f"šŸ”Š Assistant: {transcript_text}") - - # Handle response complete events (bidi_response_complete) - elif event_type == "bidi_response_complete": - # Reset interrupted state since the response is complete - context["interrupted"] = False - - # Handle tool use events (tool_use_stream) - elif event_type == "tool_use_stream": - tool_use = event.get("current_tool_use", {}) - tool_name = tool_use.get("name", "unknown") - tool_input = tool_use.get("input", {}) - print(f"šŸ”§ Tool called: {tool_name} with input: {tool_input}") - - # Handle tool result events (tool_result) - elif event_type == "tool_result": - tool_result = event.get("tool_result", {}) - tool_name = tool_result.get("name", "unknown") - result_content = tool_result.get("content", []) - # Extract text from content blocks - result_text = "" - for block in result_content: - if isinstance(block, dict) and block.get("type") == "text": - result_text = block.get("text", "") - break - print(f"āœ… Tool result from {tool_name}: {result_text}") - - except asyncio.CancelledError: - pass - - -def _get_frame(cap): - """Capture and process a frame from camera.""" - if not CAMERA_AVAILABLE: - return None - - # Read the frame - ret, frame = cap.read() - # Check if the frame was read successfully - if not ret: - return None - # Convert BGR to RGB color space - # OpenCV captures in BGR but PIL expects RGB format - # This prevents the blue tint in the video feed - frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - img = PIL.Image.fromarray(frame_rgb) - img.thumbnail([1024, 1024]) - - image_io = io.BytesIO() - img.save(image_io, format="jpeg") - image_io.seek(0) - - mime_type = "image/jpeg" - image_bytes = image_io.read() - return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} - - -async def get_frames(context): - """Capture frames from camera and send to agent.""" - if not CAMERA_AVAILABLE: - print("Camera not available - skipping video capture") - return - - # This takes about a second, and will block the whole program - # causing the audio pipeline to overflow if you don't to_thread it. - cap = await asyncio.to_thread(cv2.VideoCapture, 0) # 0 represents the default camera - - print("Camera initialized. Starting video capture...") - - try: - while context["active"] and time.time() - context["start_time"] < context["duration"]: - frame = await asyncio.to_thread(_get_frame, cap) - if frame is None: - break - - # Send frame to agent as image input - try: - from strands.experimental.bidi.types.events import BidiImageInputEvent - - image_event = BidiImageInputEvent( - image=frame["data"], # Already base64 encoded - mime_type=frame["mime_type"], - ) - await context["agent"].send(image_event) - print("šŸ“ø Frame sent to model") - except Exception as e: - logger.error("error=<%s> | error sending frame", e) - - # Wait 1 second between frames (1 FPS) - await asyncio.sleep(1.0) - - except asyncio.CancelledError: - pass - finally: - # Release the VideoCapture object - cap.release() - - -async def send(agent, context): - """Send audio input to agent.""" - try: - while time.time() - context["start_time"] < context["duration"]: - try: - audio_bytes = context["audio_in"].get_nowait() - # Create audio event using TypedEvent - from strands.experimental.bidi.types.events import BidiAudioInputEvent - - audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") - audio_event = BidiAudioInputEvent(audio=audio_b64, format="pcm", sample_rate=16000, channels=1) - await agent.send(audio_event) - except asyncio.QueueEmpty: - await asyncio.sleep(0.01) - except asyncio.CancelledError: - break - - context["active"] = False - except asyncio.CancelledError: - pass - - -async def main(duration=180): - """Main function for Gemini Live bidirectional streaming test with camera support.""" - print("Starting Gemini Live bidirectional streaming test with camera...") - print("Audio optimizations: 1024-byte buffers, balanced smooth playback + responsive interruption") - print("Video: Camera frames sent at 1 FPS to model") - - # Get API key from environment variable - api_key = os.getenv("GOOGLE_AI_API_KEY") - - if not api_key: - print("ERROR: GOOGLE_AI_API_KEY environment variable not set") - print("Please set it with: export GOOGLE_AI_API_KEY=your_api_key") - return - - # Initialize Gemini Live model with proper configuration - logger.info("Initializing Gemini Live model with API key") - - # Use default model and config (includes transcription enabled by default) - model = BidiGeminiLiveModel(api_key=api_key) - logger.info("Gemini Live model initialized successfully") - print("Using Gemini Live model with default config (audio output + transcription enabled)") - - agent = BidiAgent(model=model, tools=[calculator], system_prompt="You are a helpful assistant.") - - await agent.start() - - # Create shared context for all tasks - context = { - "active": True, - "audio_in": asyncio.Queue(), - "audio_out": asyncio.Queue(), - "connection": agent._loop, - "duration": duration, - "start_time": time.time(), - "interrupted": False, - "agent": agent, # Add agent reference for camera task - } - - print("Speak into microphone and show things to camera. Press Ctrl+C to exit.") - - try: - # Run all tasks concurrently including camera - await asyncio.gather( - play(context), - record(context), - receive(agent, context), - send(agent, context), - get_frames(context), # Add camera task - return_exceptions=True, - ) - except KeyboardInterrupt: - print("\nInterrupted by user") - except asyncio.CancelledError: - print("\nTest cancelled") - finally: - print("Cleaning up...") - context["active"] = False - await agent.stop() - - -if __name__ == "__main__": - asyncio.run(main())