diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 21c6d5d2..9c57d4e0 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -3,53 +3,105 @@ import os import threading +import time from collections.abc import Callable from typing import Any # NOTE: This is duplicated in `tesseract_core/sdk/logs.py`. # Make sure to propagate changes to both files. -class LogPipe(threading.Thread): - """Custom IO pipe to support live logging from subprocess.run or OS-level file descriptor. +class TeePipe(threading.Thread): + """Custom I/O construct to support live logging from a single file descriptor to multiple sinks. - Runs a thread that logs everything read from the pipe to the given sinks. - Can be used as a context manager for automatic cleanup. + Runs a thread that records everything written to the file descriptor. Can be used as a + context manager for automatic cleanup. + + Example: + >>> with TeePipe(print, logger.info) as pipe_fd: + ... fd = os.fdopen(pipe_fd, "w") + ... print("Hello, World!", file=fd, flush=True) + Hello, World! + 2025-06-10 12:00:00,000 - INFO - Hello, World! """ daemon = True def __init__(self, *sinks: Callable) -> None: - """Initialize the LogPipe with the given logging level.""" + """Initialize the TeePipe by creating file descriptors.""" super().__init__() self._sinks = sinks self._fd_read, self._fd_write = os.pipe() - self._pipe_reader = os.fdopen(self._fd_read) self._captured_lines = [] + self._last_time = time.time() + self._is_blocking = threading.Event() + self._grace_period = 0.1 def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" self.start() return self.fileno() - def __exit__(self, *args: Any) -> None: + def stop(self) -> None: """Close the pipe and join the thread.""" + # Wait for ongoing streams to dry up + # We only continue once the reader has spent some time blocked on reading + while True: + self._is_blocking.wait(timeout=1) + if (time.time() - self._last_time) >= self._grace_period: + break + time.sleep(self._grace_period / 10) + + # This will signal EOF to the reader thread os.close(self._fd_write) - # Use a timeout so something weird happening in the logging thread doesn't - # cause this to hang indefinitely - self.join(timeout=10) - # Do not close reader before thread is joined since there may be pending data - # This also closes the fd_read pipe - self._pipe_reader.close() + os.close(self._fd_read) + + # Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong + self.join(timeout=1) + + def __exit__(self, *args: Any) -> None: + """Close the pipe and join the thread.""" + self.stop() def fileno(self) -> int: """Return the write file descriptor of the pipe.""" return self._fd_write def run(self) -> None: - """Run the thread, logging everything.""" - for line in iter(self._pipe_reader.readline, ""): - if line.endswith("\n"): - line = line[:-1] + """Run the thread, pushing every full line of text to the sinks.""" + line_buffer = [] + while True: + self._last_time = time.time() + self._is_blocking.set() + try: + data = os.read(self._fd_read, 1024) + self._is_blocking.clear() + except OSError: + # Pipe closed + break + + if data == b"": + # EOF reached + break + + lines = data.split(b"\n") + + # Log complete lines + for i, line in enumerate(lines[:-1]): + if i == 0: + line = b"".join([*line_buffer, line]) + line_buffer = [] + line = line.decode(errors="ignore") + self._captured_lines.append(line) + for sink in self._sinks: + sink(line) + + # Accumulate incomplete line + line_buffer.append(lines[-1]) + + # Flush incomplete lines at the end of the stream + line = b"".join(line_buffer) + if line: + line = line.decode(errors="ignore") self._captured_lines.append(line) for sink in self._sinks: sink(line) diff --git a/tesseract_core/runtime/mpa.py b/tesseract_core/runtime/mpa.py index 78fb5894..1c69fc5d 100644 --- a/tesseract_core/runtime/mpa.py +++ b/tesseract_core/runtime/mpa.py @@ -20,7 +20,7 @@ import requests from tesseract_core.runtime.config import get_config -from tesseract_core.runtime.logs import LogPipe +from tesseract_core.runtime.logs import TeePipe class BaseBackend(ABC): @@ -261,7 +261,7 @@ def redirect_stdio(logfile: str | Path) -> Generator[None, None, None]: # Use `print` instead of `.write` so we get appropriate newlines and flush behavior write_to_stderr = lambda msg: print(msg, file=orig_stderr_file, flush=True) write_to_file = lambda msg: print(msg, file=f, flush=True) - pipe_fd = stack.enter_context(LogPipe(write_to_stderr, write_to_file)) + pipe_fd = stack.enter_context(TeePipe(write_to_stderr, write_to_file)) # Redirect file descriptors at OS level stack.enter_context(redirect_fd(sys.stdout, pipe_fd)) diff --git a/tesseract_core/sdk/docker_client.py b/tesseract_core/sdk/docker_client.py index 5ebfdbe2..2ec0acfc 100644 --- a/tesseract_core/sdk/docker_client.py +++ b/tesseract_core/sdk/docker_client.py @@ -16,7 +16,7 @@ from typing import List as list_ # noqa: UP035 from tesseract_core.sdk.config import get_config -from tesseract_core.sdk.logs import LogPipe +from tesseract_core.sdk.logs import TeePipe logger = logging.getLogger("tesseract") @@ -232,7 +232,7 @@ def buildx( ssh=ssh, ) - out_pipe = LogPipe(logger.debug) + out_pipe = TeePipe(logger.debug) with out_pipe as out_pipe_fd: proc = subprocess.run(build_cmd, stdout=out_pipe_fd, stderr=out_pipe_fd) diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index e29ac6e1..4ab868d2 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -2,10 +2,10 @@ # SPDX-License-Identifier: Apache-2.0 import logging -import logging.handlers import os import sys import threading +import time import warnings from collections.abc import Callable, Iterable from types import ModuleType @@ -29,47 +29,98 @@ # NOTE: This is duplicated in `tesseract_core/runtime/logs.py`. # Make sure to propagate changes to both files. -class LogPipe(threading.Thread): - """Custom IO pipe to support live logging from subprocess.run or OS-level file descriptor. - - Runs a thread that logs everything read from the pipe to the given sinks. - Can be used as a context manager for automatic cleanup. +class TeePipe(threading.Thread): + """Custom I/O construct to support live logging from a single file descriptor to multiple sinks. + + Runs a thread that records everything written to the file descriptor. Can be used as a + context manager for automatic cleanup. + + Example: + >>> with TeePipe(print, logger.info) as pipe_fd: + ... fd = os.fdopen(pipe_fd, "w") + ... print("Hello, World!", file=fd, flush=True) + Hello, World! + 2025-06-10 12:00:00,000 - INFO - Hello, World! """ daemon = True def __init__(self, *sinks: Callable) -> None: - """Initialize the LogPipe with the given logging level.""" + """Initialize the TeePipe by creating file descriptors.""" super().__init__() self._sinks = sinks self._fd_read, self._fd_write = os.pipe() - self._pipe_reader = os.fdopen(self._fd_read) self._captured_lines = [] + self._last_time = time.time() + self._is_blocking = threading.Event() + self._grace_period = 0.1 def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" self.start() return self.fileno() - def __exit__(self, *args: Any) -> None: + def stop(self) -> None: """Close the pipe and join the thread.""" + # Wait for ongoing streams to dry up + # We only continue once the reader has spent some time blocked on reading + while True: + self._is_blocking.wait(timeout=1) + if (time.time() - self._last_time) >= self._grace_period: + break + time.sleep(self._grace_period / 10) + + # This will signal EOF to the reader thread os.close(self._fd_write) - # Use a timeout so something weird happening in the logging thread doesn't - # cause this to hang indefinitely - self.join(timeout=10) - # Do not close reader before thread is joined since there may be pending data - # This also closes the fd_read pipe - self._pipe_reader.close() + os.close(self._fd_read) + + # Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong + self.join(timeout=1) + + def __exit__(self, *args: Any) -> None: + """Close the pipe and join the thread.""" + self.stop() def fileno(self) -> int: """Return the write file descriptor of the pipe.""" return self._fd_write def run(self) -> None: - """Run the thread, logging everything.""" - for line in iter(self._pipe_reader.readline, ""): - if line.endswith("\n"): - line = line[:-1] + """Run the thread, pushing every full line of text to the sinks.""" + line_buffer = [] + while True: + self._last_time = time.time() + self._is_blocking.set() + try: + data = os.read(self._fd_read, 1024) + self._is_blocking.clear() + except OSError: + # Pipe closed + break + + if data == b"": + # EOF reached + break + + lines = data.split(b"\n") + + # Log complete lines + for i, line in enumerate(lines[:-1]): + if i == 0: + line = b"".join([*line_buffer, line]) + line_buffer = [] + line = line.decode(errors="ignore") + self._captured_lines.append(line) + for sink in self._sinks: + sink(line) + + # Accumulate incomplete line + line_buffer.append(lines[-1]) + + # Flush incomplete lines at the end of the stream + line = b"".join(line_buffer) + if line: + line = line.decode(errors="ignore") self._captured_lines.append(line) for sink in self._sinks: sink(line) diff --git a/tests/endtoend_tests/test_tesseract_sdk.py b/tests/endtoend_tests/test_tesseract_sdk.py index 3a385365..bfe6c643 100644 --- a/tests/endtoend_tests/test_tesseract_sdk.py +++ b/tests/endtoend_tests/test_tesseract_sdk.py @@ -210,11 +210,11 @@ def test_signature_consistency(): ) -def test_logpipe_consistency(): - """Test that the source code of the two duplicate LogPipe implementations is identical.""" - from tesseract_core.runtime.logs import LogPipe as RuntimeLogPipe - from tesseract_core.sdk.logs import LogPipe as SDKLogPipe +def test_teepipe_consistency(): + """Test that the source code of the two duplicate TeePipe implementations is identical.""" + from tesseract_core.runtime.logs import TeePipe as RuntimeTeePipe + from tesseract_core.sdk.logs import TeePipe as SDKTeePipe - runtime_source = inspect.getsource(RuntimeLogPipe) - sdk_source = inspect.getsource(SDKLogPipe) + runtime_source = inspect.getsource(RuntimeTeePipe) + sdk_source = inspect.getsource(SDKTeePipe) assert runtime_source == sdk_source diff --git a/tests/sdk_tests/test_engine.py b/tests/sdk_tests/test_engine.py index 77f5a67c..e87acceb 100644 --- a/tests/sdk_tests/test_engine.py +++ b/tests/sdk_tests/test_engine.py @@ -5,6 +5,8 @@ import logging import os import random +import string +import threading import time from pathlib import Path @@ -361,33 +363,103 @@ def raise_docker_error(*args, **kwargs): run_something_with_docker() -def test_logpipe(caplog): +def test_teepipe(caplog): # Verify that logging in a separate thread works as intended - from tesseract_core.sdk.logs import LogPipe + from tesseract_core.sdk.logs import TeePipe, set_logger + + # Disable rich to ensure what we log is what we read + set_logger("info", catch_warnings=True, rich_format=False) logger = logging.getLogger("tesseract") caplog.set_level(logging.INFO, logger="tesseract") logged_lines = [] for _ in range(100): - msg_length = 2 ** random.randint(1, 12) - msg = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=msg_length)) + # Make sure to include a few really long lines without breaks + if random.random() < 0.1: + msg_length = random.randint(1000, 10_000) + alphabet = string.ascii_letters + "🤯" + else: + msg_length = 2 ** random.randint(2, 12) + alphabet = string.printable + "🤯" + msg = "".join(random.choices(alphabet, k=msg_length)) logged_lines.append(msg) - logpipe = LogPipe(logger.info) - with logpipe: - fd = os.fdopen(logpipe.fileno(), "w", closefd=False) + teepipe = TeePipe(logger.info) + # Extend grace period to avoid flakes in tests when runners are slow + teepipe._grace_period = 1 + with teepipe: + fd = os.fdopen(teepipe.fileno(), "w", closefd=False) for line in logged_lines: print(line, file=fd) time.sleep(random.random() / 100) - fd.flush() + fd.close() + + expected_lines = [] + for line in logged_lines: + sublines = line.split("\n") + expected_lines.extend(sublines) - assert logpipe.captured_lines == logged_lines + assert teepipe.captured_lines == expected_lines assert caplog.record_tuples == [ - ("tesseract", logging.INFO, line) for line in logged_lines + ("tesseract", logging.INFO, line) for line in expected_lines ] +def test_teepipe_early_exit(): + # Verify that TeePipe can handle early exit without hanging or losing data + from tesseract_core.sdk.logs import TeePipe + + logged_lines = [] + for _ in range(100): + # Make sure to include a few really long lines without breaks + if random.random() < 0.1: + msg_length = random.randint(1000, 10_000) + alphabet = string.ascii_letters + "🤯" + else: + msg_length = 2 ** random.randint(2, 12) + alphabet = string.printable + "🤯" + msg = "".join(random.choices(alphabet, k=msg_length)) + logged_lines.append(msg) + + teepipe = TeePipe() + # Extend grace period to avoid flakes in tests when runners are slow + teepipe._grace_period = 1 + + teepipe.start() + fd = os.fdopen(teepipe.fileno(), "w", closefd=False) + + def _write_to_pipe(): + for line in logged_lines: + print(line, file=fd, flush=True) + time.sleep(random.random() / 100) + + print("end without newline", end="", file=fd, flush=True) + + expected_lines = [] + for line in logged_lines: + sublines = line.split("\n") + expected_lines.extend(sublines) + expected_lines.append("end without newline") + + writer_thread = threading.Thread(target=_write_to_pipe) + writer_thread.start() + + # Wait for the first data to roll in, i.e., thread is up and running + while not teepipe.captured_lines: + time.sleep(0.01) + + # Sanity check that not all data has been written yet + assert len(teepipe.captured_lines) < len(expected_lines) + + # Exit the pipe early before all data is written + # This should block until no more data is incoming + teepipe.stop() + + assert len(teepipe.captured_lines) == len(expected_lines) + assert teepipe.captured_lines == expected_lines + + def test_parse_requirements(tmpdir): reqs = """ --extra-index-url https://download.pytorch.org/whl/cpu