diff --git a/examples/reproducer/run.sh b/examples/reproducer/run.sh new file mode 100644 index 00000000..1b4d0d88 --- /dev/null +++ b/examples/reproducer/run.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e + +tesseract --loglevel debug build . + +docker system prune --force + +tesseract run reproducer apply '{"inputs":{}}' --output-path outputs diff --git a/examples/reproducer/tesseract_api.py b/examples/reproducer/tesseract_api.py new file mode 100644 index 00000000..ef4b37bb --- /dev/null +++ b/examples/reproducer/tesseract_api.py @@ -0,0 +1,39 @@ +from concurrent.futures import ProcessPoolExecutor + +import numpy as np +from pydantic import BaseModel + + +class InputSchema(BaseModel): + pass + + +class OutputSchema(BaseModel): + pass + + +# FIXME: if pool.submit uses a function defined in tesseract_api.py +# we get a pickling error: module tesseract_api not found. +def preprocess_fn(data_id: int): + print(data_id, "processing") + return data_id + + +def apply(inputs): + data_ids = list(range(10)) + + pool = ProcessPoolExecutor() + futures = [] + + for idx in data_ids: + # this causes the pickling error + # x = pool.submit(preprocess_fn, idx) + x = pool.submit(np.identity, idx) + futures.append(x) + print(idx, "submitted") + + for f in futures: + res = f.result() + print(res, "done") + + return OutputSchema() diff --git a/examples/reproducer/tesseract_config.yaml b/examples/reproducer/tesseract_config.yaml new file mode 100644 index 00000000..4b3e1797 --- /dev/null +++ b/examples/reproducer/tesseract_config.yaml @@ -0,0 +1,2 @@ +name: "reproducer" +version: "0.0.1" diff --git a/tesseract_core/runtime/logs.py b/tesseract_core/runtime/logs.py index 4376cab2..6124a272 100644 --- a/tesseract_core/runtime/logs.py +++ b/tesseract_core/runtime/logs.py @@ -22,8 +22,9 @@ def __init__(self, *sinks: Callable) -> None: super().__init__() self._sinks = sinks self._fd_read, self._fd_write = os.pipe() - self._pipe_reader = os.fdopen(self._fd_read) + self._pipe_reader = os.fdopen(self._fd_read, closefd=False) self._captured_lines = [] + self._lock = threading.Lock() def __enter__(self) -> int: """Start the thread and return the write file descriptor of the pipe.""" @@ -32,13 +33,19 @@ def __enter__(self) -> int: def __exit__(self, *args: Any) -> None: """Close the pipe and join the thread.""" - os.close(self._fd_write) - # Use a timeout so something weird happening in the logging thread doesn't - # cause this to hang indefinitely - self.join(timeout=10) - # Do not close reader before thread is joined since there may be pending data - # This also closes the fd_read pipe - self._pipe_reader.close() + with self._lock: + os.close(self._fd_write) + + # Use a timeout so something weird happening in the logging thread doesn't + # cause this to hang indefinitely + # + # FIXME: this always times out in the multiprocessing case? + self.join(timeout=1) + + # Do not close reader before thread is joined since there may be pending data + # This also closes the fd_read pipe + os.close(self._fd_read) + self._pipe_reader.close() def fileno(self) -> int: """Return the write file descriptor of the pipe."""