From 7280a8ef65dbf361eb8c6ade6c80a152028c8a29 Mon Sep 17 00:00:00 2001 From: Niklas Heim Date: Thu, 28 Aug 2025 23:22:54 +0200 Subject: [PATCH 1/5] reproduce logpipe deadlock --- examples/reproducer/run.sh | 8 ++++ examples/reproducer/tesseract_api.py | 47 +++++++++++++++++++++++ examples/reproducer/tesseract_config.yaml | 2 + tesseract_core/runtime/logs.py | 21 ++++++++-- 4 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 examples/reproducer/run.sh create mode 100644 examples/reproducer/tesseract_api.py create mode 100644 examples/reproducer/tesseract_config.yaml diff --git a/examples/reproducer/run.sh b/examples/reproducer/run.sh new file mode 100644 index 00000000..1b4d0d88 --- /dev/null +++ b/examples/reproducer/run.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e + +tesseract --loglevel debug build . + +docker system prune --force + +tesseract run reproducer apply '{"inputs":{}}' --output-path outputs diff --git a/examples/reproducer/tesseract_api.py b/examples/reproducer/tesseract_api.py new file mode 100644 index 00000000..dc42c4f0 --- /dev/null +++ b/examples/reproducer/tesseract_api.py @@ -0,0 +1,47 @@ +from concurrent.futures import ProcessPoolExecutor +import numpy as np +from pydantic import BaseModel + +import typer + + +class InputSchema(BaseModel): + pass + + +class OutputSchema(BaseModel): + pass + + +# FIXME: if pool.submit uses a function defined in tesseract_api.py +# we get a pickling error: module tesseract_api not found. +def preprocess_fn(data_id: int): + print(data_id, "processing") + return data_id + + +def apply(inputs): + + data_ids = list(range(10)) + + pool = ProcessPoolExecutor() + futures = [] + + for idx in data_ids: + x = pool.submit(np.identity, idx) + futures.append(x) + print(idx, "submitted") + + for f in futures: + res = f.result() + print(res, "done") + + return OutputSchema() + + +app = typer.Typer() +app.command("apply", short_help="test")(apply) + +if __name__ == "__main__": + # apply(InputSchema()) + app() diff --git a/examples/reproducer/tesseract_config.yaml b/examples/reproducer/tesseract_config.yaml new file mode 100644 index 00000000..4b3e1797 --- /dev/null +++ b/examples/reproducer/tesseract_config.yaml @@ -0,0 +1,2 @@ +name: "reproducer" +version: "0.0.1" diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 4376cab2..5d08e548 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -22,8 +22,11 @@ def __init__(self, *sinks: Callable) -> None: super().__init__() self._sinks = sinks self._fd_read, self._fd_write = os.pipe() - self._pipe_reader = os.fdopen(self._fd_read) + self._pipe_reader = os.fdopen(self._fd_read, closefd=False) self._captured_lines = [] + self._lock = threading.Lock() + self._closed = False + print("opening!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" @@ -35,10 +38,21 @@ def __exit__(self, *args: Any) -> None: os.close(self._fd_write) # Use a timeout so something weird happening in the logging thread doesn't # cause this to hang indefinitely - self.join(timeout=10) + self.join(timeout=1) + # if self.is_alive(): + # raise ValueError("still alive") # Do not close reader before thread is joined since there may be pending data # This also closes the fd_read pipe - self._pipe_reader.close() + self.close_pipe() + + def close_pipe(self) -> None: + print("attempting close of log pipe") + with self._lock: + if not self._closed: + print("closing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + os.close(self._fd_read) + self._pipe_reader.close() + self._closed = True def fileno(self) -> int: """Return the write file descriptor of the pipe.""" @@ -52,6 +66,7 @@ def run(self) -> None: self._captured_lines.append(line) for sink in self._sinks: sink(line) + self.close_pipe() @property def captured_lines(self) -> list[str]: From d5083a826d91bd0cad2236ceb2748f730f943087 Mon Sep 17 00:00:00 2001 From: Niklas Heim Date: Thu, 28 Aug 2025 23:27:52 +0200 Subject: [PATCH 2/5] clean up --- tesseract_core/runtime/logs.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 5d08e548..fcb2eed3 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -22,11 +22,11 @@ def __init__(self, *sinks: Callable) -> None: super().__init__() self._sinks = sinks self._fd_read, self._fd_write = os.pipe() - self._pipe_reader = os.fdopen(self._fd_read, closefd=False) + self._pipe_reader = os.fdopen(self._fd_read) self._captured_lines = [] + self._lock = threading.Lock() self._closed = False - print("opening!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" @@ -39,19 +39,14 @@ def __exit__(self, *args: Any) -> None: # Use a timeout so something weird happening in the logging thread doesn't # cause this to hang indefinitely self.join(timeout=1) - # if self.is_alive(): - # raise ValueError("still alive") # Do not close reader before thread is joined since there may be pending data # This also closes the fd_read pipe self.close_pipe() def close_pipe(self) -> None: - print("attempting close of log pipe") with self._lock: if not self._closed: - print("closing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") os.close(self._fd_read) - self._pipe_reader.close() self._closed = True def fileno(self) -> int: From 47688aedeae63ef1f00bc2b8bd7e94b9238484b0 Mon Sep 17 00:00:00 2001 From: Niklas Heim Date: Thu, 28 Aug 2025 23:52:05 +0200 Subject: [PATCH 3/5] make it work with np.identity --- examples/reproducer/tesseract_api.py | 1 + tesseract_core/runtime/logs.py | 27 +++++++++++++-------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/examples/reproducer/tesseract_api.py b/examples/reproducer/tesseract_api.py index dc42c4f0..a602e7ff 100644 --- a/examples/reproducer/tesseract_api.py +++ b/examples/reproducer/tesseract_api.py @@ -28,6 +28,7 @@ def apply(inputs): futures = [] for idx in data_ids: + # x = pool.submit(preprocess_fn, idx) x = pool.submit(np.identity, idx) futures.append(x) print(idx, "submitted") diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index fcb2eed3..a152c049 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -22,7 +22,7 @@ def __init__(self, *sinks: Callable) -> None: super().__init__() self._sinks = sinks self._fd_read, self._fd_write = os.pipe() - self._pipe_reader = os.fdopen(self._fd_read) + self._pipe_reader = os.fdopen(self._fd_read, closefd=False) self._captured_lines = [] self._lock = threading.Lock() @@ -35,19 +35,19 @@ def __enter__(self) -> int: def __exit__(self, *args: Any) -> None: """Close the pipe and join the thread.""" - os.close(self._fd_write) - # Use a timeout so something weird happening in the logging thread doesn't - # cause this to hang indefinitely - self.join(timeout=1) - # Do not close reader before thread is joined since there may be pending data - # This also closes the fd_read pipe - self.close_pipe() - - def close_pipe(self) -> None: with self._lock: - if not self._closed: - os.close(self._fd_read) - self._closed = True + os.close(self._fd_write) + + # Use a timeout so something weird happening in the logging thread doesn't + # cause this to hang indefinitely + # + # FIXME: this always times out in the multiprocessing case? + self.join(timeout=1) + + # Do not close reader before thread is joined since there may be pending data + # This also closes the fd_read pipe + os.close(self._fd_read) + self._pipe_reader.close() def fileno(self) -> int: """Return the write file descriptor of the pipe.""" @@ -61,7 +61,6 @@ def run(self) -> None: self._captured_lines.append(line) for sink in self._sinks: sink(line) - self.close_pipe() @property def captured_lines(self) -> list[str]: From 26902f69f231f378ecf57c3caa69be86618505bd Mon Sep 17 00:00:00 2001 From: Niklas Heim Date: Thu, 28 Aug 2025 23:58:27 +0200 Subject: [PATCH 4/5] lint --- examples/reproducer/tesseract_api.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/examples/reproducer/tesseract_api.py b/examples/reproducer/tesseract_api.py index a602e7ff..ef4b37bb 100644 --- a/examples/reproducer/tesseract_api.py +++ b/examples/reproducer/tesseract_api.py @@ -1,9 +1,8 @@ from concurrent.futures import ProcessPoolExecutor + import numpy as np from pydantic import BaseModel -import typer - class InputSchema(BaseModel): pass @@ -21,13 +20,13 @@ def preprocess_fn(data_id: int): def apply(inputs): - data_ids = list(range(10)) pool = ProcessPoolExecutor() futures = [] for idx in data_ids: + # this causes the pickling error # x = pool.submit(preprocess_fn, idx) x = pool.submit(np.identity, idx) futures.append(x) @@ -38,11 +37,3 @@ def apply(inputs): print(res, "done") return OutputSchema() - - -app = typer.Typer() -app.command("apply", short_help="test")(apply) - -if __name__ == "__main__": - # apply(InputSchema()) - app() From f7838ce506eda6c19c1d42e05849022300cd815a Mon Sep 17 00:00:00 2001 From: Niklas Heim Date: Thu, 28 Aug 2025 23:59:59 +0200 Subject: [PATCH 5/5] clean up --- tesseract_core/runtime/logs.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index a152c049..6124a272 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -24,9 +24,7 @@ def __init__(self, *sinks: Callable) -> None: self._fd_read, self._fd_write = os.pipe() self._pipe_reader = os.fdopen(self._fd_read, closefd=False) self._captured_lines = [] - self._lock = threading.Lock() - self._closed = False def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe."""