Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 69 additions & 17 deletions tesseract_core/runtime/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,104 @@

import os
import threading
import time
from typing import Any, Callable


# NOTE: This is duplicated in `tesseract_core/sdk/logs.py`.
# Make sure to propagate changes to both files.
class LogPipe(threading.Thread):
"""Custom IO pipe to support live logging from subprocess.run or OS-level file descriptor.
class TeePipe(threading.Thread):
"""Custom I/O construct to support live logging from a single file descriptor to multiple sinks.

Runs a thread that logs everything read from the pipe to the given sinks.
Can be used as a context manager for automatic cleanup.
Runs a thread that records everything written to the file descriptor. Can be used as a
context manager for automatic cleanup.

Example:
>>> with TeePipe(print, logger.info) as pipe_fd:
... fd = os.fdopen(pipe_fd, "w")
... print("Hello, World!", file=fd, flush=True)
Hello, World!
2025-06-10 12:00:00,000 - INFO - Hello, World!
"""

daemon = True

def __init__(self, *sinks: Callable) -> None:
"""Initialize the LogPipe with the given logging level."""
"""Initialize the TeePipe by creating file descriptors."""
super().__init__()
self._sinks = sinks
self._fd_read, self._fd_write = os.pipe()
self._pipe_reader = os.fdopen(self._fd_read)
self._captured_lines = []
self._last_time = time.time()
self._is_blocking = threading.Event()
self._grace_period = 0.1

def __enter__(self) -> int:
"""Start the thread and return the write file descriptor of the pipe."""
self.start()
return self.fileno()

def __exit__(self, *args: Any) -> None:
def stop(self) -> None:
"""Close the pipe and join the thread."""
# Wait for ongoing streams to dry up
# We only continue once the reader has spent some time blocked on reading
while True:
self._is_blocking.wait(timeout=1)
if (time.time() - self._last_time) >= self._grace_period:
break
time.sleep(self._grace_period / 10)

# This will signal EOF to the reader thread
os.close(self._fd_write)
# Use a timeout so something weird happening in the logging thread doesn't
# cause this to hang indefinitely
self.join(timeout=10)
# Do not close reader before thread is joined since there may be pending data
# This also closes the fd_read pipe
self._pipe_reader.close()
os.close(self._fd_read)

# Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong
self.join(timeout=1)

def __exit__(self, *args: Any) -> None:
"""Close the pipe and join the thread."""
self.stop()

def fileno(self) -> int:
"""Return the write file descriptor of the pipe."""
return self._fd_write

def run(self) -> None:
"""Run the thread, logging everything."""
for line in iter(self._pipe_reader.readline, ""):
if line.endswith("\n"):
line = line[:-1]
"""Run the thread, pushing every full line of text to the sinks."""
line_buffer = []
while True:
self._last_time = time.time()
self._is_blocking.set()
try:
data = os.read(self._fd_read, 1024)
self._is_blocking.clear()
except OSError:
# Pipe closed
break

if data == b"":
# EOF reached
break

lines = data.split(b"\n")

# Log complete lines
for i, line in enumerate(lines[:-1]):
if i == 0:
line = b"".join([*line_buffer, line])
line_buffer = []
line = line.decode(errors="ignore")
self._captured_lines.append(line)
for sink in self._sinks:
sink(line)

# Accumulate incomplete line
line_buffer.append(lines[-1])

# Flush incomplete lines at the end of the stream
line = b"".join(line_buffer)
if line:
line = line.decode(errors="ignore")
self._captured_lines.append(line)
for sink in self._sinks:
sink(line)
Expand Down
4 changes: 2 additions & 2 deletions tesseract_core/runtime/mpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import requests

from tesseract_core.runtime.config import get_config
from tesseract_core.runtime.logs import LogPipe
from tesseract_core.runtime.logs import TeePipe


class BaseBackend(ABC):
Expand Down Expand Up @@ -261,7 +261,7 @@ def redirect_stdio(logfile: Union[str, Path]) -> Generator[None, None, None]:
# Use `print` instead of `.write` so we get appropriate newlines and flush behavior
write_to_stderr = lambda msg: print(msg, file=orig_stderr_file, flush=True)
write_to_file = lambda msg: print(msg, file=f, flush=True)
pipe_fd = stack.enter_context(LogPipe(write_to_stderr, write_to_file))
pipe_fd = stack.enter_context(TeePipe(write_to_stderr, write_to_file))

# Redirect file descriptors at OS level
stack.enter_context(redirect_fd(sys.stdout, pipe_fd))
Expand Down
4 changes: 2 additions & 2 deletions tesseract_core/sdk/docker_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import List as list_ # noqa: UP035

from tesseract_core.sdk.config import get_config
from tesseract_core.sdk.logs import LogPipe
from tesseract_core.sdk.logs import TeePipe

logger = logging.getLogger("tesseract")

Expand Down Expand Up @@ -232,7 +232,7 @@ def buildx(
ssh=ssh,
)

out_pipe = LogPipe(logger.debug)
out_pipe = TeePipe(logger.debug)

with out_pipe as out_pipe_fd:
proc = subprocess.run(build_cmd, stdout=out_pipe_fd, stderr=out_pipe_fd)
Expand Down
89 changes: 70 additions & 19 deletions tesseract_core/sdk/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
# SPDX-License-Identifier: Apache-2.0

import logging
import logging.handlers
import os
import sys
import threading
import time
import warnings
from collections.abc import Iterable
from types import ModuleType
Expand All @@ -29,47 +29,98 @@

# NOTE: This is duplicated in `tesseract_core/runtime/logs.py`.
# Make sure to propagate changes to both files.
class LogPipe(threading.Thread):
"""Custom IO pipe to support live logging from subprocess.run or OS-level file descriptor.

Runs a thread that logs everything read from the pipe to the given sinks.
Can be used as a context manager for automatic cleanup.
class TeePipe(threading.Thread):
"""Custom I/O construct to support live logging from a single file descriptor to multiple sinks.

Runs a thread that records everything written to the file descriptor. Can be used as a
context manager for automatic cleanup.

Example:
>>> with TeePipe(print, logger.info) as pipe_fd:
... fd = os.fdopen(pipe_fd, "w")
... print("Hello, World!", file=fd, flush=True)
Hello, World!
2025-06-10 12:00:00,000 - INFO - Hello, World!
"""

daemon = True

def __init__(self, *sinks: Callable) -> None:
"""Initialize the LogPipe with the given logging level."""
"""Initialize the TeePipe by creating file descriptors."""
super().__init__()
self._sinks = sinks
self._fd_read, self._fd_write = os.pipe()
self._pipe_reader = os.fdopen(self._fd_read)
self._captured_lines = []
self._last_time = time.time()
self._is_blocking = threading.Event()
self._grace_period = 0.1

def __enter__(self) -> int:
"""Start the thread and return the write file descriptor of the pipe."""
self.start()
return self.fileno()

def __exit__(self, *args: Any) -> None:
def stop(self) -> None:
"""Close the pipe and join the thread."""
# Wait for ongoing streams to dry up
# We only continue once the reader has spent some time blocked on reading
while True:
self._is_blocking.wait(timeout=1)
if (time.time() - self._last_time) >= self._grace_period:
break
time.sleep(self._grace_period / 10)

# This will signal EOF to the reader thread
os.close(self._fd_write)
# Use a timeout so something weird happening in the logging thread doesn't
# cause this to hang indefinitely
self.join(timeout=10)
# Do not close reader before thread is joined since there may be pending data
# This also closes the fd_read pipe
self._pipe_reader.close()
os.close(self._fd_read)

# Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong
self.join(timeout=1)

def __exit__(self, *args: Any) -> None:
"""Close the pipe and join the thread."""
self.stop()

def fileno(self) -> int:
"""Return the write file descriptor of the pipe."""
return self._fd_write

def run(self) -> None:
"""Run the thread, logging everything."""
for line in iter(self._pipe_reader.readline, ""):
if line.endswith("\n"):
line = line[:-1]
"""Run the thread, pushing every full line of text to the sinks."""
line_buffer = []
while True:
self._last_time = time.time()
self._is_blocking.set()
try:
data = os.read(self._fd_read, 1024)
self._is_blocking.clear()
except OSError:
# Pipe closed
break

if data == b"":
# EOF reached
break

lines = data.split(b"\n")

# Log complete lines
for i, line in enumerate(lines[:-1]):
if i == 0:
line = b"".join([*line_buffer, line])
line_buffer = []
line = line.decode(errors="ignore")
self._captured_lines.append(line)
for sink in self._sinks:
sink(line)

# Accumulate incomplete line
line_buffer.append(lines[-1])

# Flush incomplete lines at the end of the stream
line = b"".join(line_buffer)
if line:
line = line.decode(errors="ignore")
self._captured_lines.append(line)
for sink in self._sinks:
sink(line)
Expand Down
12 changes: 6 additions & 6 deletions tests/endtoend_tests/test_tesseract_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ def test_signature_consistency():
)


def test_logpipe_consistency():
"""Test that the source code of the two duplicate LogPipe implementations is identical."""
from tesseract_core.runtime.logs import LogPipe as RuntimeLogPipe
from tesseract_core.sdk.logs import LogPipe as SDKLogPipe
def test_teepipe_consistency():
"""Test that the source code of the two duplicate TeePipe implementations is identical."""
from tesseract_core.runtime.logs import TeePipe as RuntimeTeePipe
from tesseract_core.sdk.logs import TeePipe as SDKTeePipe

runtime_source = inspect.getsource(RuntimeLogPipe)
sdk_source = inspect.getsource(SDKLogPipe)
runtime_source = inspect.getsource(RuntimeTeePipe)
sdk_source = inspect.getsource(SDKTeePipe)
assert runtime_source == sdk_source
Loading
Loading