From 94e1696d04c65e19ea52e5c8664079c9d9aa0e54 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 16 Jan 2018 00:27:16 +0100 Subject: [PATCH] bpo-14976: Reentrant simple queue (#3346) Add a queue.SimpleQueue class, an unbounded FIFO queue with a reentrant C implementation of put(). --- Doc/library/queue.rst | 73 +++- Lib/queue.py | 86 +++- Lib/test/test_queue.py | 240 ++++++++++- .../2017-09-07-19-12-47.bpo-14976.dx0Zxb.rst | 2 + Modules/_queuemodule.c | 400 ++++++++++++++++++ Modules/clinic/_queuemodule.c.h | 218 ++++++++++ PCbuild/_queue.vcxproj | 77 ++++ PCbuild/_queue.vcxproj.filters | 16 + PCbuild/pcbuild.proj | 2 +- PCbuild/pcbuild.sln | 18 + PCbuild/pythoncore.vcxproj.filters | 3 + setup.py | 2 + 12 files changed, 1125 insertions(+), 12 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2017-09-07-19-12-47.bpo-14976.dx0Zxb.rst create mode 100644 Modules/_queuemodule.c create mode 100644 Modules/clinic/_queuemodule.c.h create mode 100644 PCbuild/_queue.vcxproj create mode 100644 PCbuild/_queue.vcxproj.filters diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index f9a43bbac3bcc6..1520faa9b83ff0 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -23,8 +23,14 @@ the first retrieved (operating like a stack). With a priority queue, the entries are kept sorted (using the :mod:`heapq` module) and the lowest valued entry is retrieved first. -Internally, the module uses locks to temporarily block competing threads; -however, it is not designed to handle reentrancy within a thread. +Internally, those three types of queues use locks to temporarily block +competing threads; however, they are not designed to handle reentrancy +within a thread. + +In addition, the module implements a "simple" +:abbr:`FIFO (first-in, first-out)` queue type where +specific implementations can provide additional guarantees +in exchange for the smaller functionality. The :mod:`queue` module defines the following classes and exceptions: @@ -67,6 +73,14 @@ The :mod:`queue` module defines the following classes and exceptions: priority: int item: Any=field(compare=False) +.. class:: SimpleQueue() + + Constructor for an unbounded :abbr:`FIFO (first-in, first-out)` queue. + Simple queues lack advanced functionality such as task tracking. + + .. versionadded:: 3.7 + + .. exception:: Empty Exception raised when non-blocking :meth:`~Queue.get` (or @@ -201,6 +215,60 @@ Example of how to wait for enqueued tasks to be completed:: t.join() +SimpleQueue Objects +------------------- + +:class:`SimpleQueue` objects provide the public methods described below. + +.. method:: SimpleQueue.qsize() + + Return the approximate size of the queue. Note, qsize() > 0 doesn't + guarantee that a subsequent get() will not block. + + +.. method:: SimpleQueue.empty() + + Return ``True`` if the queue is empty, ``False`` otherwise. If empty() + returns ``False`` it doesn't guarantee that a subsequent call to get() + will not block. + + +.. method:: SimpleQueue.put(item, block=True, timeout=None) + + Put *item* into the queue. The method never blocks and always succeeds + (except for potential low-level errors such as failure to allocate memory). + The optional args *block* and *timeout* are ignored and only provided + for compatibility with :meth:`Queue.put`. + + .. impl-detail:: + This method has a C implementation which is reentrant. That is, a + ``put()`` or ``get()`` call can be interrupted by another ``put()`` + call in the same thread without deadlocking or corrupting internal + state inside the queue. This makes it appropriate for use in + destructors such as ``__del__`` methods or :mod:`weakref` callbacks. + + +.. method:: SimpleQueue.put_nowait(item) + + Equivalent to ``put(item)``, provided for compatibility with + :meth:`Queue.put_nowait`. + + +.. method:: SimpleQueue.get(block=True, timeout=None) + + Remove and return an item from the queue. If optional args *block* is true and + *timeout* is ``None`` (the default), block if necessary until an item is available. + If *timeout* is a positive number, it blocks at most *timeout* seconds and + raises the :exc:`Empty` exception if no item was available within that time. + Otherwise (*block* is false), return an item if one is immediately available, + else raise the :exc:`Empty` exception (*timeout* is ignored in that case). + + +.. method:: SimpleQueue.get_nowait() + + Equivalent to ``get(False)``. + + .. seealso:: Class :class:`multiprocessing.Queue` @@ -210,4 +278,3 @@ Example of how to wait for enqueued tasks to be completed:: :class:`collections.deque` is an alternative implementation of unbounded queues with fast atomic :meth:`~collections.deque.append` and :meth:`~collections.deque.popleft` operations that do not require locking. - diff --git a/Lib/queue.py b/Lib/queue.py index c803b96deb085d..ef07957781a474 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -4,17 +4,26 @@ from collections import deque from heapq import heappush, heappop from time import monotonic as time +try: + from _queue import SimpleQueue +except ImportError: + SimpleQueue = None -__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] +__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue'] -class Empty(Exception): - 'Exception raised by Queue.get(block=0)/get_nowait().' - pass + +try: + from _queue import Empty +except AttributeError: + class Empty(Exception): + 'Exception raised by Queue.get(block=0)/get_nowait().' + pass class Full(Exception): 'Exception raised by Queue.put(block=0)/put_nowait().' pass + class Queue: '''Create a queue object with a given maximum size. @@ -241,3 +250,72 @@ def _put(self, item): def _get(self): return self.queue.pop() + + +class _PySimpleQueue: + '''Simple, unbounded FIFO queue. + + This pure Python implementation is not reentrant. + ''' + # Note: while this pure Python version provides fairness + # (by using a threading.Semaphore which is itself fair, being based + # on threading.Condition), fairness is not part of the API contract. + # This allows the C version to use a different implementation. + + def __init__(self): + self._queue = deque() + self._count = threading.Semaphore(0) + + def put(self, item, block=True, timeout=None): + '''Put the item on the queue. + + The optional 'block' and 'timeout' arguments are ignored, as this method + never blocks. They are provided for compatibility with the Queue class. + ''' + self._queue.append(item) + self._count.release() + + def get(self, block=True, timeout=None): + '''Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + ''' + if timeout is not None and timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + if not self._count.acquire(block, timeout): + raise Empty + return self._queue.popleft() + + def put_nowait(self, item): + '''Put an item into the queue without blocking. + + This is exactly equivalent to `put(item)` and is only provided + for compatibility with the Queue class. + ''' + return self.put(item, block=False) + + def get_nowait(self): + '''Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + ''' + return self.get(block=False) + + def empty(self): + '''Return True if the queue is empty, False otherwise (not reliable!).''' + return len(self._queue) == 0 + + def qsize(self): + '''Return the approximate size of the queue (not reliable!).''' + return len(self._queue) + + +if SimpleQueue is None: + SimpleQueue = _PySimpleQueue diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 6ee906c4d2665a..1a8d5f8856c5e8 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -1,12 +1,22 @@ # Some simple queue module tests, plus some failure conditions # to ensure the Queue locks remain stable. +import collections +import itertools import queue +import random +import sys import threading import time import unittest +import weakref from test import support +try: + import _queue +except ImportError: + _queue = None + QUEUE_SIZE = 5 def qfull(q): @@ -84,7 +94,7 @@ def setUp(self): self.cum = 0 self.cumlock = threading.Lock() - def simple_queue_test(self, q): + def basic_queue_test(self, q): if q.qsize(): raise RuntimeError("Call this function with an empty queue") self.assertTrue(q.empty()) @@ -192,12 +202,12 @@ def test_queue_join(self): else: self.fail("Did not detect task count going negative") - def test_simple_queue(self): + def test_basic(self): # Do it a couple of times on the same queue. # Done twice to make sure works with same instance reused. q = self.type2test(QUEUE_SIZE) - self.simple_queue_test(q) - self.simple_queue_test(q) + self.basic_queue_test(q) + self.basic_queue_test(q) def test_negative_timeout_raises_exception(self): q = self.type2test(QUEUE_SIZE) @@ -353,5 +363,227 @@ def test_failing_queue(self): self.failing_queue_test(q) +class BaseSimpleQueueTest: + + def setUp(self): + self.q = self.type2test() + + def feed(self, q, seq, rnd): + while True: + try: + val = seq.pop() + except IndexError: + return + q.put(val) + if rnd.random() > 0.5: + time.sleep(rnd.random() * 1e-3) + + def consume(self, q, results, sentinel): + while True: + val = q.get() + if val == sentinel: + return + results.append(val) + + def consume_nonblock(self, q, results, sentinel): + while True: + while True: + try: + val = q.get(block=False) + except queue.Empty: + time.sleep(1e-5) + else: + break + if val == sentinel: + return + results.append(val) + + def consume_timeout(self, q, results, sentinel): + while True: + while True: + try: + val = q.get(timeout=1e-5) + except queue.Empty: + pass + else: + break + if val == sentinel: + return + results.append(val) + + def run_threads(self, n_feeders, n_consumers, q, inputs, + feed_func, consume_func): + results = [] + sentinel = None + seq = inputs + [sentinel] * n_consumers + seq.reverse() + rnd = random.Random(42) + + exceptions = [] + def log_exceptions(f): + def wrapper(*args, **kwargs): + try: + f(*args, **kwargs) + except BaseException as e: + exceptions.append(e) + return wrapper + + feeders = [threading.Thread(target=log_exceptions(feed_func), + args=(q, seq, rnd)) + for i in range(n_feeders)] + consumers = [threading.Thread(target=log_exceptions(consume_func), + args=(q, results, sentinel)) + for i in range(n_consumers)] + + with support.start_threads(feeders + consumers): + pass + + self.assertFalse(exceptions) + self.assertTrue(q.empty()) + self.assertEqual(q.qsize(), 0) + + return results + + def test_basic(self): + # Basic tests for get(), put() etc. + q = self.q + self.assertTrue(q.empty()) + self.assertEqual(q.qsize(), 0) + q.put(1) + self.assertFalse(q.empty()) + self.assertEqual(q.qsize(), 1) + q.put(2) + q.put_nowait(3) + q.put(4) + self.assertFalse(q.empty()) + self.assertEqual(q.qsize(), 4) + + self.assertEqual(q.get(), 1) + self.assertEqual(q.qsize(), 3) + + self.assertEqual(q.get_nowait(), 2) + self.assertEqual(q.qsize(), 2) + + self.assertEqual(q.get(block=False), 3) + self.assertFalse(q.empty()) + self.assertEqual(q.qsize(), 1) + + self.assertEqual(q.get(timeout=0.1), 4) + self.assertTrue(q.empty()) + self.assertEqual(q.qsize(), 0) + + with self.assertRaises(queue.Empty): + q.get(block=False) + with self.assertRaises(queue.Empty): + q.get(timeout=1e-3) + with self.assertRaises(queue.Empty): + q.get_nowait() + self.assertTrue(q.empty()) + self.assertEqual(q.qsize(), 0) + + def test_negative_timeout_raises_exception(self): + q = self.q + q.put(1) + with self.assertRaises(ValueError): + q.get(timeout=-1) + + def test_order(self): + # Test a pair of concurrent put() and get() + q = self.q + inputs = list(range(100)) + results = self.run_threads(1, 1, q, inputs, self.feed, self.consume) + + # One producer, one consumer => results appended in well-defined order + self.assertEqual(results, inputs) + + def test_many_threads(self): + # Test multiple concurrent put() and get() + N = 50 + q = self.q + inputs = list(range(10000)) + results = self.run_threads(N, N, q, inputs, self.feed, self.consume) + + # Multiple consumers without synchronization append the + # results in random order + self.assertEqual(sorted(results), inputs) + + def test_many_threads_nonblock(self): + # Test multiple concurrent put() and get(block=False) + N = 50 + q = self.q + inputs = list(range(10000)) + results = self.run_threads(N, N, q, inputs, + self.feed, self.consume_nonblock) + + self.assertEqual(sorted(results), inputs) + + def test_many_threads_timeout(self): + # Test multiple concurrent put() and get(timeout=...) + N = 50 + q = self.q + inputs = list(range(1000)) + results = self.run_threads(N, N, q, inputs, + self.feed, self.consume_timeout) + + self.assertEqual(sorted(results), inputs) + + def test_references(self): + # The queue should lose references to each item as soon as + # it leaves the queue. + class C: + pass + + N = 20 + q = self.q + for i in range(N): + q.put(C()) + for i in range(N): + wr = weakref.ref(q.get()) + self.assertIsNone(wr()) + + +class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase): + type2test = queue._PySimpleQueue + + +@unittest.skipIf(_queue is None, "No _queue module found") +class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase): + + def setUp(self): + self.type2test = _queue.SimpleQueue + super().setUp() + + def test_is_default(self): + self.assertIs(self.type2test, queue.SimpleQueue) + + def test_reentrancy(self): + # bpo-14976: put() may be called reentrantly in an asynchronous + # callback. + q = self.q + gen = itertools.count() + N = 10000 + results = [] + + # This test exploits the fact that __del__ in a reference cycle + # can be called any time the GC may run. + + class Circular(object): + def __init__(self): + self.circular = self + + def __del__(self): + q.put(next(gen)) + + while True: + o = Circular() + q.put(next(gen)) + del o + results.append(q.get()) + if results[-1] >= N: + break + + self.assertEqual(results, list(range(N + 1))) + + if __name__ == "__main__": unittest.main() diff --git a/Misc/NEWS.d/next/Library/2017-09-07-19-12-47.bpo-14976.dx0Zxb.rst b/Misc/NEWS.d/next/Library/2017-09-07-19-12-47.bpo-14976.dx0Zxb.rst new file mode 100644 index 00000000000000..a4087551322317 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-09-07-19-12-47.bpo-14976.dx0Zxb.rst @@ -0,0 +1,2 @@ +Add a queue.SimpleQueue class, an unbounded FIFO queue with a reentrant C +implementation of put(). diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c new file mode 100644 index 00000000000000..8715337fb51179 --- /dev/null +++ b/Modules/_queuemodule.c @@ -0,0 +1,400 @@ +#include "Python.h" +#include "structmember.h" /* offsetof */ +#include "pythread.h" + +/*[clinic input] +module _queue +class _queue.SimpleQueue "simplequeueobject *" "&PySimpleQueueType" +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=cf49af81bcbbbea6]*/ + +extern PyTypeObject PySimpleQueueType; /* forward decl */ + +static PyObject *EmptyError; + + +typedef struct { + PyObject_HEAD + PyThread_type_lock lock; + int locked; + PyObject *lst; + Py_ssize_t lst_pos; + PyObject *weakreflist; +} simplequeueobject; + + +static void +simplequeue_dealloc(simplequeueobject *self) +{ + _PyObject_GC_UNTRACK(self); + if (self->lock != NULL) { + /* Unlock the lock so it's safe to free it */ + if (self->locked > 0) + PyThread_release_lock(self->lock); + PyThread_free_lock(self->lock); + } + Py_XDECREF(self->lst); + if (self->weakreflist != NULL) + PyObject_ClearWeakRefs((PyObject *) self); + Py_TYPE(self)->tp_free(self); +} + +static int +simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg) +{ + Py_VISIT(self->lst); + return 0; +} + +/*[clinic input] +@classmethod +_queue.SimpleQueue.__new__ as simplequeue_new + +Simple, unbounded, reentrant FIFO queue. +[clinic start generated code]*/ + +static PyObject * +simplequeue_new_impl(PyTypeObject *type) +/*[clinic end generated code: output=ba97740608ba31cd input=a0674a1643e3e2fb]*/ +{ + simplequeueobject *self; + + self = (simplequeueobject *) type->tp_alloc(type, 0); + if (self != NULL) { + self->weakreflist = NULL; + self->lst = PyList_New(0); + self->lock = PyThread_allocate_lock(); + self->lst_pos = 0; + if (self->lock == NULL) { + Py_DECREF(self); + PyErr_SetString(PyExc_MemoryError, "can't allocate lock"); + return NULL; + } + if (self->lst == NULL) { + Py_DECREF(self); + return NULL; + } + } + + return (PyObject *) self; +} + +/*[clinic input] +_queue.SimpleQueue.put + item: object + block: bool = True + timeout: object = None + +Put the item on the queue. + +The optional 'block' and 'timeout' arguments are ignored, as this method +never blocks. They are provided for compatibility with the Queue class. + +[clinic start generated code]*/ + +static PyObject * +_queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item, + int block, PyObject *timeout) +/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/ +{ + /* BEGIN GIL-protected critical section */ + if (PyList_Append(self->lst, item) < 0) + return NULL; + if (self->locked) { + /* A get() may be waiting, wake it up */ + self->locked = 0; + PyThread_release_lock(self->lock); + } + /* END GIL-protected critical section */ + Py_RETURN_NONE; +} + +/*[clinic input] +_queue.SimpleQueue.put_nowait + item: object + +Put an item into the queue without blocking. + +This is exactly equivalent to `put(item)` and is only provided +for compatibility with the Queue class. + +[clinic start generated code]*/ + +static PyObject * +_queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item) +/*[clinic end generated code: output=0990536715efb1f1 input=36b1ea96756b2ece]*/ +{ + return _queue_SimpleQueue_put_impl(self, item, 0, Py_None); +} + +static PyObject * +simplequeue_pop_item(simplequeueobject *self) +{ + Py_ssize_t count, n; + PyObject *item; + + n = PyList_GET_SIZE(self->lst); + assert(self->lst_pos < n); + + item = PyList_GET_ITEM(self->lst, self->lst_pos); + Py_INCREF(Py_None); + PyList_SET_ITEM(self->lst, self->lst_pos, Py_None); + self->lst_pos += 1; + count = n - self->lst_pos; + if (self->lst_pos > count) { + /* The list is more than 50% empty, reclaim space at the beginning */ + if (PyList_SetSlice(self->lst, 0, self->lst_pos, NULL)) { + /* Undo pop */ + self->lst_pos -= 1; + PyList_SET_ITEM(self->lst, self->lst_pos, item); + return NULL; + } + self->lst_pos = 0; + } + return item; +} + +/*[clinic input] +_queue.SimpleQueue.get + block: bool = True + timeout: object = None + +Remove and return an item from the queue. + +If optional args 'block' is true and 'timeout' is None (the default), +block if necessary until an item is available. If 'timeout' is +a non-negative number, it blocks at most 'timeout' seconds and raises +the Empty exception if no item was available within that time. +Otherwise ('block' is false), return an item if one is immediately +available, else raise the Empty exception ('timeout' is ignored +in that case). + +[clinic start generated code]*/ + +static PyObject * +_queue_SimpleQueue_get_impl(simplequeueobject *self, int block, + PyObject *timeout) +/*[clinic end generated code: output=ec82a7157dcccd1a input=4bf691f9f01fa297]*/ +{ + _PyTime_t endtime = 0; + _PyTime_t timeout_val; + PyObject *item; + PyLockStatus r; + PY_TIMEOUT_T microseconds; + + if (block == 0) { + /* Non-blocking */ + microseconds = 0; + } + else if (timeout != Py_None) { + /* With timeout */ + if (_PyTime_FromSecondsObject(&timeout_val, + timeout, _PyTime_ROUND_CEILING) < 0) + return NULL; + if (timeout_val < 0) { + PyErr_SetString(PyExc_ValueError, + "'timeout' must be a non-negative number"); + return NULL; + } + microseconds = _PyTime_AsMicroseconds(timeout_val, + _PyTime_ROUND_CEILING); + if (microseconds >= PY_TIMEOUT_MAX) { + PyErr_SetString(PyExc_OverflowError, + "timeout value is too large"); + return NULL; + } + endtime = _PyTime_GetMonotonicClock() + timeout_val; + } + else { + /* Infinitely blocking */ + microseconds = -1; + } + + /* put() signals the queue to be non-empty by releasing the lock. + * So we simply try to acquire the lock in a loop, until the condition + * (queue non-empty) becomes true. + */ + while (self->lst_pos == PyList_GET_SIZE(self->lst)) { + /* First a simple non-blocking try without releasing the GIL */ + r = PyThread_acquire_lock_timed(self->lock, 0, 0); + if (r == PY_LOCK_FAILURE && microseconds != 0) { + Py_BEGIN_ALLOW_THREADS + r = PyThread_acquire_lock_timed(self->lock, microseconds, 1); + Py_END_ALLOW_THREADS + } + if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) { + return NULL; + } + if (r == PY_LOCK_FAILURE) { + /* Timed out */ + PyErr_SetNone(EmptyError); + return NULL; + } + self->locked = 1; + /* Adjust timeout for next iteration (if any) */ + if (endtime > 0) { + timeout_val = endtime - _PyTime_GetMonotonicClock(); + microseconds = _PyTime_AsMicroseconds(timeout_val, _PyTime_ROUND_CEILING); + } + } + /* BEGIN GIL-protected critical section */ + assert(self->lst_pos < PyList_GET_SIZE(self->lst)); + item = simplequeue_pop_item(self); + if (self->locked) { + PyThread_release_lock(self->lock); + self->locked = 0; + } + /* END GIL-protected critical section */ + + return item; +} + +/*[clinic input] +_queue.SimpleQueue.get_nowait + +Remove and return an item from the queue without blocking. + +Only get an item if one is immediately available. Otherwise +raise the Empty exception. +[clinic start generated code]*/ + +static PyObject * +_queue_SimpleQueue_get_nowait_impl(simplequeueobject *self) +/*[clinic end generated code: output=a89731a75dbe4937 input=6fe5102db540a1b9]*/ +{ + return _queue_SimpleQueue_get_impl(self, 0, Py_None); +} + +/*[clinic input] +_queue.SimpleQueue.empty -> bool + +Return True if the queue is empty, False otherwise (not reliable!). +[clinic start generated code]*/ + +static int +_queue_SimpleQueue_empty_impl(simplequeueobject *self) +/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/ +{ + return self->lst_pos == PyList_GET_SIZE(self->lst); +} + +/*[clinic input] +_queue.SimpleQueue.qsize -> Py_ssize_t + +Return the approximate size of the queue (not reliable!). +[clinic start generated code]*/ + +static Py_ssize_t +_queue_SimpleQueue_qsize_impl(simplequeueobject *self) +/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/ +{ + return PyList_GET_SIZE(self->lst) - self->lst_pos; +} + + +#include "clinic/_queuemodule.c.h" + + +static PyMethodDef simplequeue_methods[] = { + _QUEUE_SIMPLEQUEUE_EMPTY_METHODDEF + _QUEUE_SIMPLEQUEUE_GET_METHODDEF + _QUEUE_SIMPLEQUEUE_GET_NOWAIT_METHODDEF + _QUEUE_SIMPLEQUEUE_PUT_METHODDEF + _QUEUE_SIMPLEQUEUE_PUT_NOWAIT_METHODDEF + _QUEUE_SIMPLEQUEUE_QSIZE_METHODDEF + {NULL, NULL} /* sentinel */ +}; + + +PyTypeObject PySimpleQueueType = { + PyVarObject_HEAD_INIT(NULL, 0) + "_queue.SimpleQueue", /*tp_name*/ + sizeof(simplequeueobject), /*tp_size*/ + 0, /*tp_itemsize*/ + /* methods */ + (destructor)simplequeue_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_reserved*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash*/ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE + | Py_TPFLAGS_HAVE_GC, /* tp_flags */ + simplequeue_new__doc__, /*tp_doc*/ + (traverseproc)simplequeue_traverse, /*tp_traverse*/ + 0, /*tp_clear*/ + 0, /*tp_richcompare*/ + offsetof(simplequeueobject, weakreflist), /*tp_weaklistoffset*/ + 0, /*tp_iter*/ + 0, /*tp_iternext*/ + simplequeue_methods, /*tp_methods*/ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + simplequeue_new /* tp_new */ +}; + + +/* Initialization function */ + +PyDoc_STRVAR(queue_module_doc, +"C implementation of the Python queue module.\n\ +This module is an implementation detail, please do not use it directly."); + +static struct PyModuleDef queuemodule = { + PyModuleDef_HEAD_INIT, + "_queue", + queue_module_doc, + -1, + NULL, + NULL, + NULL, + NULL, + NULL +}; + + +PyMODINIT_FUNC +PyInit__queue(void) +{ + PyObject *m; + + /* Create the module */ + m = PyModule_Create(&queuemodule); + if (m == NULL) + return NULL; + + EmptyError = PyErr_NewExceptionWithDoc( + "_queue.Empty", + "Exception raised by Queue.get(block=0)/get_nowait().", + NULL, NULL); + if (EmptyError == NULL) + return NULL; + + Py_INCREF(EmptyError); + if (PyModule_AddObject(m, "Empty", EmptyError) < 0) + return NULL; + + if (PyType_Ready(&PySimpleQueueType) < 0) + return NULL; + Py_INCREF(&PySimpleQueueType); + if (PyModule_AddObject(m, "SimpleQueue", (PyObject *)&PySimpleQueueType) < 0) + return NULL; + + return m; +} diff --git a/Modules/clinic/_queuemodule.c.h b/Modules/clinic/_queuemodule.c.h new file mode 100644 index 00000000000000..97247fd8a129e6 --- /dev/null +++ b/Modules/clinic/_queuemodule.c.h @@ -0,0 +1,218 @@ +/*[clinic input] +preserve +[clinic start generated code]*/ + +PyDoc_STRVAR(simplequeue_new__doc__, +"SimpleQueue()\n" +"--\n" +"\n" +"Simple, unbounded, reentrant FIFO queue."); + +static PyObject * +simplequeue_new_impl(PyTypeObject *type); + +static PyObject * +simplequeue_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + PyObject *return_value = NULL; + + if ((type == &PySimpleQueueType) && + !_PyArg_NoPositional("SimpleQueue", args)) { + goto exit; + } + if ((type == &PySimpleQueueType) && + !_PyArg_NoKeywords("SimpleQueue", kwargs)) { + goto exit; + } + return_value = simplequeue_new_impl(type); + +exit: + return return_value; +} + +PyDoc_STRVAR(_queue_SimpleQueue_put__doc__, +"put($self, /, item, block=True, timeout=None)\n" +"--\n" +"\n" +"Put the item on the queue.\n" +"\n" +"The optional \'block\' and \'timeout\' arguments are ignored, as this method\n" +"never blocks. They are provided for compatibility with the Queue class."); + +#define _QUEUE_SIMPLEQUEUE_PUT_METHODDEF \ + {"put", (PyCFunction)_queue_SimpleQueue_put, METH_FASTCALL|METH_KEYWORDS, _queue_SimpleQueue_put__doc__}, + +static PyObject * +_queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item, + int block, PyObject *timeout); + +static PyObject * +_queue_SimpleQueue_put(simplequeueobject *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"item", "block", "timeout", NULL}; + static _PyArg_Parser _parser = {"O|pO:put", _keywords, 0}; + PyObject *item; + int block = 1; + PyObject *timeout = Py_None; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &item, &block, &timeout)) { + goto exit; + } + return_value = _queue_SimpleQueue_put_impl(self, item, block, timeout); + +exit: + return return_value; +} + +PyDoc_STRVAR(_queue_SimpleQueue_put_nowait__doc__, +"put_nowait($self, /, item)\n" +"--\n" +"\n" +"Put an item into the queue without blocking.\n" +"\n" +"This is exactly equivalent to `put(item)` and is only provided\n" +"for compatibility with the Queue class."); + +#define _QUEUE_SIMPLEQUEUE_PUT_NOWAIT_METHODDEF \ + {"put_nowait", (PyCFunction)_queue_SimpleQueue_put_nowait, METH_FASTCALL|METH_KEYWORDS, _queue_SimpleQueue_put_nowait__doc__}, + +static PyObject * +_queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item); + +static PyObject * +_queue_SimpleQueue_put_nowait(simplequeueobject *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"item", NULL}; + static _PyArg_Parser _parser = {"O:put_nowait", _keywords, 0}; + PyObject *item; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &item)) { + goto exit; + } + return_value = _queue_SimpleQueue_put_nowait_impl(self, item); + +exit: + return return_value; +} + +PyDoc_STRVAR(_queue_SimpleQueue_get__doc__, +"get($self, /, block=True, timeout=None)\n" +"--\n" +"\n" +"Remove and return an item from the queue.\n" +"\n" +"If optional args \'block\' is true and \'timeout\' is None (the default),\n" +"block if necessary until an item is available. If \'timeout\' is\n" +"a non-negative number, it blocks at most \'timeout\' seconds and raises\n" +"the Empty exception if no item was available within that time.\n" +"Otherwise (\'block\' is false), return an item if one is immediately\n" +"available, else raise the Empty exception (\'timeout\' is ignored\n" +"in that case)."); + +#define _QUEUE_SIMPLEQUEUE_GET_METHODDEF \ + {"get", (PyCFunction)_queue_SimpleQueue_get, METH_FASTCALL|METH_KEYWORDS, _queue_SimpleQueue_get__doc__}, + +static PyObject * +_queue_SimpleQueue_get_impl(simplequeueobject *self, int block, + PyObject *timeout); + +static PyObject * +_queue_SimpleQueue_get(simplequeueobject *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"block", "timeout", NULL}; + static _PyArg_Parser _parser = {"|pO:get", _keywords, 0}; + int block = 1; + PyObject *timeout = Py_None; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &block, &timeout)) { + goto exit; + } + return_value = _queue_SimpleQueue_get_impl(self, block, timeout); + +exit: + return return_value; +} + +PyDoc_STRVAR(_queue_SimpleQueue_get_nowait__doc__, +"get_nowait($self, /)\n" +"--\n" +"\n" +"Remove and return an item from the queue without blocking.\n" +"\n" +"Only get an item if one is immediately available. Otherwise\n" +"raise the Empty exception."); + +#define _QUEUE_SIMPLEQUEUE_GET_NOWAIT_METHODDEF \ + {"get_nowait", (PyCFunction)_queue_SimpleQueue_get_nowait, METH_NOARGS, _queue_SimpleQueue_get_nowait__doc__}, + +static PyObject * +_queue_SimpleQueue_get_nowait_impl(simplequeueobject *self); + +static PyObject * +_queue_SimpleQueue_get_nowait(simplequeueobject *self, PyObject *Py_UNUSED(ignored)) +{ + return _queue_SimpleQueue_get_nowait_impl(self); +} + +PyDoc_STRVAR(_queue_SimpleQueue_empty__doc__, +"empty($self, /)\n" +"--\n" +"\n" +"Return True if the queue is empty, False otherwise (not reliable!)."); + +#define _QUEUE_SIMPLEQUEUE_EMPTY_METHODDEF \ + {"empty", (PyCFunction)_queue_SimpleQueue_empty, METH_NOARGS, _queue_SimpleQueue_empty__doc__}, + +static int +_queue_SimpleQueue_empty_impl(simplequeueobject *self); + +static PyObject * +_queue_SimpleQueue_empty(simplequeueobject *self, PyObject *Py_UNUSED(ignored)) +{ + PyObject *return_value = NULL; + int _return_value; + + _return_value = _queue_SimpleQueue_empty_impl(self); + if ((_return_value == -1) && PyErr_Occurred()) { + goto exit; + } + return_value = PyBool_FromLong((long)_return_value); + +exit: + return return_value; +} + +PyDoc_STRVAR(_queue_SimpleQueue_qsize__doc__, +"qsize($self, /)\n" +"--\n" +"\n" +"Return the approximate size of the queue (not reliable!)."); + +#define _QUEUE_SIMPLEQUEUE_QSIZE_METHODDEF \ + {"qsize", (PyCFunction)_queue_SimpleQueue_qsize, METH_NOARGS, _queue_SimpleQueue_qsize__doc__}, + +static Py_ssize_t +_queue_SimpleQueue_qsize_impl(simplequeueobject *self); + +static PyObject * +_queue_SimpleQueue_qsize(simplequeueobject *self, PyObject *Py_UNUSED(ignored)) +{ + PyObject *return_value = NULL; + Py_ssize_t _return_value; + + _return_value = _queue_SimpleQueue_qsize_impl(self); + if ((_return_value == -1) && PyErr_Occurred()) { + goto exit; + } + return_value = PyLong_FromSsize_t(_return_value); + +exit: + return return_value; +} +/*[clinic end generated code: output=8badc3bb85263689 input=a9049054013a1b77]*/ diff --git a/PCbuild/_queue.vcxproj b/PCbuild/_queue.vcxproj new file mode 100644 index 00000000000000..81df2dfe514e1e --- /dev/null +++ b/PCbuild/_queue.vcxproj @@ -0,0 +1,77 @@ + + + + + Debug + Win32 + + + Debug + x64 + + + PGInstrument + Win32 + + + PGInstrument + x64 + + + PGUpdate + Win32 + + + PGUpdate + x64 + + + Release + Win32 + + + Release + x64 + + + + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687} + _queue + Win32Proj + + + + + DynamicLibrary + NotSet + + + + .pyd + + + + + + + + + + <_ProjectFileVersion>10.0.30319.1 + + + + + + + + + + {cf7ac3d1-e2df-41d2-bea6-1e2556cdea26} + false + + + + + + diff --git a/PCbuild/_queue.vcxproj.filters b/PCbuild/_queue.vcxproj.filters new file mode 100644 index 00000000000000..88b80826adfeec --- /dev/null +++ b/PCbuild/_queue.vcxproj.filters @@ -0,0 +1,16 @@ + + + + + + + + {c56a5dd3-7838-48e9-a781-855d8be7370f} + + + + + Source Files + + + \ No newline at end of file diff --git a/PCbuild/pcbuild.proj b/PCbuild/pcbuild.proj index 22cf7b54189257..05e885717ff765 100644 --- a/PCbuild/pcbuild.proj +++ b/PCbuild/pcbuild.proj @@ -49,7 +49,7 @@ - + diff --git a/PCbuild/pcbuild.sln b/PCbuild/pcbuild.sln index 82cfaf249d739a..5ff86a49c7c9ab 100644 --- a/PCbuild/pcbuild.sln +++ b/PCbuild/pcbuild.sln @@ -89,6 +89,8 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "_testconsole", "_testconsol EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "_asyncio", "_asyncio.vcxproj", "{384C224A-7474-476E-A01B-750EA7DE918C}" EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "_queue", "_queue.vcxproj", "{78D80A15-BD8C-44E2-B49E-1F05B0A0A687}" +EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "liblzma", "liblzma.vcxproj", "{12728250-16EC-4DC6-94D7-E21DD88947F8}" EndProject Global @@ -659,6 +661,22 @@ Global {384C224A-7474-476E-A01B-750EA7DE918C}.Release|Win32.Build.0 = Release|Win32 {384C224A-7474-476E-A01B-750EA7DE918C}.Release|x64.ActiveCfg = Release|x64 {384C224A-7474-476E-A01B-750EA7DE918C}.Release|x64.Build.0 = Release|x64 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.Debug|Win32.ActiveCfg = Debug|Win32 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.Debug|Win32.Build.0 = Debug|Win32 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.Debug|x64.ActiveCfg = Debug|x64 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.Debug|x64.Build.0 = Debug|x64 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.PGInstrument|Win32.ActiveCfg = PGInstrument|Win32 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.PGInstrument|Win32.Build.0 = PGInstrument|Win32 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.PGInstrument|x64.ActiveCfg = PGInstrument|x64 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.PGInstrument|x64.Build.0 = PGInstrument|x64 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.PGUpdate|Win32.ActiveCfg = PGUpdate|Win32 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.PGUpdate|Win32.Build.0 = PGUpdate|Win32 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.PGUpdate|x64.ActiveCfg = PGUpdate|x64 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.PGUpdate|x64.Build.0 = PGUpdate|x64 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.Release|Win32.ActiveCfg = Release|Win32 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.Release|Win32.Build.0 = Release|Win32 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.Release|x64.ActiveCfg = Release|x64 + {78D80A15-BD8C-44E2-B49E-1F05B0A0A687}.Release|x64.Build.0 = Release|x64 {12728250-16EC-4DC6-94D7-E21DD88947F8}.Debug|Win32.ActiveCfg = Debug|Win32 {12728250-16EC-4DC6-94D7-E21DD88947F8}.Debug|Win32.Build.0 = Debug|Win32 {12728250-16EC-4DC6-94D7-E21DD88947F8}.Debug|x64.ActiveCfg = Debug|x64 diff --git a/PCbuild/pythoncore.vcxproj.filters b/PCbuild/pythoncore.vcxproj.filters index 42b2fc3ea7c7f5..13600cb5c9fe9b 100644 --- a/PCbuild/pythoncore.vcxproj.filters +++ b/PCbuild/pythoncore.vcxproj.filters @@ -506,6 +506,9 @@ Modules + + Modules + Modules diff --git a/setup.py b/setup.py index 250425ebd3148e..8e98fdcb7ef3ca 100644 --- a/setup.py +++ b/setup.py @@ -699,6 +699,8 @@ def detect_modules(self): exts.append( Extension('_opcode', ['_opcode.c']) ) # asyncio speedups exts.append( Extension("_asyncio", ["_asynciomodule.c"]) ) + # _queue module + exts.append( Extension("_queue", ["_queuemodule.c"]) ) # Modules with some UNIX dependencies -- on by default: # (If you have a really backward UNIX, select and socket may not be