Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,20 @@
# VertexAI models
"gemini-live-2.5-flash-native-audio", # GA https://docs.cloud.google.com/vertex-ai/generative-ai/docs/models/gemini/2-5-flash-live-api#live-2.5-flash
# Gemini API models
"gemini-3.1-flash-live-preview",
"gemini-3.1-flash-live-preview", # https://ai.google.dev/gemini-api/docs/models/gemini-3.1-flash-live-preview
"gemini-2.5-flash-native-audio-preview-12-2025", # https://ai.google.dev/gemini-api/docs/models#gemini-2.5-flash-live
"gemini-2.5-flash-native-audio-preview-09-2025", # https://ai.google.dev/gemini-api/docs/models#gemini-2.5-flash-live
]

# Models that reject send_client_content after the first model turn.
# These models require send_realtime_input for all mid-session text input.
# See: https://ai.google.dev/gemini-api/docs/models/gemini-3.1-flash-live-preview#migrating
RESTRICTED_CLIENT_CONTENT_MODELS: frozenset[str] = frozenset(
{
"gemini-3.1-flash-live-preview",
}
)

Voice = Literal[
"Achernar",
"Achird",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
NotGivenOr,
)
from livekit.agents.utils import audio as audio_utils, images, is_given
from livekit.plugins.google.realtime.api_proto import ClientEvents, LiveAPIModels, Voice
from livekit.plugins.google.realtime.api_proto import (
RESTRICTED_CLIENT_CONTENT_MODELS,
ClientEvents,
LiveAPIModels,
Voice,
)

from ..log import logger
from ..utils import create_tools_config, get_tool_results_for_realtime
Expand Down Expand Up @@ -150,6 +155,7 @@ class _RealtimeOptions:
tool_response_scheduling: NotGivenOr[types.FunctionResponseScheduling] = NOT_GIVEN
thinking_config: NotGivenOr[types.ThinkingConfig] = NOT_GIVEN
session_resumption: NotGivenOr[types.SessionResumptionConfig] = NOT_GIVEN
reconnect_on_update: bool = False
credentials: google.auth.credentials.Credentials | None = None


Expand Down Expand Up @@ -218,6 +224,7 @@ def __init__(
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
http_options: NotGivenOr[types.HttpOptions] = NOT_GIVEN,
thinking_config: NotGivenOr[types.ThinkingConfig] = NOT_GIVEN,
reconnect_on_update: bool = False,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a user specified option? when do you want this to be True unless the model requires it?

credentials: google.auth.credentials.Credentials | None = None,
) -> None:
"""
Expand Down Expand Up @@ -257,6 +264,10 @@ def __init__(
tool_response_scheduling (FunctionResponseScheduling, optional): The scheduling for tool response. Default scheduling is WHEN_IDLE.
session_resumption (SessionResumptionConfig, optional): The configuration for session resumption. Defaults to None.
thinking_config (ThinkingConfig, optional): Native audio thinking configuration.
reconnect_on_update (bool, optional): For models that restrict mid-session
send_client_content (e.g. gemini-3.1-flash-live-preview), reconnect the session
when update_instructions() or update_chat_ctx() is called. Without this,
updates are silently dropped. Defaults to False.
conn_options (APIConnectOptions, optional): The configuration for the API connection. Defaults to DEFAULT_API_CONNECT_OPTIONS.

Raises:
Expand Down Expand Up @@ -363,6 +374,7 @@ def __init__(
http_options=http_options,
thinking_config=thinking_config,
session_resumption=session_resumption,
reconnect_on_update=reconnect_on_update,
credentials=credentials,
)

Expand Down Expand Up @@ -701,12 +713,19 @@ def generate_reply(
self._in_user_activity = False

# Gemini requires the last message to end with user's turn
# so we need to add a placeholder user turn in order to trigger a new generation
turns = []
if is_given(instructions):
turns.append(types.Content(parts=[types.Part(text=instructions)], role="model"))
turns.append(types.Content(parts=[types.Part(text=".")], role="user"))
self._send_client_event(types.LiveClientContent(turns=turns, turn_complete=True))
# so we need to add a placeholder user turn in order to trigger a new generation.
# Restricted client_content models (e.g. gemini-3.1-flash-live-preview) reject send_client_content
# after the first model turn and require send_realtime_input for text input.
# See: https://ai.google.dev/gemini-api/docs/models/gemini-3.1-flash-live-preview#migrating
if self._opts.model in RESTRICTED_CLIENT_CONTENT_MODELS:
prompt = instructions if is_given(instructions) else "."
self._send_client_event(types.LiveClientRealtimeInput(text=prompt))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't going to work. realtime input is coming from the end user, but generate_reply instructions needs to be coming from the model itsel

else:
turns = []
if is_given(instructions):
turns.append(types.Content(parts=[types.Part(text=instructions)], role="model"))
turns.append(types.Content(parts=[types.Part(text=".")], role="user"))
self._send_client_event(types.LiveClientContent(turns=turns, turn_complete=True))

def _on_timeout() -> None:
if not fut.done():
Expand Down Expand Up @@ -811,6 +830,10 @@ async def _main_task(self) -> None:
f"update_instructions() to set system-level context instead."
)

# For restricted client_content models (e.g. gemini-3.1-flash-live-preview),
# history_config.initial_history_in_client_content=True is set in
# _build_connect_config, which allows send_client_content for initial
# history seeding before the first model turn.
turns_dict, _ = self._chat_ctx.copy(
exclude_function_call=True,
exclude_handoff=True,
Expand Down Expand Up @@ -898,10 +921,27 @@ async def _send_task(self, session: AsyncSession) -> None:
):
break
if isinstance(msg, types.LiveClientContent):
await session.send_client_content(
turns=msg.turns, # type: ignore
turn_complete=msg.turn_complete if msg.turn_complete is not None else True,
)
# Restricted client_content models reject send_client_content after the first model turn.
# With reconnect_on_update=True, trigger a reconnect so updates take
# effect via system_instruction/history in the new session setup.
# Otherwise, silently drop to prevent 1007 errors.
if self._opts.model in RESTRICTED_CLIENT_CONTENT_MODELS:
if self._opts.reconnect_on_update:
logger.debug(
"restricted model: reconnecting to apply LiveClientContent update"
)
self._mark_restart_needed()
else:
logger.debug(
"restricted model: dropping LiveClientContent (not supported mid-session)"
)
else:
await session.send_client_content(
turns=msg.turns, # type: ignore
turn_complete=msg.turn_complete
if msg.turn_complete is not None
else True,
)
elif isinstance(msg, types.LiveClientToolResponse) and msg.function_responses:
await session.send_tool_response(function_responses=msg.function_responses)
elif isinstance(msg, types.LiveClientRealtimeInput):
Expand Down Expand Up @@ -1069,6 +1109,11 @@ def _build_connect_config(self) -> types.LiveConnectConfig:
if is_given(self._opts.context_window_compression):
conf.context_window_compression = self._opts.context_window_compression

# Restricted client_content models require history_config to allow send_client_content
# for initial context history seeding on connect.
if self._opts.model in RESTRICTED_CLIENT_CONTENT_MODELS:
conf.history_config = types.HistoryConfig(initial_history_in_client_content=True)

return conf

def _start_new_generation(self) -> None:
Expand Down Expand Up @@ -1121,6 +1166,20 @@ def _start_new_generation(self) -> None:
self.emit("generation_created", generation_event)

def _handle_server_content(self, server_content: types.LiveServerContent) -> None:
# Restricted client_content models (e.g. gemini-3.1-flash-live-preview) may send an empty
# server_content with only usage_metadata before the first model_turn.
# Skip these to avoid dropping the first generation's audio.
has_content = (
server_content.model_turn
or server_content.output_transcription
or server_content.input_transcription
or server_content.turn_complete is not None
or server_content.generation_complete is not None
or server_content.interrupted is not None
)
if not has_content:
return

current_gen = self._current_generation
if not current_gen:
logger.warning("received server content but no active generation.")
Expand Down
Loading