# https://github.com/Azure-Samples/aoai-realtime-audio-sdk

In [1]:
#!pip install load-dotenv
#!pip install pyaudio
#!pip install gradio
# git clone https://github.com/Azure-Samples/aoai-realtime-audio-sdk.git
# copy aoai-realtime-audio-sdk/python/rtclient .


In [2]:

import gradio as gr
import os
import requests
import asyncio
import pyaudio
import numpy as np
from player import Player
from recorder import Recorder
from assistants import AssistantService
from dotenv import load_dotenv
import json

from rtclient import RealtimeException, RTClient, RTInputAudioItem, RTResponse
from rtclient.models import InputAudioTranscription, InputTextContentPart, NoTurnDetection, ServerVAD, UserMessageItem
from azure.core.credentials import AzureKeyCredential
#from rtclient import RTClient


import rtclient.low_level_client as llc


load_dotenv(override=True)


realtime_client = None
audio_recorder = None
audio_player = None
assistant_service = AssistantService()

recording_active = False

In [4]:
def create_rt_client():
    azure_openai_key = os.getenv("AZURE_OPENAI_API_KEY")
    azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    azure_openai_deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT")
    # Create an RT client
    realtime_client = RTClient(
        url=azure_openai_endpoint,
        key_credential=AzureKeyCredential(azure_openai_key),
        azure_deployment=azure_openai_deployment,
    )
    return realtime_client


In [23]:





async def process_audio_recording_buffer(base64_audio):
    if recording_active:
        await realtime_streaming.send({
            "type": "input_audio_buffer.append",
            "audio": base64_audio,
        })


async def reset_audio(start_recording, language, temperature, voice):
    global recording_active, audio_recorder, audio_player

    recording_active = False
    if audio_recorder:
        audio_recorder.stop()
    if audio_player:
        audio_player.clear()

    audio_recorder = Recorder(process_audio_recording_buffer)
    audio_player = Player()
    audio_player.init(24000)

    if start_recording:
        stream = await get_audio_stream()  # Replace with your implementation to fetch an audio stream
        await audio_recorder.start(stream)
        recording_active = True


def is_azure_openai():
    return False  # Set this to True if using Azure OpenAI


def get_audio_stream():
    # Mock function to simulate getting an audio stream
    return None





def stop_audio():
    global realtime_streaming
    recording_active = False
    audio_recorder.stop()
    audio_player.clear()
    realtime_streaming.close()
    return "Audio and streaming stopped."









In [27]:



async def handle_text_input(user_input):  
    global realtime_client
    await realtime_client.configure(modalities={"text"}, turn_detection=NoTurnDetection())
    await realtime_client.send_item(
        item=UserMessageItem(
            content=[InputTextContentPart(text=user_input)]
        )
    )
    response = await realtime_client.generate_response()

    item = await anext(response)
    assert item.type == "message"
    
    async for part in item:
        text = ""
        assert part.type == "text"
        async for chunk in part.text_chunks():
            assert chunk is not None
            text += chunk
        assert part.text == text



In [28]:
def main(language, temperature, voice):
    # Start the session with the assistant and handle audio interaction
    asyncio.run(start_realtime(endpoint, api_key, deployment, language, temperature, voice))
    return "Real-time session started."





async def start_realtime(endpoint, api_key, deployment, language, temperature, voice):
    try:
        global realtime_client
    
        # Simulating initialization of the real-time stream client
        realtime_client = create_rt_client()
        # Code to create rt client
    
        await realtime_client.connect()
        
        assistant_service.language = language
        assistant = assistant_service.create_generic_assistant_config_message()
        
        await realtime_client.configure(
                   turn_detection=ServerVAD(),
                   input_audio_transcription=InputAudioTranscription(model="whisper-1"),
                   tools=assistant[1],
                   voice=voice,
                   instructions=assistant[0],
                   temperature=temperature
                )
        
        return "Session started successfully!"
    except Exception as error:
        return f"Error: Unable to start session. Details: {error}"


async def handle_realtime_messages():
    global realtime_streaming

    async for message in realtime_streaming.messages():
        message = json.loads(message)  # Assuming message is received in JSON format

        # Handle different message types, similar to the original JS code
        if message['type'] == 'session.created':
            print(f"Session Created: {json.dumps(message, indent=2)}")
            # Handle session state or UI changes here

        elif message['type'] == 'conversation.item.created':
            # Handle user messages being created
            if message['item']['type'] == "message" and message['item']['role'] == "user":
                append_message_id(message['item']['id'])  # Custom function to append the message ID

        elif message['type'] == 'response.content_part.added':
            # Append the assistant's response to the UI
            append_to_text_block("Assistant: ")

        elif message['type'] == 'response.audio_transcript.delta':
            # Append transcription data
            append_to_text_block(message['delta'])

        elif message['type'] == 'response.audio.delta':
            # Handle binary audio data and play with Player
            binary_data = base64.b64decode(message['delta'])
            pcm_data = np.frombuffer(binary_data, dtype=np.int16)
            audio_player.play(pcm_data)

        elif message['type'] == 'input_audio_buffer.speech_started':
            # Handle audio recording started
            append_to_text_block("")  # Start a new line for the next audio input
            audio_player.clear()

        elif message['type'] == 'conversation.item.input_audio_transcription.completed':
            # Append the completed transcription
            append_to_text_block(f"User (Speech): {message['transcript']} >> {message['item_id']}")

        elif message['type'] == 'response.done':
            # Handle when the assistant's response is fully processed
            for output in message['response']['output']:
                if output['type'] == 'function_call':
                    response = await assistant_service.get_tool_response(output['name'], output['arguments'], output['call_id'])
                    if response['type'] == 'session.update':
                        response['session']['voice'] = voice
                        response['session']['temperature'] = temperature
                    await realtime_streaming.send(json.dumps(response))
                    await realtime_streaming.send(json.dumps({"type": "response.create"}))
                elif output['type'] == 'message':
                    append_message_id(output['id'])

        elif message['type'] == 'error':
            # Handle any errors received from the real-time streaming
            print(f"Error: {json.dumps(message, indent=2)}")

    # Reset audio once the session ends
    await reset_audio(False)

In [29]:
# Gradio UI for the app
gradio_interface = gr.Blocks()

with gradio_interface:
    gr.Markdown("# Real-Time Assistant Interaction")

    with gr.Row():
        language_input = gr.Textbox(label="Language", value="English")
        temperature_input = gr.Slider(0.0, 1.0, step=0.1, label="Temperature", value=0.7)
        voice_input = gr.Dropdown(choices=["alloy", "echo", "shimmer"], label="Voice", value="alloy")

    start_button = gr.Button("Start Real-Time Session")
    stop_button = gr.Button("Stop Audio and Streaming")

    user_input = gr.Textbox(label="Your Input")
    send_button = gr.Button("Send Message")

    output = gr.Textbox(label="Output")

    start_button.click(fn=main, inputs=[language_input, temperature_input, voice_input], outputs=output)
    stop_button.click(fn=stop_audio, outputs=output)
    send_button.click(fn=handle_text_input, inputs=user_input, outputs=output)

gradio_interface.launch()

* Running on local URL:  http://127.0.0.1:7866

To create a public link, set `share=True` in `launch()`.




event_id=None type='session.update' session=SessionUpdateParams(model=None, modalities={'text'}, voice=None, instructions=None, input_audio_format=None, output_audio_format=None, input_audio_transcription=None, turn_detection=NoTurnDetection(type='none'), tools=None, tool_choice=None, temperature=None, max_response_output_tokens=None)
{"type":"session.update","session":{"modalities":["text"],"turn_detection":{"type":"none"}}}


Traceback (most recent call last):
  File "C:\Users\shagrawal\Miniconda3\envs\rtep\Lib\site-packages\gradio\queueing.py", line 622, in process_events
    response = await route_utils.call_process_api(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\shagrawal\Miniconda3\envs\rtep\Lib\site-packages\gradio\route_utils.py", line 323, in call_process_api
    output = await app.get_blocks().process_api(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\shagrawal\Miniconda3\envs\rtep\Lib\site-packages\gradio\blocks.py", line 2014, in process_api
    result = await self.call_function(
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\shagrawal\Miniconda3\envs\rtep\Lib\site-packages\gradio\blocks.py", line 1565, in call_function
    prediction = await fn(*processed_input)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\shagrawal\Miniconda3\envs\rtep\Lib\site-packages\gradio\utils.py", line 813, in async_wrapper
    response = await f(*args,

In [20]:
realtime_client

<rtclient.RTClient at 0x1c53b156950>

In [None]:

    
# Simulating initialization of the real-time stream client
realtime_client = create_rt_client()
# Code to create rt client

await realtime_client.connect()

assistant_service.language = "English"
assistant = assistant_service.create_generic_assistant_config_message()
print(assistant)
await realtime_client.configure(
           turn_detection=ServerVAD(),
           input_audio_transcription=InputAudioTranscription(model="whisper-1"),
           tools=assistant["session"]["tools"],
           voice="alloy",
           instructions=assistant["session"]["instructions"],
           temperature=0.7
        )


await realtime_client.configure(modalities={"text"}, turn_detection=NoTurnDetection())

user_input = input("Enter something: ")
while user_input != "stop":
    await realtime_client.send_item(
            item=UserMessageItem(
                content=[InputTextContentPart(text=user_input)]
            )
        )
    response = await realtime_client.generate_response()
    try:
        item = await anext(response)
        assert item.type == "message"
        
        async for part in item:
            text = ""
            assert part.type == "text"
            async for chunk in part.text_chunks():
                assert chunk is not None
                text += chunk
            assert part.text == text
    except Exception as e:
        print(e)
    
    print(text)
    user_input = input("Enter something: ")
    

{'type': 'session.update', 'session': {'turn_detection': {'type': 'server_vad'}, 'instructions': '\n        ##Role\n        You are an expert, well-training agent for support center.\n        You are a native speaker of English without any accents.\n        Use function calling to switch to specialized assistant.\n        ', 'tools': [{'type': 'function', 'name': 'get_weather', 'parameters': {'type': 'object', 'properties': {'location': {'type': 'string', 'description': 'location for the weather'}}}, 'description': 'get the weather of the location'}, {'type': 'function', 'name': 'Assistant_MobileAssistant', 'parameters': {'type': 'object', 'properties': {}}, 'description': 'Help user to answer mobile related questions, such as billing, contract, etc.'}, {'type': 'function', 'name': 'Assistant_ShopAssistant', 'parameters': {'type': 'object', 'properties': {}}, 'description': 'Help user to answer shop-related questions, such as shop location, available time, etc.'}, {'type': 'function', 

Enter something:  hi


event_id=None type='conversation.item.create' previous_item_id=None item=UserMessageItem(type='message', role='user', id='item-GxasQWWu8dYhbHJOcOm0fbeGmAN', content=[InputTextContentPart(type='input_text', text='hi')], status=None)
{"type":"conversation.item.create","previous_item_id":null,"item":{"type":"message","role":"user","id":"item-GxasQWWu8dYhbHJOcOm0fbeGmAN","content":[{"type":"input_text","text":"hi"}]}}
event_id=None type='response.create' response=None
{"type":"response.create"}
Hello! How can I help you today?


Enter something:  how are you


event_id=None type='conversation.item.create' previous_item_id=None item=UserMessageItem(type='message', role='user', id='item-qQvNBKFh0EUn-hN9BXPaoYo6xTA', content=[InputTextContentPart(type='input_text', text='how are you')], status=None)
{"type":"conversation.item.create","previous_item_id":null,"item":{"type":"message","role":"user","id":"item-qQvNBKFh0EUn-hN9BXPaoYo6xTA","content":[{"type":"input_text","text":"how are you"}]}}
event_id=None type='response.create' response=None
{"type":"response.create"}

Hello! How can I help you today?


Enter something:  How is weather in gurgaon


event_id=None type='conversation.item.create' previous_item_id=None item=UserMessageItem(type='message', role='user', id='item-5UWUWBfHdRL_mqy6HcGXKvfBavw', content=[InputTextContentPart(type='input_text', text='How is weather in gurgaon')], status=None)
{"type":"conversation.item.create","previous_item_id":null,"item":{"type":"message","role":"user","id":"item-5UWUWBfHdRL_mqy6HcGXKvfBavw","content":[{"type":"input_text","text":"How is weather in gurgaon"}]}}
event_id=None type='response.create' response=None
{"type":"response.create"}
I'm doing well, thank you! How about you?


Enter something:  what is weather in gurgaon


event_id=None type='conversation.item.create' previous_item_id=None item=UserMessageItem(type='message', role='user', id='item-zM7MeQ2gEmqXCceBuh7bn1TerGO', content=[InputTextContentPart(type='input_text', text='what is weather in gurgaon')], status=None)
{"type":"conversation.item.create","previous_item_id":null,"item":{"type":"message","role":"user","id":"item-zM7MeQ2gEmqXCceBuh7bn1TerGO","content":[{"type":"input_text","text":"what is weather in gurgaon"}]}}
event_id=None type='response.create' response=None
{"type":"response.create"}

I'm doing well, thank you! How about you?
