Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 56 additions & 20 deletions livekit-agents/livekit/agents/ipc/proc_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,28 +108,64 @@ async def aclose(self) -> None:
self._closed = True
await aio.cancel_and_wait(self._main_atask)

async def _acquire_proc(self, job_id: str) -> JobExecutor:
MAX_ACQUIRE_ATTEMPTS = 3

for attempt in range(MAX_ACQUIRE_ATTEMPTS):
if (
self._warmed_proc_queue.empty()
and len(self._spawn_tasks) < self._jobs_waiting_for_process
):
# spawn a new process if there are no idle processes
task = asyncio.create_task(self._proc_spawn_task())
self._spawn_tasks.add(task)
task.add_done_callback(self._spawn_tasks.discard)

if self._warmed_proc_queue.empty():
logger.warning(
"no warmed process available for job, waiting for one to be created",
extra={"job_id": job_id},
)

# race the queue against every in-flight spawn task
while True:
if not self._warmed_proc_queue.empty():
return self._warmed_proc_queue.get_nowait()

spawns = [t for t in self._spawn_tasks if not t.done()]
# retry if all in-flight spawns have completed without producing a proc
if not spawns:
break

get_task = asyncio.ensure_future(self._warmed_proc_queue.get())
try:
await asyncio.wait([get_task, *spawns], return_when=asyncio.FIRST_COMPLETED)
finally:
if not get_task.done():
get_task.cancel()

if get_task.done() and not get_task.cancelled():
return get_task.result()

logger.warning(
"all in-flight spawns failed to initialize, retrying",
extra={"job_id": job_id, "attempt": attempt + 1},
)

logger.error(
"failed to acquire process for job after %d attempts",
MAX_ACQUIRE_ATTEMPTS,
extra={"job_id": job_id},
)
raise RuntimeError(f"no process became available after {MAX_ACQUIRE_ATTEMPTS} attempts")

async def launch_job(self, info: RunningJobInfo) -> None:
MAX_ATTEMPTS = 3
MAX_LAUNCH_ATTEMPTS = 3

for attempt in range(MAX_ATTEMPTS):
for attempt in range(MAX_LAUNCH_ATTEMPTS):
self._jobs_waiting_for_process += 1
try:
if (
self._warmed_proc_queue.empty()
and len(self._spawn_tasks) < self._jobs_waiting_for_process
):
# spawn a new process if there are no idle processes
task = asyncio.create_task(self._proc_spawn_task())
self._spawn_tasks.add(task)
task.add_done_callback(self._spawn_tasks.discard)

if self._warmed_proc_queue.empty():
logger.warning(
"no warmed process available for job, waiting for one to be created",
extra={"job_id": info.job.id},
)

proc = await self._warmed_proc_queue.get()
proc = await self._acquire_proc(info.job.id)
finally:
self._jobs_waiting_for_process -= 1

Expand All @@ -141,10 +177,10 @@ async def launch_job(self, info: RunningJobInfo) -> None:
close_task = asyncio.create_task(proc.aclose())
self._close_tasks.add(close_task)
close_task.add_done_callback(self._close_tasks.discard)
if attempt == MAX_ATTEMPTS - 1:
if attempt == MAX_LAUNCH_ATTEMPTS - 1:
logger.error(
"failed to launch job on process after %d attempts",
MAX_ATTEMPTS,
MAX_LAUNCH_ATTEMPTS,
extra={"job_id": info.job.id},
)
raise
Expand Down
51 changes: 51 additions & 0 deletions tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import ClassVar

import psutil
import pytest

from livekit.agents import JobContext, JobProcess, ipc, job, utils
from livekit.agents.ipc.log_queue import LogQueueHandler, LogQueueListener
Expand Down Expand Up @@ -370,6 +371,56 @@ def _process_closed(proc: ipc.job_proc_executor.ProcJobExecutor):
assert exitcode != 0, "process should have been killed"


async def test_proc_pool_launch_job_raises_when_all_spawns_fail(monkeypatch):
"""When every spawn task fails to initialize, launch_job should raise
instead of hanging on an empty warmed-process queue. Reproduces #5868."""

class FailingProc:
running_job = None

def __init__(self, **kwargs):
pass

async def start(self) -> None:
pass

async def initialize(self) -> None:
raise TimeoutError("init timed out")

async def aclose(self) -> None:
pass

def logging_extra(self) -> dict:
return {}

monkeypatch.setattr(ipc.proc_pool.job_proc_executor, "ProcJobExecutor", FailingProc)

mp_ctx = mp.get_context("spawn")
loop = asyncio.get_running_loop()
pool = ipc.proc_pool.ProcPool(
job_executor_type=job.JobExecutorType.PROCESS,
initialize_process_fnc=_initialize_proc,
job_entrypoint_fnc=_job_entrypoint,
session_end_fnc=None,
num_idle_processes=0,
initialize_timeout=0.1,
close_timeout=20.0,
session_end_timeout=300.0,
inference_executor=None,
memory_warn_mb=0,
memory_limit_mb=0,
http_proxy=None,
mp_ctx=mp_ctx,
loop=loop,
)

with pytest.raises(RuntimeError, match="no process became available"):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pool.start() seems missing

await pool.launch_job(_generate_fake_job())

assert pool._jobs_waiting_for_process == 0
assert len(pool.processes) == 0


def _create_proc(
*,
close_timeout: float,
Expand Down