Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-40089: Add _at_fork_reinit() method to locks #19195

Merged
merged 2 commits into from Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions Include/pythread.h
Expand Up @@ -36,6 +36,15 @@ PyAPI_FUNC(int) PyThread_acquire_lock(PyThread_type_lock, int);
#define WAIT_LOCK 1
#define NOWAIT_LOCK 0

#ifndef Py_LIMITED_API
#ifdef HAVE_FORK
/* Private function to reinitialize a lock at fork in the child process.
Reset the lock to the unlocked state.
Return 0 on success, return -1 on error. */
PyAPI_FUNC(int) _PyThread_at_fork_reinit(PyThread_type_lock *lock);
#endif /* HAVE_FORK */
#endif /* !Py_LIMITED_API */

/* PY_TIMEOUT_T is the integral type used to specify timeouts when waiting
on a lock (see PyThread_acquire_lock_timed() below).
PY_TIMEOUT_MAX is the highest usable value (in microseconds) of that
Expand Down
30 changes: 28 additions & 2 deletions Lib/test/lock_tests.py
Expand Up @@ -2,6 +2,7 @@
Various tests for synchronization primitives.
"""

import os
import sys
import time
from _thread import start_new_thread, TIMEOUT_MAX
Expand All @@ -12,6 +13,11 @@
from test import support


requires_fork = unittest.skipUnless(hasattr(os, 'fork'),
"platform doesn't support fork "
"(no _at_fork_reinit method)")


def _wait():
# A crude wait/yield function not relying on synchronization primitives.
time.sleep(0.01)
Expand Down Expand Up @@ -265,6 +271,25 @@ def test_state_after_timeout(self):
self.assertFalse(lock.locked())
self.assertTrue(lock.acquire(blocking=False))

@requires_fork
def test_at_fork_reinit(self):
def use_lock(lock):
# make sure that the lock still works normally
# after _at_fork_reinit()
lock.acquire()
lock.release()

# unlocked
lock = self.locktype()
lock._at_fork_reinit()
use_lock(lock)

# locked: _at_fork_reinit() resets the lock to the unlocked state
lock2 = self.locktype()
lock2.acquire()
lock2._at_fork_reinit()
use_lock(lock2)
vstinner marked this conversation as resolved.
Show resolved Hide resolved


class RLockTests(BaseLockTests):
"""
Expand Down Expand Up @@ -417,12 +442,13 @@ def f():
b.wait_for_finished()
self.assertEqual(results, [True] * N)

def test_reset_internal_locks(self):
@requires_fork
def test_at_fork_reinit(self):
# ensure that condition is still using a Lock after reset
evt = self.eventtype()
with evt._cond:
self.assertFalse(evt._cond.acquire(False))
evt._reset_internal_locks()
evt._at_fork_reinit()
with evt._cond:
self.assertFalse(evt._cond.acquire(False))

Expand Down
20 changes: 15 additions & 5 deletions Lib/threading.py
Expand Up @@ -123,6 +123,11 @@ def __repr__(self):
hex(id(self))
)

def _at_fork_reinit(self):
self._block._at_fork_reinit()
self._owner = None
self._count = 0

def acquire(self, blocking=True, timeout=-1):
"""Acquire a lock, blocking or non-blocking.

Expand Down Expand Up @@ -245,6 +250,10 @@ def __init__(self, lock=None):
pass
self._waiters = _deque()

def _at_fork_reinit(self):
self._lock._at_fork_reinit()
self._waiters.clear()

def __enter__(self):
return self._lock.__enter__()

Expand Down Expand Up @@ -514,9 +523,9 @@ def __init__(self):
self._cond = Condition(Lock())
self._flag = False

def _reset_internal_locks(self):
# private! called by Thread._reset_internal_locks by _after_fork()
self._cond.__init__(Lock())
def _at_fork_reinit(self):
# Private method called by Thread._reset_internal_locks()
self._cond._at_fork_reinit()

def is_set(self):
"""Return true if and only if the internal flag is true."""
Expand Down Expand Up @@ -816,9 +825,10 @@ class is implemented.
def _reset_internal_locks(self, is_alive):
# private! Called by _after_fork() to reset our internal locks as
# they may be in an invalid state leading to a deadlock or crash.
self._started._reset_internal_locks()
self._started._at_fork_reinit()
if is_alive:
self._set_tstate_lock()
self._tstate_lock._at_fork_reinit()
self._tstate_lock.acquire()
else:
# The thread isn't alive after fork: it doesn't have a tstate
# anymore.
Expand Down
@@ -0,0 +1,6 @@
Add a private ``_at_fork_reinit()`` method to :class:`_thread.Lock`,
:class:`_thread.RLock`, :class:`threading.RLock` and
:class:`threading.Condition` classes: reinitialize the lock at fork in the
child process, reset the lock to the unlocked state.
Rename also the private ``_reset_internal_locks()`` method of
:class:`threading.Event` to ``_at_fork_reinit()``.
67 changes: 53 additions & 14 deletions Modules/_threadmodule.c
Expand Up @@ -213,6 +213,22 @@ lock_repr(lockobject *self)
self->locked ? "locked" : "unlocked", Py_TYPE(self)->tp_name, self);
}

#ifdef HAVE_FORK
static PyObject *
lock__at_fork_reinit(lockobject *self, PyObject *Py_UNUSED(args))
{
if (_PyThread_at_fork_reinit(&self->lock_lock) < 0) {
PyErr_SetString(ThreadError, "failed to reinitialize lock at fork");
return NULL;
}

self->locked = 0;

Py_RETURN_NONE;
}
#endif /* HAVE_FORK */


static PyMethodDef lock_methods[] = {
{"acquire_lock", (PyCFunction)(void(*)(void))lock_PyThread_acquire_lock,
METH_VARARGS | METH_KEYWORDS, acquire_doc},
Expand All @@ -230,6 +246,10 @@ static PyMethodDef lock_methods[] = {
METH_VARARGS | METH_KEYWORDS, acquire_doc},
{"__exit__", (PyCFunction)lock_PyThread_release_lock,
METH_VARARGS, release_doc},
#ifdef HAVE_FORK
{"_at_fork_reinit", (PyCFunction)lock__at_fork_reinit,
METH_NOARGS, NULL},
#endif
{NULL, NULL} /* sentinel */
};

Expand Down Expand Up @@ -446,22 +466,20 @@ For internal use by `threading.Condition`.");
static PyObject *
rlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
rlockobject *self;

self = (rlockobject *) type->tp_alloc(type, 0);
if (self != NULL) {
self->in_weakreflist = NULL;
self->rlock_owner = 0;
self->rlock_count = 0;

self->rlock_lock = PyThread_allocate_lock();
if (self->rlock_lock == NULL) {
Py_DECREF(self);
PyErr_SetString(ThreadError, "can't allocate lock");
return NULL;
}
rlockobject *self = (rlockobject *) type->tp_alloc(type, 0);
if (self == NULL) {
return NULL;
}
self->in_weakreflist = NULL;
self->rlock_owner = 0;
self->rlock_count = 0;

self->rlock_lock = PyThread_allocate_lock();
if (self->rlock_lock == NULL) {
Py_DECREF(self);
PyErr_SetString(ThreadError, "can't allocate lock");
return NULL;
}
return (PyObject *) self;
}

Expand All @@ -475,6 +493,23 @@ rlock_repr(rlockobject *self)
}


#ifdef HAVE_FORK
static PyObject *
rlock__at_fork_reinit(rlockobject *self, PyObject *Py_UNUSED(args))
{
if (_PyThread_at_fork_reinit(&self->rlock_lock) < 0) {
PyErr_SetString(ThreadError, "failed to reinitialize lock at fork");
return NULL;
}

self->rlock_owner = 0;
self->rlock_count = 0;

Py_RETURN_NONE;
}
#endif /* HAVE_FORK */


static PyMethodDef rlock_methods[] = {
{"acquire", (PyCFunction)(void(*)(void))rlock_acquire,
METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
Expand All @@ -490,6 +525,10 @@ static PyMethodDef rlock_methods[] = {
METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
{"__exit__", (PyCFunction)rlock_release,
METH_VARARGS, rlock_release_doc},
#ifdef HAVE_FORK
{"_at_fork_reinit", (PyCFunction)rlock__at_fork_reinit,
METH_NOARGS, NULL},
#endif
{NULL, NULL} /* sentinel */
};

Expand Down
3 changes: 2 additions & 1 deletion Modules/posixmodule.c
Expand Up @@ -491,7 +491,8 @@ register_at_forker(PyObject **lst, PyObject *func)
}
return PyList_Append(*lst, func);
}
#endif
#endif /* HAVE_FORK */


/* Legacy wrapper */
void
Expand Down
20 changes: 20 additions & 0 deletions Python/thread_pthread.h
Expand Up @@ -693,6 +693,26 @@ PyThread_release_lock(PyThread_type_lock lock)

#endif /* USE_SEMAPHORES */

int
_PyThread_at_fork_reinit(PyThread_type_lock *lock)
{
PyThread_type_lock new_lock = PyThread_allocate_lock();
if (new_lock == NULL) {
return -1;
}

/* bpo-6721, bpo-40089: The old lock can be in an inconsistent state.
fork() can be called in the middle of an operation on the lock done by
another thread. So don't call PyThread_free_lock(*lock).

Leak memory on purpose. Don't release the memory either since the
address of a mutex is relevant. Putting two mutexes at the same address
can lead to problems. */

*lock = new_lock;
return 0;
}

int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
Expand Down