Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion livekit-agents/livekit/agents/ipc/job_proc_lazy_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@
StartJobRequest,
)

# Defensive timeout for AgentSession.aclose() during job shutdown. Hardcoded for now
# as a guardrail against close paths that hang indefinitely. If aclose() does not
# return in this window, the rest of the shutdown sequence (on_session_end,
# ShuttingDown ack, room.disconnect, shutdown callbacks) runs anyway so user-
# registered shutdown callbacks are not silently dropped.
_SESSION_ACLOSE_TIMEOUT = 60.0


@dataclass
class ProcStartArgs:
Expand Down Expand Up @@ -371,7 +378,14 @@ def _on_entry_done(t: asyncio.Task[Any]) -> None:
pass

if session := self._job_ctx._primary_agent_session:
await session.aclose()
try:
await asyncio.wait_for(session.aclose(), timeout=_SESSION_ACLOSE_TIMEOUT)
except asyncio.TimeoutError:
logger.error(
"AgentSession.aclose() timed out after %.1fs; "
"proceeding with shutdown so registered callbacks still run.",
_SESSION_ACLOSE_TIMEOUT,
)

if self._session_end_fnc:
try:
Expand Down
62 changes: 62 additions & 0 deletions tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,42 @@ async def _job_shutdown() -> None:
start_args.update_ev.notify()


async def _job_entrypoint_session_aclose_hangs(job_ctx: JobContext) -> None:
"""Plant a fake AgentSession whose aclose() hangs forever; the
_SESSION_ACLOSE_TIMEOUT guardrail must fire so shutdown callbacks still run."""
from livekit.agents.ipc import job_proc_lazy_main

# Shrink the hardcoded guardrail so the test doesn't wait the full 60s.
job_proc_lazy_main._SESSION_ACLOSE_TIMEOUT = 2.0

start_args: _StartArgs = job_ctx.proc.user_arguments

async def _job_shutdown() -> None:
with start_args.shutdown_counter.get_lock():
start_args.shutdown_counter.value += 1

job_ctx.add_shutdown_callback(_job_shutdown)

class _HangingSession:
async def aclose(self) -> None:
await asyncio.Event().wait() # never resolves

job_ctx._primary_agent_session = _HangingSession() # type: ignore[assignment]

# _on_session_end touches session.history and make_session_report, which
# would crash on our minimal fake. Bypass it so the assertion is purely
# about the aclose-timeout → shutdown-callback path.
async def _noop_session_end() -> None:
return None

job_ctx._on_session_end = _noop_session_end # type: ignore[method-assign]

with start_args.entrypoint_counter.get_lock():
start_args.entrypoint_counter.value += 1

job_ctx.shutdown("trigger hang in session.aclose()")


async def _job_entrypoint_raises_after_shutdown(job_ctx: JobContext) -> None:
"""Reproduces the room-disconnect race: _shutdown_fut is set before the
entrypoint task unwinds, then the entrypoint raises while _run_job_task
Expand Down Expand Up @@ -430,6 +466,32 @@ async def test_job_slow_shutdown():
assert proc.killed


async def test_shutdown_callback_runs_when_session_aclose_hangs():
"""Regression test: when AgentSession.aclose() blocks indefinitely during
job shutdown, the _SESSION_ACLOSE_TIMEOUT guardrail must fire and
user-registered shutdown callbacks must still run. The entrypoint shrinks
the hardcoded constant to 2s so the test stays fast."""
mp_ctx = mp.get_context("spawn")
proc, start_args = _create_proc(
close_timeout=20.0,
mp_ctx=mp_ctx,
job_entrypoint_fnc=_job_entrypoint_session_aclose_hangs,
)
await proc.start()
await proc.initialize()

fake_job = _generate_fake_job()
await proc.launch_job(fake_job)
await _poll_until(lambda: start_args.entrypoint_counter.value >= 1)
await proc.aclose()

assert proc.exitcode == 0, "process should have exited cleanly"
assert not proc.killed
assert start_args.shutdown_counter.value == 1, (
"shutdown callback must run even when session.aclose() hangs"
)


async def test_shutdown_callback_runs_when_entrypoint_raises():
"""Regression test: when the entrypoint raises after _shutdown_fut is
already set (as happens on room disconnect mid-wait_for_participant),
Expand Down
Loading