Skip to content

Feature request: AgentSession.update_stt / update_llm / update_tts / update_vad #5735

@MSameerAbbas

Description

@MSameerAbbas

The gap

AgentSession exposes its core resources as read-only @property getters over private fields:

session.stt -> session._stt
session.llm -> session._llm
session.tts -> session._tts
session.vad -> session._vad

There is no public setter and no update_* resource method. To swap any of these mid-session — most commonly to change a parameter the plugin's own update_options(...) doesn't expose (e.g. enabling translation on Soniox, switching VAD model, swapping a RealtimeModel) — the only reachable path is:

session._stt = new_stt                              # private mutation
session.update_agent(session.current_agent)         # full activity rebuild

This shape is wrong on three counts:

  1. It mutates a leading-underscore private attribute. The framework offers no contract about that field.
  2. It piggybacks on update_agent for invalidation. update_agent is heavyweight: tears down endpointing, MCP toolset wiring, on_exit/on_enter hooks, in-flight tasks, the entire activity boot. To swap one parameter on one resource, that's many orders of magnitude more work than required, and many opportunities to break unrelated state.
  3. For STT specifically, it is silently incorrect — the reuse-handoff check inside AgentActivity._detach_reusable_resources (renamed from _detach_stt_pipeline_if_reusable in 1.5.9, but the comparison is unchanged) resolves both sides of its identity check through the same _session._stt field, so the comparison is X is X after the mutation. This is the bug at BUG_REPORT_stt_pipeline_reuse. In livekit-agents==1.5.9:
# livekit/agents/voice/agent_activity.py, line 603
async def _detach_reusable_resources(self, new_activity: AgentActivity) -> _ReusableResources:
    """Detach reusable resources for handoff to *new_activity*."""
    resources = _ReusableResources()

    try:
        # stt pipeline
        if (
            self._audio_recognition
            and self.stt is not None
            and type(self.agent).stt_node is type(new_activity.agent).stt_node
            and self.stt is new_activity.stt        # <-- same property indirection
        ):

The same method now also handles RealtimeSession reuse (lines 618-664), keyed on self.llm is new_activity.llm with the same property-over-mutable-field shape. Capability checks layered on top (chat_ctx equivalence, instructions equivalence, tools equivalence) save most current misuses, but the identity comparison itself is structurally identical to the STT case and would manifest the same way under _session._llm = new mutation. update_llm covers it for free.

Proposed API

Four parallel methods on AgentSession. All sync, returning asyncio.Task[None], mirroring update_agent's spawn-a-task semantics so they remain callable from sync contexts (tool implementations, event handlers) while letting interested callers await for completion.

class AgentSession:
    def update_stt(self, stt: stt.STT | None) -> asyncio.Task[None]: ...
    def update_llm(self, llm: llm.LLM | llm.RealtimeModel | None) -> asyncio.Task[None]: ...
    def update_tts(self, tts: tts.TTS | None) -> asyncio.Task[None]: ...
    def update_vad(self, vad: vad.VAD | None) -> asyncio.Task[None]: ...

Caller idioms:

session.update_stt(new)              # fire and forget
await session.update_stt(new)        # know when the swap is fully applied
task = session.update_stt(new); ...  # keep handle for later cancel/await

This is a strict superset of update_agent's current contract: that method spawns a task internally and stashes it in a private slot (_update_activity_atask), forcing callers to use wait_for_inactive() to find out when the transition is done. Returning the task here removes the need for a parallel wait_for_* companion per resource.

Cancel-and-supersede. Each resource gets its own task slot (_update_stt_atask, etc.). Calling update_stt(b) while update_stt(a) is in flight cancels a's task; last call wins. This matches _update_activity_task's existing handling (it accepts old_task and supersedes it).

Failure handling. Spawned tasks are wrapped in @utils.log_exceptions(logger=logger) so unawaited swaps that raise are logged, never silently dropped — same pattern _update_activity_task uses.

Calling before start(). Allowed; just updates the field with no activity touch (returns a task that completes immediately).

Calling after the session is closing/closed. The returned task raises RuntimeError("session is closing/closed") on await; the sync call itself does not raise.

None accepted. Symmetric with construction. update_tts(None) removes a previously set TTS.

Per-resource semantics

All four share a fixed sequence (see "Implementation sketch" below) and differ only in the invalidation hook.

update_stt

  • Effective at: the next utterance.
  • Invalidation: delegate to the existing AudioRecognition.update_stt at audio_recognition.py:559, which spawns a new consumer task that takes over from the prior pipeline cleanly. Passes captured_stt=new for the defense-in-depth snapshot.
  • Override interaction: if agent.stt is set, activity.stt resolves to the agent override. The session-level swap still updates session._stt (the fallback) but emits a WARNING log: "current agent overrides STT; new value will only apply when an agent without an STT override is active."

update_tts

  • Effective at: the next say() / generate_reply() synthesis. In-flight synthesis continues against the old TTS.
  • Invalidation: none required at the pipeline level — tts_node re-resolves activity.tts on each call. Swapping the field is sufficient.
  • Override interaction: same WARNING rule.

update_llm

Two branches based on the type of the new and old instance:

  • Classical LLM ↔ classical LLM: identical to TTS — per-generation resolution, no in-flight invalidation.
  • RealtimeModel involved on either side: invalidation is mandatory. Delegate to the to-be-extracted AgentActivity._reconcile_realtime_session(old_llm, new_llm) (see Helpers below) which owns the full rt_session lifecycle: deregister the seven handlers (generation_created, input_speech_started, input_speech_stopped, input_audio_transcription_completed, metrics_collected, remote_item_added, error), await _rt_session.aclose(), and — if the new LLM is a RealtimeModel — open a fresh _rt_session, push instructions/chat_ctx/tools, and re-register the handlers.
  • Cross-resource invariants the existing _start_session enforces are re-validated after the swap (RealtimeModel + audio output without TTS, turn_detection="realtime_llm" requiring a RealtimeModel, etc.). Violations log warnings — same severity as the original boot.
  • Effective at: classical → next generation. Realtime → immediate (cancels any in-flight realtime generation).

update_vad

  • Effective at: the next AudioRecognition cycle that consumes VAD output.
  • Invalidation: delegate to the existing AudioRecognition.update_vad(new_vad) at audio_recognition.py:590, which already swaps self._vad, re-wires the _vad_ch/_vad_atask pair if running, and updates _interruption_enabled accordingly.
  • Override interaction: same WARNING rule.

Shared rule across all four: handler migration

Each _invalidate_* hook below performs two type-local steps for the resource it owns:

  1. Detach metrics_collected (and error, where the type emits it — STT/LLM/TTS do; VAD only emits metrics_collected) from the old instance, mirroring what _close_session does today.
  2. Attach those same handlers to the new instance, mirroring what _start_session does today.

Failing to do (1) leaks handlers indefinitely. Failing to do (2) silently drops metrics/errors from the new instance. The shared sequencer is pure orchestration (lock, idempotence, override warning, field set, dispatch to hook). The field is set before the hook runs, so each hook sees self._<resource> is new while still receiving the old reference as a parameter for the detach step.

Implementation sketch

Each update_* is a Template Method around a fixed sequence with one varying step (the invalidation hook). Four near-identical bodies collapse to one base + four hooks. In 1.5.9 the framework already ships the in-place hot-swap primitives the invalidation hooks need: AudioRecognition.update_stt(stt_node, pipeline=...) and AudioRecognition.update_vad(vad) at audio_recognition.py:559-606. update_stt already spawns a new consumer task that takes over from the old pipeline cleanly (clear_user_turn() uses this pattern internally for buffer reset at line 670-671: self.update_stt(None); self.update_stt(stt)). The session-level methods become thin delegates over those primitives.

# inside AgentSession

def update_stt(self, stt: stt.STT | None) -> asyncio.Task[None]:
    return self._spawn_swap("_stt", stt, self._invalidate_stt)

def update_tts(self, tts: tts.TTS | None) -> asyncio.Task[None]:
    return self._spawn_swap("_tts", tts, self._invalidate_tts)

def update_llm(self, llm: llm.LLM | llm.RealtimeModel | None) -> asyncio.Task[None]:
    return self._spawn_swap("_llm", llm, self._invalidate_llm)

def update_vad(self, vad: vad.VAD | None) -> asyncio.Task[None]:
    return self._spawn_swap("_vad", vad, self._invalidate_vad)


def _spawn_swap(
    self,
    field_name: str,
    new_instance: Any | None,
    invalidate: Callable[[Any | None, Any | None], Awaitable[None]],
) -> asyncio.Task[None]:
    slot = f"_update{field_name}_atask"      # e.g. "_update_stt_atask"
    old_task: asyncio.Task[None] | None = getattr(self, slot, None)

    @utils.log_exceptions(logger=logger)
    async def _swap() -> None:
        if old_task is not None:
            old_task.cancel()
            with contextlib.suppress(BaseException):
                await old_task

        if self._closing or self._closed:
            raise RuntimeError("session is closing/closed")

        async with self._activity_lock:
            old = getattr(self, field_name)
            if old is new_instance:
                return  # idempotent on identity

            if self._activity_uses_override(field_name):
                logger.warning(
                    "session.%s: current agent overrides this resource; new value will "
                    "only apply when an agent without that override is active.",
                    field_name.lstrip("_"),
                )
                setattr(self, field_name, new_instance)
                return  # invalidation is meaningless: activity reads the override,
                        # not the session-level slot. Field is updated for the
                        # next override-less agent; no pipeline rebuild needed.

            setattr(self, field_name, new_instance)
            await invalidate(old, new_instance)

    task = asyncio.create_task(_swap(), name=f"_update{field_name}_task")
    setattr(self, slot, task)
    return task


# Per-resource invalidate hooks. The handler dance is co-located with the type
# it knows about, so the shared sequencer above is pure orchestration.

async def _invalidate_stt(self, old, new):
    if isinstance(old, stt_module.STT):
        old.off("metrics_collected", self._on_metrics_collected)
        old.off("error", self._on_error)
    if isinstance(new, stt_module.STT):
        new.on("metrics_collected", self._on_metrics_collected)
        new.on("error", self._on_error)
    if not (self._started and self._activity is not None):
        return
    ar = self._activity._audio_recognition
    if ar is not None:
        ar.update_stt(
            self._activity.agent.stt_node if new is not None else None,
            captured_stt=new,                          # defense-in-depth snapshot
        )

async def _invalidate_vad(self, old, new):
    if isinstance(old, vad_module.VAD):
        old.off("metrics_collected", self._on_metrics_collected)
    if isinstance(new, vad_module.VAD):
        new.on("metrics_collected", self._on_metrics_collected)
    if not (self._started and self._activity is not None):
        return
    ar = self._activity._audio_recognition
    if ar is not None:
        ar.update_vad(new)

async def _invalidate_tts(self, old, new):
    if isinstance(old, tts_module.TTS):
        old.off("metrics_collected", self._on_metrics_collected)
        old.off("error", self._on_error)
    if isinstance(new, tts_module.TTS):
        new.on("metrics_collected", self._on_metrics_collected)
        new.on("error", self._on_error)
    # TTS resolves per-utterance via tts_node; no further invalidation.

async def _invalidate_llm(self, old, new):
    # Classical LLM: handler dance only. RealtimeModel: rt_session lifecycle
    # handled by _reconcile_realtime_session below.
    if isinstance(old, llm_module.LLM):
        old.off("metrics_collected", self._on_metrics_collected)
        old.off("error", self._on_error)
    if isinstance(new, llm_module.LLM):
        new.on("metrics_collected", self._on_metrics_collected)
        new.on("error", self._on_error)
    if self._started and self._activity is not None:
        await self._activity._reconcile_realtime_session(old_llm=old, new_llm=new)

The only new plumbing on the framework side is the captured_stt snapshot:

  • _STTPipeline.__init__ gains an optional captured_stt: stt.STT | None = None parameter, stored on the instance.
  • AudioRecognition.update_stt gains a passthrough captured_stt kwarg used when it constructs a new _STTPipeline.
  • AgentActivity._start_session snapshots onto the freshly-constructed pipeline after self._audio_recognition.start(...) returns: pipeline._captured_stt = self.stt when both are present. One line.
  • AgentActivity._detach_reusable_resources compares against pipeline.captured_stt instead of the property re-read at line 613.

Two existing signatures touched (_STTPipeline.__init__ and AudioRecognition.update_stt), plus one one-line comparison change in _detach_reusable_resources, plus one one-line snapshot assignment in _start_session. AudioRecognition.start's signature is unchanged — the snapshot is set from above, by the activity that knows its own STT, after the recognition layer has handed back the constructed pipeline.

This matters for defense in depth: covering both boot and swap paths means every pipeline carries an honest snapshot, so the reuse check is correct regardless of whether the user reached the bug via the proposed seam or via the existing _session._stt = ...; update_agent(self.current_agent) workaround. Covering only the swap path would leave the workaround users unprotected — which is the population most exposed to the bug today.

The hot-swap path itself — AudioRecognition.update_stt spawning a new consumer with old_pipeline reference — is already correct in 1.5.9.

Lines of new code: small. Lines of duplicated code: zero. Each public method names what it does; per-resource invalidate hooks supply only what genuinely varies; the framework picks up one snapshot field on _STTPipeline and the matching comparison swap in _detach_reusable_resources.

Helpers required

  • AgentSession._activity_uses_override(field_name) -> bool — checks self._agent for the corresponding override slot.
  • AgentActivity._reconcile_realtime_session(old_llm, new_llm) — the LLM swap's invalidation hook. Encapsulates the realtime teardown / boot cycle. In 1.5.9 the seven-handler set/unset is duplicated across three sites: _start_session registers (rt branch), _close_session deregisters, and the new _detach_reusable_resources realtime-reuse branch (agent_activity.py:648-659) deregisters again for handoff. Extracting _reconcile_realtime_session is a precondition for update_llm and removes that three-way duplication.

AudioRecognition.update_stt and AudioRecognition.update_vad already exist in 1.5.9 (audio_recognition.py:559, 590); the only delta they need is a passthrough captured_stt kwarg on update_stt for the defense-in-depth snapshot. No new helper required for stt_node access — self._activity.agent.stt_node is already the bound callable the invalidate hook uses.

Locking

_spawn_swap's inner work holds _activity_lock for the field-mutation + invalidation block. This serializes against _update_activity (which also holds it). Concurrent update_stt(a) and update_stt(b) are serialized; the second call cancels the first task before acquiring the lock.

update_options does not touch the same fields and remains independent.

Workarounds / Alternatives

Three patterns exist today; each has costs that motivate this request.

1. Mutate _session._stt (or _llm/_tts/_vad) and call update_agent(self.current_agent)

session._stt = new_stt
session.update_agent(session.current_agent)
  • Reaches into a private attribute. _session._stt is leading-underscore; the framework offers no stability contract.
  • Heavyweight. update_agent rebuilds the entire AgentActivity — endpointing, MCP wiring, on_exit/on_enter, in-flight tasks — to swap one resource.
  • Silently incorrect for STT. The pipeline-reuse check is self.stt is new_activity.stt where both sides resolve through the same field; after the mutation the comparison is X is X and the running pipeline (bound to the old STT instance) is reused. The new STT is built but never connected. Detailed in BUG_REPORT_stt_pipeline_reuse. Verified still present in 1.5.9 at agent_activity.py:613; the rename to _detach_reusable_resources did not change the comparison.

This is the path most users currently take, and the reason the bug exists in the wild.

2. Plugin-level update_options(...)

Some plugins expose this — e.g. livekit-plugins-openai STT has update_options(language=...) that signals reconnect on the active stream. When available, this is the cheapest path and should remain the default recommendation.

But:

  • It is plugin-specific. Soniox does not expose update_options for its translation field; OpenAI exposes only language; etc. Coverage is sparse.
  • It only covers fields the plugin chose to make mutable. Constructor-time params (model id, audio format, region) are still locked.

The proposed seam complements update_options rather than replacing it — when update_options covers your need, prefer it; this exists for the cases it doesn't.

3. Construct a fresh Agent with the new resource and call update_agent(new_agent)

This sidesteps the bug because new_activity.stt resolves to new_agent._stt (a distinct reference) and the reuse check correctly fails.

But:

  • The agent typically carries non-trivial state — chat_ctx for conversation history, in-flight background tasks, references to per-session services, mutable domain state on subclasses.
  • Migrating all of that on every "swap one provider parameter" is heavyweight, error-prone, and forces hard decisions: cancel in-flight tasks? let them complete on the old agent? hand off chat_ctx deep-copy or by reference?
  • The framework already documents the same-agent restart pattern (_update_activity has an explicit branch annotated # allow updating the same agent that is running), which exists precisely to avoid these costs. So users reach for # 1 instead and meet the bug.

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions