# Creating Effective User Agents with Azure OpenAI’s Real-Time API 
Developing a responsive, intelligent user agent using Azure OpenAI’s real-time capabilities involves careful attention to voice interactions, context retention, performance, and robust design. 

Key challanges include...

## 1. Voice Interruption Handling

Our goal is to handle user "barge-in"'s gracefully so the AI stops talking and immediately re-focuses on the new user input. By default, Azure OpenAI Real-Time API offers built-in Voice Activity Detection (VAD), which you can tune for your environment. If your setting is noisy, raising the VAD threshold helps avoid false triggers; if it’s too sensitive, lower it. For extreme conditions or custom scenarios, you can disable built-in VAD and implement your own, manually stopping TTS playback whenever you detect a genuine interruption.

Once an interruption occurs, be sure to truncate the unfinished response on the server side, so the AI doesn’t keep referencing text the user never heard. This keeps the conversation strictly aligned with what was actually said. A helpful touch is to acknowledge the interruption with a brief apology or confirmation—users will find the experience more natural. If barge-ins happen a lot, add a small “debounce” window so quick background sounds don’t falsely trigger interruptions.

### Automatic VAD (Voice Activity Detection)
- Use built-in VAD from Azure OpenAI Real-Time to detect interruptions.
- Adjust VAD sensitivity (threshold, padding) to reduce false positives in noisy settings.
- Truncation for Synchronization: When the user interrupts mid-response, call the API to truncate the remainder of the AI’s reply.
This ensures the agent doesn’t assume it said something the user never heard, keeping the conversation “in sync.”
Optimization Tips:

There is usually more optimisation to be done as per your environment:
- Fine-Tune VAD: Adjust thresholds to match your environment (e.g. set higher for busy call centers).
- Detect Early: Implement a short “debounce” so minor noises don’t trigger interruption, but user speech does.
- Acknowledge Interruptions: Provide a quick apology or relevant acknowledgment to make it feel more natural.


## 2. Context Management

We want our AI to feel like it genuinely remembers what’s been said, even across multiple turns. A good approach is to maintain a rolling conversation history by storing recent messages—both user and assistant—and bundling them into each API call. If the session drags on, you can summarize older content to keep token usage manageable.

At the start of any session, it helps to present a system prompt describing the assistant’s role, tone, and any user-specific details. This keeps the AI focused and aligned with your goals. For extended or returning conversations, stash key data points in an external store—like a database or vector index—to fetch them on demand, especially if the user references something from a previous day.

To optimize context usage, consider summarizing older exchanges so the assistant doesn’t forget important details. Always label who is speaking (e.g. “User,” “Assistant”) so the model can track conversation flow. And if the user abruptly pivots to a completely new topic, feel free to reset the context or at least partially clear it to avoid confusion.

## 3. Latency Optimization

We want the conversation to feel immediate and responsive, so adopting a streaming architecture is key. Let Azure OpenAI process user audio in real time and start sending back partial responses as soon as they’re generated—no waiting for a full transcription. To keep it efficient, maintain a persistent WebSocket so you don’t have to reconnect on every turn, and consider pooling connections if you need to handle many sessions in parallel.

Whenever possible, go for an end-to-end pipeline by using Real-Time audio input/output directly, avoiding separate STT/TTS services that can add latency. Hosting your service in the same Azure region as your OpenAI resource also helps keep roundtrip times low.

For further optimization, reduce the number of layers in your architecture—fewer services in between means quicker responses—and send smaller audio chunks more frequently to maintain that “streaming” feel. Finally, don’t forget to load test your setup under realistic conditions so you can spot bottlenecks and fine-tune performance before going live.

## 4. Speaker Identification & Differentiation

Although we will not use diarization in this workshop it is an important concept to mention while building Voice UI's. 
When multiple people are talking, it’s easy for your AI to lose track of who said what. Speaker diarization—labeling each voice as “Speaker 1,” “Speaker 2,” and so on—keeps everything straight, especially if you feed these labels back into the conversation context. If you need real identities, you can enroll voice profiles (until that feature is retired), but simple on-the-fly diarization often works fine.

In your prompts, clearly denote each speaker (like “Alice: … Bob: …”) so the model can address or reference them accurately. For instance, if “Alice” asked a question, the AI knows to respond directly to her. If your application can eventually match “Speaker 1” to Alice, rename labels for clarity. And if folks talk over each other, it’s wise to pause or ask for clarification. Finally, consider multi-user memory—persist speaker-specific context or preferences so the AI can recall each person’s details even across multiple sessions.




# TWO-WAY AUDIO CONVERSATION v1.0

Now review & run the script part1_realtime_api_advanced_converse_step1.py in the repo. 

This code sets up a two-way audio conversation with Azure OpenAI’s Real-Time API. You speak into your microphone; the audio is sent to Azure in near-real-time. The API processes that audio and returns both text and audio replies, which the script then plays back through your speakers.

In essence: It’s a basic, end-to-end voice assistant loop—your microphone feeds Azure GPT-4o Real-Time; the model replies with generated speech you can hear.

This is what our code does...
- **Audio Setup** : It sets up an input stream for your microphone and an output stream for your speakers (both at 24 kHz).
Any audio captured gets temporarily buffered—unless the script is busy playing back the AI’s voice, in which case it ignores incoming sound to avoid echo/feedback.
- **Session Configuration** : Upon connecting, it sends a “session.update” message specifying things like the voice to use (e.g. “alloy”), VAD thresholds, and the instruction text (“You are a helpful AI assistant…”).
- **Conversation Loop** : First it sends a text greeting (“Hello”) to get a response from the AI.
Then it repeatedly waits two seconds, grabs whatever audio you spoke into the mic, and sends that audio (Base64-encoded) to the Real-Time API.
It requests a reply in both text and audio form, then streams back the AI’s spoken response in real time.
- **Playback & Barge-In** :For each chunk of audio data received from the API, it decodes and plays it over your speakers.
There’s a simple voice activity detection (VAD) setup, but it doesn’t attempt to handle mid-response interruptions—just detects silence to know when you’re done speaking.
- **Ending Conditions** :The loop continues indefinitely unless something goes wrong (like a network error) or you terminate the script. Each full cycle of sending audio and receiving AI output counts as a single turn in the conversation.

Overall, it’s a straightforward real-time audio pipeline: (1) capture mic audio → (2) send to Azure → (3) get response audio → (4) play it back. It’s a practical demo of how to create a voice-first GPT-style assistant with Azure’s Real-Time API.

In [None]:
import asyncio
import os
import base64
import json
from dotenv import load_dotenv
import websockets
import numpy as np
import sounddevice as sd

class ConversationSystem:
    def __init__(self):
        # Load environment variables (including AZURE_OPENAI_API_KEY)
        load_dotenv()
        
        # Initialize audio streams and flags
        self.input_stream = None
        self.output_stream = None
        self.is_speaking = False
        self.input_buffer = []
        
        # Retrieve your Azure OpenAI key from environment
        self.api_key = os.getenv("AZURE_OPENAI_API_KEY")
        if not self.api_key:
            raise ValueError("AZURE_OPENAI_API_KEY not found in environment")
        
        # Construct the WebSocket URL for Azure OpenAI Real-Time
        self.url = (
            f"wss://aoai-ep-swedencentral02.openai.azure.com/openai/realtime?"
            f"api-version=2024-10-01-preview&deployment=gpt-4o-realtime-preview&api-key={self.api_key}"
        )

    async def setup_audio(self):
        """
        Sets up audio input (microphone) and output (speakers) streams at 24kHz,
        using the 'sounddevice' library. 
        """
        print("Setting up audio streams...")
        
        # Output stream: for playing back AI responses
        self.output_stream = sd.OutputStream(samplerate=24000, channels=1, dtype=np.int16)
        
        # Input stream: captures user speech via microphone
        self.input_stream = sd.InputStream(
            samplerate=24000, 
            channels=1, 
            dtype=np.int16,
            callback=self.audio_callback
        )
        
        # Start both streams
        self.output_stream.start()
        self.input_stream.start()
        print("Audio streams started")
        
    def audio_callback(self, indata, frames, time, status):
        """
        Called whenever there's new audio data from the mic. 
        If we're not currently playing back the AI's response, 
        we buffer this audio for sending to Azure.
        """
        if status:
            print(f"Input stream error: {status}")
        # Only record mic input if we're not playing AI audio
        if not self.is_speaking:
            self.input_buffer.extend(indata.tobytes())

    async def start_conversation(self):
        """
        Main loop to manage WebSocket connection and audio round-trips:
        - Connect to Azure Real-Time
        - Send an initial text message
        - Continuously record mic audio (2-second chunks) and send to Azure
        - Request the AI's response and play it back
        """
        try:
            async with websockets.connect(self.url) as ws:
                print("Connected to WebSocket")
                
                # Configure session with voice, instructions, VAD, etc.
                await self.setup_session(ws)
                
                # Send a simple greeting as initial text
                await self.send_message(ws, "Hello")
                await self.handle_response(ws)

                # Repeatedly capture short audio segments, then request a reply
                while True:
                    print("\nListening... (speak for at least 2 seconds)")
                    self.input_buffer.clear()
                    
                    # Wait 2 seconds to accumulate mic input
                    await asyncio.sleep(2)

                    # If there's any recorded audio, send it to the Real-Time API
                    if len(self.input_buffer) > 0:
                        print("Processing your input...")
                        audio_data = bytes(self.input_buffer)
                        self.input_buffer.clear()
                        
                        # Convert audio to Base64
                        base64_audio = base64.b64encode(audio_data).decode('utf-8')
                        
                        # Append new audio data to the server-side audio buffer
                        await ws.send(json.dumps({
                            "type": "input_audio_buffer.append",
                            "audio": base64_audio
                        }))
                        
                        # Signal that we've finished sending this chunk of audio
                        await ws.send(json.dumps({
                            "type": "input_audio_buffer.commit"
                        }))
                        
                        # Request both audio and text back from Azure
                        await ws.send(json.dumps({
                            "type": "response.create",
                            "response": {"modalities": ["audio", "text"]}
                        }))
                        
                        # Handle AI's response (streaming audio)
                        await self.handle_response(ws)

        except Exception as e:
            print(f"Error in conversation: {e}")

    async def setup_session(self, ws):
        """
        Sends a session update payload to configure voice, instructions, 
        and VAD settings, then waits for 'session.created' to confirm.
        """
        session_payload = {
            "type": "session.update",
            "session": {
                "voice": "alloy",
                "instructions": "You are a helpful AI assistant. Keep responses brief and engaging.",
                "modalities": ["audio", "text"],
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.5,
                    "prefix_padding_ms": 300,
                    "silence_duration_ms": 200
                }
            }
        }
        await ws.send(json.dumps(session_payload))
        
        # Wait for confirmation or error
        while True:
            response = await ws.recv()
            data = json.loads(response)
            if data.get("type") == "session.created":
                print("Session setup complete")
                break
            elif data.get("type") == "error":
                raise Exception("Error creating session")

    async def send_message(self, ws, text):
        """
        Sends a user message in text form, then requests an audio+text response.
        """
        message_payload = {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": text}]
            }
        }
        # Post the user message
        await ws.send(json.dumps(message_payload))
        
        # Ask Azure to generate a response in audio + text
        await ws.send(json.dumps({
            "type": "response.create",
            "response": {"modalities": ["audio", "text"]}
        }))

    async def handle_response(self, ws):
        """
        Reads streamed audio chunks from Azure (Base64-encoded deltas),
        decodes them, and plays them out in real time.
        """
        self.is_speaking = True
        try:
            while True:
                response = await ws.recv()
                data = json.loads(response)
                
                # Each 'response.audio.delta' event has a piece of audio data
                if data["type"] == "response.audio.delta":
                    if "delta" in data:
                        try:
                            # Clean up the Base64 string
                            audio_data = data["delta"].replace(" ", "").replace("\n", "")
                            # Pad if the length isn't multiple of 4 (Base64 requirement)
                            padding = len(audio_data) % 4
                            if padding:
                                audio_data += "=" * padding
                            
                            # Decode to raw PCM bytes, convert to np array, then play
                            audio_bytes = base64.b64decode(audio_data)
                            audio = np.frombuffer(audio_bytes, dtype=np.int16)
                            self.output_stream.write(audio)
                            
                            # Print a dot to indicate streaming progress
                            print(".", end="", flush=True)
                            
                        except Exception as e:
                            print(f"Error processing audio: {e}")
                            
                # 'response.done' means the assistant finished speaking this turn
                elif data["type"] == "response.done":
                    break
                    
        finally:
            # Allow mic recording again
            self.is_speaking = False

async def main():
    # Create our conversation system instance
    system = ConversationSystem()
    
    # Set up mic and speaker streams
    await system.setup_audio()
    
    # Start the real-time audio conversation
    await system.start_conversation()

if __name__ == "__main__":
    print("Starting real-time conversation system...")
    # Run the main function in an asyncio event loop
    asyncio.run(main())


Further iterations of the code, improve voice activity detection, handle interruptions, and manage context more effectively...
part1_realtime_api_advanced_converse_step2_better_vad.py 
part1_realtime_api_advanced_converse_step3_interruption_handling.py
part1_realtime_api_advanced_converse_step4_context_management.py




## Step 1 → Step 2: Adding Voice Activity Detection (VAD) and Basic Interrupt Handling

Key Improvement: Introduced VAD for real-time speech detection instead of waiting for fixed intervals.
New Code Snippet in Step 2:

In [None]:
class AudioProcessor:
    def __init__(self, sample_rate=24000):
        self.sample_rate = sample_rate
        self.vad_threshold = 0.015  # Detects normal speech
        self.speech_frames = 0
        self.silence_frames = 0
        self.min_speech_duration = int(0.3 * sample_rate)
        self.max_silence_duration = int(0.8 * sample_rate)
        self.main_buffer = []
        self.speech_detected = False

    def process_audio(self, indata):
        """Basic VAD: Only buffers speech detected above the threshold"""
        audio_level = np.abs(indata).mean() / 32768.0  # Normalize audio levels

        if audio_level > self.vad_threshold:
            self.speech_detected = True
            self.speech_frames += len(indata)
            self.silence_frames = 0
            self.main_buffer.extend(indata.tobytes())
        elif self.speech_detected:
            self.silence_frames += len(indata)
            if self.silence_frames < self.max_silence_duration:
                self.main_buffer.extend(indata.tobytes())


Why is this an Improvement?

✅ No more fixed 2-second waits (Step 1 was just waiting and then processing). \
✅ Real-time detection of speech (only records meaningful user input). \
✅ Ignores background noise, reducing unnecessary API calls. 

## Step 2 → Step 3: Adding Barge-In Detection (Interruptions)

Key Improvement: Detects if a user speaks while AI is responding, cancels AI output, and sends the interruption immediately.
New Code Snippet in Step 3:


In [None]:
class AudioProcessor:
    def __init__(self, sample_rate=24000):
        self.interrupt_threshold = 0.02  # Higher sensitivity for interruptions
        self.interrupt_buffer = []
        self.is_speaking = False
        self.is_interrupting = False

    def process_audio(self, indata):
        """Detects both normal speech and interruptions"""
        audio_level = np.abs(indata).mean() / 32768.0

        # If AI is speaking and user interrupts, switch to interruption buffer
        if self.is_speaking and audio_level > self.interrupt_threshold:
            self.is_interrupting = True
            self.interrupt_buffer.extend(indata.tobytes())
            return

        # If AI is not speaking, process normally
        if not self.is_speaking:
            if audio_level > self.vad_threshold:
                self.speech_detected = True
                self.main_buffer.extend(indata.tobytes())
            elif self.speech_detected:
                self.silence_frames += len(indata)
                if self.silence_frames < self.max_silence_duration:
                    self.main_buffer.extend(indata.tobytes())

    def check_interruption(self):
        """Returns True if the user interrupted"""
        return self.is_interrupting

    def get_interrupt_audio(self):
        """Retrieves buffered interruption speech"""
        if not self.interrupt_buffer:
            return None
        audio_data = bytes(self.interrupt_buffer)
        self.interrupt_buffer.clear()
        self.is_interrupting = False
        return audio_data


Why is this an Improvement? \
✅ Detects user interruptions dynamically. \
✅ Buffers user input while the AI is talking. \
✅ **Doesn’t wait for AI to finish—it immediately stops the AI and sends new input.

## Step 3 → Step 4: Cancelling AI Response in Real-Time

Key Improvement: Now, if the user interrupts mid-response, the system sends a cancellation request (response.cancel) to Azure OpenAI.



In [None]:
async def handle_response(self, websocket):
    """Handles AI responses and supports real-time interruption."""
    self.audio_processor.is_speaking = True
    try:
        while True:
            # Check if user interrupted mid-response
            if self.audio_processor.check_interruption():
                interrupt_audio = self.audio_processor.get_interrupt_audio()
                if interrupt_audio:
                    print("Interrupted!")
                    
                    # Cancel the AI’s current response
                    await websocket.send(json.dumps({"type": "response.cancel"}))
                    
                    # Immediately process the new user input
                    await self.send_audio(websocket, interrupt_audio)
                    break
            
            response = json.loads(await websocket.recv())

            if response["type"] == "response.audio.delta":
                if "delta" in response:
                    try:
                        audio_data = response["delta"].strip()
                        padding = -len(audio_data) % 4
                        if padding:
                            audio_data += "=" * padding
                        
                        audio = np.frombuffer(base64.b64decode(audio_data), dtype=np.int16)
                        self.streams['output'].write(audio)

                    except Exception as e:
                        print(f"Audio processing error: {e}")

            elif response["type"] == "response.done":
                break
    finally:
        self.audio_processor.is_speaking = False


Why is this an Improvement? \
✅ No more waiting for AI to finish → Interruptions stop AI instantly. \
✅ Users don’t have to listen to an entire AI response before speaking. \
✅ Feels more like Alexa, Google Assistant, or Siri’s barge-in support. 

Difference between 2->3 and 3->4 is that step 3 adds real-time interruption detection while the AI is speaking, but it doesn’t yet cancel AI responses.
These are included here for the sake of discussion as to the design choices that needs to be made while creating a Voice UI.
Our intention here is not closing into perfection but to show an MVP that can be iterated upon...


| Feature                 | **Step 2** (VAD)       | **Step 3** (Barge-In Detection) | **Step 4** (AI Cancellation)  |
|-------------------------|----------------------|--------------------------------|--------------------------------|
| **Speech Detection**    | ✅ Uses VAD         | ✅ Uses VAD                   | ✅ Uses VAD                    |
| **Interrupt Handling**  | ❌ No detection     | ✅ Detects interruptions but **AI keeps talking** | ✅ Detects interruptions and **AI stops talking** immediately |
| **AI Response Handling** | ❌ Always waits for AI to finish | ❌ Always waits for AI to finish | ✅ **Cancels AI immediately** when interrupted |
| **Natural Conversation** | ❌ Feels robotic (fixed waits) | ❌ User can talk over AI, but AI doesn’t stop | ✅ **User can talk over AI, and AI stops instantly** |
