-
-
Notifications
You must be signed in to change notification settings - Fork 31.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix a BrokenPipeError when a multiprocessing.Queue is garbage collected #91185
Comments
A
|
I have attached the following patch: pass a reference to the reader end of the queue pipe to the queue thread so that the reader end is not garbage collected and closed before the queue thread has sent all the buffered data to the writer end. |
I forgot to include the output of the above program:
|
I am seeing this same issue and it's causing my unit tests to print a bunch of spurious BrokenPipeError chatter. It's subject to race conditions, and the sample code in #91185 (comment) only manifests the issue a fraction of the time on my development machine. If I want to reliably reproduce the BrokenPipeError, this def main():
q = multiprocessing.Queue()
q.put(0)
#time.sleep(0.001) # Issue goes away when this line is uncommented
q.close()
q.join_thread() If I uncomment 1ms delay, the BrokenPipe error goes away. |
The PR from @maggyero didn't fix this issue for me with my modified I believe the root cause of this bug is that I wouldn't consider this a real fix since the reader is surely being closed explicitly for a reason (and because this is ugly code), but as a proof of concept, this change eliminates the BrokenPipeError reliably for me: diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index ff3747ec49..59cda8af52 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -140,7 +140,7 @@ def put_nowait(self, obj):
def close(self):
self._closed = True
try:
- self._reader.close()
+ pass
finally:
close = self._close
if close: |
Some further evidence: putting this code before while q._buffer:
pass [edit: mostly stops. There's an additional race condition that happens very rarely as well] |
Is this a dupe of #80025? |
Thanks for the code sample @malsyned. I have updated the PR and it fixes your issue too. The problem in your case was that From the Linux manual page of pipe:
|
Hi @JelleZijlstra. Could you close this issue now that it has been fixed in PR #31913? (I cannot do it from my account.) |
@JelleZijlstra Thanks! |
I am hitting a similar issue in Python 3.8 when using mp.Queues from asyncio.run-in_executor. |
@gatopeich Could you open a separate issue with a minimal working example to reproduce the bug? |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: