# CONVERSE with REALTIME API 
In this step, we'll transform our single-turn audio generation into a full conversational system. This involves tackling several key challenges: managing bi-directional audio streams for both user input and AI output, implementing proper turn-taking mechanics (knowing when the user has finished speaking and when the AI should respond), handling the complex flow of events from speech detection to response generation, and maintaining conversation context across multiple exchanges. The trickiest part isn't just streaming the audio - it's orchestrating all these components to work together smoothly while maintaining natural conversational flow. We'll build this step by step, focusing on creating a robust system that can handle real-world conversation scenarios.

# Key Challenges in Building Real-time Conversational AI

Below are the key challanges in building Real-time Conversational AI...

1. **Session and Context Management**
  - Maintaining conversation history
  - Tracking context across exchanges
  - Managing session state

2. **Turn-Taking Mechanics**
  - Detecting end of user speech
  - Managing interruptions
  - Coordinating input/output transitions
  - Timing audio recording and playback

3. **Audio Stream Handling**
  - Coordinating bidirectional audio
  - Managing stream lifecycles
  - Resolving device conflicts
  - Handling format compatibility

4. **Event Flow Control**
  - Input audio streaming
  - Speech detection
  - Text transcription
  - Response generation
  - Output audio streaming

5. **Error Recovery**
  - Connection drops
  - Audio device issues
  - API rate limits
  - Graceful error handling

6. **Resource Management**
  - WebSocket connection handling
  - Audio buffer memory
  - Resource cleanup
  - Connection lifecycle management


We will build an MVP, therefore choose the minimum set that will give us a working setup. The below is what we will target. 

### Minimum Viable Conversational System - Development Plan

#### STEP 1 **Session and Context Management**
#### STEP 2 **Turn Detection, better VAD and Silence Detection**
#### STEP 3 **Interruption Handling**
#### STEP 4 **Session and Context Management**




### STEP 1 **Session and Context Management**
We will implement the below key features implemented:

- Bidirectional audio streams
- Basic turn management
- Audio buffering
- Session handling
- Basic error recovery

Finally test by running the script - it will listen for 2 seconds, send to API, then play response.

In [None]:
import asyncio
import os
import base64
import json
from dotenv import load_dotenv
import websockets
import numpy as np
import sounddevice as sd

class ConversationSystem:
    def __init__(self):
        load_dotenv()
        self.input_stream = None
        self.output_stream = None
        self.is_speaking = False
        self.input_buffer = []
        self.api_key = os.getenv("AZURE_OPENAI_API_KEY")
        if not self.api_key:
            raise ValueError("AZURE_OPENAI_API_KEY not found in environment")
        self.url = (
            f"wss://aoai-ep-swedencentral02.openai.azure.com/openai/realtime?"
            f"api-version=2024-10-01-preview&deployment=gpt-4o-realtime-preview&api-key={self.api_key}"
        )

    async def setup_audio(self):
        print("Setting up audio streams...")
        self.output_stream = sd.OutputStream(samplerate=24000, channels=1, dtype=np.int16)
        self.input_stream = sd.InputStream(samplerate=24000, channels=1, dtype=np.int16,
                                         callback=self.audio_callback)
        self.output_stream.start()
        self.input_stream.start()
        print("Audio streams started")
        
    def audio_callback(self, indata, frames, time, status):
        if status:
            print(f"Input stream error: {status}")
        if not self.is_speaking:
            self.input_buffer.extend(indata.tobytes())

    async def start_conversation(self):
        try:
            async with websockets.connect(self.url) as ws:
                print("Connected to WebSocket")
                await self.setup_session(ws)
                
                # Initial greeting
                await self.send_message(ws, "Hello")
                await self.handle_response(ws)

                while True:
                    print("\nListening... (speak for at least 2 seconds)")
                    self.input_buffer.clear()
                    await asyncio.sleep(2)  # Wait for 2 seconds of audio

                    if len(self.input_buffer) > 0:
                        print("Processing your input...")
                        audio_data = bytes(self.input_buffer)
                        self.input_buffer.clear()
                        
                        # Send audio
                        base64_audio = base64.b64encode(audio_data).decode('utf-8')
                        await ws.send(json.dumps({
                            "type": "input_audio_buffer.append",
                            "audio": base64_audio
                        }))
                        await ws.send(json.dumps({
                            "type": "input_audio_buffer.commit"
                        }))
                        
                        # Request response
                        await ws.send(json.dumps({
                            "type": "response.create",
                            "response": {"modalities": ["audio", "text"]}
                        }))
                        
                        await self.handle_response(ws)

        except Exception as e:
            print(f"Error in conversation: {e}")

    async def setup_session(self, ws):
        session_payload = {
            "type": "session.update",
            "session": {
                "voice": "alloy",
                "instructions": "You are a helpful AI assistant. Keep responses brief and engaging.",
                "modalities": ["audio", "text"],
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.5,
                    "prefix_padding_ms": 300,
                    "silence_duration_ms": 200
                }
            }
        }
        await ws.send(json.dumps(session_payload))
        
        while True:
            response = await ws.recv()
            data = json.loads(response)
            if data.get("type") == "session.created":
                print("Session setup complete")
                break
            elif data.get("type") == "error":
                raise Exception("Error creating session")

    async def send_message(self, ws, text):
        message_payload = {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": text}]
            }
        }
        await ws.send(json.dumps(message_payload))
        
        await ws.send(json.dumps({
            "type": "response.create",
            "response": {"modalities": ["audio", "text"]}
        }))

    async def handle_response(self, ws):
        self.is_speaking = True
        try:
            while True:
                response = await ws.recv()
                data = json.loads(response)
                
                if data["type"] == "response.audio.delta":
                    if "delta" in data:
                        try:
                            audio_data = data["delta"].replace(" ", "").replace("\n", "")
                            padding = len(audio_data) % 4
                            if padding:
                                audio_data += "=" * padding
                            
                            audio_bytes = base64.b64decode(audio_data)
                            audio = np.frombuffer(audio_bytes, dtype=np.int16)
                            self.output_stream.write(audio)
                            print(".", end="", flush=True)
                            
                        except Exception as e:
                            print(f"Error processing audio: {e}")
                            
                elif data["type"] == "response.done":
                    break
                    
        finally:
            self.is_speaking = False

async def main():
    system = ConversationSystem()
    await system.setup_audio()
    await system.start_conversation()

if __name__ == "__main__":
    print("Starting real-time conversation system...")
    asyncio.run(main())

Run the above as a python script outside of the notebook as event loops clash with Jupyters scheduling. \
[Here is a working script](https://github.com/ozgurgulerx/ai-builders/blob/main/part1_build/p1uc1_realtime_api_converse_step1.py)



Voila - our voice chatbot works!!! \
Still a bit clumsy though in terms of detecting session dynamics as well as properly listenning and understanding the user. \
We will improve it in the next steps.

Implementation Status: Phase 1 \

#### ✓ Completed
- Audio I/O setup (device configuration, streaming)
- Server-side VAD implementation
- Basic audio state management

#### 🔄 Partially Implemented
- Turn detection (needs refinement)
- Basic interrupt handling
- State tracking

##### ❌ Not Implemented
- Conversation history
- Context preservation
- Session state management

#### Next Steps
Focus on context management system and turn detection improvements for Phase 2.

### STEP2 - Turn Detection, better VAD and Silence Detection 

#### Voice Activity Detection (VAD)
- Configurable VAD threshold
- Proper silence duration tracking
- Minimum speech frames validation

#### Audio Processing
- Block-based processing (200ms blocks)
- Efficient buffer management
- Real-time audio level monitoring

#### State Management
- Clear speech state tracking
- Better interruption handling
- Dynamic silence detection

Expected outcome: More natural conversation flow with reliable speech detection and turn-taking.
```

In [None]:
import asyncio
import os
import base64
import json
import numpy as np
import sounddevice as sd
import websockets
from dotenv import load_dotenv

class AudioProcessor:
    def __init__(self, sample_rate=24000):
        self.sample_rate = sample_rate
        self.vad_threshold = 0.015
        self.speech_frames = 0
        self.silence_frames = 0
        self.min_speech_duration = int(0.3 * sample_rate)
        self.max_silence_duration = int(0.8 * sample_rate)
        self.buffer = []
        self.is_speaking = False
        self.speech_detected = False

    def process_audio(self, indata):
        if self.is_speaking:
            return

        audio_level = np.abs(indata).mean() / 32768.0
        
        if audio_level > self.vad_threshold:
            self.speech_detected = True
            self.speech_frames += len(indata)
            self.silence_frames = 0
            self.buffer.extend(indata.tobytes())
        elif self.speech_detected:
            self.silence_frames += len(indata)
            if self.silence_frames < self.max_silence_duration:
                self.buffer.extend(indata.tobytes())

    def should_process(self):
        return (self.speech_detected and 
                self.speech_frames >= self.min_speech_duration and 
                self.silence_frames >= self.max_silence_duration)

    def reset(self):
        self.speech_frames = 0
        self.silence_frames = 0
        self.speech_detected = False
        audio_data = bytes(self.buffer)
        self.buffer.clear()
        return audio_data

class ConversationSystem:
    def __init__(self):
        load_dotenv()
        self.api_key = os.getenv("AZURE_OPENAI_API_KEY")
        if not self.api_key:
            raise ValueError("AZURE_OPENAI_API_KEY not found")
            
        self.url = (
            "wss://aoai-ep-swedencentral02.openai.azure.com/openai/realtime?"
            f"api-version=2024-10-01-preview&deployment=gpt-4o-realtime-preview&"
            f"api-key={self.api_key}"
        )
        
        self.audio_processor = AudioProcessor()
        self.streams = {'input': None, 'output': None}

    def audio_callback(self, indata, frames, time, status):
        if status:
            print(f"Audio error: {status}")
            return
        self.audio_processor.process_audio(indata)

    async def setup_audio(self):
        self.streams['output'] = sd.OutputStream(
            samplerate=24000, channels=1, dtype=np.int16)
        self.streams['input'] = sd.InputStream(
            samplerate=24000, channels=1, dtype=np.int16,
            callback=self.audio_callback, blocksize=4800)
            
        for stream in self.streams.values():
            stream.start()

    async def setup_websocket_session(self, websocket):
        session_config = {
            "type": "session.update",
            "session": {
                "voice": "alloy",
                "instructions": "You are a helpful AI assistant. Keep responses brief.",
                "modalities": ["audio", "text"],
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.3,
                    "prefix_padding_ms": 150,
                    "silence_duration_ms": 600
                }
            }
        }
        
        await websocket.send(json.dumps(session_config))
        
        while True:
            response = json.loads(await websocket.recv())
            if response["type"] == "session.created":
                break
            if response["type"] == "error":
                raise Exception(f"Session setup failed: {response}")

    async def send_audio(self, websocket, audio_data):
        audio_base64 = base64.b64encode(audio_data).decode('utf-8')
        
        await websocket.send(json.dumps({
            "type": "input_audio_buffer.append",
            "audio": audio_base64
        }))
        await websocket.send(json.dumps({"type": "input_audio_buffer.commit"}))
        await websocket.send(json.dumps({
            "type": "response.create",
            "response": {"modalities": ["audio", "text"]}
        }))

    async def handle_response(self, websocket):
        self.audio_processor.is_speaking = True
        
        try:
            while True:
                response = json.loads(await websocket.recv())
                
                if response["type"] == "response.audio.delta":
                    if "delta" in response:
                        try:
                            audio_data = response["delta"].strip()
                            padding = -len(audio_data) % 4
                            if padding:
                                audio_data += "=" * padding
                            
                            audio = np.frombuffer(
                                base64.b64decode(audio_data), 
                                dtype=np.int16
                            )
                            self.streams['output'].write(audio)
                            
                        except Exception as e:
                            print(f"Audio processing error: {e}")
                            
                elif response["type"] == "response.done":
                    break
                    
        finally:
            self.audio_processor.is_speaking = False

    async def run(self):
        await self.setup_audio()
        
        async with websockets.connect(self.url) as ws:
            await self.setup_websocket_session(ws)
            print("Ready for conversation")
            
            while True:
                if self.audio_processor.should_process():
                    audio_data = self.audio_processor.reset()
                    await self.send_audio(ws, audio_data)
                    await self.handle_response(ws)
                await asyncio.sleep(0.05)

if __name__ == "__main__":
    system = ConversationSystem()
    asyncio.run(system.run())

### Changes & Expected Effects

#### VAD Parameters
- Changed threshold: 500 -> 0.015 (normalized)
- Effect: More sensitive speech detection

#### Timing
- Sleep interval: 0.1s -> 0.05s
- Effect: Faster response time

#### Buffer Management
- Added proper reset after processing
- Effect: Prevents audio data buildup

#### State Tracking
- Added speech_frames counter
- Min speech duration: 0.3s
- Max silence: 0.8s
- Effect: Better conversation turn detection

#### Code Structure
- Split into AudioProcessor class
- Effect: Cleaner maintenance, better testing

[Here is a working script](https://github.com/ozgurgulerx/ai-builders/blob/main/part1_build/p1uc1_realtime_api_converse_step2.py) 

Now the bot is more responsive and can detect the user's speech more accurately. \
The conversation has become more fluid and natural. \
Well done! 

One missing piece is improving "interrupt handling". Voice bot keeps going even when we interrupt - we will address this in the next step.



### STEP 3: Interruption Handling 

**Initial Challenge: Beyond Basic Interruption Detection**

The Azure OpenAI SDK documentation reveals that effective interruption handling requires more than just detecting and stopping speech. We must properly manage the full conversation context and message flow to create natural interactions.

**Key Components for Successful Implementation**

**1. Conversation Context Management**
The system needs to actively track and manage conversation state through:
- Unique conversation IDs for each interaction session
- Message IDs for individual exchanges
- Proper association between audio inputs and their conversation context

**2. Interruption Audio Processing**
When interruption occurs, the system must:
- Maintain a buffer of the interrupting speech
- Process this speech as a new conversation input
- Ensure no audio information is lost during the transition

**3. Structured Message Flow**
The conversation must follow a specific structure:
- Create a new conversation message before sending audio
- Associate audio data with the correct message ID
- Maintain proper sequencing of conversation turns

**Implementation Requirements**

**Technical Infrastructure**
- Audio buffer management system
- Continuous audio level monitoring
- Calibrated threshold values
- Performance-optimized processing

**Conversation Flow Control**
- Immediate response cancellation capability
- Context clearing mechanisms
- New conversation message creation
- Proper message sequencing

**User Experience Considerations**
- Quick interruption detection (200-300ms)
- Minimal false positives
- Smooth topic transitions
- Natural conversation rhythm

**Process Flow During Interruption**

1. Initial Detection
  - Monitor audio levels during system speech
  - Compare against interruption threshold
  - Trigger interruption handling when threshold is exceeded

2. Response Management
  - Cancel current audio output
  - Clear ongoing response generation
  - Preserve interrupting audio

3. Context Transition
  - Clear existing conversation context
  - Create new conversation message
  - Associate preserved audio with new message

4. New Input Processing
  - Process interrupted speech as fresh input
  - Generate appropriate response
  - Maintain conversation fluidity

#### Success Criteria

A properly implemented system will:
- Respond immediately to interruptions
- Understand and process interrupted speech correctly
- Maintain natural conversation flow
- Handle topic switches seamlessly
- Provide appropriate responses to new questionsß

In [None]:
import asyncio
import os
import base64
import json
import numpy as np
import sounddevice as sd
import websockets
from dotenv import load_dotenv

class AudioProcessor:
    """Handles real-time audio processing, speech detection, and interruption management.
    This class processes incoming audio frames, detects speech activity, and manages
    both normal conversation flow and interruptions."""
    
    def __init__(self, sample_rate=24000):
        # Audio configuration parameters
        self.sample_rate = sample_rate
        
        # Speech detection thresholds
        # Normal speech needs to be above this level to be detected
        self.vad_threshold = 0.015
        # Interruptions need to exceed this higher threshold
        self.interrupt_threshold = 0.02
        
        # Duration settings (in audio frames)
        # We need at least this many frames of speech to consider it valid
        self.min_speech_duration = int(0.3 * sample_rate)
        # We allow this many frames of silence before ending speech detection
        self.max_silence_duration = int(0.8 * sample_rate)
        
        # Frame counters for speech detection
        self.speech_frames = 0
        self.silence_frames = 0
        
        # Audio buffers
        self.main_buffer = []         # For regular speech
        self.interrupt_buffer = []    # For speech during interruptions
        
        # State tracking
        self.is_speaking = False      # Is the AI currently speaking?
        self.speech_detected = False  # Have we detected user speech?
        self.is_interrupting = False  # Is an interruption in progress?
        self.latest_audio = None      # Most recent audio frame

    def process_audio(self, indata):
        """Process incoming audio frames for both speech detection and interruption handling."""
        # Keep track of the most recent audio for interruption detection
        self.latest_audio = indata
        audio_level = np.abs(indata).mean() / 32768.0  # Normalize to 0-1 range
        
        # If we're in interruption mode, store audio in interrupt buffer
        if self.is_interrupting:
            self.interrupt_buffer.extend(indata.tobytes())
            return
            
        # Don't process for speech detection if AI is speaking
        if self.is_speaking:
            return

        # Speech detection logic
        if audio_level > self.vad_threshold:
            # Speech detected
            self.speech_detected = True
            self.speech_frames += len(indata)
            self.silence_frames = 0
            self.main_buffer.extend(indata.tobytes())
        elif self.speech_detected:
            # Track silence after speech
            self.silence_frames += len(indata)
            if self.silence_frames < self.max_silence_duration:
                self.main_buffer.extend(indata.tobytes())

    def check_interruption(self):
        """Check if current audio level indicates an interruption while AI is speaking."""
        if not self.is_speaking or self.latest_audio is None:
            return False
            
        audio_level = np.abs(self.latest_audio).mean() / 32768.0
        if audio_level > self.interrupt_threshold:
            self.is_interrupting = True
            return True
        return False

    def should_process(self):
        """Determine if we have enough valid speech to process."""
        return (self.speech_detected and 
                self.speech_frames >= self.min_speech_duration and 
                self.silence_frames >= self.max_silence_duration)

    def get_main_audio(self):
        """Get and clear the main speech buffer."""
        self.speech_frames = 0
        self.silence_frames = 0
        self.speech_detected = False
        audio_data = bytes(self.main_buffer)
        self.main_buffer.clear()
        return audio_data

    def get_interrupt_audio(self):
        """Get and clear the interruption buffer."""
        if not self.interrupt_buffer:
            return None
        audio_data = bytes(self.interrupt_buffer)
        self.interrupt_buffer.clear()
        self.is_interrupting = False
        return audio_data

class ConversationSystem:
    """Manages the overall conversation system, handling websocket communication,
    audio streaming, and coordinating between speech detection and AI responses."""
    
    def __init__(self):
        # Load environment variables and set up API connection
        load_dotenv()
        self.api_key = os.getenv("AZURE_OPENAI_API_KEY")
        if not self.api_key:
            raise ValueError("AZURE_OPENAI_API_KEY not found")
            
        self.url = (
            "wss://aoai-ep-swedencentral02.openai.azure.com/openai/realtime?"
            f"api-version=2024-10-01-preview&deployment=gpt-4o-realtime-preview&"
            f"api-key={self.api_key}"
        )
        
        # Initialize components
        self.audio_processor = AudioProcessor()
        self.streams = {'input': None, 'output': None}

    def audio_callback(self, indata, frames, time, status):
        """Callback function for audio input stream."""
        if status:
            print(f"Audio error: {status}")
            return
        self.audio_processor.process_audio(indata)

    async def setup_audio(self):
        """Initialize audio input and output streams."""
        self.streams['output'] = sd.OutputStream(
            samplerate=24000, channels=1, dtype=np.int16)
        self.streams['input'] = sd.InputStream(
            samplerate=24000, channels=1, dtype=np.int16,
            callback=self.audio_callback, blocksize=4800)
            
        for stream in self.streams.values():
            stream.start()

    async def setup_websocket_session(self, websocket):
        """Configure the conversation session with the API."""
        session_config = {
            "type": "session.update",
            "session": {
                "voice": "alloy",
                "instructions": "You are a helpful AI assistant. Keep responses brief.",
                "modalities": ["audio", "text"],
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.3,
                    "prefix_padding_ms": 150,
                    "silence_duration_ms": 600
                }
            }
        }
        
        await websocket.send(json.dumps(session_config))
        
        while True:
            response = json.loads(await websocket.recv())
            if response["type"] == "session.created":
                break
            if response["type"] == "error":
                raise Exception(f"Session setup failed: {response}")

    async def send_audio(self, websocket, audio_data):
        """Send audio data to the API."""
        audio_base64 = base64.b64encode(audio_data).decode('utf-8')
        
        # Send audio data
        await websocket.send(json.dumps({
            "type": "input_audio_buffer.append",
            "audio": audio_base64
        }))
        
        # Mark audio input as complete
        await websocket.send(json.dumps({
            "type": "input_audio_buffer.commit"
        }))
        
        # Request response
        await websocket.send(json.dumps({
            "type": "response.create",
            "response": {"modalities": ["audio", "text"]}
        }))

    async def handle_response(self, websocket):
        """Process AI responses and handle interruptions."""
        self.audio_processor.is_speaking = True
        
        try:
            while True:
                # Check for interruption
                if self.audio_processor.check_interruption():
                    print("Interrupted!")
                    # Cancel current response
                    await websocket.send(json.dumps({"type": "response.cancel"}))
                    # Wait briefly for any final speech
                    await asyncio.sleep(0.2)
                    # Process interrupted speech if any
                    interrupt_audio = self.audio_processor.get_interrupt_audio()
                    if interrupt_audio:
                        await self.send_audio(websocket, interrupt_audio)
                    break
                
                # Process normal response
                response = json.loads(await websocket.recv())
                
                if response["type"] == "response.audio.delta":
                    if "delta" in response:
                        try:
                            # Process audio chunk
                            audio_data = response["delta"].strip()
                            padding = -len(audio_data) % 4
                            if padding:
                                audio_data += "=" * padding
                            
                            audio = np.frombuffer(
                                base64.b64decode(audio_data), 
                                dtype=np.int16
                            )
                            self.streams['output'].write(audio)
                            
                        except Exception as e:
                            print(f"Audio processing error: {e}")
                            
                elif response["type"] == "response.done":
                    break
                    
        finally:
            self.audio_processor.is_speaking = False

    async def run(self):
        """Main conversation loop."""
        await self.setup_audio()
        print("Audio setup complete")
        
        async with websockets.connect(self.url) as ws:
            await self.setup_websocket_session(ws)
            print("Ready for conversation")
            
            while True:
                # Process normal speech
                if self.audio_processor.should_process():
                    audio_data = self.audio_processor.get_main_audio()
                    await self.send_audio(ws, audio_data)
                    await self.handle_response(ws)
                    
                await asyncio.sleep(0.05)

if __name__ == "__main__":
    system = ConversationSystem()
    asyncio.run(system.run())

You can use the following [script](https://github.com/ozgurgulerx/ai-builders/blob/main/part1_build/p1uc1_realtime_api_converse_step3_interruption.py) to run the real-time conversation system...

### STEP 4 **Session and Context Management