feat(realtime): support multi-message generation per response#1555
Conversation
🦋 Changeset detectedLatest commit: 896d71c The changes in this PR will be included in the next version bump. This PR includes changesets to release 33 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
@tinalenguyen Stating the obvious how can / what required to push this through.
|
| this.agentSession._conversationItemAdded(message); | ||
|
|
||
| // TODO(brian): add tracing span | ||
| if (realtimeModel.capabilities.midSessionChatCtxUpdate) { |
There was a problem hiding this comment.
didn't check if any messages are skipped so the updateChatCtx is called every time the agent speech is interrupted. should we check if any skipped before this?
There was a problem hiding this comment.
Updated to only call updateChatCtx on interruption when at least one processed message was skipped.
| } else if (interrupted && output.synchronizedTranscript !== undefined) { | ||
| forwardedText = output.synchronizedTranscript; | ||
| } |
There was a problem hiding this comment.
this branch is not reachable since synchronizedTranscript is set only if audioOut is valid?
There was a problem hiding this comment.
Removed the unreachable fallback branch; interrupted audio still uses synchronizedTranscript when available and falls back to an empty string otherwise.
| } | ||
| await waitFor(forwardTasks); | ||
| } catch (error) { | ||
| this.logger.error(error, 'error reading messages from the realtime API'); |
There was a problem hiding this comment.
should we raise this error? what was the original behavior when failed?
There was a problem hiding this comment.
Kept the existing TS behavior here: the previous readMessages path caught and logged errors from reading the realtime message stream rather than rethrowing. This port preserves that behavior.
| textOut = _textOut; | ||
| forwardTasks.push(forwardTask); | ||
| output.audioOut = audioOut; | ||
| audioOut.firstFrameFut.await |
There was a problem hiding this comment.
should we clean up firstFrameFut like that in python?
There was a problem hiding this comment.
No extra cleanup is needed in the JS path: performAudioForwarding resolves firstFrameFut on playback start and rejects it in its finally block if playback never starts, and each await already has a catch to avoid an unhandled rejection.


Summary
MessageGenerationfromgeneration_ev.message_streamserially viaperform_audio_forwarding+perform_text_forwarding+wait_for_playout. Only one flush is in flight at a time.playback_finishedevent:full→ emitChatMessage(interrupted=False)with the msg'smessage_idpartial→ emitChatMessage(interrupted=True)and call_rt_session.truncate(...)with this msg's localplayback_position(not a cumulative offset)skipped→ drop locally and callupdate_chat_ctx(...)so the realtime server removes never-played items from its history_on_first_framenow early-returns oncestarted_speaking_atis set, so per-msg first-frame callbacks don't re-fire_update_agent_state("speaking")for each message.Alternative considered
#5690 makes multi-message work by flushing per message — that needs the synchronizer to keep pending/finalizing impls alive and serialize concurrent flushes in
room_io/_output.py. Our AudioOutput assumes there is only one speech at a time, serializing per-message at thewait_for_playoutboundary (this PR) avoids both changes.close livekit/agents#5690, livekit/agents#5684