diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 2d931040e57d15..7f7ee81403a759 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -1110,7 +1110,7 @@ def _unregister_eager_task(task): from _asyncio import (_register_task, _register_eager_task, _unregister_task, _unregister_eager_task, _enter_task, _leave_task, _swap_current_task, - _scheduled_tasks, _eager_tasks, _current_tasks, + _scheduled_tasks, _eager_tasks, current_task, all_tasks) except ImportError: pass diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index ff24fd76a61733..761c53a5e45fdd 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -139,10 +139,6 @@ typedef struct { PyObject *asyncio_mod; PyObject *context_kwname; - /* Dictionary containing tasks that are currently active in - all running event loops. {EventLoop: Task} */ - PyObject *current_tasks; - /* WeakSet containing scheduled 3rd party tasks which don't inherit from native asyncio.Task */ PyObject *non_asyncio_tasks; @@ -2061,8 +2057,6 @@ static int task_call_step_soon(asyncio_state *state, TaskObj *, PyObject *); static PyObject * task_wakeup(TaskObj *, PyObject *); static PyObject * task_step(asyncio_state *, TaskObj *, PyObject *); static int task_eager_start(asyncio_state *state, TaskObj *task); -static inline void clear_ts_asyncio_running_task(PyObject *loop); -static inline void set_ts_asyncio_running_task(PyObject *loop, PyObject *task); /* ----- Task._step wrapper */ @@ -2235,159 +2229,71 @@ unregister_eager_task(asyncio_state *state, PyObject *task) } static int -enter_task(asyncio_state *state, PyObject *loop, PyObject *task) +enter_task(PyObject *loop, PyObject *task) { - PyObject *item; - int res = PyDict_SetDefaultRef(state->current_tasks, loop, task, &item); - if (res < 0) { + _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); + + if (ts->asyncio_running_loop != loop) { + PyErr_Format(PyExc_RuntimeError, "loop %R is not the running loop", loop); return -1; } - else if (res == 1) { + + if (ts->asyncio_running_task != NULL) { PyErr_Format( PyExc_RuntimeError, "Cannot enter into task %R while another " \ "task %R is being executed.", - task, item, NULL); - Py_DECREF(item); + task, ts->asyncio_running_task, NULL); return -1; } - assert(task == item); - Py_CLEAR(item); - set_ts_asyncio_running_task(loop, task); + ts->asyncio_running_task = Py_NewRef(task); return 0; } static int -err_leave_task(PyObject *item, PyObject *task) +leave_task(PyObject *loop, PyObject *task) { - PyErr_Format( - PyExc_RuntimeError, - "Leaving task %R does not match the current task %R.", - task, item); - return -1; -} - -static int -leave_task_predicate(PyObject *item, void *task) -{ - if (item != task) { - return err_leave_task(item, (PyObject *)task); - } - return 1; -} + _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); -static int -leave_task(asyncio_state *state, PyObject *loop, PyObject *task) -{ - int res = _PyDict_DelItemIf(state->current_tasks, loop, - leave_task_predicate, task); - if (res == 0) { - // task was not found - return err_leave_task(Py_None, task); + if (ts->asyncio_running_loop != loop) { + PyErr_Format(PyExc_RuntimeError, "loop %R is not the running loop", loop); + return -1; } - clear_ts_asyncio_running_task(loop); - return res; -} -static PyObject * -swap_current_task_lock_held(PyDictObject *current_tasks, PyObject *loop, - Py_hash_t hash, PyObject *task) -{ - PyObject *prev_task; - if (_PyDict_GetItemRef_KnownHash_LockHeld(current_tasks, loop, hash, &prev_task) < 0) { - return NULL; - } - if (_PyDict_SetItem_KnownHash_LockHeld(current_tasks, loop, task, hash) < 0) { - Py_XDECREF(prev_task); - return NULL; - } - if (prev_task == NULL) { - Py_RETURN_NONE; + if (ts->asyncio_running_task != task) { + PyErr_Format( + PyExc_RuntimeError, + "Invalid attempt to leave task %R while " \ + "task %R is entered.", + task, ts->asyncio_running_task ? ts->asyncio_running_task : Py_None, NULL); + return -1; } - return prev_task; + Py_CLEAR(ts->asyncio_running_task); + return 0; } static PyObject * -swap_current_task(asyncio_state *state, PyObject *loop, PyObject *task) +swap_current_task(PyObject *loop, PyObject *task) { - PyObject *prev_task; - - clear_ts_asyncio_running_task(loop); - if (task == Py_None) { - if (PyDict_Pop(state->current_tasks, loop, &prev_task) < 0) { - return NULL; - } - if (prev_task == NULL) { - Py_RETURN_NONE; - } - return prev_task; - } + _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); - Py_hash_t hash = PyObject_Hash(loop); - if (hash == -1) { + if (ts->asyncio_running_loop != loop) { + PyErr_Format(PyExc_RuntimeError, "loop %R is not the running loop", loop); return NULL; } - PyDictObject *current_tasks = (PyDictObject *)state->current_tasks; - Py_BEGIN_CRITICAL_SECTION(current_tasks); - prev_task = swap_current_task_lock_held(current_tasks, loop, hash, task); - Py_END_CRITICAL_SECTION(); - set_ts_asyncio_running_task(loop, task); - return prev_task; -} - -static inline void -set_ts_asyncio_running_task(PyObject *loop, PyObject *task) -{ - // We want to enable debuggers and profilers to be able to quickly - // introspect the asyncio running state from another process. - // When we do that, we need to essentially traverse the address space - // of a Python process and understand what every Python thread in it is - // currently doing, mainly: - // - // * current frame - // * current asyncio task - // - // A naive solution would be to require profilers and debuggers to - // find the current task in the "_asynciomodule" module state, but - // unfortunately that would require a lot of complicated remote - // memory reads and logic, as Python's dict is a notoriously complex - // and ever-changing data structure. - // - // So the easier solution is to put a strong reference to the currently - // running `asyncio.Task` on the current thread state (the current loop - // is also stored there.) - _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); - if (ts->asyncio_running_loop == loop) { - // Protect from a situation when someone calls this method - // from another thread. This shouldn't ever happen though, - // as `enter_task` and `leave_task` can either be called by: - // - // - `asyncio.Task` itself, in `Task.__step()`. That method - // can only be called by the event loop itself. - // - // - third-party Task "from scratch" implementations, that - // our `capture_call_graph` API doesn't support anyway. - // - // That said, we still want to make sure we don't end up in - // a broken state, so we check that we're in the correct thread - // by comparing the *loop* argument to the event loop running - // in the current thread. If they match we know we're in the - // right thread, as asyncio event loops don't change threads. - assert(ts->asyncio_running_task == NULL); + /* transfer ownership to avoid redundant ref counting */ + PyObject *prev_task = ts->asyncio_running_task; + if (task != Py_None) { ts->asyncio_running_task = Py_NewRef(task); + } else { + ts->asyncio_running_task = NULL; } -} - -static inline void -clear_ts_asyncio_running_task(PyObject *loop) -{ - // See comment in set_ts_asyncio_running_task() for details. - _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); - if (ts->asyncio_running_loop == NULL || ts->asyncio_running_loop == loop) { - Py_CLEAR(ts->asyncio_running_task); + if (prev_task == NULL) { + Py_RETURN_NONE; } + return prev_task; } /* ----- Task */ @@ -3539,7 +3445,7 @@ task_step(asyncio_state *state, TaskObj *task, PyObject *exc) { PyObject *res; - if (enter_task(state, task->task_loop, (PyObject*)task) < 0) { + if (enter_task(task->task_loop, (PyObject*)task) < 0) { return NULL; } @@ -3547,12 +3453,12 @@ task_step(asyncio_state *state, TaskObj *task, PyObject *exc) if (res == NULL) { PyObject *exc = PyErr_GetRaisedException(); - leave_task(state, task->task_loop, (PyObject*)task); + leave_task(task->task_loop, (PyObject*)task); _PyErr_ChainExceptions1(exc); return NULL; } else { - if (leave_task(state, task->task_loop, (PyObject*)task) < 0) { + if (leave_task(task->task_loop, (PyObject*)task) < 0) { Py_DECREF(res); return NULL; } @@ -3566,7 +3472,7 @@ static int task_eager_start(asyncio_state *state, TaskObj *task) { assert(task != NULL); - PyObject *prevtask = swap_current_task(state, task->task_loop, (PyObject *)task); + PyObject *prevtask = swap_current_task(task->task_loop, (PyObject *)task); if (prevtask == NULL) { return -1; } @@ -3595,7 +3501,7 @@ task_eager_start(asyncio_state *state, TaskObj *task) Py_DECREF(stepres); } - PyObject *curtask = swap_current_task(state, task->task_loop, prevtask); + PyObject *curtask = swap_current_task(task->task_loop, prevtask); Py_DECREF(prevtask); if (curtask == NULL) { retval = -1; @@ -3907,8 +3813,7 @@ static PyObject * _asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task) /*[clinic end generated code: output=a22611c858035b73 input=de1b06dca70d8737]*/ { - asyncio_state *state = get_asyncio_state(module); - if (enter_task(state, loop, task) < 0) { + if (enter_task(loop, task) < 0) { return NULL; } Py_RETURN_NONE; @@ -3932,8 +3837,7 @@ static PyObject * _asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task) /*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/ { - asyncio_state *state = get_asyncio_state(module); - if (leave_task(state, loop, task) < 0) { + if (leave_task(loop, task) < 0) { return NULL; } Py_RETURN_NONE; @@ -3957,7 +3861,7 @@ _asyncio__swap_current_task_impl(PyObject *module, PyObject *loop, PyObject *task) /*[clinic end generated code: output=9f88de958df74c7e input=c9c72208d3d38b6c]*/ { - return swap_current_task(get_asyncio_state(module), loop, task); + return swap_current_task(loop, task); } @@ -3974,9 +3878,6 @@ static PyObject * _asyncio_current_task_impl(PyObject *module, PyObject *loop) /*[clinic end generated code: output=fe15ac331a7f981a input=58910f61a5627112]*/ { - PyObject *ret; - asyncio_state *state = get_asyncio_state(module); - if (loop == Py_None) { loop = _asyncio_get_running_loop_impl(module); if (loop == NULL) { @@ -3986,11 +3887,36 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop) Py_INCREF(loop); } - int rc = PyDict_GetItemRef(state->current_tasks, loop, &ret); - Py_DECREF(loop); - if (rc == 0) { + _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); + // Fast path for the current running loop of current thread + // no locking or stop the world pause is required + if (ts->asyncio_running_loop == loop) { + if (ts->asyncio_running_task != NULL) { + Py_DECREF(loop); + return Py_NewRef(ts->asyncio_running_task); + } + Py_DECREF(loop); Py_RETURN_NONE; } + + PyObject *ret = Py_None; + // Stop the world and traverse the per-thread current tasks + // and return the task if the loop matches + PyInterpreterState *interp = ts->base.interp; + _PyEval_StopTheWorld(interp); + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { + ts = (_PyThreadStateImpl *)p; + if (ts->asyncio_running_loop == loop) { + if (ts->asyncio_running_task != NULL) { + ret = Py_NewRef(ts->asyncio_running_task); + } + goto exit; + } + } +exit: + _Py_FOR_EACH_TSTATE_END(interp); + _PyEval_StartTheWorld(interp); + Py_DECREF(loop); return ret; } @@ -4258,7 +4184,6 @@ module_traverse(PyObject *mod, visitproc visit, void *arg) Py_VISIT(state->non_asyncio_tasks); Py_VISIT(state->eager_tasks); - Py_VISIT(state->current_tasks); Py_VISIT(state->iscoroutine_typecache); Py_VISIT(state->context_kwname); @@ -4289,7 +4214,6 @@ module_clear(PyObject *mod) Py_CLEAR(state->non_asyncio_tasks); Py_CLEAR(state->eager_tasks); - Py_CLEAR(state->current_tasks); Py_CLEAR(state->iscoroutine_typecache); Py_CLEAR(state->context_kwname); @@ -4319,11 +4243,6 @@ module_init(asyncio_state *state) goto fail; } - state->current_tasks = PyDict_New(); - if (state->current_tasks == NULL) { - goto fail; - } - state->iscoroutine_typecache = PySet_New(NULL); if (state->iscoroutine_typecache == NULL) { goto fail; @@ -4456,11 +4375,6 @@ module_exec(PyObject *mod) return -1; } - if (PyModule_AddObjectRef(mod, "_current_tasks", state->current_tasks) < 0) { - return -1; - } - - return 0; }