# Real-time Voice Assistant with Gemini Live API

This notebook demonstrates a real-time voice assistant using the Gemini Live API. It captures audio from your microphone, streams it to Gemini, and plays back the audio responses.

## Features
- Real-time bidirectional audio streaming
- Voice Activity Detection (VAD) for natural conversations
- Audio transcription (input and output)
- Tool use (timer example)
- Interrupt handling

## Requirements

Install PyAudio for audio capture and playback:

```bash
pip install pyaudio
```

**Note:** On macOS, you may need to install PortAudio first:
```bash
brew install portaudio
```

On Ubuntu/Debian:
```bash
sudo apt-get install portaudio19-dev
```

## Usage
1. Run all cells in order
2. Speak into your microphone when prompted
3. The assistant will respond with audio
4. Try saying "Set a timer for 5 seconds" to test tool use
5. Interrupt the assistant by speaking while it's responding
6. Press the stop button (■) in Jupyter to end the session
7. Run the cleanup cell when done

**Note:** This notebook uses `await` directly (top-level await). If running as a script, use `asyncio.run(voice_assistant())` instead.

In [None]:
import asyncio
import json
import base64
import re
from datetime import datetime
from enum import Enum
from typing import Optional
import pyaudio
from google import genai
from google.genai import types

# Log file configuration
LOG_FILE = "live_session_messages.jsonl"

# Audio configuration
INPUT_SAMPLE_RATE = 16000   # Input: 16kHz (required by Live API)
OUTPUT_SAMPLE_RATE = 24000  # Output: 24kHz (returned by Live API)
CHANNELS = 1                # Mono audio
CHUNK_SIZE = 1024           # Frames per buffer

# Global audio state
audio_interface = pyaudio.PyAudio()
playback_stream = None
is_playing = False


def log_message(message: types.LiveServerMessage, log_file: str = LOG_FILE):
    """Write a message to the log file in JSONL format."""
    # Use model_dump to serialize the entire message as-is
    msg_dict = message.model_dump(mode='json', exclude_none=True)
    msg_dict["_timestamp"] = datetime.now().isoformat()
    
    with open(log_file, "a") as f:
        f.write(json.dumps(msg_dict) + "\n")


class ModelIntent(Enum):
    """Detected intent from model output."""
    STATEMENT = "statement"           # Informational statement
    BINARY_CHOICE = "binary_choice"   # Yes/No or A/B choice
    CONFIRMATION = "confirmation"     # Waiting for user confirmation
    OPEN_QUESTION = "open_question"   # Open-ended question
    GREETING = "greeting"             # Greeting or farewell
    ACKNOWLEDGMENT = "acknowledgment" # Acknowledging user input
    INSTRUCTION = "instruction"       # Giving step-by-step instruction


def detect_intent(text: str) -> tuple[ModelIntent, dict]:
    """
    Analyze model output text to detect the intent.
    
    Returns:
        tuple: (ModelIntent, metadata_dict)
    """
    text_lower = text.lower().strip()
    metadata = {"original_text": text}
    
    # Check for binary choices (A or B patterns)
    binary_patterns = [
        r"would you like (?:to )?([\w\s]+) or ([\w\s]+)\??",
        r"do you want ([\w\s]+) or ([\w\s]+)\??",
        r"([\w\s]+) or ([\w\s]+)\?",
        r"say ['\"]?(yes|no)['\"]? (?:to|if)",
        r"is that (?:correct|right|okay|ok)\??",
    ]
    for pattern in binary_patterns:
        match = re.search(pattern, text_lower)
        if match:
            metadata["choices"] = match.groups()
            return ModelIntent.BINARY_CHOICE, metadata
    
    # Check for confirmation requests
    confirmation_patterns = [
        r"(?:tell|let) me when you(?:'re| are) ready",
        r"ready\??$",
        r"is that (?:correct|right|okay|ok)\??",
        r"(?:do|can) you see",
        r"did (?:you|that) (?:work|help)",
        r"does that (?:make sense|help)",
    ]
    for pattern in confirmation_patterns:
        if re.search(pattern, text_lower):
            return ModelIntent.CONFIRMATION, metadata
    
    # Check for open questions
    open_question_patterns = [
        r"^(?:what|how|where|when|why|who|which)",
        r"what (?:would|do|can|should) you",
        r"how (?:would|do|can|should) you",
        r"what else",
        r"anything else",
    ]
    for pattern in open_question_patterns:
        if re.search(pattern, text_lower):
            return ModelIntent.OPEN_QUESTION, metadata
    
    # Check for greetings
    greeting_patterns = [
        r"^(?:hello|hi|hey|good (?:morning|afternoon|evening))",
        r"(?:goodbye|bye|farewell|take care)",
        r"have a (?:great|good|nice) day",
    ]
    for pattern in greeting_patterns:
        if re.search(pattern, text_lower):
            return ModelIntent.GREETING, metadata
    
    # Check for acknowledgments
    acknowledgment_patterns = [
        r"^(?:okay|ok|sure|got it|understood|i see|alright)",
        r"^that(?:'s| is) (?:okay|fine|good|great)",
        r"^no problem",
        r"^i understand",
    ]
    for pattern in acknowledgment_patterns:
        if re.search(pattern, text_lower):
            return ModelIntent.ACKNOWLEDGMENT, metadata
    
    # Check for instructions
    instruction_patterns = [
        r"^(?:first|next|then|now|finally)",
        r"^(?:press|tap|click|open|close|look at|find)",
        r"^(?:step \d|start by|begin with)",
    ]
    for pattern in instruction_patterns:
        if re.search(pattern, text_lower):
            return ModelIntent.INSTRUCTION, metadata
    
    # Check if it ends with a question mark
    if text.strip().endswith("?"):
        return ModelIntent.OPEN_QUESTION, metadata
    
    # Default to statement
    return ModelIntent.STATEMENT, metadata


def on_model_turn_complete(full_text: str, was_interrupted: bool = False):
    """
    Callback when model completes a turn. Override this for custom processing.
    
    Args:
        full_text: Complete transcription of model's speech
        was_interrupted: True if the turn was interrupted by user
    """
    if was_interrupted:
        print(f"\n[INTERRUPTED] Partial output: {full_text[:50]}...")
        return
    
    # Detect intent
    intent, metadata = detect_intent(full_text)
    
    # Log the detected intent
    print(f"\n[INTENT: {intent.value}] {full_text}")
    
    # Handle specific intents
    if intent == ModelIntent.BINARY_CHOICE:
        choices = metadata.get("choices", [])
        print(f"  -> Detected choices: {choices}")
        # You could trigger UI updates, log analytics, etc.
        
    elif intent == ModelIntent.CONFIRMATION:
        print(f"  -> Waiting for user confirmation")
        # You could show a visual indicator, set a timeout, etc.
        
    elif intent == ModelIntent.INSTRUCTION:
        print(f"  -> Step-by-step instruction detected")
        # You could track progress, show step counter, etc.
    
    # Log to file for analysis
    with open("model_turns.jsonl", "a") as f:
        f.write(json.dumps({
            "timestamp": datetime.now().isoformat(),
            "text": full_text,
            "intent": intent.value,
            "metadata": metadata,
            "interrupted": was_interrupted,
        }) + "\n")

In [2]:
async def stream_microphone(session):
    """Capture audio from the microphone and stream it to the Live API session."""
    mic_stream = audio_interface.open(
        format=pyaudio.paInt16,
        channels=CHANNELS,
        rate=INPUT_SAMPLE_RATE,
        input=True,
        frames_per_buffer=CHUNK_SIZE,
    )
    
    print("Microphone streaming started...")
    
    try:
        while True:
            # Read audio chunk from microphone (non-blocking with exception handling)
            audio_data = mic_stream.read(CHUNK_SIZE, exception_on_overflow=False)
            
            # Send audio to the Live API session
            await session.send_realtime_input(
                audio=types.Blob(
                    data=audio_data,
                    mime_type=f'audio/pcm;rate={INPUT_SAMPLE_RATE}'
                )
            )
            
            # Yield to the event loop to allow other tasks to run
            await asyncio.sleep(0.001)
    except asyncio.CancelledError:
        print("Microphone streaming stopped.")
    finally:
        mic_stream.stop_stream()
        mic_stream.close()


async def play_audio_async(audio_data: bytes):
    """Play audio data received from the Live API."""
    global playback_stream, is_playing
    
    # Initialize playback stream if needed
    if playback_stream is None or not playback_stream.is_active():
        playback_stream = audio_interface.open(
            format=pyaudio.paInt16,
            channels=CHANNELS,
            rate=OUTPUT_SAMPLE_RATE,
            output=True,
            frames_per_buffer=CHUNK_SIZE,
        )
    
    is_playing = True
    
    # Write audio data to the playback stream
    # Run in executor to avoid blocking the event loop
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, playback_stream.write, audio_data)


async def stop_audio_playback():
    """Stop audio playback (called when user interrupts)."""
    global playback_stream, is_playing
    
    is_playing = False
    
    if playback_stream is not None and playback_stream.is_active():
        playback_stream.stop_stream()
        playback_stream = None
        print("Audio playback interrupted.")


async def run_timer(seconds: int):
    """Run a timer for the specified duration."""
    print(f"Timer started for {seconds} seconds...")
    await asyncio.sleep(seconds)
    print(f"Timer finished! {seconds} seconds elapsed.")

In [None]:
async def voice_assistant():
    global playback_stream, is_playing
    
    # Reset global state from previous runs
    if playback_stream is not None:
        try:
            playback_stream.stop_stream()
            playback_stream.close()
        except Exception:
            pass
        playback_stream = None
    is_playing = False
    
    # Clear log files at start of session
    with open(LOG_FILE, "w") as f:
        f.write("")
    with open("model_turns.jsonl", "w") as f:
        f.write("")
    print(f"Logging messages to: {LOG_FILE}")
    print(f"Logging model turns to: model_turns.jsonl")
    
    client = genai.Client()

    # Define a tool
    tools = [{
        'function_declarations': [{
            'name': 'set_timer',
            'description': 'Set a timer for a specified duration',
            'parameters': {
                'type': 'object',
                'properties': {
                    'seconds': {'type': 'integer', 'description': 'Timer duration in seconds'}
                },
                'required': ['seconds']
            }
        }],
    }, {
        'function_declarations': [{
            'name': 'end_session',
            'description': 'End the voice assistant session when user says goodbye or wants to stop',
            'parameters': {
                'type': 'object',
                'properties': {}
            }
        }]
    }]

    # System prompt optimized for older adults
    system_prompt = """You are an empathetic, clear, and patient Voice Assistant designed to support older adults. Your primary goal is to maximize comprehension and reduce cognitive load. You must strictly adhere to specific linguistic and pacing protocols in every interaction.

CORE COMMUNICATION PROTOCOLS
1. PACING & CHUNKING (The "Stop-and-Wait" Rule)
One Concept Per Turn: Never provide more than one distinct instruction or piece of information at a time.

Mandatory Pausing: Insert distinct silence markers (e.g., ``) between sentences to mimic a slow, measured cadence. Allow the user time to process before you continue.

Chunking Strategy: Break complex tasks into isolated steps. Wait for verbal confirmation (e.g., "Ready" or "Okay") after every step before providing the next one.

Bad: "To set the alarm, go to settings, click clock, and then press add."

Good: "We will set the alarm now. First, open Settings. Tell me when you are ready."

2. SYNTACTIC STRUCTURE (Right-Branching Only)
Main Clause First: Always place the primary action or subject at the start of the sentence. Never begin a sentence with a dependent clause (e.g., "If," "When," "Because").

Bad: "If you want to hear the news, say yes." (Left-Branching/High Cognitive Load)

Good: "Say yes to hear the news." (Right-Branching/Low Cognitive Load)

Simple Subject-Verb-Object: Use linear sentence structures. Avoid embedded clauses or parenthetical explanations.

3. SEMANTIC CLARITY
Positive Phrasing: State what the user should do, not what they should not do. Negative abstractions (e.g., "Don't press cancel") require double-processing.

Bad: "Don't close the window."

Good: "Keep the window open."

Concrete Vocabulary: Use high-frequency, concrete nouns and verbs. Avoid idioms, metaphors, or tech jargon.

Bad: "Tap the hamburger menu to access preferences."

Good: "Press the button with three lines. It is at the top."

Binary Choices: Never ask open-ended questions that require high recall. Offer two distinct options.

Bad: "What music would you like to play?"

Good: "Would you like to play Jazz or Classical?"

RESPONSE TEMPLATE
Acknowledge: Briefly validate the user's input.

Action/Information: Deliver the core message using Right-Branching syntax.

Check: Ask a binary question or wait for confirmation.

EXAMPLES
User: "How do I call my daughter?" AI: "We can call her together. First, say 'Call Mary'. Or, say 'Dial Number'. Which one would you like?"

User: "I don't know what to do next." AI: "That is okay. We will take it slow. Look at the screen. Do you see the green button?"

TOOLS
When the user says goodbye, thanks you, or wants to end the conversation, call the end_session function."""

    # Voice options: https://docs.cloud.google.com/text-to-speech/docs/list-voices-and-types
    # Female voices: Zephyr, Kore, Leda, Aoede, Achernar, Gacrux, etc.
    # Male voices: Puck, Charon, Fenrir, Orus, Algenib, etc.
    config = types.LiveConnectConfig(
        response_modalities=['AUDIO'],
        system_instruction=system_prompt,
        speech_config=types.SpeechConfig(
            voice_config=types.VoiceConfig(
                prebuilt_voice_config=types.PrebuiltVoiceConfig(
                    voice_name='Gacrux'  # Female, Mature tone
                )
            ),
            language_code='en-US',
        ),
        tools=tools,
        input_audio_transcription=types.AudioTranscriptionConfig(),
        output_audio_transcription=types.AudioTranscriptionConfig(),
    )

    session_active = True
    
    # Buffer to accumulate model output transcription
    model_output_buffer = []
    was_interrupted = False

    async with client.aio.live.connect(
        model='gemini-2.5-flash-native-audio-preview-12-2025',
        config=config
    ) as session:
        print("Voice assistant ready. Speak into your microphone...")
        print("Voice: Gacrux (Female, Mature) - Optimized for older adults")
        print("Say 'goodbye' to end, or press the stop button (■).")

        # Start audio streaming task
        audio_task = asyncio.create_task(stream_microphone(session))

        try:
            # Keep receiving messages in a continuous loop
            while session_active:
                async for message in session.receive():
                    # Log every message to file
                    log_message(message)
                    
                    # Play audio response
                    if message.data:
                        await play_audio_async(message.data)

                    # Handle transcriptions
                    if message.server_content:
                        # Accumulate input transcription (for reference)
                        if message.server_content.input_transcription:
                            print(f"You: {message.server_content.input_transcription.text}")
                        
                        # Accumulate output transcription
                        if message.server_content.output_transcription:
                            transcript_text = message.server_content.output_transcription.text
                            model_output_buffer.append(transcript_text)
                            print(f"Assistant: {transcript_text}")
                        
                        # Handle interruption
                        if message.server_content.interrupted:
                            await stop_audio_playback()
                            was_interrupted = True
                        
                        # Turn complete - process the full output
                        if message.server_content.turn_complete:
                            # Combine all accumulated text
                            full_output = "".join(model_output_buffer).strip()
                            
                            if full_output:
                                # Call the callback with complete text
                                on_model_turn_complete(full_output, was_interrupted)
                            
                            # Reset buffer for next turn
                            model_output_buffer = []
                            was_interrupted = False
                            print("---")  # Visual separator between turns

                    # Handle tool calls
                    if message.tool_call:
                        for fc in message.tool_call.function_calls:
                            if fc.name == 'set_timer':
                                seconds = fc.args['seconds']
                                asyncio.create_task(run_timer(seconds))
                                await session.send_tool_response(
                                    function_responses=types.FunctionResponse(
                                        id=fc.id,
                                        name=fc.name,
                                        response={'status': 'Timer set'},
                                    )
                                )
                            elif fc.name == 'end_session':
                                print("\nGoodbye!")
                                await session.send_tool_response(
                                    function_responses=types.FunctionResponse(
                                        id=fc.id,
                                        name=fc.name,
                                        response={'status': 'Session ending'},
                                    )
                                )
                                session_active = False
                                break

                    # Server going away
                    if message.go_away:
                        print(f"Server disconnecting in {message.go_away.time_left}...")
                        session_active = False
                        break
                
                if not session_active:
                    break

        except asyncio.CancelledError:
            print("\nSession cancelled.")
        finally:
            audio_task.cancel()
            try:
                await audio_task
            except asyncio.CancelledError:
                pass
            # Clean up playback stream
            if playback_stream is not None:
                try:
                    playback_stream.stop_stream()
                    playback_stream.close()
                except Exception:
                    pass
                playback_stream = None
            print(f"Session ended. Messages logged to: {LOG_FILE}")

In [None]:
# In Jupyter notebooks, use await directly (top-level await is supported)
await voice_assistant()

Logging messages to: live_session_messages.jsonl
Voice assistant ready. Speak into your microphone...
Voice: Gacrux (Female, Mature)
Say 'goodbye' to end, or press the stop button (■).
Microphone streaming started...
You:  Öyle doğum.




Assistant: Pardon, ne
Assistant:  demek
Assistant:  istemiştiniz?


In [None]:
# Cleanup: Close the audio interface when done
audio_interface.terminate()