Skip to content

bpo-30966: Add multiprocessing.SimpleQueue.close() #2760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed

bpo-30966: Add multiprocessing.SimpleQueue.close() #2760

wants to merge 4 commits into from

Conversation

vstinner
Copy link
Member

Add a new close() method to multiprocessing.SimpleQueue to explicitly
closes the queue. This is called automatically when the queue is
garbage collected.

@@ -319,6 +319,13 @@ def __init__(self, *, ctx):
else:
self._wlock = ctx.Lock()

def __del__(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need that: the reader and writer already have a __del__.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, maybe I gone too far. This change is part on a larger plan to add ResourceWarning everywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed del(), sorry this change was not directly related to this PR. It can be added later if needed (for ResourceWarning).

queue = multiprocessing.SimpleQueue()
queue.close()
# closing a queue twice should not fail
queue.close()
Copy link
Contributor

@mlouielu mlouielu Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also need explicit testing _reader and _writer's .closed to be True?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike testing private attributes in unit tests. Should I add a second @cpython_only unit test testing private attributes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added functional tests instead.

queue = multiprocessing.SimpleQueue()
queue.close()
# closing a queue twice should not fail
queue.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if I call get() or put() on a closed queue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if I call get() or put() on a closed queue?

No idea :-) Which behaviour do you expect/want?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, it would raise a ValueError.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's do that.


def get(self):
self._check_closed()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much overhead adds this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea. Does it matter? Correctness beats performance, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance matters. What bug fixed by adding this call?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pitrou asked me to raise a ValueError.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's so much going on here (I/O, pickling) that I don't think a trivial method call is going to matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition here. AttributeError is raised if the queue is closed after calling _check_closed(). Maybe use locking in close() and guard the check with a lock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimpleQueue is supposed to be thread-safe? (I don't know the answer, it's a real question!) If we start to use the read lock, we also may have to use the write lock??

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise why it use the locks?

---------------

The :class:`multiprocessing.SimpleQueue` class has a new
:meth:`~multiprocessing.SimpleQueue.close` method to explicitly closes the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but seems there are errors in English here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, same typo as below: should be "to explicitly close the..."

@vstinner
Copy link
Member Author

I rewrote close() to close the writer even if closing the reader fails. It also clears _reader and _writer references to help the garbage collector.

I chose to remove the _poll attribute to simplify the implementation. But I don't know the rationale of the _poll() method. Was it added to add a private _poll() method? Or is it to prevent issues with the garbage collector on Python finalization?

SimpleQueue._poll attribute was added by the commit bdb1cf1 and http://bugs.python.org/issue12328:

    Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
    Also, add a multiprocessing.connection.wait(rlist, timeout=None) function
    for polling multiple objects at once.  Patch by sbt.
    
    Complete changelist from sbt's patch:
    
    * Adds a wait(rlist, timeout=None) function for polling multiple
      objects at once.  On Unix this is just a wrapper for
      select(rlist, [], [], timeout=None).
    
    * Removes use of the SentinelReady exception and the sentinels argument
      to certain methods.  concurrent.futures.process has been changed to
      use wait() instead of SentinelReady.
    
    * Fixes bugs concerning PipeConnection.poll() and messages of zero
      length.
    
    * Fixes PipeListener.accept() to call ConnectNamedPipe() with
      overlapped=True.
    
    * Fixes Queue.empty() and SimpleQueue.empty() so that they are
      threadsafe on Windows.
    
    * Now PipeConnection.poll() and wait() will not modify the pipe except
      possibly by consuming a zero length message.  (Previously poll()
      could consume a partial message.)
    
    * All of multiprocesing's pipe related blocking functions/methods are
      now interruptible by SIGINT on Windows.

So @pitrou: Is it safe to remove _poll? Or should I keep it? If we keep it, I would prefer to explain why we need it.

@@ -0,0 +1,2 @@
Add a new close() method to multiprocessing.SimpleQueue to explicitly closes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be "to explicitly close the..."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right, fixed.

vstinner added 4 commits July 19, 2017 15:01
* Add a new close() method to multiprocessing.SimpleQueue to explicitly
  close the queue.
* Operation on a closed queue raises a ValueError.
* Remove the SimpleQueue._poll attribute: use self._reader.poll()
@pitrou
Copy link
Member

pitrou commented Jul 20, 2017

Ok, I think the current diff has gone too far :-) If raising ValueError cannot be provided without adding a lot of locking and complication in the implementation, then I'll happily lose the ValueError requirement so that the implementation remains simple and lightweight (it's called "SimpleQueue" for a reason :-)).

@vstinner
Copy link
Member Author

Ok, I think the current diff has gone too far :-) If raising ValueError cannot be provided without adding a lot of locking and complication in the implementation, then I'll happily lose the ValueError requirement so that the implementation remains simple and lightweight (it's called "SimpleQueue" for a reason :-)).

The complicated code comes from my will of setting attributes to None. Maybe if we only call .close() without setting attributes to None, it would simplify the code, no? What do you think?

@pitrou
Copy link
Member

pitrou commented Jul 20, 2017

Maybe if we only call .close() without setting attributes to None, it would simplify the code, no? What do you think?

I don't know, I'd like to see the code :-)

@vstinner
Copy link
Member Author

Ok, let's try something completely different: PR #2776 is a simpler queue with closer() ;-)

@vstinner vstinner closed this Aug 10, 2017
@vstinner vstinner deleted the simplequeue branch August 10, 2017 23:39
@vstinner
Copy link
Member Author

Oh, I removed the branch by mistake :-p

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants