Skip to content

Commit

Permalink
gh-110481: Implement inter-thread queue for biased reference counting (
Browse files Browse the repository at this point in the history
…#114824)

Biased reference counting maintains two refcount fields in each object:
`ob_ref_local` and `ob_ref_shared`. The true refcount is the sum of these two
fields. In some cases, when refcounting operations are split across threads,
the ob_ref_shared field can be negative (although the total refcount must be
at least zero). In this case, the thread that decremented the refcount
requests that the owning thread give up ownership and merge the refcount
fields.
  • Loading branch information
colesbury committed Feb 9, 2024
1 parent a225520 commit a3af3cb
Show file tree
Hide file tree
Showing 21 changed files with 418 additions and 11 deletions.
74 changes: 74 additions & 0 deletions Include/internal/pycore_brc.h
@@ -0,0 +1,74 @@
#ifndef Py_INTERNAL_BRC_H
#define Py_INTERNAL_BRC_H

#include <stdint.h>
#include "pycore_llist.h" // struct llist_node
#include "pycore_lock.h" // PyMutex
#include "pycore_object_stack.h" // _PyObjectStack

#ifdef __cplusplus
extern "C" {
#endif

#ifndef Py_BUILD_CORE
# error "this header requires Py_BUILD_CORE define"
#endif

#ifdef Py_GIL_DISABLED

// Prime number to avoid correlations with memory addresses.
#define _Py_BRC_NUM_BUCKETS 257

// Hash table bucket
struct _brc_bucket {
// Mutex protects both the bucket and thread state queues in this bucket.
PyMutex mutex;

// Linked list of _PyThreadStateImpl objects hashed to this bucket.
struct llist_node root;
};

// Per-interpreter biased reference counting state
struct _brc_state {
// Hash table of thread states by thread-id. Thread states within a bucket
// are chained using a doubly-linked list.
struct _brc_bucket table[_Py_BRC_NUM_BUCKETS];
};

// Per-thread biased reference counting state
struct _brc_thread_state {
// Linked-list of thread states per hash bucket
struct llist_node bucket_node;

// Thread-id as determined by _PyThread_Id()
uintptr_t tid;

// Objects with refcounts to be merged (protected by bucket mutex)
_PyObjectStack objects_to_merge;

// Local stack of objects to be merged (not accessed by other threads)
_PyObjectStack local_objects_to_merge;
};

// Initialize/finalize the per-thread biased reference counting state
void _Py_brc_init_thread(PyThreadState *tstate);
void _Py_brc_remove_thread(PyThreadState *tstate);

// Initialize per-interpreter state
void _Py_brc_init_state(PyInterpreterState *interp);

void _Py_brc_after_fork(PyInterpreterState *interp);

// Enqueues an object to be merged by it's owning thread (tid). This
// steals a reference to the object.
void _Py_brc_queue_object(PyObject *ob);

// Merge the refcounts of queued objects for the current thread.
void _Py_brc_merge_refcounts(PyThreadState *tstate);

#endif /* Py_GIL_DISABLED */

#ifdef __cplusplus
}
#endif
#endif /* !Py_INTERNAL_BRC_H */
1 change: 1 addition & 0 deletions Include/internal/pycore_ceval.h
Expand Up @@ -206,6 +206,7 @@ void _PyEval_FrameClearAndPop(PyThreadState *tstate, _PyInterpreterFrame *frame)
#define _PY_ASYNC_EXCEPTION_BIT 3
#define _PY_GC_SCHEDULED_BIT 4
#define _PY_EVAL_PLEASE_STOP_BIT 5
#define _PY_EVAL_EXPLICIT_MERGE_BIT 6

/* Reserve a few bits for future use */
#define _PY_EVAL_EVENTS_BITS 8
Expand Down
1 change: 1 addition & 0 deletions Include/internal/pycore_interp.h
Expand Up @@ -201,6 +201,7 @@ struct _is {

#if defined(Py_GIL_DISABLED)
struct _mimalloc_interp_state mimalloc;
struct _brc_state brc; // biased reference counting state
#endif

// Per-interpreter state for the obmalloc allocator. For the main
Expand Down
6 changes: 6 additions & 0 deletions Include/internal/pycore_object_stack.h
@@ -1,6 +1,8 @@
#ifndef Py_INTERNAL_OBJECT_STACK_H
#define Py_INTERNAL_OBJECT_STACK_H

#include "pycore_freelist.h" // _PyFreeListState

#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -74,6 +76,10 @@ _PyObjectStack_Pop(_PyObjectStack *stack)
return obj;
}

// Merge src into dst, leaving src empty
extern void
_PyObjectStack_Merge(_PyObjectStack *dst, _PyObjectStack *src);

// Remove all items from the stack
extern void
_PyObjectStack_Clear(_PyObjectStack *stack);
Expand Down
2 changes: 2 additions & 0 deletions Include/internal/pycore_tstate.h
Expand Up @@ -10,6 +10,7 @@ extern "C" {

#include "pycore_freelist.h" // struct _Py_freelist_state
#include "pycore_mimalloc.h" // struct _mimalloc_thread_state
#include "pycore_brc.h" // struct _brc_thread_state


// Every PyThreadState is actually allocated as a _PyThreadStateImpl. The
Expand All @@ -22,6 +23,7 @@ typedef struct _PyThreadStateImpl {
#ifdef Py_GIL_DISABLED
struct _mimalloc_thread_state mimalloc;
struct _Py_freelist_state freelist_state;
struct _brc_thread_state brc;
#endif

} _PyThreadStateImpl;
Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_code.py
Expand Up @@ -865,6 +865,7 @@ def __init__(self, f, test):
self.test = test
def run(self):
del self.f
gc_collect()
self.test.assertEqual(LAST_FREED, 500)

SetExtra(f.__code__, FREE_INDEX, ctypes.c_voidp(500))
Expand Down
17 changes: 15 additions & 2 deletions Lib/test/test_concurrent_futures/executor.py
@@ -1,8 +1,10 @@
import threading
import time
import unittest
import weakref
from concurrent import futures
from test import support
from test.support import Py_GIL_DISABLED


def mul(x, y):
Expand Down Expand Up @@ -83,10 +85,21 @@ def test_no_stale_references(self):
my_object_collected = threading.Event()
my_object_callback = weakref.ref(
my_object, lambda obj: my_object_collected.set())
# Deliberately discarding the future.
self.executor.submit(my_object.my_method)
fut = self.executor.submit(my_object.my_method)
del my_object

if Py_GIL_DISABLED:
# Due to biased reference counting, my_object might only be
# deallocated while the thread that created it runs -- if the
# thread is paused waiting on an event, it may not merge the
# refcount of the queued object. For that reason, we wait for the
# task to finish (so that it's no longer referenced) and force a
# GC to ensure that it is collected.
fut.result() # Wait for the task to finish.
support.gc_collect()
else:
del fut # Deliberately discard the future.

collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
self.assertTrue(collected,
"Stale reference not collected within timeout.")
Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Expand Up @@ -98,6 +98,7 @@ def test_ressources_gced_in_workers(self):

# explicitly destroy the object to ensure that EventfulGCObj.__del__()
# is called while manager is still running.
support.gc_collect()
obj = None
support.gc_collect()

Expand Down
2 changes: 2 additions & 0 deletions Makefile.pre.in
Expand Up @@ -405,6 +405,7 @@ PYTHON_OBJS= \
Python/ast_opt.o \
Python/ast_unparse.o \
Python/bltinmodule.o \
Python/brc.o \
Python/ceval.o \
Python/codecs.o \
Python/compile.o \
Expand Down Expand Up @@ -1081,6 +1082,7 @@ PYTHON_HEADERS= \
$(srcdir)/Include/internal/pycore_atexit.h \
$(srcdir)/Include/internal/pycore_bitutils.h \
$(srcdir)/Include/internal/pycore_blocks_output_buffer.h \
$(srcdir)/Include/internal/pycore_brc.h \
$(srcdir)/Include/internal/pycore_bytes_methods.h \
$(srcdir)/Include/internal/pycore_bytesobject.h \
$(srcdir)/Include/internal/pycore_call.h \
Expand Down
4 changes: 4 additions & 0 deletions Modules/posixmodule.c
Expand Up @@ -637,6 +637,10 @@ PyOS_AfterFork_Child(void)
tstate->native_thread_id = PyThread_get_thread_native_id();
#endif

#ifdef Py_GIL_DISABLED
_Py_brc_after_fork(tstate->interp);
#endif

status = _PyEval_ReInitThreads(tstate);
if (_PyStatus_EXCEPTION(status)) {
goto fatal_error;
Expand Down
16 changes: 15 additions & 1 deletion Objects/dictobject.c
Expand Up @@ -5989,6 +5989,18 @@ _PyObject_MakeDictFromInstanceAttributes(PyObject *obj, PyDictValues *values)
return make_dict_from_instance_attributes(interp, keys, values);
}

static bool
has_unique_reference(PyObject *op)
{
#ifdef Py_GIL_DISABLED
return (_Py_IsOwnedByCurrentThread(op) &&
op->ob_ref_local == 1 &&
_Py_atomic_load_ssize_relaxed(&op->ob_ref_shared) == 0);
#else
return Py_REFCNT(op) == 1;
#endif
}

// Return true if the dict was dematerialized, false otherwise.
bool
_PyObject_MakeInstanceAttributesFromDict(PyObject *obj, PyDictOrValues *dorv)
Expand All @@ -6005,7 +6017,9 @@ _PyObject_MakeInstanceAttributesFromDict(PyObject *obj, PyDictOrValues *dorv)
return false;
}
assert(_PyType_HasFeature(Py_TYPE(obj), Py_TPFLAGS_HEAPTYPE));
if (dict->ma_keys != CACHED_KEYS(Py_TYPE(obj)) || Py_REFCNT(dict) != 1) {
if (dict->ma_keys != CACHED_KEYS(Py_TYPE(obj)) ||
!has_unique_reference((PyObject *)dict))
{
return false;
}
assert(dict->ma_values);
Expand Down
8 changes: 2 additions & 6 deletions Objects/object.c
Expand Up @@ -2,6 +2,7 @@
/* Generic object operations; and implementation of None */

#include "Python.h"
#include "pycore_brc.h" // _Py_brc_queue_object()
#include "pycore_call.h" // _PyObject_CallNoArgs()
#include "pycore_ceval.h" // _Py_EnterRecursiveCallTstate()
#include "pycore_context.h" // _PyContextTokenMissing_Type
Expand Down Expand Up @@ -344,15 +345,10 @@ _Py_DecRefSharedDebug(PyObject *o, const char *filename, int lineno)
&shared, new_shared));

if (should_queue) {
// TODO: the inter-thread queue is not yet implemented. For now,
// we just merge the refcount here.
#ifdef Py_REF_DEBUG
_Py_IncRefTotal(_PyInterpreterState_GET());
#endif
Py_ssize_t refcount = _Py_ExplicitMergeRefcount(o, -1);
if (refcount == 0) {
_Py_Dealloc(o);
}
_Py_brc_queue_object(o);
}
else if (new_shared == _Py_REF_MERGED) {
// refcount is zero AND merged
Expand Down
1 change: 1 addition & 0 deletions PCbuild/_freeze_module.vcxproj
Expand Up @@ -191,6 +191,7 @@
<ClCompile Include="..\Python\ast_opt.c" />
<ClCompile Include="..\Python\ast_unparse.c" />
<ClCompile Include="..\Python\bltinmodule.c" />
<ClCompile Include="..\Python\brc.c" />
<ClCompile Include="..\Python\bootstrap_hash.c" />
<ClCompile Include="..\Python\ceval.c" />
<ClCompile Include="..\Python\codecs.c" />
Expand Down
3 changes: 3 additions & 0 deletions PCbuild/_freeze_module.vcxproj.filters
Expand Up @@ -46,6 +46,9 @@
<ClCompile Include="..\Python\bltinmodule.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\Python\brc.c">
<Filter>Python</Filter>
</ClCompile>
<ClCompile Include="..\Objects\boolobject.c">
<Filter>Source Files</Filter>
</ClCompile>
Expand Down
2 changes: 2 additions & 0 deletions PCbuild/pythoncore.vcxproj
Expand Up @@ -206,6 +206,7 @@
<ClInclude Include="..\Include\internal\pycore_ast_state.h" />
<ClInclude Include="..\Include\internal\pycore_atexit.h" />
<ClInclude Include="..\Include\internal\pycore_bitutils.h" />
<ClInclude Include="..\Include\internal\pycore_brc.h" />
<ClInclude Include="..\Include\internal\pycore_bytes_methods.h" />
<ClInclude Include="..\Include\internal\pycore_bytesobject.h" />
<ClInclude Include="..\Include\internal\pycore_call.h" />
Expand Down Expand Up @@ -553,6 +554,7 @@
<ClCompile Include="..\Python\ast_unparse.c" />
<ClCompile Include="..\Python\bltinmodule.c" />
<ClCompile Include="..\Python\bootstrap_hash.c" />
<ClCompile Include="..\Python\brc.c" />
<ClCompile Include="..\Python\ceval.c" />
<ClCompile Include="..\Python\codecs.c" />
<ClCompile Include="..\Python\compile.c" />
Expand Down
6 changes: 6 additions & 0 deletions PCbuild/pythoncore.vcxproj.filters
Expand Up @@ -546,6 +546,9 @@
<ClInclude Include="..\Include\internal\pycore_bitutils.h">
<Filter>Include\internal</Filter>
</ClInclude>
<ClInclude Include="..\Include\internal\pycore_brc.h">
<Filter>Include\internal</Filter>
</ClInclude>
<ClInclude Include="..\Include\internal\pycore_bytes_methods.h">
<Filter>Include\internal</Filter>
</ClInclude>
Expand Down Expand Up @@ -1253,6 +1256,9 @@
<ClCompile Include="..\Python\bltinmodule.c">
<Filter>Python</Filter>
</ClCompile>
<ClCompile Include="..\Python\brc.c">
<Filter>Python</Filter>
</ClCompile>
<ClCompile Include="..\Python\ceval.c">
<Filter>Python</Filter>
</ClCompile>
Expand Down

0 comments on commit a3af3cb

Please sign in to comment.