diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index 289ad1b014c356..35a2fb79c66d58 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -89,6 +89,11 @@ Queue Return the number of items in the queue. + .. method:: close() + + Cancel all currently waiting :meth:`get` or :meth:`put` + coroutines. + .. method:: task_done() Indicate that a formerly enqueued task is complete. diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index a87ec8b2158767..74dd98ba1bb8f6 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -215,6 +215,13 @@ async def join(self): if self._unfinished_tasks > 0: await self._finished.wait() + def close(self): + """Cancel all getters and putters currently waiting""" + for fut in self._putters: + fut.cancel() + for fut in self._getters: + fut.cancel() + class PriorityQueue(Queue): """A subclass of Queue; retrieves entries in priority order (lowest first). diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 63a9a5f270cc92..b0d35a94dc9ee5 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -297,6 +297,19 @@ async def consumer(queue): self.loop.run_until_complete(self.loop.create_task(consumer(queue))) self.assertEqual(len(queue._getters), 0) + def test_close_get(self): + queue = asyncio.Queue() + + get1 = self.loop.create_task(queue.get()) + get2 = self.loop.create_task(queue.get()) + test_utils.run_briefly(self.loop) + queue.put_nowait(3) + queue.close() + self.assertEqual(self.loop.run_until_complete(get1), 3) + test_utils.run_briefly(self.loop) + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(get2) + class QueuePutTests(_QueueTestBase): @@ -575,6 +588,18 @@ def gen(): with self.assertRaises(asyncio.CancelledError): loop.run_until_complete(put_task) + def test_close_put(self): + queue = asyncio.Queue(1) + queue.put_nowait(1) + + putter = self.loop.create_task(queue.put(1)) + test_utils.run_briefly(self.loop) + queue.close() + test_utils.run_briefly(self.loop) + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(putter) + + class LifoQueueTests(_QueueTestBase): diff --git a/Misc/NEWS.d/next/Library/2019-11-21-11-32-00.bpo-37334.3i2oAq.rst b/Misc/NEWS.d/next/Library/2019-11-21-11-32-00.bpo-37334.3i2oAq.rst new file mode 100644 index 00000000000000..16dab6c3e3c642 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-11-21-11-32-00.bpo-37334.3i2oAq.rst @@ -0,0 +1 @@ +Add a close() method to asyncio queues