Skip to content

fix(avatar): preserve audio wrappers across avatar hot-swaps#5863

Open
longcw wants to merge 11 commits into
mainfrom
longc/preserve-audio-wrappers-on-avatar-swap
Open

fix(avatar): preserve audio wrappers across avatar hot-swaps#5863
longcw wants to merge 11 commits into
mainfrom
longc/preserve-audio-wrappers-on-avatar-swap

Conversation

@longcw
Copy link
Copy Markdown
Contributor

@longcw longcw commented May 27, 2026

Summary

Avatar plugins set session.output.audio = DataStreamAudioOutput(...) on every start. On the first start this works because AgentSession.start() wraps the sink with the TranscriptSynchronizer / RecorderAudioOutput chain afterwards; on a mid-session rebind (avatar switch) the raw assignment blows the chain away, silently breaking transcription sync and recording.

Fix it by auto-inserting an _AudioSinkProxy at the bottom of any wrapper chain. Wrappers cache the proxy, the proxy holds the swappable leaf β€” so hot-swaps preserve the wrappers above. New AgentOutput.swap_audio_endpoint(sink) walks the chain to the proxy and swaps its downstream in place, leaving the wrappers attached; full replacement stays as output.audio = sink.

Plugin migration

All 13 avatar plugins migrated to swap_audio_endpoint(...): anam, avatario, avatartalk, bey, bithuman, did, keyframe, lemonslice, liveavatar, runway, simli, tavus, trugen.

Example

examples/avatar_agents/audio_wave now demonstrates hot-swapping through a swap_avatar RPC method: it tears down the current avatar (removing it from the room) and launches a fresh one under the same identity, while the audio wrappers and the listeners attached to session.output.audio survive the swap.

Related

Clean avatar-worker shutdown on swap depends on livekit/python-sdks#699.

longcw added 6 commits May 27, 2026 14:56
AvatarSession.start() rebinds session.output.audio to a fresh
DataStreamAudioOutput. On the first call the wrapper chain (Recorder,
TranscriptSynchronizer) wraps it correctly, but a re-bind during a
mid-session avatar switch overwrites the synchronizer-wrapped output
with a raw sink, breaking audio/transcription sync and recording.

Introduce _AudioSinkProxy, a transparent proxy auto-inserted at the
bottom of any wrapper chain. Wrappers cache the proxy (not the leaf),
so the leaf can be hot-swapped via the proxy without invalidating
upstream references. When the proxy has no inner sink, flush()
synthesizes a playback_finished so upstream wrappers don't hang.

Add AgentOutput.set_audio_sink(sink, *, preserve_wrappers=False).
With preserve_wrappers=True, walks the chain to find the proxy and
swaps its downstream; otherwise behaves as the existing audio setter.
Avatar plugins migrate to this API; AvatarSession.aclose() detaches
the sink so the chain stays intact across aclose -> restart.

Drops the "may be replaced by the avatar" warning in AvatarSession.start
since the proxy makes mid-session rebinding correct by construction.
…ers=True)

Route every avatar plugin's audio sink binding through the new
AgentOutput.set_audio_sink API so mid-session hot-swaps (e.g. avatar
switches) preserve the TranscriptSynchronizer / RecorderAudioOutput
wrapper chain.

Plugins migrated: anam, avatario, avatartalk, bey, bithuman, did,
keyframe, liveavatar, runway, simli, tavus, trugen.
Covers:
- auto-wrap inserts the proxy between a wrapper and a bare leaf
- auto-wrap skipped when the downstream is already a proxy or a non-leaf
- set_audio_sink default replaces the chain
- set_audio_sink with preserve_wrappers swaps the proxy's inner in place
- preserve_wrappers fallback when no proxy exists in the chain
- proxy rejects a wrapper chain as inner (set_next_in_chain assert)
- detached proxy synthesizes playback_finished on flush
- swap routes new-leaf playback events to upstream listeners
- swap disconnects the old leaf from the chain
- on_attached/on_detached propagate to current inner and across swaps
Drop the leaf-only assertion in _AudioSinkProxy.set_next_in_chain β€” the
base AudioOutput machinery cascades capture/flush and bubbles playback
events through any chain, so the proxy can hold either a leaf or a
wrapper chain without breaking the contract upstream.
The base class doesn't track which sink the avatar set, so nulling
session.output.audio unconditionally could clobber a sink owned by
someone else. The wrapper chain stays intact across hot-swaps anyway
because the proxy preserves the wrappers regardless of what's in its
downstream slot, so leaving the sink in place until it's replaced or
the session tears down is fine.
@chenghao-mou chenghao-mou requested a review from a team May 27, 2026 07:27
devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

else:
self._audio_sink.on_detached()

def set_audio_sink(self, sink: AudioOutput | None, *, preserve_wrappers: bool = False) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to avoid this method?

Ideally the audio.setter alone is sufficient

    @audio.setter
    def audio(self, sink: AudioOutput | None) -> None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can make preserve_wrappers default for setter but then there is no way to fully replace it.

@longcw longcw requested a review from theomonnom June 3, 2026 00:54
longcw added 3 commits June 3, 2026 21:36
Drop the preserve_wrappers flag: the wrapper-preserving leaf swap is now its
own method, and full replacement stays as output.audio = sink.
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 12 additional findings in Devin Review.

Open in Devin Review

Comment on lines +353 to +359
def flush(self) -> None:
super().flush()
if self.next_in_chain:
self.next_in_chain.flush()
else:
# no real sink; synthesize a playback_finished
self.on_playback_finished(playback_position=0.0, interrupted=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟑 Re-entrant on_playback_finished during flush() causes double segment rotation and skips end_audio_input()

When _AudioSinkProxy has no downstream (e.g. after swap_audio_endpoint(None)), its flush() at io.py:358-359 synchronously calls self.on_playback_finished(...) which emits playback_finished. This event propagates up to _SyncedAudioOutput.on_playback_finished (synchronizer.py:633-643) which calls self._synchronizer.rotate_segment() and resets self._pushed_duration = 0.0. Control then returns to _SyncedAudioOutput.flush() (synchronizer.py:598-606) where the if not self._pushed_duration: check at line 601 is now unexpectedly true (reset by the re-entrant on_playback_finished), triggering a second rotate_segment() and returning before self._synchronizer._impl.end_audio_input() at line 606 is ever called.

Trace of the re-entrant call
  1. _SyncedAudioOutput.flush() β†’ self.next_in_chain.flush() (the proxy)
  2. _AudioSinkProxy.flush() β†’ no downstream β†’ self.on_playback_finished(0.0, interrupted=True) (synchronous)
  3. Proxy emits β†’ _SyncedAudioOutput._forward_next_playback_finished β†’ _SyncedAudioOutput.on_playback_finished β†’ rotate_segment(), _pushed_duration = 0.0
  4. Returns to _SyncedAudioOutput.flush() line 601: _pushed_duration is now 0 β†’ second rotate_segment(), end_audio_input() never reached
Prompt for agents
The issue is that _AudioSinkProxy.flush() with no downstream synchronously calls self.on_playback_finished(), which re-enters _SyncedAudioOutput.on_playback_finished (via event forwarding) and mutates _pushed_duration to 0.0. When control returns to _SyncedAudioOutput.flush(), the _pushed_duration check wrongly triggers a second rotate_segment() and skips end_audio_input().

Possible fixes:
1. In _AudioSinkProxy.flush(), schedule the synthesized playback_finished asynchronously (e.g. via asyncio.get_event_loop().call_soon) so it doesn't re-enter during flush().
2. In _SyncedAudioOutput.flush(), save _pushed_duration before calling self.next_in_chain.flush() and use the saved value for the subsequent check.
3. In _AudioSinkProxy.flush(), set a flag and let the caller handle the no-sink case rather than emitting playback_finished inline.

Option 2 is the simplest local fix β€” save `had_audio = bool(self._pushed_duration)` before calling `self.next_in_chain.flush()` and check `had_audio` instead of `self._pushed_duration` afterward in synchronizer.py _SyncedAudioOutput.flush().
Open in Devin Review

Was this helpful? React with πŸ‘ or πŸ‘Ž to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants