Skip to content
56 changes: 55 additions & 1 deletion livekit-agents/livekit/agents/llm/realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,32 @@ class RealtimeCapabilities:
per_response_tool_choice: bool = False
"""Whether the tool and tool choice can be specified per response"""
supports_say: bool = False
"""Whether the model supports session.say()"""
"""Whether the model supports session.say(). Note: RealtimeModel
implementations typically only support say() when
add_to_chat_ctx=True; for ephemeral say() see ephemeral_say."""
ephemeral_say: bool = False
"""Whether the plugin's say() honors add_to_chat_ctx=False
(renders text to the user without adding it to the agent's
reasoning context).

Plugins must satisfy this only if the underlying substrate
provides an out-of-band response primitive that does not enter
conversation state.

Substrate landscape (verified empirically against the OpenAI Realtime API):
OpenAI Realtime: True — response.create(conversation: "none")
Phonic: False — substrate strictly turn-based;
no isolation primitive
Gemini Live: False — Bidi append-only; no isolation
primitive (47 SDK fields verified)
Ultravox: False — no isolation primitive observed
AWS Nova Sonic: False — no isolation primitive observed

A plugin that sets supports_say=True MAY set
ephemeral_say=False (basic say() supported but no isolation
primitive). A plugin that sets ephemeral_say=True MUST also
set supports_say=True.
"""


class RealtimeError(Exception):
Expand Down Expand Up @@ -280,7 +305,36 @@ def start_user_activity(self) -> None:
def say(
self,
text: str | AsyncIterable[str],
*,
add_to_chat_ctx: bool = True,
) -> asyncio.Future[GenerationCreatedEvent]:
"""Render text to the user channel via the realtime substrate.

Args:
text: The text to render. Either a complete string or
an async iterable of string chunks.
add_to_chat_ctx: If True (default), the rendered text
enters the agent's reasoning context per the
substrate's session model. If False, plugins
declaring ephemeral_say=True render the text
WITHOUT it entering the context. Plugins that
cannot honor False MUST declare
RealtimeCapabilities.ephemeral_say=False so that
AgentActivity.say()'s dispatcher emits a
DeprecationWarning and preserves backward-compat
silent-degrade behavior.

Returns:
A future that resolves to GenerationCreatedEvent when
the underlying substrate confirms the response was
created.

Raises:
NotImplementedError: If the plugin does not implement
say().
RealtimeError: On substrate-specific failures (network,
API errors, provider rejection, client-side timeout).
"""
raise NotImplementedError(
f"{type(self).__name__} does not implement say(). use a TTS model instead"
)
30 changes: 24 additions & 6 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import heapq
import json
import time
import warnings
from collections.abc import AsyncIterable, Coroutine, Sequence
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -1074,16 +1075,28 @@ def say(
and isinstance(self.llm, llm.RealtimeModel)
and self.llm.capabilities.supports_say
):
if not add_to_chat_ctx:
logger.warning(
"add_to_chat_ctx=False is not supported when say() uses a RealtimeModel; "
"the message will still be added to the chat context"
if not add_to_chat_ctx and not self.llm.capabilities.ephemeral_say:
# Per ADR 0005-style deprecation cycle: emit DeprecationWarning
# in this release; a future release will replace this with
# NotImplementedError. Silent-degrade preserved for the
# deprecation window.
warnings.warn(
f"{type(self.llm).__name__} supports say() but does not "
"support add_to_chat_ctx=False (ephemeral say). The text "
"will be added to chat context anyway in this release. "
"In a future release this will raise NotImplementedError "
"instead of warning. Either remove add_to_chat_ctx=False "
"or use a plugin that declares ephemeral_say=True "
"(currently: OpenAI plugin).",
DeprecationWarning,
stacklevel=2,
)
self._create_speech_task(
self._realtime_reply_task(
speech_handle=handle,
text=text,
model_settings=ModelSettings(),
add_to_chat_ctx=add_to_chat_ctx,
Comment on lines 1093 to +1099
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Deprecation path still passes add_to_chat_ctx=False downstream, breaking "silent-degrade" promise

When ephemeral_say=False and add_to_chat_ctx=False, the deprecation warning at line 1083 tells the user "The text will be added to chat context anyway in this release" (silent-degrade). However, the code never overrides add_to_chat_ctx back to True before passing it to _realtime_reply_task at line 1099. This value flows all the way to the gate at agent_activity.py:3270 (if msg_gen and forwarded_text and add_to_chat_ctx:), which skips the local _upsert_item call.

For plugins like Phonic (which ignore the parameter server-side), this creates a mismatch: the text IS in the server-side conversation state but is NOT in the local agent._chat_ctx. The stated intent of the deprecation path — backward-compatible silent degrade — is violated.

Suggested change
)
self._create_speech_task(
self._realtime_reply_task(
speech_handle=handle,
text=text,
model_settings=ModelSettings(),
add_to_chat_ctx=add_to_chat_ctx,
)
add_to_chat_ctx = True
self._create_speech_task(
self._realtime_reply_task(
speech_handle=handle,
text=text,
model_settings=ModelSettings(),
add_to_chat_ctx=add_to_chat_ctx,
),
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

),
speech_handle=handle,
name="AgentActivity.realtime_say",
Expand Down Expand Up @@ -2812,6 +2825,7 @@ async def _realtime_reply_task(
instructions: str | None = None,
tool_reply: bool = False,
text: str | AsyncIterable[str] | None = None,
add_to_chat_ctx: bool = True,
) -> None:
assert self._rt_session is not None, "rt_session is not available"
# realtime_reply_task is called only when there's text input, native audio input is handled by _realtime_generation_task
Expand All @@ -2828,7 +2842,7 @@ async def _realtime_reply_task(

if text is not None:
try:
generation_ev = await self._rt_session.say(text)
generation_ev = await self._rt_session.say(text, add_to_chat_ctx=add_to_chat_ctx)
except llm.RealtimeError as e:
logger.error("failed to say text: %s", str(e))
return
Expand All @@ -2837,6 +2851,7 @@ async def _realtime_reply_task(
speech_handle=speech_handle,
generation_ev=generation_ev,
model_settings=model_settings,
add_to_chat_ctx=add_to_chat_ctx,
)
return

Expand Down Expand Up @@ -2916,6 +2931,7 @@ async def _realtime_generation_task(
generation_ev: llm.GenerationCreatedEvent,
model_settings: ModelSettings,
instructions: str | None = None,
add_to_chat_ctx: bool = True,
) -> None:
with tracer.start_as_current_span(
"agent_turn", context=self._session._root_span_context
Expand All @@ -2930,6 +2946,7 @@ async def _realtime_generation_task(
generation_ev=generation_ev,
model_settings=model_settings,
instructions=instructions,
add_to_chat_ctx=add_to_chat_ctx,
)

async def _realtime_generation_task_impl(
Expand All @@ -2939,6 +2956,7 @@ async def _realtime_generation_task_impl(
generation_ev: llm.GenerationCreatedEvent,
model_settings: ModelSettings,
instructions: str | None = None,
add_to_chat_ctx: bool = True,
) -> None:
current_span = trace.get_current_span(context=speech_handle._agent_turn_context)
current_span.set_attribute(trace_types.ATTR_SPEECH_ID, speech_handle.id)
Expand Down Expand Up @@ -3249,7 +3267,7 @@ def _create_assistant_message(
"`use_tts_aligned_transcript` is enabled but no agent transcript was returned from tts"
)

if msg_gen and forwarded_text:
if msg_gen and forwarded_text and add_to_chat_ctx:
msg = _create_assistant_message(
message_id=msg_gen.message_id,
forwarded_text=forwarded_text,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import time
import weakref
from collections.abc import Iterator
from collections.abc import AsyncIterable, Iterator
from dataclasses import dataclass, replace
from typing import Any, Literal, overload
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
Expand Down Expand Up @@ -423,6 +423,8 @@ def __init__(
mutable_instructions=True,
mutable_tools=True,
per_response_tool_choice=True,
supports_say=True,
ephemeral_say=True,
)
)

Expand Down Expand Up @@ -1535,6 +1537,99 @@ def _on_timeout() -> None:
fut.add_done_callback(lambda _: handle.cancel())
return fut

def say(
self,
text: str | AsyncIterable[str],
*,
add_to_chat_ctx: bool = True,
) -> asyncio.Future[llm.GenerationCreatedEvent]:
"""Render text to user via OpenAI Realtime API.

Uses response.create(conversation: "none") for
add_to_chat_ctx=False with text wrapped in an assistant
message in the input field. The OpenAI Realtime API's
documented behavior is that conversation: "none" responses
do not enter the server-side conversation state.

On client-side 10s timeout, the future is rejected and
popped from _response_created_futures (existing behavior).
The orphan filter in _handle_response_created detects
late-arriving response.created by checking
missing-from-_response_created_futures.
"""
if isinstance(text, str):
return self._say_with_text(text, add_to_chat_ctx=add_to_chat_ctx)

async def _materialize_and_say() -> llm.GenerationCreatedEvent:
full_text = ""
async for chunk in text:
full_text += chunk
return await self._say_with_text(full_text, add_to_chat_ctx=add_to_chat_ctx)

# Wrap async materialization in a Future to match the abstract base contract
outer_fut: asyncio.Future[llm.GenerationCreatedEvent] = asyncio.Future()

async def _runner() -> None:
try:
inner = await _materialize_and_say()
if not outer_fut.done():
outer_fut.set_result(inner)
except BaseException as e:
if not outer_fut.done():
outer_fut.set_exception(e)

asyncio.ensure_future(_runner())
return outer_fut

def _say_with_text(
self, full_text: str, *, add_to_chat_ctx: bool
) -> asyncio.Future[llm.GenerationCreatedEvent]:
event_id = utils.shortuuid("response_create_say_")
fut: asyncio.Future[llm.GenerationCreatedEvent] = asyncio.Future()
self._response_created_futures[event_id] = fut

params = RealtimeResponseCreateParams(
input=[
realtime.RealtimeConversationItemAssistantMessage(
type="message",
role="assistant",
content=[
realtime.realtime_conversation_item_assistant_message.Content(
type="output_text",
text=full_text,
)
],
)
],
metadata={"client_event_id": event_id},
)
if not add_to_chat_ctx:
params.conversation = "none"

self.send_event(
ResponseCreateEvent(type="response.create", event_id=event_id, response=params)
)
Comment on lines +1591 to +1611
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you tested that if openai realtime will "say" this message instead of using it as context?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tested and it seems it's not supported by openai realtime session.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, my sincerest apologies. I wrote an made test validity error when experimenting via OpenAI's SDK. I only confirmed audio was returned, not that the audio was what I expected. Closing this PR.


def _on_timeout() -> None:
self._response_created_futures.pop(event_id, None)
if fut and not fut.done():
fut.set_exception(llm.RealtimeError("say() timed out."))
try:
# response_id intentionally omitted: response.created has not
# arrived yet at timeout, so the server-assigned id is unknown.
# Best-effort cancel. The orphan filter
# (_handle_response_created) sends a second cancel WITH
# response_id when the late response.created arrives, giving
# full coverage. The OpenAI substrate treats duplicate cancels
# as no-ops.
self.send_event(ResponseCancelEvent(type="response.cancel"))
except Exception:
logger.debug("response.cancel failed on say() timeout")

handle = asyncio.get_event_loop().call_later(10.0, _on_timeout)
fut.add_done_callback(lambda _: handle.cancel())
return fut

@property
def has_active_generation(self) -> bool:
return self._current_generation is not None or len(self._response_created_futures) > 0
Expand Down Expand Up @@ -1645,6 +1740,39 @@ def _handle_input_audio_buffer_speech_stopped(
def _handle_response_created(self, event: ResponseCreatedEvent) -> None:
assert event.response.id is not None, "response.id is None"

# Orphan filter: discard a late-arriving response.created when
# the corresponding caller-future has already been popped
# (e.g., by _on_timeout). Reads missing-from-dict pattern using
# the existing _response_created_futures — adds no new state.
# The metadata-presence guard ensures only client-issued
# responses can be filtered; server-VAD-initiated responses
# (no client_event_id in metadata) flow through normally.
client_event_id_for_orphan_check = (
event.response.metadata.get("client_event_id")
if isinstance(event.response.metadata, dict)
else None
)
if (
client_event_id_for_orphan_check is not None
and client_event_id_for_orphan_check not in self._response_created_futures
):
logger.debug(
"discarding orphaned response %s (client_event_id=%s); "
"future was popped before resolution (likely timeout)",
event.response.id,
client_event_id_for_orphan_check,
)
# Defensive cancel WITH response_id to clean up substrate state.
# Distinct from _on_timeout's cancel (which omits response_id
# because the server-assigned id is not yet known at timeout).
self.send_event(
ResponseCancelEvent(
type="response.cancel",
response_id=event.response.id,
)
)
return

self._current_generation = _ResponseGeneration(
message_ch=utils.aio.Chan(),
function_ch=utils.aio.Chan(),
Expand Down Expand Up @@ -1674,7 +1802,11 @@ def _handle_response_created(self, event: ResponseCreatedEvent) -> None:
self.emit("generation_created", generation_ev)

def _handle_response_output_item_added(self, event: ResponseOutputItemAddedEvent) -> None:
assert self._current_generation is not None, "current_generation is None"
if self._current_generation is None:
# Late event for an orphaned (timed-out) response;
# safe to drop. Mirrors the existing graceful handling
# in _handle_response_done.
return
Comment on lines +1805 to +1809
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Orphan filter leaves downstream event handlers unguarded, causing assertion failures on in-flight events

The orphan filter added to _handle_response_created (realtime_model.py:1755-1774) returns early and leaves _current_generation as None. The PR correctly added graceful None checks to _handle_response_output_item_added (line 1805) and _handle_response_content_part_added (line 1835), but the remaining handlers — _handle_response_text_delta (line 1917), _handle_response_text_done (line 1930), _handle_response_audio_transcript_delta (line 1933), _handle_response_audio_delta (line 1945), _handle_response_audio_done (line 1964), and _handle_response_output_item_done (line 1967) — still use assert self._current_generation is not None. Events already in the WebSocket pipeline when the cancel is sent will hit these asserts. While the outer dispatch loop (line 1103) catches Exception and logs it, this results in error log spam for every in-flight event of the orphaned response and is inconsistent with the graceful handling applied to the two other handlers.

Prompt for agents
The orphan filter in _handle_response_created (line 1755-1774) returns early when a late response.created arrives for a timed-out future, leaving _current_generation as None. Two handlers (_handle_response_output_item_added and _handle_response_content_part_added) were updated with graceful None checks, but several other handlers still assert _current_generation is not None: _handle_response_text_delta (line 1917), _handle_response_text_done (line 1930), _handle_response_audio_transcript_delta (line 1933), _handle_response_audio_delta (line 1945), _handle_response_audio_done (line 1964), _handle_response_output_item_done (line 1967). These should all be updated with the same pattern: if self._current_generation is None: return. This ensures consistent graceful handling when in-flight WebSocket events arrive after the orphan filter discards a response.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

assert (item_id := event.item.id) is not None, "item.id is None"
assert (item_type := event.item.type) is not None, "item.type is None"

Expand All @@ -1700,7 +1832,9 @@ def _handle_response_output_item_added(self, event: ResponseOutputItemAddedEvent
self._current_generation.messages[item_id] = item_generation

def _handle_response_content_part_added(self, event: ResponseContentPartAddedEvent) -> None:
assert self._current_generation is not None, "current_generation is None"
if self._current_generation is None:
# Late event for an orphaned (timed-out) response; safe to drop.
return
assert (item_id := event.item_id) is not None, "item_id is None"
assert (item_type := event.part.type) is not None, "part.type is None"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,23 @@ def push_video(self, frame: rtc.VideoFrame) -> None:
def say(
self,
text: str | AsyncIterable[str],
*,
add_to_chat_ctx: bool = True,
) -> asyncio.Future[llm.GenerationCreatedEvent]:
"""Render text via the Phonic Realtime substrate.

Note: Phonic's substrate is strictly turn-based and does
not have an out-of-band response primitive equivalent to
OpenAI's response.create(conversation: "none"). Phonic's
RealtimeCapabilities declares ephemeral_say=False so that
AgentActivity.say()'s capability check emits a
DeprecationWarning BEFORE reaching this method when
add_to_chat_ctx=False is requested. The parameter is
accepted here for signature compatibility with the
abstract base; if Phonic later gains an isolation
primitive, this method should be updated to honor
add_to_chat_ctx=False.
"""
if self._generate_reply_task and not self._generate_reply_task.done():
self._generate_reply_task.cancel()
self._generate_reply_task = asyncio.create_task(self._send_say(text), name="phonic-say")
Expand Down
Loading