diff --git a/Include/pythread.h b/Include/pythread.h index 0784f6b2e5391fb..50da70681fbb33d 100644 --- a/Include/pythread.h +++ b/Include/pythread.h @@ -20,6 +20,14 @@ PyAPI_FUNC(unsigned long) PyThread_start_new_thread(void (*)(void *), void *); PyAPI_FUNC(void) _Py_NO_RETURN PyThread_exit_thread(void); PyAPI_FUNC(unsigned long) PyThread_get_thread_ident(void); +#if !defined(Py_LIMITED_API) +PyAPI_FUNC(unsigned long) PyThread_start_joinable_thread(void (*func)(void *), + void *arg, + Py_uintptr_t* handle); +PyAPI_FUNC(int) PyThread_join_thread(Py_uintptr_t); +PyAPI_FUNC(int) PyThread_detach_thread(Py_uintptr_t); +#endif + #if (defined(__APPLE__) || defined(__linux__) || defined(_WIN32) \ || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) \ || defined(__DragonFly__) || defined(_AIX)) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 0b333ca3b7e9dcd..b1a50b30cb7837c 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2578,7 +2578,7 @@ def test_async(self): self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) def test_async_timeout(self): - res = self.pool.apply_async(sqr, (6, TIMEOUT2 + support.SHORT_TIMEOUT)) + res = self.pool.apply_async(sqr, (6, 5 * TIMEOUT2)) get = TimingWrapper(res.get) self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) @@ -2682,6 +2682,9 @@ def test_make_pool(self): p.join() def test_terminate(self): + if self.TYPE == 'threads': + self.skipTest("Threads cannot be terminated") + # Simulate slow tasks which take "forever" to complete args = [support.LONG_TIMEOUT for i in range(10_000)] result = self.pool.map_async(time.sleep, args, chunksize=1) diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 8656fbdd83e9c70..27cdbfced896778 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -160,6 +160,105 @@ def task(): "Exception ignored in thread started by") self.assertIsNotNone(cm.unraisable.exc_traceback) + def test_join_thread(self): + finished = [] + + def task(): + time.sleep(0.05) + finished.append(None) + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + thread.join_thread(ident) + self.assertEqual(len(finished), 1) + + def test_join_thread_already_exited(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + time.sleep(0.05) + thread.join_thread(ident) + + def test_join_non_joinable(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + ident = thread.start_new_thread(task, ()) + with self.assertRaisesRegex(ValueError, "not joinable"): + thread.join_thread(ident) + + def test_join_several_times(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + thread.join_thread(ident) + with self.assertRaisesRegex(ValueError, "not joinable"): + thread.join_thread(ident) + + def test_join_from_self(self): + errors = [] + lock = thread.allocate_lock() + lock.acquire() + + def task(): + ident = thread.get_ident() + # Wait for start_new_thread() to return so that the joinable threads + # are populated with the ident, otherwise ValueError would be raised + # instead. + lock.acquire() + try: + thread.join_thread(ident) + except Exception as e: + errors.append(e) + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + lock.release() + time.sleep(0.05) + # Can still join after join_thread() failed in other thread + thread.join_thread(ident) + + assert len(errors) == 1 + with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): + raise errors[0] + + def test_detach_then_join(self): + lock = thread.allocate_lock() + lock.acquire() + + def task(): + lock.acquire() + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + # detach_thread() returns even though the thread is blocked on lock + thread.detach_thread(ident) + # join_thread() then cannot be called anymore + with self.assertRaisesRegex(ValueError, "not joinable"): + thread.join_thread(ident) + lock.release() + + def test_join_then_detach(self): + def task(): + pass + + with threading_helper.wait_threads_exit(): + joinable = True + ident = thread.start_new_thread(task, (), {}, joinable) + thread.join_thread(ident) + with self.assertRaisesRegex(ValueError, "not joinable"): + thread.detach_thread(ident) + class Barrier: def __init__(self, num_threads): diff --git a/Lib/threading.py b/Lib/threading.py index 41c3a9ff93856fd..8c23e8f313555a5 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -5,6 +5,7 @@ import _thread import functools import warnings +import _weakref from time import monotonic as _time from _weakrefset import WeakSet @@ -34,6 +35,8 @@ # Rename some stuff so "from threading import *" is safe _start_new_thread = _thread.start_new_thread +_join_thread = _thread.join_thread +_detach_thread = _thread.detach_thread _daemon_threads_allowed = _thread.daemon_threads_allowed _allocate_lock = _thread.allocate_lock _set_sentinel = _thread._set_sentinel @@ -924,6 +927,7 @@ class is implemented. if _HAVE_THREAD_NATIVE_ID: self._native_id = None self._tstate_lock = None + self._join_lock = None self._started = Event() self._is_stopped = False self._initialized = True @@ -944,11 +948,14 @@ def _reset_internal_locks(self, is_alive): if self._tstate_lock is not None: self._tstate_lock._at_fork_reinit() self._tstate_lock.acquire() + if self._join_lock is not None: + self._join_lock._at_fork_reinit() else: # The thread isn't alive after fork: it doesn't have a tstate # anymore. self._is_stopped = True self._tstate_lock = None + self._join_lock = None def __repr__(self): assert self._initialized, "Thread.__init__() was not called" @@ -980,15 +987,24 @@ def start(self): if self._started.is_set(): raise RuntimeError("threads can only be started once") + self._join_lock = _allocate_lock() + with _active_limbo_lock: _limbo[self] = self try: - _start_new_thread(self._bootstrap, ()) + # Start joinable thread + _start_new_thread(self._bootstrap, (), {}, True) except Exception: with _active_limbo_lock: del _limbo[self] raise - self._started.wait() + self._started.wait() # Will set ident and native_id + + # We need to make sure the OS thread is either explicitly joined or + # detached at some point, otherwise system resources can be leaked. + def _finalizer(wr, _detach_thread=_detach_thread, ident=self._ident): + _detach_thread(ident) + self._non_joined_finalizer = _weakref.ref(self, _finalizer) def run(self): """Method representing the thread's activity. @@ -1144,6 +1160,19 @@ def join(self, timeout=None): # historically .join(timeout=x) for x<0 has acted as if timeout=0 self._wait_for_tstate_lock(timeout=max(timeout, 0)) + if self._is_stopped: + self._join_os_thread() + + def _join_os_thread(self): + join_lock = self._join_lock + if join_lock is not None: + # Calling join() multiple times simultaneously would result in early + # return for one of the callers. + with join_lock: + _join_thread(self._ident) + self._join_lock = None + self._non_joined_finalizer = None + def _wait_for_tstate_lock(self, block=True, timeout=-1): # Issue #18808: wait for the thread state to be gone. # At the end of the thread's life, after all knowledge of the thread @@ -1223,6 +1252,8 @@ def is_alive(self): if self._is_stopped or not self._started.is_set(): return False self._wait_for_tstate_lock(False) + if self._is_stopped: + self._join_os_thread() return not self._is_stopped @property diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index a849a200df625c4..a1317faf7108b3c 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -29,6 +29,7 @@ typedef struct { PyTypeObject *lock_type; PyTypeObject *local_type; PyTypeObject *local_dummy_type; + PyObject *joinable_dict; } thread_module_state; static inline thread_module_state* @@ -39,6 +40,25 @@ get_thread_state(PyObject *module) return (thread_module_state *)state; } +static int get_joinable_thread(thread_module_state* state, PyObject* ident_obj, + unsigned long *ident, Py_uintptr_t *handle) { + PyObject* handle_obj = PyDict_GetItemWithError(state->joinable_dict, ident_obj); + if (handle_obj == NULL) { + if (!PyErr_Occurred()) { + PyErr_SetString(PyExc_ValueError, "the given thread is not joinable"); + } + return -1; + } + *ident = PyLong_AsUnsignedLong(ident_obj); + unsigned long long ull_handle = PyLong_AsUnsignedLongLong(handle_obj); + if ((*ident == (unsigned long) -1 || ull_handle == (unsigned long long) -1) + && PyErr_Occurred()) { + // This should not occur as we control the contents of state->joinable_dict + return -1; + } + *handle = (Py_uintptr_t) ull_handle; + return 0; +} /* Lock objects */ @@ -1110,12 +1130,13 @@ Return True if daemon threads are allowed in the current interpreter,\n\ and False otherwise.\n"); static PyObject * -thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) +thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs) { - PyObject *func, *args, *kwargs = NULL; + PyObject *func, *args, *kwargs = NULL, *joinable = NULL; + thread_module_state *state = get_thread_state(module); - if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3, - &func, &args, &kwargs)) + if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 4, + &func, &args, &kwargs, &joinable)) return NULL; if (!PyCallable_Check(func)) { PyErr_SetString(PyExc_TypeError, @@ -1132,6 +1153,11 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) "optional 3rd arg must be a dictionary"); return NULL; } + if (joinable != NULL && !PyBool_Check(joinable)) { + PyErr_SetString(PyExc_TypeError, + "optional 4th arg must be a boolean"); + return NULL; + } if (PySys_Audit("_thread.start_new_thread", "OOO", func, args, kwargs ? kwargs : Py_None) < 0) { @@ -1169,18 +1195,41 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) boot->args = Py_NewRef(args); boot->kwargs = Py_XNewRef(kwargs); - unsigned long ident = PyThread_start_new_thread(thread_run, (void*) boot); + unsigned long ident; + Py_uintptr_t handle = 0; + if (joinable == Py_True) { + ident = PyThread_start_joinable_thread(thread_run, (void*) boot, &handle); + } else { + ident = PyThread_start_new_thread(thread_run, (void*) boot); + } if (ident == PYTHREAD_INVALID_THREAD_ID) { PyErr_SetString(ThreadError, "can't start new thread"); PyThreadState_Clear(boot->tstate); thread_bootstate_free(boot, 1); return NULL; } - return PyLong_FromUnsignedLong(ident); + PyObject* ident_obj = PyLong_FromUnsignedLong(ident); + if (ident_obj == NULL) { + return NULL; + } + if (joinable == Py_True) { + PyObject* handle_obj = PyLong_FromUnsignedLongLong(handle); + if (handle_obj == NULL) { + Py_DECREF(ident_obj); + return NULL; + } + if (PyDict_SetItem(state->joinable_dict, ident_obj, handle_obj)) { + Py_DECREF(handle_obj); + Py_DECREF(ident_obj); + return NULL; + } + Py_DECREF(handle_obj); + } + return ident_obj; } PyDoc_STRVAR(start_new_doc, -"start_new_thread(function, args[, kwargs])\n\ +"start_new_thread(function, args[, kwargs[, joinable]])\n\ (start_new() is an obsolete synonym)\n\ \n\ Start a new thread and return its identifier. The thread will call the\n\ @@ -1204,6 +1253,89 @@ PyDoc_STRVAR(exit_doc, This is synonymous to ``raise SystemExit''. It will cause the current\n\ thread to exit silently unless the exception is caught."); +static PyObject * +thread_PyThread_join_thread(PyObject *module, PyObject *ident_obj) +{ + thread_module_state *state = get_thread_state(module); + + if (!PyLong_Check(ident_obj)) { + PyErr_Format(PyExc_TypeError, "thread ident must be an int, not %s", + Py_TYPE(ident_obj)->tp_name); + return NULL; + } + // Check if the ident is part of the joinable threads and fetch its handle. + unsigned long ident; + Py_uintptr_t handle; + if (get_joinable_thread(state, ident_obj, &ident, &handle)) { + return NULL; + } + if (ident == PyThread_get_thread_ident()) { + // PyThread_join_thread() would deadlock or error out. + PyErr_SetString(ThreadError, "Cannot join current thread"); + return NULL; + } + // Before actually joining, we must first remove the ident from joinable_dict, + // as joining several times simultaneously or sequentially is undefined behavior. + if (PyDict_DelItem(state->joinable_dict, ident_obj)) { + return NULL; + } + int ret; + Py_BEGIN_ALLOW_THREADS + ret = PyThread_join_thread((Py_uintptr_t) handle); + Py_END_ALLOW_THREADS + if (ret) { + PyErr_SetString(ThreadError, "Failed joining thread"); + return NULL; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(join_thread_doc, +"join_thread(ident)\n\ +\n\ +Join the thread with the given identifier. The thread must have been started\n\ +with start_new_thread() with the `joinable` argument set to True.\n\ +This function can only be called once per joinable thread, and it cannot be\n\ +called if detach_thread() was previously called on the same thread.\n"); + +static PyObject * +thread_PyThread_detach_thread(PyObject *module, PyObject *ident_obj) +{ + thread_module_state *state = get_thread_state(module); + + if (!PyLong_Check(ident_obj)) { + PyErr_Format(PyExc_TypeError, "thread ident must be an int, not %s", + Py_TYPE(ident_obj)->tp_name); + return NULL; + } + unsigned long ident; + Py_uintptr_t handle; + if (get_joinable_thread(state, ident_obj, &ident, &handle)) { + return NULL; + } + if (PyDict_DelItem(state->joinable_dict, ident_obj)) { + if (PyErr_ExceptionMatches(PyExc_KeyError)) { + PyErr_SetString(PyExc_ValueError, + "the given thread is not joinable and thus cannot be detached"); + } + return NULL; + } + int ret = PyThread_detach_thread(handle); + if (ret) { + PyErr_SetString(ThreadError, "Failed detaching thread"); + return NULL; + } + Py_RETURN_NONE; +} + +PyDoc_STRVAR(detach_thread_doc, +"detach_thread(ident)\n\ +\n\ +Detach the thread with the given identifier. The thread must have been started\n\ +with start_new_thread() with the `joinable` argument set to True.\n\ +This function can only be called once per joinable thread, and it cannot be\n\ +called if join_thread() was previously called on the same thread.\n"); + static PyObject * thread_PyThread_interrupt_main(PyObject *self, PyObject *args) { @@ -1574,6 +1706,10 @@ static PyMethodDef thread_methods[] = { METH_VARARGS, start_new_doc}, {"start_new", (PyCFunction)thread_PyThread_start_new_thread, METH_VARARGS, start_new_doc}, + {"join_thread", (PyCFunction)thread_PyThread_join_thread, + METH_O, join_thread_doc}, + {"detach_thread", (PyCFunction)thread_PyThread_detach_thread, + METH_O, detach_thread_doc}, {"daemon_threads_allowed", (PyCFunction)thread_daemon_threads_allowed, METH_NOARGS, daemon_threads_allowed_doc}, {"allocate_lock", thread_PyThread_allocate_lock, @@ -1652,6 +1788,12 @@ thread_module_exec(PyObject *module) return -1; } + // Dict of joinable threads: ident -> handle + state->joinable_dict = PyDict_New(); + if (state->joinable_dict == NULL) { + return -1; + } + // Add module attributes if (PyDict_SetItemString(d, "error", ThreadError) < 0) { return -1; @@ -1690,6 +1832,7 @@ thread_module_traverse(PyObject *module, visitproc visit, void *arg) Py_VISIT(state->lock_type); Py_VISIT(state->local_type); Py_VISIT(state->local_dummy_type); + Py_VISIT(state->joinable_dict); return 0; } @@ -1701,6 +1844,24 @@ thread_module_clear(PyObject *module) Py_CLEAR(state->lock_type); Py_CLEAR(state->local_type); Py_CLEAR(state->local_dummy_type); + // To avoid resource leaks, detach all still joinable threads + if (state->joinable_dict != NULL) { + PyObject *key, *value; + Py_ssize_t pos = 0; + + while (PyDict_Next(state->joinable_dict, &pos, &key, &value)) { + if (PyLong_Check(value)) { + unsigned long long handle = PyLong_AsUnsignedLongLong(value); + if (handle == (unsigned long long) -1 && PyErr_Occurred()) { + // Should not happen + PyErr_Clear(); + } else { + PyThread_detach_thread((Py_uintptr_t) handle); + } + } + } + Py_CLEAR(state->joinable_dict); + } return 0; } diff --git a/Python/thread_nt.h b/Python/thread_nt.h index 26f441bd6d3c569..8dbe78e42e3edf4 100644 --- a/Python/thread_nt.h +++ b/Python/thread_nt.h @@ -183,8 +183,7 @@ bootstrap(void *call) } unsigned long -PyThread_start_new_thread(void (*func)(void *), void *arg) -{ +PyThread_start_joinable_thread(void (*func)(void *), void *arg, Py_uintptr_t* handle) { HANDLE hThread; unsigned threadID; callobj *obj; @@ -207,16 +206,35 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) /* I've seen errno == EAGAIN here, which means "there are * too many threads". */ - int e = errno; - threadID = (unsigned)-1; HeapFree(GetProcessHeap(), 0, obj); + return PYTHREAD_INVALID_THREAD_ID; } - else { - CloseHandle(hThread); - } + *handle = (Py_uintptr_t) hThread; + return threadID; +} + +unsigned long +PyThread_start_new_thread(void (*func)(void *), void *arg) { + Py_uintptr_t handle; + unsigned long threadID = PyThread_start_joinable_thread(func, arg, &handle); + CloseHandle((HANDLE) handle); return threadID; } +int +PyThread_join_thread(Py_uintptr_t handle) { + HANDLE hThread = (HANDLE) handle; + int errored = (WaitForSingleObject(hThread, INFINITE) != WAIT_OBJECT_0); + CloseHandle(hThread); + return errored; +} + +int +PyThread_detach_thread(Py_uintptr_t handle) { + HANDLE hThread = (HANDLE) handle; + return (CloseHandle(hThread) == 0); +} + /* * Return the thread Id instead of a handle. The Id is said to uniquely identify the * thread in the system diff --git a/Python/thread_pthread.h b/Python/thread_pthread.h index 76a1f7763f23b9f..5d08429c36dedcf 100644 --- a/Python/thread_pthread.h +++ b/Python/thread_pthread.h @@ -235,8 +235,8 @@ pythread_wrapper(void *arg) return NULL; } -unsigned long -PyThread_start_new_thread(void (*func)(void *), void *arg) +static int +do_start_joinable_thread(void (*func)(void *), void *arg, pthread_t* out_id) { pthread_t th; int status; @@ -252,7 +252,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) #if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED) if (pthread_attr_init(&attrs) != 0) - return PYTHREAD_INVALID_THREAD_ID; + return -1; #endif #if defined(THREAD_STACK_SIZE) PyThreadState *tstate = _PyThreadState_GET(); @@ -261,7 +261,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) if (tss != 0) { if (pthread_attr_setstacksize(&attrs, tss) != 0) { pthread_attr_destroy(&attrs); - return PYTHREAD_INVALID_THREAD_ID; + return -1; } } #endif @@ -272,7 +272,7 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) pythread_callback *callback = PyMem_RawMalloc(sizeof(pythread_callback)); if (callback == NULL) { - return PYTHREAD_INVALID_THREAD_ID; + return -1; } callback->func = func; @@ -292,11 +292,36 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) if (status != 0) { PyMem_RawFree(callback); + return -1; + } + *out_id = th; + return 0; +} + +unsigned long +PyThread_start_joinable_thread(void (*func)(void *), void *arg, Py_uintptr_t* handle) { + pthread_t th = (pthread_t) 0; + if (do_start_joinable_thread(func, arg, &th)) { return PYTHREAD_INVALID_THREAD_ID; } + assert(th == (pthread_t) (unsigned long) th); + assert(th == (pthread_t) (Py_uintptr_t) th); + *handle = (Py_uintptr_t) th; +#if SIZEOF_PTHREAD_T <= SIZEOF_LONG + return (unsigned long) th; +#else + return (unsigned long) *(unsigned long *) &th; +#endif +} +unsigned long +PyThread_start_new_thread(void (*func)(void *), void *arg) +{ + pthread_t th = (pthread_t) 0; + if (do_start_joinable_thread(func, arg, &th)) { + return PYTHREAD_INVALID_THREAD_ID; + } pthread_detach(th); - #if SIZEOF_PTHREAD_T <= SIZEOF_LONG return (unsigned long) th; #else @@ -304,6 +329,16 @@ PyThread_start_new_thread(void (*func)(void *), void *arg) #endif } +int +PyThread_join_thread(Py_uintptr_t th) { + return pthread_join((pthread_t) th, NULL); +} + +int +PyThread_detach_thread(Py_uintptr_t th) { + return pthread_detach((pthread_t) th); +} + /* XXX This implementation is considered (to quote Tim Peters) "inherently hosed" because: - It does not guarantee the promise that a non-zero integer is returned.