Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Include/internal/pycore_ceval.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,8 @@ extern void _PyEval_SignalReceived(void);
#define _Py_PENDING_MAINTHREADONLY 1
#define _Py_PENDING_RAWFREE 2

typedef int _Py_add_pending_call_result;
#define _Py_ADD_PENDING_SUCCESS 0
#define _Py_ADD_PENDING_FULL -1

// Export for '_testinternalcapi' shared extension
PyAPI_FUNC(_Py_add_pending_call_result) _PyEval_AddPendingCall(
PyAPI_FUNC(int) _PyEval_AddPendingCall(
PyInterpreterState *interp,
_Py_pending_call_func func,
void *arg,
Expand Down
35 changes: 11 additions & 24 deletions Include/internal/pycore_ceval_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,38 @@ extern "C" {

typedef int (*_Py_pending_call_func)(void *);

struct _pending_call {
typedef struct _pending_call {
_Py_pending_call_func func;
void *arg;
int flags;
};

#define PENDINGCALLSARRAYSIZE 300
struct _pending_call *next;
} _pending_call;

#define MAXPENDINGCALLS PENDINGCALLSARRAYSIZE
/* For interpreter-level pending calls, we want to avoid spending too
much time on pending calls in any one thread, so we apply a limit. */
#if MAXPENDINGCALLS > 100
# define MAXPENDINGCALLSLOOP 100
#else
# define MAXPENDINGCALLSLOOP MAXPENDINGCALLS
#endif
#define MAXPENDINGCALLSLOOP 100

/* We keep the number small to preserve as much compatibility
as possible with earlier versions. */
#define MAXPENDINGCALLS_MAIN 32
Comment on lines -34 to -36
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This point is still valid, so we should probably leave nearly all of the "main" pending calls stuff alone.

/* For the main thread, we want to make sure all pending calls are
run at once, for the sake of prompt signal handling. This is
unlikely to cause any problems since there should be very few
pending calls for the main thread. */
#define MAXPENDINGCALLSLOOP_MAIN 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably leave this alone, but I'm not opposed to this change.

#define MAXPENDINGCALLSLOOP_MAIN INT32_MAX

struct _pending_calls {
PyThreadState *handling_thread;
PyMutex mutex;
/* Request for running pending calls. */
/* The number of pending calls. */
int32_t npending;
/* The maximum allowed number of pending calls.
If the queue fills up to this point then _PyEval_AddPendingCall()
will return _Py_ADD_PENDING_FULL. */
int32_t max;
Comment on lines -48 to -51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should leave this, for the sake of the "main" pending calls stuff.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be worth leaving it for a while, so we can more easily deal with unexpected consequences of switching to an unbounded linked list.

/* We don't want a flood of pending calls to interrupt any one thread
for too long, so we keep a limit on the number handled per pass.
A value of 0 means there is no limit (other than the maximum
size of the list of pending calls). */
int32_t maxloop;
struct _pending_call calls[PENDINGCALLSARRAYSIZE];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should leave this statically allocated array for the sake of the "main" pending calls. We could go back to the old limit of MAXPENDINGCALLS_MAIN though.

Of course, we'd still use this for per-interpreter pending calls, since it's already there.

int first;
int next;
_pending_call *head;

#define _Py_PENDING_CALLS_FREELIST_SIZE 100
_pending_call *freelist;
size_t freelist_num;
};


Expand Down Expand Up @@ -97,8 +86,6 @@ struct _ceval_runtime_state {
/* Pending calls to be made only on the main thread. */
// The signal machinery falls back on this
// so it must be especially stable and efficient.
// For example, we use a preallocated array
// for the list of pending calls.
Comment on lines -100 to -101
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This point is still valid.

struct _pending_calls pending_mainthread;
PyMutex sys_trace_profile_mutex;
};
Expand Down
2 changes: 0 additions & 2 deletions Include/internal/pycore_runtime_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ extern PyTypeObject _PyExc_MemoryError;
.parser = _parser_runtime_state_INIT, \
.ceval = { \
.pending_mainthread = { \
.max = MAXPENDINGCALLS_MAIN, \
.maxloop = MAXPENDINGCALLSLOOP_MAIN, \
}, \
.perf = _PyEval_RUNTIME_PERF_INIT, \
Expand Down Expand Up @@ -223,7 +222,6 @@ extern PyTypeObject _PyExc_MemoryError;
.ceval = { \
.recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \
.pending = { \
.max = MAXPENDINGCALLS, \
.maxloop = MAXPENDINGCALLSLOOP, \
}, \
}, \
Expand Down
59 changes: 6 additions & 53 deletions Lib/test/test_capi/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1557,27 +1557,20 @@ def callback():

for i in range(n):
time.sleep(random.random()*0.02) #0.01 secs on average
#try submitting callback until successful.
#rely on regular interrupt to flush queue if we are
#unsuccessful.
while True:
if _testcapi._pending_threadfunc(callback):
break
self.assertTrue(_testcapi._pending_threadfunc(callback))

def pendingcalls_submit(self, l, n, *, main=True, ensure=False):
def pendingcalls_submit(self, l, n, *, main=True):
def callback():
#this function can be interrupted by thread switching so let's
#use an atomic operation
l.append(None)

if main:
return _testcapi._pending_threadfunc(callback, n,
blocking=False,
ensure_added=ensure)
blocking=False)
else:
return _testinternalcapi.pending_threadfunc(callback, n,
blocking=False,
ensure_added=ensure)
blocking=False)

def pendingcalls_wait(self, l, numadded, context = None):
#now, stick around until l[0] has grown to 10
Expand Down Expand Up @@ -1635,51 +1628,11 @@ def test_main_pendingcalls_non_threaded(self):
#again, just using the main thread, likely they will all be dispatched at
#once. It is ok to ask for too many, because we loop until we find a slot.
#the loop can be interrupted to dispatch.
#there are only 32 dispatch slots, so we go for twice that!
l = []
n = 64
self.main_pendingcalls_submit(l, n)
self.pendingcalls_wait(l, n)

def test_max_pending(self):
with self.subTest('main-only'):
maxpending = 32

l = []
added = self.pendingcalls_submit(l, 1, main=True)
self.pendingcalls_wait(l, added)
self.assertEqual(added, 1)

l = []
added = self.pendingcalls_submit(l, maxpending, main=True)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)

l = []
added = self.pendingcalls_submit(l, maxpending+1, main=True)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)

with self.subTest('not main-only'):
# Per-interpreter pending calls has a much higher limit
# on how many may be pending at a time.
maxpending = 300

l = []
added = self.pendingcalls_submit(l, 1, main=False)
self.pendingcalls_wait(l, added)
self.assertEqual(added, 1)

l = []
added = self.pendingcalls_submit(l, maxpending, main=False)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)

l = []
added = self.pendingcalls_submit(l, maxpending+1, main=False)
self.pendingcalls_wait(l, added)
self.assertEqual(added, maxpending)

class PendingTask(types.SimpleNamespace):

_add_pending = _testinternalcapi.pending_threadfunc
Expand Down Expand Up @@ -1713,10 +1666,10 @@ def callback():
# the eval breaker, so we take a naive approach to
# make sure.
if threading.get_ident() not in worker_tids:
self._add_pending(callback, ensure_added=True)
self._add_pending(callback)
return
self.run()
self._add_pending(callback, ensure_added=True)
self._add_pending(callback)

def create_thread(self, worker_tids):
return threading.Thread(
Expand Down
20 changes: 6 additions & 14 deletions Modules/_testcapimodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -837,14 +837,13 @@ static PyObject *
pending_threadfunc(PyObject *self, PyObject *arg, PyObject *kwargs)
{
static char *kwlist[] = {"callback", "num",
"blocking", "ensure_added", NULL};
"blocking", NULL};
PyObject *callable;
unsigned int num = 1;
int blocking = 0;
int ensure_added = 0;
if (!PyArg_ParseTupleAndKeywords(arg, kwargs,
"O|I$pp:_pending_threadfunc", kwlist,
&callable, &num, &blocking, &ensure_added))
"O|I$p:_pending_threadfunc", kwlist,
&callable, &num, &blocking))
{
return NULL;
}
Expand All @@ -861,16 +860,9 @@ pending_threadfunc(PyObject *self, PyObject *arg, PyObject *kwargs)

unsigned int num_added = 0;
for (; num_added < num; num_added++) {
if (ensure_added) {
int r;
do {
r = Py_AddPendingCall(&_pending_callback, callable);
} while (r < 0);
}
else {
if (Py_AddPendingCall(&_pending_callback, callable) < 0) {
break;
}
if (Py_AddPendingCall(&_pending_callback, callable) < 0) {
// out of memory and freelist is empty
break;
}
}

Expand Down
42 changes: 15 additions & 27 deletions Modules/_testinternalcapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -1072,18 +1072,17 @@ pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
PyObject *callable;
unsigned int num = 1;
int blocking = 0;
int ensure_added = 0;
static char *kwlist[] = {"callback", "num",
"blocking", "ensure_added", NULL};
"blocking", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs,
"O|I$pp:pending_threadfunc", kwlist,
&callable, &num, &blocking, &ensure_added))
"O|I$p:pending_threadfunc", kwlist,
&callable, &num, &blocking))
{
return NULL;
}
PyInterpreterState *interp = _PyInterpreterState_GET();

/* create the reference for the callbackwhile we hold the lock */
/* create the reference for the callback while we hold the lock */
for (unsigned int i = 0; i < num; i++) {
Py_INCREF(callable);
}
Expand All @@ -1095,18 +1094,9 @@ pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)

unsigned int num_added = 0;
for (; num_added < num; num_added++) {
if (ensure_added) {
_Py_add_pending_call_result r;
do {
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
assert(r == _Py_ADD_PENDING_SUCCESS
|| r == _Py_ADD_PENDING_FULL);
} while (r == _Py_ADD_PENDING_FULL);
}
else {
if (_PyEval_AddPendingCall(interp, &_pending_callback, callable, 0) < 0) {
break;
}
if (_PyEval_AddPendingCall(interp, &_pending_callback, callable, 0) < 0) {
// out of memory and freelist is empty
break;
}
}

Expand Down Expand Up @@ -1162,16 +1152,14 @@ pending_identify(PyObject *self, PyObject *args)
PyThread_acquire_lock(mutex, WAIT_LOCK);
/* It gets released in _pending_identify_callback(). */

_Py_add_pending_call_result r;
do {
Py_BEGIN_ALLOW_THREADS
r = _PyEval_AddPendingCall(interp,
&_pending_identify_callback, (void *)mutex,
0);
Py_END_ALLOW_THREADS
assert(r == _Py_ADD_PENDING_SUCCESS
|| r == _Py_ADD_PENDING_FULL);
} while (r == _Py_ADD_PENDING_FULL);

Py_BEGIN_ALLOW_THREADS
int r = _PyEval_AddPendingCall(interp,
&_pending_identify_callback, (void *)mutex,
0);
(void)r;
assert(r == 0);
Py_END_ALLOW_THREADS

/* Wait for the pending call to complete. */
PyThread_acquire_lock(mutex, WAIT_LOCK);
Expand Down
Loading
Loading