Skip to content

Commit

Permalink
gh-117531: Unblock getters after non-immediate queue shutdown (#117532)
Browse files Browse the repository at this point in the history
(This is a small tweak of the original gh-104750 which added shutdown.)
  • Loading branch information
EpicWink committed Apr 10, 2024
1 parent dfcae43 commit 6bc0b33
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
6 changes: 4 additions & 2 deletions Doc/library/queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,10 @@ them down.
queue is empty. Set *immediate* to true to make :meth:`~Queue.get` raise
immediately instead.

All blocked callers of :meth:`~Queue.put` will be unblocked. If *immediate*
is true, also unblock callers of :meth:`~Queue.get` and :meth:`~Queue.join`.
All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get` will be
unblocked. If *immediate* is true, a task will be marked as done for each
remaining item in the queue, which may unblock callers of
:meth:`~Queue.join`.

.. versionadded:: 3.13

Expand Down
8 changes: 5 additions & 3 deletions Lib/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,9 @@ def shutdown(self, immediate=False):
By default, gets will only raise once the queue is empty. Set
'immediate' to True to make gets raise immediately instead.
All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'.
All blocked callers of put() and get() will be unblocked. If
'immediate', a task is marked as done for each item remaining in
the queue, which may unblock callers of join().
'''
with self.mutex:
self.is_shutdown = True
Expand All @@ -249,9 +250,10 @@ def shutdown(self, immediate=False):
self._get()
if self.unfinished_tasks > 0:
self.unfinished_tasks -= 1
self.not_empty.notify_all()
# release all blocked threads in `join()`
self.all_tasks_done.notify_all()
# All getters need to re-check queue-empty to raise ShutDown
self.not_empty.notify_all()
self.not_full.notify_all()

# Override these methods to implement other queue organizations
Expand Down
17 changes: 17 additions & 0 deletions Lib/test/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,23 @@ def test_shutdown_get_task_done_join(self):

self.assertEqual(results, [True]*len(thrds))

def test_shutdown_pending_get(self):
def get():
try:
results.append(q.get())
except Exception as e:
results.append(e)

q = self.type2test()
results = []
get_thread = threading.Thread(target=get)
get_thread.start()
q.shutdown(immediate=False)
get_thread.join(timeout=10.0)
self.assertFalse(get_thread.is_alive())
self.assertEqual(len(results), 1)
self.assertIsInstance(results[0], self.queue.ShutDown)


class QueueTest(BaseQueueTestMixin):

Expand Down

0 comments on commit 6bc0b33

Please sign in to comment.