Skip to content

Commit

Permalink
[3.12] gh-105716: Support Background Threads in Subinterpreters Consi…
Browse files Browse the repository at this point in the history
…stently (gh-109921) (gh-110707)

The existence of background threads running on a subinterpreter was preventing interpreters from getting properly destroyed, as well as impacting the ability to run the interpreter again. It also affected how we wait for non-daemon threads to finish.

We add PyInterpreterState.threads.main, with some internal C-API functions.

(cherry-picked from commit 1dd9dee)
  • Loading branch information
ericsnowcurrently committed Nov 28, 2023
1 parent 82ae5a6 commit 0122b4d
Show file tree
Hide file tree
Showing 11 changed files with 385 additions and 150 deletions.
228 changes: 123 additions & 105 deletions Doc/data/python3.12.abi

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Include/internal/pycore_interp.h
Expand Up @@ -194,6 +194,8 @@ struct _is {
struct _Py_interp_cached_objects cached_objects;
struct _Py_interp_static_objects static_objects;

/* The thread currently executing in the __main__ module, if any. */
PyThreadState *threads_main;
/* The ID of the OS thread in which we are finalizing.
We use _Py_atomic_address instead of adding a new _Py_atomic_ulong. */
_Py_atomic_address _finalizing_id;
Expand Down
5 changes: 5 additions & 0 deletions Include/internal/pycore_pystate.h
Expand Up @@ -44,6 +44,11 @@ _Py_IsMainInterpreterFinalizing(PyInterpreterState *interp)
interp == &_PyRuntime._main_interpreter);
}

// Export for _xxsubinterpreters module.
PyAPI_FUNC(int) _PyInterpreterState_SetRunningMain(PyInterpreterState *);
PyAPI_FUNC(void) _PyInterpreterState_SetNotRunningMain(PyInterpreterState *);
PyAPI_FUNC(int) _PyInterpreterState_IsRunningMain(PyInterpreterState *);


static inline const PyConfig *
_Py_GetMainConfig(void)
Expand Down
97 changes: 97 additions & 0 deletions Lib/test/test_interpreters.py
Expand Up @@ -261,6 +261,16 @@ def test_subinterpreter(self):
self.assertTrue(interp.is_running())
self.assertFalse(interp.is_running())

def test_finished(self):
r, w = os.pipe()
interp = interpreters.create()
interp.run(f"""if True:
import os
os.write({w}, b'x')
""")
self.assertFalse(interp.is_running())
self.assertEqual(os.read(r, 1), b'x')

def test_from_subinterpreter(self):
interp = interpreters.create()
out = _run_output(interp, dedent(f"""
Expand Down Expand Up @@ -288,6 +298,31 @@ def test_bad_id(self):
with self.assertRaises(ValueError):
interp.is_running()

def test_with_only_background_threads(self):
r_interp, w_interp = os.pipe()
r_thread, w_thread = os.pipe()

DONE = b'D'
FINISHED = b'F'

interp = interpreters.create()
interp.run(f"""if True:
import os
import threading
def task():
v = os.read({r_thread}, 1)
assert v == {DONE!r}
os.write({w_interp}, {FINISHED!r})
t = threading.Thread(target=task)
t.start()
""")
self.assertFalse(interp.is_running())

os.write(w_thread, DONE)
interp.run('t.join()')
self.assertEqual(os.read(r_interp, 1), FINISHED)


class TestInterpreterClose(TestBase):

Expand Down Expand Up @@ -389,6 +424,37 @@ def test_still_running(self):
interp.close()
self.assertTrue(interp.is_running())

def test_subthreads_still_running(self):
r_interp, w_interp = os.pipe()
r_thread, w_thread = os.pipe()

FINISHED = b'F'

interp = interpreters.create()
interp.run(f"""if True:
import os
import threading
import time
done = False
def notify_fini():
global done
done = True
t.join()
threading._register_atexit(notify_fini)
def task():
while not done:
time.sleep(0.1)
os.write({w_interp}, {FINISHED!r})
t = threading.Thread(target=task)
t.start()
""")
interp.close()

self.assertEqual(os.read(r_interp, 1), FINISHED)


class TestInterpreterRun(TestBase):

Expand Down Expand Up @@ -465,6 +531,37 @@ def test_bytes_for_script(self):
with self.assertRaises(TypeError):
interp.run(b'print("spam")')

def test_with_background_threads_still_running(self):
r_interp, w_interp = os.pipe()
r_thread, w_thread = os.pipe()

RAN = b'R'
DONE = b'D'
FINISHED = b'F'

interp = interpreters.create()
interp.run(f"""if True:
import os
import threading
def task():
v = os.read({r_thread}, 1)
assert v == {DONE!r}
os.write({w_interp}, {FINISHED!r})
t = threading.Thread(target=task)
t.start()
os.write({w_interp}, {RAN!r})
""")
interp.run(f"""if True:
os.write({w_interp}, {RAN!r})
""")

os.write(w_thread, DONE)
interp.run('t.join()')
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), FINISHED)

# test_xxsubinterpreters covers the remaining Interpreter.run() behavior.


Expand Down
49 changes: 49 additions & 0 deletions Lib/test/test_threading.py
Expand Up @@ -26,6 +26,11 @@
from test import lock_tests
from test import support

try:
from test.support import interpreters
except ModuleNotFoundError:
interpreters = None

threading_helper.requires_working_threading(module=True)

# Between fork() and exec(), only async-safe functions are allowed (issues
Expand All @@ -45,6 +50,12 @@ def skip_unless_reliable_fork(test):
return test


def requires_subinterpreters(meth):
"""Decorator to skip a test if subinterpreters are not supported."""
return unittest.skipIf(interpreters is None,
'subinterpreters required')(meth)


def restore_default_excepthook(testcase):
testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
threading.excepthook = threading.__excepthook__
Expand Down Expand Up @@ -1296,6 +1307,44 @@ def f():
# The thread was joined properly.
self.assertEqual(os.read(r, 1), b"x")

@requires_subinterpreters
def test_threads_join_with_no_main(self):
r_interp, w_interp = self.pipe()

INTERP = b'I'
FINI = b'F'
DONE = b'D'

interp = interpreters.create()
interp.run(f"""if True:
import os
import threading
import time
done = False
def notify_fini():
global done
done = True
os.write({w_interp}, {FINI!r})
t.join()
threading._register_atexit(notify_fini)
def task():
while not done:
time.sleep(0.1)
os.write({w_interp}, {DONE!r})
t = threading.Thread(target=task)
t.start()
os.write({w_interp}, {INTERP!r})
""")
interp.close()

self.assertEqual(os.read(r_interp, 1), INTERP)
self.assertEqual(os.read(r_interp, 1), FINI)
self.assertEqual(os.read(r_interp, 1), DONE)

@cpython_only
def test_daemon_threads_fatal_error(self):
subinterp_code = f"""if 1:
Expand Down
4 changes: 3 additions & 1 deletion Lib/threading.py
Expand Up @@ -37,6 +37,7 @@
_allocate_lock = _thread.allocate_lock
_set_sentinel = _thread._set_sentinel
get_ident = _thread.get_ident
_is_main_interpreter = _thread._is_main_interpreter
try:
get_native_id = _thread.get_native_id
_HAVE_THREAD_NATIVE_ID = True
Expand Down Expand Up @@ -1566,7 +1567,7 @@ def _shutdown():
# the main thread's tstate_lock - that won't happen until the interpreter
# is nearly dead. So we release it here. Note that just calling _stop()
# isn't enough: other threads may already be waiting on _tstate_lock.
if _main_thread._is_stopped:
if _main_thread._is_stopped and _is_main_interpreter():
# _shutdown() was already called
return

Expand Down Expand Up @@ -1619,6 +1620,7 @@ def main_thread():
In normal conditions, the main thread is the thread from which the
Python interpreter was started.
"""
# XXX Figure this out for subinterpreters. (See gh-75698.)
return _main_thread

# get thread-local implementation, either from the thread
Expand Down
@@ -0,0 +1,3 @@
Subinterpreters now correctly handle the case where they have threads
running in the background. Before, such threads would interfere with
cleaning up and destroying them, as well as prevent running another script.
16 changes: 15 additions & 1 deletion Modules/_threadmodule.c
Expand Up @@ -1604,6 +1604,18 @@ PyDoc_STRVAR(excepthook_doc,
\n\
Handle uncaught Thread.run() exception.");

static PyObject *
thread__is_main_interpreter(PyObject *module, PyObject *Py_UNUSED(ignored))
{
PyInterpreterState *interp = _PyInterpreterState_GET();
return PyBool_FromLong(_Py_IsMainInterpreter(interp));
}

PyDoc_STRVAR(thread__is_main_interpreter_doc,
"_is_main_interpreter()\n\
\n\
Return True if the current interpreter is the main Python interpreter.");

static PyMethodDef thread_methods[] = {
{"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
Expand Down Expand Up @@ -1633,8 +1645,10 @@ static PyMethodDef thread_methods[] = {
METH_VARARGS, stack_size_doc},
{"_set_sentinel", thread__set_sentinel,
METH_NOARGS, _set_sentinel_doc},
{"_excepthook", thread_excepthook,
{"_excepthook", thread_excepthook,
METH_O, excepthook_doc},
{"_is_main_interpreter", thread__is_main_interpreter,
METH_NOARGS, thread__is_main_interpreter_doc},
{NULL, NULL} /* sentinel */
};

Expand Down

0 comments on commit 0122b4d

Please sign in to comment.