-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
Feature Type
Would make my life easier
Feature Description
In my app, for any discrete llm response, a background async task needs to be kicked off for that specific response as soon as any audio from that response is played back to the user. If, for a single llm_node run, only one discrete llm response is generated, then this can be trivially implemented by adding a call back to the event agent_state_changed when the agent state changes to speaking, like the following
@session.on("agent_state_changed")
def _on_agent_state_changed(ev: AgentStateChangedEvent) -> None:
if ev.new_state == "speaking":
def callback() -> None:
start_background_task(...)
loop = asyncio.get_running_loop()
# its delayed for 80ms because I want the background task to actually fire
# as soon as the audio is actually played back, but agent_state_change event
# is emitted slightly before the audio is actually played back
loop.call_later(
0.080, callback
)
The challenge comes when a single llm_node run produces two or more separate llm responses, like the following
async def llm_node(
self,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool],
model_settings: ModelSettings,
) -> AsyncIterable[llm.ChatChunk | str]:
# in this case, two start_background_task need to be called. One for the first response and one for the second
async for response_1_chunk in generate_llm_response():
yield response_1_chunk
async for response_2_chunk in generate_llm_response():
yield response_2_chunk
then, re-using the agent_state_changed event solution doesnt work because the agent state does not to reset (e.g. back to thinking) and remains speaking the whole time.
To resolve this, the easiest way I can think of is to allow me to manually set the agent's state back to thinking in between the two responses. This way, I can reuse the agent_state_changed event solution
async def llm_node(
self,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool],
model_settings: ModelSettings,
) -> AsyncIterable[llm.ChatChunk | str]:
async for response_1_chunk in generate_llm_response():
yield response_1_chunk
# Im thinking that it should be a queue mechanism instead of setting, since
# the agent may have not finished speaking everything from the first response
# by this line
self.queue_agent_state_change("thinking")
async for response_2_chunk in generate_llm_response():
yield response_2_chunk
Workarounds / Alternatives
There are some workarounds, all of them very hacky and non-deterministic. The main challenge is that livekit does not expose any way for me to know when the audio has actually started playback to the user. The closest thing to it is the agent_state_changed event for when the agent_state changes to speaking, but it is still before actual audio playback (in my experiments, its about 80ms early).
Other than the agent_state_changed solution, a much hackier solution is to yield some sentinel text
BOUNDARY_MARKER = "--some dummy text--"
async def llm_node(
self,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool],
model_settings: ModelSettings,
) -> AsyncIterable[llm.ChatChunk | str]:
async for response_1_chunk in generate_llm_response():
yield response_1_chunk
yield BOUNDARY_MARKER
async for response_2_chunk in generate_llm_response():
yield response_2_chunk
async def tts_node(self, text, model_settings):
async def new_iter():
async for chunk_text in text:
if chunk_text == BOUNDARY_MARKER:
def callback() -> None:
start_background_task(...)
# Instead of 80ms, I have to increase the timer even more.
# Furthermore, the actual time difference here has much more variance
# because there is an AI call involved (to the tts model)
asyncio.call_later(
0.200, callback
)
else:
yield chunk_text
async for frame in Agent.default.tts_node(self, new_iter(), model_settings):
yield frame
Additional Context
No response