Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions livekit-agents/livekit/agents/ipc/log_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,17 @@ def stop(self) -> None:
if self._thread is None:
return

self._duplex.close()
self._thread.join()
# join the thread first so it can drain all remaining log records
# from the socket buffer before we close the duplex. The sending end
# must already be closed (child process exited) so recv_bytes() will
# see EOF after the buffer is consumed and the thread will exit.
self._thread.join(timeout=2)
if self._thread.is_alive():
# fallback: force-close the duplex to unblock the thread
self._duplex.close()
self._thread.join()
else:
self._duplex.close()
self._thread = None

def handle(self, record: logging.LogRecord) -> None:
Expand Down
52 changes: 52 additions & 0 deletions tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import ctypes
import io
import logging
import multiprocessing as mp
import socket
import time
Expand All @@ -15,6 +16,8 @@
import psutil

from livekit.agents import JobContext, JobProcess, ipc, job, utils
from livekit.agents.ipc.log_queue import LogQueueHandler, LogQueueListener
from livekit.agents.utils.aio import duplex_unix
from livekit.protocol import agent


Expand Down Expand Up @@ -410,3 +413,52 @@ async def test_job_graceful_shutdown():
assert proc.exitcode == 0, "process should have exited cleanly"
assert not proc.killed
assert start_args.shutdown_counter.value == 1


def test_log_queue_drains_before_stop():
"""All log records must be received by the listener even when stop() is
called right after the sender closes its end. This reproduces a race where
LogQueueListener.stop() used to close the socket *before* joining the
monitor thread, dropping buffered records."""
NUM_LOGS = 200
received: list[str] = []

parent_sock, child_sock = socket.socketpair()
parent_dup = duplex_unix._Duplex.open(parent_sock)
child_dup = duplex_unix._Duplex.open(child_sock)

# -- parent (listener) side --
class _CapturingListener(LogQueueListener):
def handle(self, record: logging.LogRecord) -> None:
received.append(record.getMessage())
# slow down processing so the buffer is not fully drained
# before stop() is called
time.sleep(0.001)

listener = _CapturingListener(parent_dup, lambda r: None)
listener.start()

# -- child (handler) side --
handler = LogQueueHandler(child_dup)
test_logger = logging.getLogger("test_log_queue_drain")
test_logger.addHandler(handler)
test_logger.setLevel(logging.DEBUG)
test_logger.propagate = False

for i in range(NUM_LOGS):
test_logger.info("msg %d", i)

# simulate the child process shutting down: close handler then its thread
handler.close()
handler.thread.join()
# child duplex is now closed by _forward_logs

# simulate supervised_proc._sync_run: proc.join() returned, now stop listener
listener.stop()

test_logger.removeHandler(handler)

assert len(received) == NUM_LOGS, (
f"Expected {NUM_LOGS} records, got {len(received)}. "
f"Lost {NUM_LOGS - len(received)} tail log records."
)