From bb5194a501a3917d29a8e16f23f5ef5590b0d531 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Wed, 12 Nov 2025 15:32:11 +0100 Subject: [PATCH 01/16] better io pipes --- tesseract_core/runtime/logs.py | 66 ++++++++++++++++----- tesseract_core/runtime/mpa.py | 4 +- tesseract_core/sdk/docker_client.py | 4 +- tesseract_core/sdk/logs.py | 67 ++++++++++++++++------ tests/endtoend_tests/test_tesseract_sdk.py | 12 ++-- tests/sdk_tests/test_engine.py | 53 +++++++++++++++-- 6 files changed, 159 insertions(+), 47 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 4376cab2..d6486837 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -3,42 +3,52 @@ import os import threading +import time from typing import Any, Callable # NOTE: This is duplicated in `tesseract_core/sdk/logs.py`. # Make sure to propagate changes to both files. -class LogPipe(threading.Thread): - """Custom IO pipe to support live logging from subprocess.run or OS-level file descriptor. +class TeePipe(threading.Thread): + """Custom I/O pipe to support live logging to multiple sinks. - Runs a thread that logs everything read from the pipe to the given sinks. + Runs a thread that logs everything written to the pipe to the given sinks. Can be used as a context manager for automatic cleanup. """ daemon = True def __init__(self, *sinks: Callable) -> None: - """Initialize the LogPipe with the given logging level.""" + """Initialize the TeePipe by creating file descriptors.""" super().__init__() self._sinks = sinks self._fd_read, self._fd_write = os.pipe() - self._pipe_reader = os.fdopen(self._fd_read) + self._pipe_reader = os.fdopen( + self._fd_read, mode="r", closefd=False, buffering=1024 + ) self._captured_lines = [] + self._last_line_time = time.time() def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" self.start() return self.fileno() - def __exit__(self, *args: Any) -> None: + def stop(self) -> None: """Close the pipe and join the thread.""" - os.close(self._fd_write) - # Use a timeout so something weird happening in the logging thread doesn't - # cause this to hang indefinitely - self.join(timeout=10) - # Do not close reader before thread is joined since there may be pending data - # This also closes the fd_read pipe + # Wait for ongoing reads to complete + grace = 0.1 + while (time.time() - self._last_line_time) < grace: + time.sleep(grace / 10) + os.close(self._fd_read) + # Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong + self.join(timeout=1) self._pipe_reader.close() + os.close(self._fd_write) + + def __exit__(self, *args: Any) -> None: + """Close the pipe and join the thread.""" + self.stop() def fileno(self) -> int: """Return the write file descriptor of the pipe.""" @@ -46,9 +56,35 @@ def fileno(self) -> int: def run(self) -> None: """Run the thread, logging everything.""" - for line in iter(self._pipe_reader.readline, ""): - if line.endswith("\n"): - line = line[:-1] + line_buffer = [] + while True: + try: + data = self._pipe_reader.readline(1024) + except OSError: + # Pipe closed + break + if data == "": + # EOF reached + break + + self._last_line_time = time.time() + if data.endswith("\n"): + data = data[:-1] + flush = True + else: + flush = False + + line_buffer.append(data) + if flush: + line = "".join(line_buffer) + line_buffer.clear() + self._captured_lines.append(line) + for sink in self._sinks: + sink(line) + + # Flush incomplete lines at the end of the stream + if line_buffer: + line = "".join(line_buffer) self._captured_lines.append(line) for sink in self._sinks: sink(line) diff --git a/tesseract_core/runtime/mpa.py b/tesseract_core/runtime/mpa.py index fe3a0b5f..1eec0ef7 100644 --- a/tesseract_core/runtime/mpa.py +++ b/tesseract_core/runtime/mpa.py @@ -20,7 +20,7 @@ import requests from tesseract_core.runtime.config import get_config -from tesseract_core.runtime.logs import LogPipe +from tesseract_core.runtime.logs import TeePipe class BaseBackend(ABC): @@ -261,7 +261,7 @@ def redirect_stdio(logfile: Union[str, Path]) -> Generator[None, None, None]: # Use `print` instead of `.write` so we get appropriate newlines and flush behavior write_to_stderr = lambda msg: print(msg, file=orig_stderr_file, flush=True) write_to_file = lambda msg: print(msg, file=f, flush=True) - pipe_fd = stack.enter_context(LogPipe(write_to_stderr, write_to_file)) + pipe_fd = stack.enter_context(TeePipe(write_to_stderr, write_to_file)) # Redirect file descriptors at OS level stack.enter_context(redirect_fd(sys.stdout, pipe_fd)) diff --git a/tesseract_core/sdk/docker_client.py b/tesseract_core/sdk/docker_client.py index 5ebfdbe2..2ec0acfc 100644 --- a/tesseract_core/sdk/docker_client.py +++ b/tesseract_core/sdk/docker_client.py @@ -16,7 +16,7 @@ from typing import List as list_ # noqa: UP035 from tesseract_core.sdk.config import get_config -from tesseract_core.sdk.logs import LogPipe +from tesseract_core.sdk.logs import TeePipe logger = logging.getLogger("tesseract") @@ -232,7 +232,7 @@ def buildx( ssh=ssh, ) - out_pipe = LogPipe(logger.debug) + out_pipe = TeePipe(logger.debug) with out_pipe as out_pipe_fd: proc = subprocess.run(build_cmd, stdout=out_pipe_fd, stderr=out_pipe_fd) diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index e22c2a43..a2c22654 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -2,10 +2,10 @@ # SPDX-License-Identifier: Apache-2.0 import logging -import logging.handlers import os import sys import threading +import time import warnings from collections.abc import Iterable from types import ModuleType @@ -29,37 +29,46 @@ # NOTE: This is duplicated in `tesseract_core/runtime/logs.py`. # Make sure to propagate changes to both files. -class LogPipe(threading.Thread): - """Custom IO pipe to support live logging from subprocess.run or OS-level file descriptor. +class TeePipe(threading.Thread): + """Custom I/O pipe to support live logging to multiple sinks. - Runs a thread that logs everything read from the pipe to the given sinks. + Runs a thread that logs everything written to the pipe to the given sinks. Can be used as a context manager for automatic cleanup. """ daemon = True def __init__(self, *sinks: Callable) -> None: - """Initialize the LogPipe with the given logging level.""" + """Initialize the TeePipe by creating file descriptors.""" super().__init__() self._sinks = sinks self._fd_read, self._fd_write = os.pipe() - self._pipe_reader = os.fdopen(self._fd_read) + self._pipe_reader = os.fdopen( + self._fd_read, mode="r", closefd=False, buffering=1024 + ) self._captured_lines = [] + self._last_line_time = time.time() def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" self.start() return self.fileno() - def __exit__(self, *args: Any) -> None: + def stop(self) -> None: """Close the pipe and join the thread.""" - os.close(self._fd_write) - # Use a timeout so something weird happening in the logging thread doesn't - # cause this to hang indefinitely - self.join(timeout=10) - # Do not close reader before thread is joined since there may be pending data - # This also closes the fd_read pipe + # Wait for ongoing reads to complete + grace = 0.1 + while (time.time() - self._last_line_time) < grace: + time.sleep(grace / 10) + os.close(self._fd_read) + # Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong + self.join(timeout=1) self._pipe_reader.close() + os.close(self._fd_write) + + def __exit__(self, *args: Any) -> None: + """Close the pipe and join the thread.""" + self.stop() def fileno(self) -> int: """Return the write file descriptor of the pipe.""" @@ -67,9 +76,35 @@ def fileno(self) -> int: def run(self) -> None: """Run the thread, logging everything.""" - for line in iter(self._pipe_reader.readline, ""): - if line.endswith("\n"): - line = line[:-1] + line_buffer = [] + while True: + try: + data = self._pipe_reader.readline(1024) + except OSError: + # Pipe closed + break + if data == "": + # EOF reached + break + + self._last_line_time = time.time() + if data.endswith("\n"): + data = data[:-1] + flush = True + else: + flush = False + + line_buffer.append(data) + if flush: + line = "".join(line_buffer) + line_buffer.clear() + self._captured_lines.append(line) + for sink in self._sinks: + sink(line) + + # Flush incomplete lines at the end of the stream + if line_buffer: + line = "".join(line_buffer) self._captured_lines.append(line) for sink in self._sinks: sink(line) diff --git a/tests/endtoend_tests/test_tesseract_sdk.py b/tests/endtoend_tests/test_tesseract_sdk.py index 3a385365..bfe6c643 100644 --- a/tests/endtoend_tests/test_tesseract_sdk.py +++ b/tests/endtoend_tests/test_tesseract_sdk.py @@ -210,11 +210,11 @@ def test_signature_consistency(): ) -def test_logpipe_consistency(): - """Test that the source code of the two duplicate LogPipe implementations is identical.""" - from tesseract_core.runtime.logs import LogPipe as RuntimeLogPipe - from tesseract_core.sdk.logs import LogPipe as SDKLogPipe +def test_teepipe_consistency(): + """Test that the source code of the two duplicate TeePipe implementations is identical.""" + from tesseract_core.runtime.logs import TeePipe as RuntimeTeePipe + from tesseract_core.sdk.logs import TeePipe as SDKTeePipe - runtime_source = inspect.getsource(RuntimeLogPipe) - sdk_source = inspect.getsource(SDKLogPipe) + runtime_source = inspect.getsource(RuntimeTeePipe) + sdk_source = inspect.getsource(SDKTeePipe) assert runtime_source == sdk_source diff --git a/tests/sdk_tests/test_engine.py b/tests/sdk_tests/test_engine.py index 8cdb700f..a4241afd 100644 --- a/tests/sdk_tests/test_engine.py +++ b/tests/sdk_tests/test_engine.py @@ -5,6 +5,7 @@ import logging import os import random +import threading import time from pathlib import Path @@ -316,9 +317,9 @@ def raise_docker_error(*args, **kwargs): run_something_with_docker() -def test_logpipe(caplog): +def test_teepipe(caplog): # Verify that logging in a separate thread works as intended - from tesseract_core.sdk.logs import LogPipe + from tesseract_core.sdk.logs import TeePipe logger = logging.getLogger("tesseract") caplog.set_level(logging.INFO, logger="tesseract") @@ -329,20 +330,60 @@ def test_logpipe(caplog): msg = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=msg_length)) logged_lines.append(msg) - logpipe = LogPipe(logger.info) - with logpipe: - fd = os.fdopen(logpipe.fileno(), "w", closefd=False) + teepipe = TeePipe(logger.info) + with teepipe: + fd = os.fdopen(teepipe.fileno(), "w", closefd=False) for line in logged_lines: print(line, file=fd) time.sleep(random.random() / 100) fd.flush() - assert logpipe.captured_lines == logged_lines + assert teepipe.captured_lines == logged_lines assert caplog.record_tuples == [ ("tesseract", logging.INFO, line) for line in logged_lines ] +def test_teepipe_early_exit(): + # Verify that TeePipe can handle early exit without hanging or losing data + from tesseract_core.sdk.logs import TeePipe + + logged_lines = [] + for _ in range(100): + msg_length = 2 ** random.randint(2, 12) + msg = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=msg_length)) + logged_lines.append(msg) + + teepipe = TeePipe() + + teepipe.start() + fd = os.fdopen(teepipe.fileno(), "w", closefd=False) + + def _write_to_pipe(): + for line in logged_lines: + print(line, file=fd, flush=True) + time.sleep(random.random() / 100) + + print("end without newline", end="", file=fd, flush=True) + fd.flush() + + writer_thread = threading.Thread(target=_write_to_pipe) + writer_thread.start() + + # Wait for the first data to roll in, i.e., thread is up and running + while not teepipe.captured_lines: + time.sleep(0.01) + + assert len(teepipe.captured_lines) < len(logged_lines) + + # Exit the pipe early before all data is written + # This should block until no more data is incoming + teepipe.stop() + + assert len(teepipe.captured_lines) == len(logged_lines) + 1 + assert teepipe.captured_lines == [*logged_lines, "end without newline"] + + def test_parse_requirements(tmpdir): reqs = """ --extra-index-url https://download.pytorch.org/whl/cpu From 2361d48cde5f59e0bb082ed40958daddf734c5c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Wed, 12 Nov 2025 15:45:02 +0100 Subject: [PATCH 02/16] comment for clarity --- tesseract_core/runtime/logs.py | 2 ++ tesseract_core/sdk/logs.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index d6486837..2166da77 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -40,6 +40,8 @@ def stop(self) -> None: grace = 0.1 while (time.time() - self._last_line_time) < grace: time.sleep(grace / 10) + + # This will signal EOF to the reader thread os.close(self._fd_read) # Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong self.join(timeout=1) diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index a2c22654..5da3aa52 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -60,6 +60,8 @@ def stop(self) -> None: grace = 0.1 while (time.time() - self._last_line_time) < grace: time.sleep(grace / 10) + + # This will signal EOF to the reader thread os.close(self._fd_read) # Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong self.join(timeout=1) From f3de9a96d07178a98f34c55cbb9930a6cead7a1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Wed, 12 Nov 2025 16:35:34 +0100 Subject: [PATCH 03/16] hopefully fix hanging tests --- tesseract_core/runtime/logs.py | 8 ++------ tesseract_core/sdk/logs.py | 8 ++------ tests/sdk_tests/test_engine.py | 2 +- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 2166da77..ca72f8d1 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -23,9 +23,6 @@ def __init__(self, *sinks: Callable) -> None: super().__init__() self._sinks = sinks self._fd_read, self._fd_write = os.pipe() - self._pipe_reader = os.fdopen( - self._fd_read, mode="r", closefd=False, buffering=1024 - ) self._captured_lines = [] self._last_line_time = time.time() @@ -42,11 +39,10 @@ def stop(self) -> None: time.sleep(grace / 10) # This will signal EOF to the reader thread + os.close(self._fd_write) os.close(self._fd_read) # Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong self.join(timeout=1) - self._pipe_reader.close() - os.close(self._fd_write) def __exit__(self, *args: Any) -> None: """Close the pipe and join the thread.""" @@ -61,7 +57,7 @@ def run(self) -> None: line_buffer = [] while True: try: - data = self._pipe_reader.readline(1024) + data = os.read(self._fd_read, 1024).decode() except OSError: # Pipe closed break diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index 5da3aa52..eb356744 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -43,9 +43,6 @@ def __init__(self, *sinks: Callable) -> None: super().__init__() self._sinks = sinks self._fd_read, self._fd_write = os.pipe() - self._pipe_reader = os.fdopen( - self._fd_read, mode="r", closefd=False, buffering=1024 - ) self._captured_lines = [] self._last_line_time = time.time() @@ -62,11 +59,10 @@ def stop(self) -> None: time.sleep(grace / 10) # This will signal EOF to the reader thread + os.close(self._fd_write) os.close(self._fd_read) # Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong self.join(timeout=1) - self._pipe_reader.close() - os.close(self._fd_write) def __exit__(self, *args: Any) -> None: """Close the pipe and join the thread.""" @@ -81,7 +77,7 @@ def run(self) -> None: line_buffer = [] while True: try: - data = self._pipe_reader.readline(1024) + data = os.read(self._fd_read, 1024).decode() except OSError: # Pipe closed break diff --git a/tests/sdk_tests/test_engine.py b/tests/sdk_tests/test_engine.py index a4241afd..8d96a666 100644 --- a/tests/sdk_tests/test_engine.py +++ b/tests/sdk_tests/test_engine.py @@ -365,7 +365,6 @@ def _write_to_pipe(): time.sleep(random.random() / 100) print("end without newline", end="", file=fd, flush=True) - fd.flush() writer_thread = threading.Thread(target=_write_to_pipe) writer_thread.start() @@ -374,6 +373,7 @@ def _write_to_pipe(): while not teepipe.captured_lines: time.sleep(0.01) + # Sanity check that not all data has been written yet assert len(teepipe.captured_lines) < len(logged_lines) # Exit the pipe early before all data is written From 88e0b1892f5afa53fb5dbf9c35abd498989f7497 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Wed, 12 Nov 2025 21:12:02 +0100 Subject: [PATCH 04/16] back to line-based output --- tesseract_core/runtime/logs.py | 34 +++++++++++++++++++--------------- tesseract_core/sdk/logs.py | 34 +++++++++++++++++++--------------- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index ca72f8d1..21db20fb 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -24,7 +24,7 @@ def __init__(self, *sinks: Callable) -> None: self._sinks = sinks self._fd_read, self._fd_write = os.pipe() self._captured_lines = [] - self._last_line_time = time.time() + self._last_time = time.time() def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" @@ -35,7 +35,7 @@ def stop(self) -> None: """Close the pipe and join the thread.""" # Wait for ongoing reads to complete grace = 0.1 - while (time.time() - self._last_line_time) < grace: + while (time.time() - self._last_time) < grace: time.sleep(grace / 10) # This will signal EOF to the reader thread @@ -53,7 +53,7 @@ def fileno(self) -> int: return self._fd_write def run(self) -> None: - """Run the thread, logging everything.""" + """Run the thread, pushing every full line of text to the sinks.""" line_buffer = [] while True: try: @@ -65,24 +65,28 @@ def run(self) -> None: # EOF reached break - self._last_line_time = time.time() + self._last_time = time.time() + + lines = data.splitlines() if data.endswith("\n"): - data = data[:-1] - flush = True - else: - flush = False - - line_buffer.append(data) - if flush: - line = "".join(line_buffer) - line_buffer.clear() + # Treat trailing newline as an empty line + lines.append("") + + # Log complete lines + for i, line in enumerate(lines[:-1]): + if i == 0: + line = "".join([*line_buffer, line]) + line_buffer = [] self._captured_lines.append(line) for sink in self._sinks: sink(line) + # Accumulate incomplete line + line_buffer.append(lines[-1]) + # Flush incomplete lines at the end of the stream - if line_buffer: - line = "".join(line_buffer) + line = "".join(line_buffer) + if line: self._captured_lines.append(line) for sink in self._sinks: sink(line) diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index eb356744..9788e489 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -44,7 +44,7 @@ def __init__(self, *sinks: Callable) -> None: self._sinks = sinks self._fd_read, self._fd_write = os.pipe() self._captured_lines = [] - self._last_line_time = time.time() + self._last_time = time.time() def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" @@ -55,7 +55,7 @@ def stop(self) -> None: """Close the pipe and join the thread.""" # Wait for ongoing reads to complete grace = 0.1 - while (time.time() - self._last_line_time) < grace: + while (time.time() - self._last_time) < grace: time.sleep(grace / 10) # This will signal EOF to the reader thread @@ -73,7 +73,7 @@ def fileno(self) -> int: return self._fd_write def run(self) -> None: - """Run the thread, logging everything.""" + """Run the thread, pushing every full line of text to the sinks.""" line_buffer = [] while True: try: @@ -85,24 +85,28 @@ def run(self) -> None: # EOF reached break - self._last_line_time = time.time() - if data.endswith("\n"): - data = data[:-1] - flush = True - else: - flush = False + self._last_time = time.time() - line_buffer.append(data) - if flush: - line = "".join(line_buffer) - line_buffer.clear() + lines = data.splitlines() + if data.endswith("\n"): + # Treat trailing newline as an empty line + lines.append("") + + # Log complete lines + for i, line in enumerate(lines[:-1]): + if i == 0: + line = "".join([*line_buffer, line]) + line_buffer = [] self._captured_lines.append(line) for sink in self._sinks: sink(line) + # Accumulate incomplete line + line_buffer.append(lines[-1]) + # Flush incomplete lines at the end of the stream - if line_buffer: - line = "".join(line_buffer) + line = "".join(line_buffer) + if line: self._captured_lines.append(line) for sink in self._sinks: sink(line) From 8243922f9ed610c696ee79ab8879c2fa6e44d19d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Wed, 12 Nov 2025 21:20:43 +0100 Subject: [PATCH 05/16] improve docstring --- tesseract_core/runtime/logs.py | 13 ++++++++++--- tesseract_core/sdk/logs.py | 13 ++++++++++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 21db20fb..47a5bb7f 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -10,10 +10,17 @@ # NOTE: This is duplicated in `tesseract_core/sdk/logs.py`. # Make sure to propagate changes to both files. class TeePipe(threading.Thread): - """Custom I/O pipe to support live logging to multiple sinks. + """Custom I/O construct to support live logging from a single file descriptor to multiple sinks. - Runs a thread that logs everything written to the pipe to the given sinks. - Can be used as a context manager for automatic cleanup. + Runs a thread that records everything written to the file descriptor. Can be used as a + context manager for automatic cleanup. + + Example: + >>> with TeePipe(print, logger.info) as pipe_fd: + ... fd = os.fdopen(pipe_fd, "w") + ... print("Hello, World!", file=fd, flush=True) + Hello, World! + 2025-06-10 12:00:00,000 - INFO - Hello, World! """ daemon = True diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index 9788e489..650531b8 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -30,10 +30,17 @@ # NOTE: This is duplicated in `tesseract_core/runtime/logs.py`. # Make sure to propagate changes to both files. class TeePipe(threading.Thread): - """Custom I/O pipe to support live logging to multiple sinks. + """Custom I/O construct to support live logging from a single file descriptor to multiple sinks. - Runs a thread that logs everything written to the pipe to the given sinks. - Can be used as a context manager for automatic cleanup. + Runs a thread that records everything written to the file descriptor. Can be used as a + context manager for automatic cleanup. + + Example: + >>> with TeePipe(print, logger.info) as pipe_fd: + ... fd = os.fdopen(pipe_fd, "w") + ... print("Hello, World!", file=fd, flush=True) + Hello, World! + 2025-06-10 12:00:00,000 - INFO - Hello, World! """ daemon = True From 4941bcb3c2b2e711c636ac2d026225699b920a3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Wed, 12 Nov 2025 21:46:45 +0100 Subject: [PATCH 06/16] try again --- tests/sdk_tests/test_engine.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/sdk_tests/test_engine.py b/tests/sdk_tests/test_engine.py index 8d96a666..37a5f474 100644 --- a/tests/sdk_tests/test_engine.py +++ b/tests/sdk_tests/test_engine.py @@ -334,14 +334,13 @@ def test_teepipe(caplog): with teepipe: fd = os.fdopen(teepipe.fileno(), "w", closefd=False) for line in logged_lines: - print(line, file=fd) + print(line, file=fd, flush=True) time.sleep(random.random() / 100) - fd.flush() - assert teepipe.captured_lines == logged_lines assert caplog.record_tuples == [ ("tesseract", logging.INFO, line) for line in logged_lines ] + assert teepipe.captured_lines == logged_lines def test_teepipe_early_exit(): From f31850141dc5432496bcf459c5e921d1dda593ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Thu, 13 Nov 2025 10:49:02 +0100 Subject: [PATCH 07/16] decode as late as possible --- tesseract_core/runtime/logs.py | 14 ++++++++------ tesseract_core/sdk/logs.py | 14 ++++++++------ tests/sdk_tests/test_engine.py | 3 ++- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 47a5bb7f..0c033fa5 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -64,26 +64,27 @@ def run(self) -> None: line_buffer = [] while True: try: - data = os.read(self._fd_read, 1024).decode() + data = os.read(self._fd_read, 1024) except OSError: # Pipe closed break - if data == "": + if data == b"": # EOF reached break self._last_time = time.time() lines = data.splitlines() - if data.endswith("\n"): + if data.endswith(b"\n"): # Treat trailing newline as an empty line - lines.append("") + lines.append(b"") # Log complete lines for i, line in enumerate(lines[:-1]): if i == 0: - line = "".join([*line_buffer, line]) + line = b"".join([*line_buffer, line]) line_buffer = [] + line = line.decode(errors="ignore") self._captured_lines.append(line) for sink in self._sinks: sink(line) @@ -92,8 +93,9 @@ def run(self) -> None: line_buffer.append(lines[-1]) # Flush incomplete lines at the end of the stream - line = "".join(line_buffer) + line = b"".join(line_buffer) if line: + line = line.decode(errors="ignore") self._captured_lines.append(line) for sink in self._sinks: sink(line) diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index 650531b8..6086de66 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -84,26 +84,27 @@ def run(self) -> None: line_buffer = [] while True: try: - data = os.read(self._fd_read, 1024).decode() + data = os.read(self._fd_read, 1024) except OSError: # Pipe closed break - if data == "": + if data == b"": # EOF reached break self._last_time = time.time() lines = data.splitlines() - if data.endswith("\n"): + if data.endswith(b"\n"): # Treat trailing newline as an empty line - lines.append("") + lines.append(b"") # Log complete lines for i, line in enumerate(lines[:-1]): if i == 0: - line = "".join([*line_buffer, line]) + line = b"".join([*line_buffer, line]) line_buffer = [] + line = line.decode(errors="ignore") self._captured_lines.append(line) for sink in self._sinks: sink(line) @@ -112,8 +113,9 @@ def run(self) -> None: line_buffer.append(lines[-1]) # Flush incomplete lines at the end of the stream - line = "".join(line_buffer) + line = b"".join(line_buffer) if line: + line = line.decode(errors="ignore") self._captured_lines.append(line) for sink in self._sinks: sink(line) diff --git a/tests/sdk_tests/test_engine.py b/tests/sdk_tests/test_engine.py index 37a5f474..27111742 100644 --- a/tests/sdk_tests/test_engine.py +++ b/tests/sdk_tests/test_engine.py @@ -334,8 +334,9 @@ def test_teepipe(caplog): with teepipe: fd = os.fdopen(teepipe.fileno(), "w", closefd=False) for line in logged_lines: - print(line, file=fd, flush=True) + print(line, file=fd) time.sleep(random.random() / 100) + fd.close() assert caplog.record_tuples == [ ("tesseract", logging.INFO, line) for line in logged_lines From 2f59cc8a8e2653d71433e7185fae89c795818977 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Thu, 13 Nov 2025 10:50:59 +0100 Subject: [PATCH 08/16] empty commit to trigger ci From 1b3e60b6afba6eeb87e659b36c39d6d19db62d56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Mon, 17 Nov 2025 10:59:17 +0100 Subject: [PATCH 09/16] sleep more --- tesseract_core/runtime/logs.py | 5 ++++- tesseract_core/sdk/logs.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 0c033fa5..8753e907 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -40,14 +40,17 @@ def __enter__(self) -> int: def stop(self) -> None: """Close the pipe and join the thread.""" - # Wait for ongoing reads to complete grace = 0.1 + # Initial sleep to allow messages from flush-and-close to arrive + time.sleep(grace / 10) + # Wait for ongoing streams to dry up while (time.time() - self._last_time) < grace: time.sleep(grace / 10) # This will signal EOF to the reader thread os.close(self._fd_write) os.close(self._fd_read) + # Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong self.join(timeout=1) diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index 6086de66..007649f2 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -60,14 +60,17 @@ def __enter__(self) -> int: def stop(self) -> None: """Close the pipe and join the thread.""" - # Wait for ongoing reads to complete grace = 0.1 + # Initial sleep to allow messages from flush-and-close to arrive + time.sleep(grace / 10) + # Wait for ongoing streams to dry up while (time.time() - self._last_time) < grace: time.sleep(grace / 10) # This will signal EOF to the reader thread os.close(self._fd_write) os.close(self._fd_read) + # Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong self.join(timeout=1) From cccfdd2c0835e530a3fee0592105921e52b45f00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Mon, 17 Nov 2025 11:10:49 +0100 Subject: [PATCH 10/16] try with an event instead --- tesseract_core/runtime/logs.py | 15 ++++++++++----- tesseract_core/sdk/logs.py | 17 +++++++++++------ 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 8753e907..37414af3 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -32,6 +32,7 @@ def __init__(self, *sinks: Callable) -> None: self._fd_read, self._fd_write = os.pipe() self._captured_lines = [] self._last_time = time.time() + self._is_blocking = threading.Event() def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" @@ -41,10 +42,12 @@ def __enter__(self) -> int: def stop(self) -> None: """Close the pipe and join the thread.""" grace = 0.1 - # Initial sleep to allow messages from flush-and-close to arrive - time.sleep(grace / 10) # Wait for ongoing streams to dry up - while (time.time() - self._last_time) < grace: + # We only continue once the reader has spent some time blocked on reading + while True: + self._is_blocking.wait(timeout=1) + if (time.time() - self._last_time) >= grace: + break time.sleep(grace / 10) # This will signal EOF to the reader thread @@ -66,17 +69,19 @@ def run(self) -> None: """Run the thread, pushing every full line of text to the sinks.""" line_buffer = [] while True: + self._last_time = time.time() + self._is_blocking.set() try: data = os.read(self._fd_read, 1024) + self._is_blocking.clear() except OSError: # Pipe closed break + if data == b"": # EOF reached break - self._last_time = time.time() - lines = data.splitlines() if data.endswith(b"\n"): # Treat trailing newline as an empty line diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index 007649f2..0da8269c 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -52,6 +52,7 @@ def __init__(self, *sinks: Callable) -> None: self._fd_read, self._fd_write = os.pipe() self._captured_lines = [] self._last_time = time.time() + self._is_blocking = threading.Event() def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" @@ -60,11 +61,13 @@ def __enter__(self) -> int: def stop(self) -> None: """Close the pipe and join the thread.""" - grace = 0.1 - # Initial sleep to allow messages from flush-and-close to arrive - time.sleep(grace / 10) # Wait for ongoing streams to dry up - while (time.time() - self._last_time) < grace: + # We only continue once the reader has spent some time blocked on reading + grace = 0.1 + while True: + self._is_blocking.wait(timeout=1) + if (time.time() - self._last_time) >= grace: + break time.sleep(grace / 10) # This will signal EOF to the reader thread @@ -86,17 +89,19 @@ def run(self) -> None: """Run the thread, pushing every full line of text to the sinks.""" line_buffer = [] while True: + self._last_time = time.time() + self._is_blocking.set() try: data = os.read(self._fd_read, 1024) + self._is_blocking.clear() except OSError: # Pipe closed break + if data == b"": # EOF reached break - self._last_time = time.time() - lines = data.splitlines() if data.endswith(b"\n"): # Treat trailing newline as an empty line From 92caffe30ee8657a28fe02dc4bde93b109152f5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Mon, 17 Nov 2025 12:46:09 +0100 Subject: [PATCH 11/16] try with outrageous grace period --- tesseract_core/runtime/logs.py | 2 +- tesseract_core/sdk/logs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 37414af3..e28a7d3c 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -41,9 +41,9 @@ def __enter__(self) -> int: def stop(self) -> None: """Close the pipe and join the thread.""" - grace = 0.1 # Wait for ongoing streams to dry up # We only continue once the reader has spent some time blocked on reading + grace = 10.0 while True: self._is_blocking.wait(timeout=1) if (time.time() - self._last_time) >= grace: diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index 0da8269c..e211c6ce 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -63,7 +63,7 @@ def stop(self) -> None: """Close the pipe and join the thread.""" # Wait for ongoing streams to dry up # We only continue once the reader has spent some time blocked on reading - grace = 0.1 + grace = 10.0 while True: self._is_blocking.wait(timeout=1) if (time.time() - self._last_time) >= grace: From 5a38d073179e5dca536d10cfd157c62009edb6b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Mon, 17 Nov 2025 13:36:18 +0100 Subject: [PATCH 12/16] make grace period configurable --- tesseract_core/runtime/logs.py | 6 +++--- tesseract_core/sdk/logs.py | 6 +++--- tests/sdk_tests/test_engine.py | 4 ++++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index e28a7d3c..3ef49167 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -33,6 +33,7 @@ def __init__(self, *sinks: Callable) -> None: self._captured_lines = [] self._last_time = time.time() self._is_blocking = threading.Event() + self._grace_period = 0.1 def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" @@ -43,12 +44,11 @@ def stop(self) -> None: """Close the pipe and join the thread.""" # Wait for ongoing streams to dry up # We only continue once the reader has spent some time blocked on reading - grace = 10.0 while True: self._is_blocking.wait(timeout=1) - if (time.time() - self._last_time) >= grace: + if (time.time() - self._last_time) >= self._grace_period: break - time.sleep(grace / 10) + time.sleep(self._grace_period / 10) # This will signal EOF to the reader thread os.close(self._fd_write) diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index e211c6ce..bd1afa7f 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -53,6 +53,7 @@ def __init__(self, *sinks: Callable) -> None: self._captured_lines = [] self._last_time = time.time() self._is_blocking = threading.Event() + self._grace_period = 0.1 def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" @@ -63,12 +64,11 @@ def stop(self) -> None: """Close the pipe and join the thread.""" # Wait for ongoing streams to dry up # We only continue once the reader has spent some time blocked on reading - grace = 10.0 while True: self._is_blocking.wait(timeout=1) - if (time.time() - self._last_time) >= grace: + if (time.time() - self._last_time) >= self._grace_period: break - time.sleep(grace / 10) + time.sleep(self._grace_period / 10) # This will signal EOF to the reader thread os.close(self._fd_write) diff --git a/tests/sdk_tests/test_engine.py b/tests/sdk_tests/test_engine.py index d482fcd1..5a55db75 100644 --- a/tests/sdk_tests/test_engine.py +++ b/tests/sdk_tests/test_engine.py @@ -376,6 +376,8 @@ def test_teepipe(caplog): logged_lines.append(msg) teepipe = TeePipe(logger.info) + # Extend grace period to avoid flakes in tests when runners are slow + teepipe._grace_period = 1 with teepipe: fd = os.fdopen(teepipe.fileno(), "w", closefd=False) for line in logged_lines: @@ -400,6 +402,8 @@ def test_teepipe_early_exit(): logged_lines.append(msg) teepipe = TeePipe() + # Extend grace period to avoid flakes in tests when runners are slow + teepipe._grace_period = 1 teepipe.start() fd = os.fdopen(teepipe.fileno(), "w", closefd=False) From 742e68b0cc0399e8087d95926fc76b037531eb99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Wed, 19 Nov 2025 11:57:36 +0100 Subject: [PATCH 13/16] make test harder by including whitespace + unicode --- tesseract_core/runtime/logs.py | 5 +--- tesseract_core/sdk/logs.py | 5 +--- tests/sdk_tests/test_engine.py | 42 ++++++++++++++++++++++++++-------- 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 3ef49167..c836cc7c 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -82,10 +82,7 @@ def run(self) -> None: # EOF reached break - lines = data.splitlines() - if data.endswith(b"\n"): - # Treat trailing newline as an empty line - lines.append(b"") + lines = data.split(b"\n") # Log complete lines for i, line in enumerate(lines[:-1]): diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index bd1afa7f..9255260f 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -102,10 +102,7 @@ def run(self) -> None: # EOF reached break - lines = data.splitlines() - if data.endswith(b"\n"): - # Treat trailing newline as an empty line - lines.append(b"") + lines = data.split(b"\n") # Log complete lines for i, line in enumerate(lines[:-1]): diff --git a/tests/sdk_tests/test_engine.py b/tests/sdk_tests/test_engine.py index 5a55db75..fb1902bf 100644 --- a/tests/sdk_tests/test_engine.py +++ b/tests/sdk_tests/test_engine.py @@ -5,6 +5,7 @@ import logging import os import random +import string import threading import time from pathlib import Path @@ -371,8 +372,14 @@ def test_teepipe(caplog): logged_lines = [] for _ in range(100): - msg_length = 2 ** random.randint(1, 12) - msg = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=msg_length)) + # Make sure to include a few really long lines without breaks + if random.random() < 0.1: + msg_length = random.randint(1000, 10_000) + alphabet = string.ascii_letters + "🤯" + else: + msg_length = 2 ** random.randint(2, 12) + alphabet = string.printable + "🤯" + msg = "".join(random.choices(alphabet, k=msg_length)) logged_lines.append(msg) teepipe = TeePipe(logger.info) @@ -385,10 +392,15 @@ def test_teepipe(caplog): time.sleep(random.random() / 100) fd.close() + expected_lines = [] + for line in logged_lines: + sublines = line.split("\n") + expected_lines.extend(sublines) + + assert teepipe.captured_lines == expected_lines assert caplog.record_tuples == [ - ("tesseract", logging.INFO, line) for line in logged_lines + ("tesseract", logging.INFO, line) for line in expected_lines ] - assert teepipe.captured_lines == logged_lines def test_teepipe_early_exit(): @@ -397,8 +409,14 @@ def test_teepipe_early_exit(): logged_lines = [] for _ in range(100): - msg_length = 2 ** random.randint(2, 12) - msg = "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=msg_length)) + # Make sure to include a few really long lines without breaks + if random.random() < 0.1: + msg_length = random.randint(1000, 10_000) + alphabet = string.ascii_letters + "🤯" + else: + msg_length = 2 ** random.randint(2, 12) + alphabet = string.printable + "🤯" + msg = "".join(random.choices(alphabet, k=msg_length)) logged_lines.append(msg) teepipe = TeePipe() @@ -415,6 +433,12 @@ def _write_to_pipe(): print("end without newline", end="", file=fd, flush=True) + expected_lines = [] + for line in logged_lines: + sublines = line.split("\n") + expected_lines.extend(sublines) + expected_lines.append("end without newline") + writer_thread = threading.Thread(target=_write_to_pipe) writer_thread.start() @@ -423,14 +447,14 @@ def _write_to_pipe(): time.sleep(0.01) # Sanity check that not all data has been written yet - assert len(teepipe.captured_lines) < len(logged_lines) + assert len(teepipe.captured_lines) < len(expected_lines) # Exit the pipe early before all data is written # This should block until no more data is incoming teepipe.stop() - assert len(teepipe.captured_lines) == len(logged_lines) + 1 - assert teepipe.captured_lines == [*logged_lines, "end without newline"] + assert len(teepipe.captured_lines) == len(expected_lines) + assert teepipe.captured_lines == expected_lines def test_parse_requirements(tmpdir): From 6fde2b191427af728c143113ef244592106420a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Wed, 19 Nov 2025 13:21:54 +0100 Subject: [PATCH 14/16] try without ignoring decode errors --- tesseract_core/runtime/logs.py | 2 +- tesseract_core/sdk/logs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index c836cc7c..0aa445e1 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -89,7 +89,7 @@ def run(self) -> None: if i == 0: line = b"".join([*line_buffer, line]) line_buffer = [] - line = line.decode(errors="ignore") + line = line.decode() # (errors="ignore") self._captured_lines.append(line) for sink in self._sinks: sink(line) diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index 9255260f..27f74dbd 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -109,7 +109,7 @@ def run(self) -> None: if i == 0: line = b"".join([*line_buffer, line]) line_buffer = [] - line = line.decode(errors="ignore") + line = line.decode() # (errors="ignore") self._captured_lines.append(line) for sink in self._sinks: sink(line) From 851ceb0a0d7094bf294cdaffa7666b8e2d0c1262 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Thu, 20 Nov 2025 17:02:38 +0100 Subject: [PATCH 15/16] rich bug poor bug --- tesseract_core/runtime/logs.py | 2 +- tesseract_core/sdk/logs.py | 2 +- tests/sdk_tests/test_engine.py | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 0aa445e1..c836cc7c 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -89,7 +89,7 @@ def run(self) -> None: if i == 0: line = b"".join([*line_buffer, line]) line_buffer = [] - line = line.decode() # (errors="ignore") + line = line.decode(errors="ignore") self._captured_lines.append(line) for sink in self._sinks: sink(line) diff --git a/tesseract_core/sdk/logs.py b/tesseract_core/sdk/logs.py index 27f74dbd..9255260f 100644 --- a/tesseract_core/sdk/logs.py +++ b/tesseract_core/sdk/logs.py @@ -109,7 +109,7 @@ def run(self) -> None: if i == 0: line = b"".join([*line_buffer, line]) line_buffer = [] - line = line.decode() # (errors="ignore") + line = line.decode(errors="ignore") self._captured_lines.append(line) for sink in self._sinks: sink(line) diff --git a/tests/sdk_tests/test_engine.py b/tests/sdk_tests/test_engine.py index fb1902bf..e87acceb 100644 --- a/tests/sdk_tests/test_engine.py +++ b/tests/sdk_tests/test_engine.py @@ -365,7 +365,10 @@ def raise_docker_error(*args, **kwargs): def test_teepipe(caplog): # Verify that logging in a separate thread works as intended - from tesseract_core.sdk.logs import TeePipe + from tesseract_core.sdk.logs import TeePipe, set_logger + + # Disable rich to ensure what we log is what we read + set_logger("info", catch_warnings=True, rich_format=False) logger = logging.getLogger("tesseract") caplog.set_level(logging.INFO, logger="tesseract") From f1f0ce615db25b84103d6fca4b9fd402194d813e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Thu, 4 Dec 2025 12:37:52 +0100 Subject: [PATCH 16/16] remove file added by mistake --- shapeopt.md | 99 ----------------------------------------------------- 1 file changed, 99 deletions(-) delete mode 100644 shapeopt.md diff --git a/shapeopt.md b/shapeopt.md deleted file mode 100644 index 3eaee449..00000000 --- a/shapeopt.md +++ /dev/null @@ -1,99 +0,0 @@ -# Parametric Shape Optimization of Rocket Fins with Ansys SpaceClaim and PyAnsys - -Grid fins are lattice-like structures on multi-stage rockets that provide steering control across a wide range of speeds. For example during SpaceX Super Heavy booster re-entry, grid fins experience high dynamic pressure, much higher than during ascent. At this critical flight stage, the fins must maintain structural rigidity under maximum loading to preserve aerodynamic characteristics and enable precise trajectory control back to the landing pad. - -| Grid Fin Example | SpaceX Super Heavy Rocket | -| ------------- | ------------- | -| ![grid](imgs/grid_fin_example.png) | ![star](imgs/superheavy.png) | - -This case study demonstrates a gradient-based optimization workflow combining Ansys tools with Tesseract-driven differentiable programming. The goal is to maximize grid fin stiffness while maintaining a fixed mass constraint of 8 bars. Higher stiffness reduces deformation during Max-Q, keeping lift and drag coefficients consistent for reliable aerodynamic control. - -Each bar is defined by start and end angular positions, giving us 16 design parameters to optimize. Below are two example starting configurations: - -| Grid initial conditions | Random initial conditions | -| ------------- | ------------- | -| ![grid](imgs/grid_surface.png) | ![star](imgs/rnd_surface.png) | - -The simulation uses fixed boundary conditions at the knuckles (where the fin attaches to the rocket) and an out-of-plane load at the fin tip. This load placement approximates the aerodynamic forces while decoupling them from the bar geometry during optimization: - -![BCs](imgs/boundary_conditions.png) - -The simulation uses a linear elastic finite element solver with small deformation assumptions. To maximize stiffness, we minimize compliance, which is the inverse measure of structural rigidity. - -## Workflow - -This workflow demonstrates an end-to-end gradient-based optimization connecting Ansys SpaceClaim for parametric geometry generation with PyAnsys for finite element analysis. Tesseract acts as glue between components, by packaging each component into a unified interface with built-in differentiation support. By composing Tesseracts with [Tesseract-JAX](https://github.com/pasteurlabs/tesseract-jax), we can leverage automatic differentiation across the entire pipeline: - -![Workflow](imgs/workflow_1.png) - - -The workflow uses three Tesseract components: - -- **Ansys SpaceClaim Tesseract**: Takes design parameters and generates the grid fin geometry through a SpaceClaim script, returning a triangular surface mesh (vertices and faces). Described in more detail in the [Tesseract docs](https://docs.pasteurlabs.ai/projects/tesseract-core/latest/content/examples/ansys_integration/spaceclaim_tess.html). - -- **SDF and Finite Difference Tesseract**: Converts the surface mesh into a signed distance field (SDF) on a regular grid. Additionally computes gradients with respect to design parameters using finite differences. Takes another Tesseract (here: SpaceClaim Tesseract) as input, which makes it a higher-order Tesseract. This Tesseract can work with any mesh generator Tesseract that matches the expected interface. - -- **PyMAPDL Tesseract**: Takes a hex mesh and boundary conditions as inputs, then solves the linear elasticity problem using PyMAPDL. Returns strain energy per cell and total compliance, with full gradient support for optimization via an analytical adjoint. Described in more detail in the [Tesseract docs](https://docs.pasteurlabs.ai/projects/tesseract-core/latest/content/examples/ansys_integration/pymapdl_tess.html). - -Between these Tesseracts, standard Python code handles hex mesh generation, boundary condition setup, and density derivation from the SDF. The hex mesh generation and boundary condition setup operations don't require differentiation since we optimize with respect to field quantities on mesh cells, not the mesh structure itself. The density function uses a sigmoid-like mapping and is differentiated with [JAX](github.com/google/jax)'s automatic differentiation. - - -## Optimization - -We first compare the two initial configurations. The regular grid (compliance: 61.9) significantly outperforms the random arrangement (compliance: 87.0). The plots below show strain energy and compliance sensitivities with respect to density. Negative sensitivity values indicate where adding material would reduce compliance. Note the tendency to thicken bars along their length, though this isn't achievable under our angular parametrization. - -![Workflow](imgs/sim_comparision.png) - -We run gradient-based optimization using Adam (learning rate: 0.01, 80 iterations) on both initial conditions. - -| Grid IC | Random IC | -| ------------- | ------------- | -| ![grid](imgs/mesh_grid_adam.gif) | ![star](imgs/mesh_rnd_adam.gif) | - -Both runs converge to similar asymmetric solutions. The optimizer concentrates material near the knuckle attachments to maximize local stiffness, consistent with the strain energy distributions showing highest concentrations at the fixed boundaries. The emergence of a grid-like structure from random initial conditions suggests the solver finds a near-optimal topology. - -However, the resulting geometries lack symmetry and would be difficult to manufacture. While increasing the number of optimization iterations might improve symmetry, explicitly enforcing symmetry constraints on the parameters would be more effective. - -The compliance evolution for both initial conditions is shown below: - -![loss](imgs/conv_rnd.png) - -Both configurations converge to similar final compliance values. The optimization reveals three structural behaviors for optimal load paths under these boundary conditions: - -- **Emergent Orthogonality**: Regardless of initialization, the topology settles into roughly equal numbers of lateral and longitudinal members. The random initialization is particularly revealing, where bars initially spanning 180 degrees reorganize into a nearly orthogonal pattern. - -- **Diagonal Lateral Load Paths**: Lateral bars orient diagonally relative to the opposing knuckle, creating direct load paths that efficiently transfer tip moments to the fixed boundary. - -- **Root Reinforcement**: Longitudinal bars align vertically and cluster near the knuckles. Concentrating material at the fixed boundary stiffens the fin root where strain energy gradients are highest. - -## Results - -The optimized designs achieve higher stiffness at constant mass through non-uniform bar distributions. Lower compliance translates to reduced deformation under load, maintaining consistent aerodynamic coefficients during re-entry. However, the asymmetric topologies present manufacturing challenges and would provide unequal control authority in different flight directions. - -The final step in shape optimization is translating computational insights into manufacturable designs that satisfy practical constraints the optimizer didn't account for. We interpret the three structural behaviors (Emergent Orthogonality, Diagonal Lateral Load Paths, Root Reinforcement) into a symmetric, manufacturable geometry: - -| Final Geometry | Comparison with optimization results | -| ------------- | ------------- | -| ![manual_result](imgs/surface_radial_manual.png) | ![conv_with_manual](imgs/conv_with_manual.png) | - -Running this geometry through the Tesseract pipeline yields a compliance of 49.8, that is, a design that is 24% stiffer than the original grid and 75% stiffer than random bars. While not matching the fully optimized result, this design balances performance with manufacturability and symmetric aerodynamic characteristics. - -This demonstrates how gradient-based optimization with the Tesseract ecosystem and Ansys software can guide practical engineering decisions, even when the final design incorporates constraints beyond the optimization problem. - -## Why Tesseract? - -This case study demonstrates several capabilities that make Tesseract practical for simulation-driven design workflows: - -- **Composability**: Each component (geometry generation, meshing, FEM) is independently packaged. You can swap SpaceClaim for another CAD tool or replace PyMAPDL with a different solver without rewriting the pipeline. We've validated this by running the same workflow with [PyVista](https://docs.pyvista.org/) geometry and the [JAX-FEM](https://github.com/deepmodeling/jax-fem) solver. - -![Workflow](imgs/workflow_2.png) - -- **Gradient Support**: Tesseract's differentiation interface connects tools not originally designed for gradient-based optimization. This workflow combines analytic adjoints (PyMAPDL), finite differences (SDF conversion), and automatic differentiation (JAX glue code) into a complete gradient chain for the optimization problem. - -- **Heterogeneous Compute**: Tesseract integrates across different operating systems and environments. This workflow runs PyAnsys tools on Windows (with specific licenses and installation requirements) while executing optimization logic and Tesseract orchestration on Linux. - -- **Dependency Management**: Setting up workflows with multiple commercial and open-source packages typically creates dependency conflicts. Each Tesseract is self-contained with its own environment, isolating Python packages and system requirements (like OpenGL libraries for meshing). This makes workflows reproducible and eliminates version conflicts. - -- **Team Collaboration**: Tesseract uses a contract-first approach where each component's inputs and outputs are defined upfront through schemas. Different engineers can develop components independently against these interfaces, reducing integration issues when combining work. - -This approach generalizes beyond structural optimization to virtually any workflow involving simulation tools, multiphysics coupling, design exploration, or inverse problems.