-
Notifications
You must be signed in to change notification settings - Fork 4
fix: Multiprocessing deadlock #357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7280a8e
d5083a8
47688ae
3cca8d5
26902f6
f7838ce
364edca
16e507c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| #!/bin/bash | ||
| set -e | ||
|
|
||
| tesseract --loglevel debug build . | ||
|
|
||
| docker system prune --force | ||
|
|
||
| tesseract run reproducer apply '{"inputs":{}}' --output-path outputs | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| from concurrent.futures import ProcessPoolExecutor | ||
|
|
||
| import numpy as np | ||
| from pydantic import BaseModel | ||
|
|
||
|
|
||
| class InputSchema(BaseModel): | ||
| pass | ||
|
|
||
|
|
||
| class OutputSchema(BaseModel): | ||
| pass | ||
|
|
||
|
|
||
| # FIXME: if pool.submit uses a function defined in tesseract_api.py | ||
| # we get a pickling error: module tesseract_api not found. | ||
| def preprocess_fn(data_id: int): | ||
| print(data_id, "processing") | ||
| return data_id | ||
|
|
||
|
|
||
| def apply(inputs): | ||
| data_ids = list(range(10)) | ||
|
|
||
| pool = ProcessPoolExecutor() | ||
| futures = [] | ||
|
|
||
| for idx in data_ids: | ||
| # this causes the pickling error | ||
| # x = pool.submit(preprocess_fn, idx) | ||
| x = pool.submit(np.identity, idx) | ||
| futures.append(x) | ||
| print(idx, "submitted") | ||
|
|
||
| for f in futures: | ||
| res = f.result() | ||
| print(res, "done") | ||
|
|
||
| return OutputSchema() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| name: "reproducer" | ||
| version: "0.0.1" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,9 @@ def __init__(self, *sinks: Callable) -> None: | |
| super().__init__() | ||
| self._sinks = sinks | ||
| self._fd_read, self._fd_write = os.pipe() | ||
| self._pipe_reader = os.fdopen(self._fd_read) | ||
| self._pipe_reader = os.fdopen(self._fd_read, closefd=False) | ||
| self._captured_lines = [] | ||
| self._lock = threading.Lock() | ||
|
|
||
| def __enter__(self) -> int: | ||
| """Start the thread and return the write file descriptor of the pipe.""" | ||
|
|
@@ -32,13 +33,19 @@ def __enter__(self) -> int: | |
|
|
||
| def __exit__(self, *args: Any) -> None: | ||
| """Close the pipe and join the thread.""" | ||
| os.close(self._fd_write) | ||
| # Use a timeout so something weird happening in the logging thread doesn't | ||
| # cause this to hang indefinitely | ||
| self.join(timeout=10) | ||
| # Do not close reader before thread is joined since there may be pending data | ||
| # This also closes the fd_read pipe | ||
| self._pipe_reader.close() | ||
| with self._lock: | ||
| os.close(self._fd_write) | ||
|
|
||
| # Use a timeout so something weird happening in the logging thread doesn't | ||
| # cause this to hang indefinitely | ||
| # | ||
| # FIXME: this always times out in the multiprocessing case? | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tested this with Should we add a warning here so that we can revise the timeout later if we frequently find an issue with the timeout, or set it to something even longer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need the FIXME, it works for me? |
||
| self.join(timeout=1) | ||
|
|
||
| # Do not close reader before thread is joined since there may be pending data | ||
| # This also closes the fd_read pipe | ||
| os.close(self._fd_read) | ||
| self._pipe_reader.close() | ||
|
|
||
| def fileno(self) -> int: | ||
| """Return the write file descriptor of the pipe.""" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we're going to commit this example let's name it something more specific, like
process-pool-executor-reproducer