From 5433300c340baf57e0650ddbef09b5f14240463f Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Tue, 25 Mar 2025 13:32:26 -0600 Subject: [PATCH 01/10] Add static assert macro to the asserts header. --- include/qt_asserts.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/include/qt_asserts.h b/include/qt_asserts.h index 197125a76..9df776117 100644 --- a/include/qt_asserts.h +++ b/include/qt_asserts.h @@ -6,6 +6,12 @@ #include /* for assert() */ +#if __STDC_VERSION__ < 202311L +#define qt_static_assert _Static_assert +#else +#define qt_static_assert static_assert +#endif + #ifdef qassert #undef qassert #endif From cdcfb425a4d8a259bc3c7069ed9ccb1c93a88c5c Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Tue, 25 Mar 2025 13:40:21 -0600 Subject: [PATCH 02/10] Add a header for macros related to hinting default branches. --- include/qt_branching.h | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 include/qt_branching.h diff --git a/include/qt_branching.h b/include/qt_branching.h new file mode 100644 index 000000000..898045285 --- /dev/null +++ b/include/qt_branching.h @@ -0,0 +1,7 @@ +#ifndef QT_BRANCHING_H +#define QT_BRANCHING_H + +#define likely(x) (__builtin_expect(!!(x), 1)) +#define unlikely(x) (__builtin_expect(!!(x), 0)) + +#endif From 5b36d5582ad3e0399ef8f711ca06a2506aa3d391 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Tue, 25 Mar 2025 13:43:02 -0600 Subject: [PATCH 03/10] Add header for miscellaneous arithmetic macros. --- include/qt_arithmetic.h | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 include/qt_arithmetic.h diff --git a/include/qt_arithmetic.h b/include/qt_arithmetic.h new file mode 100644 index 000000000..02208611d --- /dev/null +++ b/include/qt_arithmetic.h @@ -0,0 +1,8 @@ +#ifndef QT_ARITHMETIC_H +#define QT_ARITHMETIC_H + +// Miscellanious aritmetic utility macros. +#define QTHREAD_MAX(a, b) (((a) > (b)) ? (a) : (b)) +#define QTHREAD_MIN(a, b) (((a) < (b)) ? (a) : (b)) + +#endif From 7f0e82d021ba4b00d3bfbb10468e95d256747bc9 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Thu, 27 Mar 2025 13:21:14 -0600 Subject: [PATCH 04/10] Install the linux headers in the MUSL builds. --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 761301d96..adb8304db 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -229,7 +229,7 @@ jobs: export REPO_HTTPS=`echo "$CIRCLE_REPOSITORY_URL" | sed "s|git@github.com:|https://github.com/|g"` git clone -b "$CIRCLE_BRANCH" "$REPO_HTTPS" . --depth=1 - run: | - apk add --no-cache --no-progress bash make musl-dev hwloc-dev cmake gcc g++ + apk add --no-cache --no-progress bash make musl-dev hwloc-dev cmake gcc g++ linux-headers if [ "<< parameters.compiler >>" == "clang" ]; then apk add clang; fi - run: | if [ "<< parameters.compiler >>" == "clang" ]; then export CC=clang && export CXX=clang++; fi From 1c1ee5868ea880b1689b473e72693097159446b6 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Thu, 27 Mar 2025 14:11:15 -0600 Subject: [PATCH 05/10] Add a memory sanitizer suppression macro. --- include/qt_macros.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/include/qt_macros.h b/include/qt_macros.h index 107008edc..2e1732d5e 100644 --- a/include/qt_macros.h +++ b/include/qt_macros.h @@ -23,6 +23,11 @@ #define QTHREAD_MSAN #endif #endif +#ifdef QTHREAD_MSAN +#define QTHREAD_SUPPRESS_MSAN __attribute__((no_sanitize("memory"))) +#else +#define QTHREAD_SUPPRESS_MSAN +#endif #define TLS_DECL(type, name) thread_local type name #define TLS_DECL_INIT(type, name) thread_local type name = 0 From efe2a043d9576a118c5c7d6ee70be4dd535a550d Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Tue, 25 Mar 2025 16:06:29 -0600 Subject: [PATCH 06/10] Add a single unified OS detection header. --- include/qt_os.h | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 include/qt_os.h diff --git a/include/qt_os.h b/include/qt_os.h new file mode 100644 index 000000000..0fe0ea267 --- /dev/null +++ b/include/qt_os.h @@ -0,0 +1,49 @@ +#ifndef QT_OS_H +#define QT_OS_H + +// Not all Operating systems listed here are actually supported. + +#if defined(__linux__) || defined(QTHREADS_LINUX) +#ifndef QTHREADS_LINUX +#define QTHREADS_LINUX +#endif + +#elif defined(__APPLE__) || defined(QTHREADS_APPLE) +#ifndef QTHREADS_APPLE +#define QTHREADS_APPLE +#endif + +#elif defined(_WIN32) || defined(QTHREADS_WINDOWS) +#ifndef QTHREADS_WINDOWS +#define QTHREADS_WINDOWS +#endif + +#elif defined(__FreeBSD__) || defined(QTHREADS_FREEBSD) +#ifndef QTHREADS_FREEBSD +#define QTHREADS_FREEBSD +#endif + +#elif defined(__NetBSD__) || defined(QTHREADS_NETBSD) +#ifndef QTHREADS_NETBSD +#define QTHREADS_NETBSD +#endif + +#elif defined(__OpenBSD__) || defined(QTHREADS_OPENBSD) +#ifndef QTHREADS_OPENBSD +#define QTHREADS_OPENBSD +#endif + +#elif defined(__DragonFly__) || defined(QTHREADS_DRAGONFLYBSD) +#ifndef QTHREADS_DRAGONFLYBSD +#define QTHREADS_DRAGONFLYBSD +#endif + +#elif defined(__sun) +#error "Sun OS not currently supported." + +#else +#error "Unrecognized OS" + +#endif + +#endif From 7b34dcb54b2f6ee8da011ab98d7e75280754c900 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Fri, 28 Mar 2025 10:11:36 -0600 Subject: [PATCH 07/10] Link c11 threading library on BSD operating systems. --- src/CMakeLists.txt | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f5e8b1ca3..97c5917a3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -76,7 +76,20 @@ target_include_directories(qthread PRIVATE "../include" ) set_target_properties(qthread PROPERTIES C_VISIBILITY_PRESET hidden) + +# Link pthreads. target_link_libraries(qthread PUBLIC Threads::Threads) + +# The c11 threading library has to be linked +# via a separate library on BSD OSs. +if ("${CMAKE_SYSTEM_NAME}" STREQUAL "FreeBSD" OR + "${CMAKE_SYSTEM_NAME}" STREQUAL "kFreeBSD" OR + "${CMAKE_SYSTEM_NAME}" STREQUAL "OpenBSD" OR + "${CMAKE_SYSTEM_NAME}" STREQUAL "NetBSD" OR + "${CMAKE_SYSTEM_NAME}" STREQUAL "DragonFly") + target_link_libraries(qthread PUBLIC "stdthreads") +endif() + if ("${QTHREADS_TOPOLOGY}" STREQUAL "hwloc" OR "${QTHREADS_TOPOLOGY}" STREQUAL "binders") find_package(hwloc REQUIRED) target_include_directories(qthread PRIVATE "${hwloc_INCLUDE_DIR}") From aa3d8a89df9dd11bf92aefc67051bb77175eadb3 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Wed, 26 Mar 2025 15:44:25 -0600 Subject: [PATCH 08/10] Add a cross-platform futex-like API based on wrappers of the corresponding OS APIs. --- include/qt_atomic_wait.h | 282 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 282 insertions(+) create mode 100644 include/qt_atomic_wait.h diff --git a/include/qt_atomic_wait.h b/include/qt_atomic_wait.h new file mode 100644 index 000000000..5c8460ee9 --- /dev/null +++ b/include/qt_atomic_wait.h @@ -0,0 +1,282 @@ +#ifndef QT_ATOMIC_WAIT_H +#define QT_ATOMIC_WAIT_H + +#include +#include +#include + +#include "qt_asserts.h" +#include "qt_os.h" + +// This header defines a cross-platform futex-like API. +// It's somewhat like atomic_wait in c++20, however these constructs are +// guaranteed to use the appropriate OS APIs directly. It's also somewhat like +// Rust's https://github.com/m-ou-se/atomic-wait, except this is just a bunch of +// C macros and it supports a few more OSs. + +// Linux only has 32-bit futexes so that's the only size that's possible to +// standardize. +#define qt_atomic_wait_t _Atomic uint32_t +#define qt_atomic_wait_empty 0u +#define qt_atomic_wait_full UINT32_MAX +#define qt_atomic_wait_set_empty(a) \ + atomic_store_explicit((a), 0u, memory_order_relaxed) +#define qt_atomic_wait_set_full(a) \ + atomic_store_explicit((a), UINT32_MAX, memory_order_relaxed) +#define qt_atomic_wait_load(a) atomic_load_explicit((a), memory_order_relaxed) +#define qt_atomic_wait_store(a, v) \ + atomic_store_explicit((a), v, memory_order_relaxed) + +// Futex-like atomic wait functionality that's guaranteed to use +// the appropriate OS thread pausing functionality (e.g. futex). +// Due to constraints between the various operating systems, +// only 32-bit integers are supported. + +#ifdef QTHREADS_LINUX + +// Use Linux futexes + +#include +#include +#include +#include + +#ifndef NDEBUG +#define qt_wait_on_address(a, expected) \ + do { \ + long status = syscall(SYS_futex, \ + (a), \ + FUTEX_WAIT | FUTEX_PRIVATE_FLAG, \ + (expected), \ + NULL, \ + NULL, \ + 0u); \ + /* EAGAIN means the value already changed so no sleep was necessary. */ \ + assert(!status || (status == -1 && errno == EAGAIN)); \ + } while (0) +#else +#define qt_wait_on_address(a, expected) \ + do { \ + syscall(SYS_futex, \ + (a), \ + FUTEX_WAIT | FUTEX_PRIVATE_FLAG, \ + (expected), \ + NULL, \ + NULL, \ + 0u); \ + } while (0) +#endif + +#define qt_wake_all(a) \ + do { \ + syscall(SYS_futex, \ + (a), \ + FUTEX_WAKE | FUTEX_PRIVATE_FLAG, \ + UINT32_MAX, \ + NULL, \ + NULL, \ + 0u); \ + } while (0) + +#define qt_wake_one(a) \ + do { \ + syscall( \ + SYS_futex, (a), FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1u, NULL, NULL, 0u); \ + } while (0) + +#elif defined(QTHREADS_APPLE) +// use __ulock_wait and __ulock_wake +// NOTE! This isn't technically a stable API even though they export the +// symbols, so watch for changes in OSX releases later. libc++ relies on them +// though so changes seem very unlikely. + +// Can't directly include apple's sys/ulock.h but we only need these symbols. +// They are exported from libSystem though, which also provides their libc. +// Given that, presumably we're already linking to them. +// See +// https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/bsd/sys/ulock.h#L64-L68 +extern int +__ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); +extern int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value); + +// Corresponding operation codes: +// See +// https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/bsd/sys/ulock.h#L72-L137 +#define UL_COMPARE_AND_WAIT 1 +#define ULF_WAKE_ALL 0x00000100 +#define ULF_NO_ERRNO 0x01000000 + +// Note: no need to check for the case where a wait operation +// wakes because the flag was already changed. +// In that case the return value ends up being the same. +// See +// https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/osfmk/kern/waitq.c#L2819-L2820 +// See also +// https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/bsd/kern/sys_ulock.c#L424 +#define qt_wait_on_address(a, expected) \ + do { \ + qassert( \ + __ulock_wait(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, (a), (expected), 0ull), \ + 0); \ + } while (0) + +#define qt_wake_all(a) \ + do { \ + qassert(__ulock_wake(UL_COMPARE_AND_WAIT | ULF_WAKE_ALL | ULF_NO_ERRNO, \ + (a), \ + 0ull) >= 0, \ + 1); \ + } while (0) + +#define qt_wake_one(a) \ + do { __ulock_wake(UL_COMPARE_AND_WAIT | ULF_NO_ERRNO, (a), 0ull); } while (0) + +#elif defined(QTHREADS_FREEBSD) +// use _umtx_op +#include +#include + +#define qt_wait_on_address(a, expected) \ + do { \ + qassert(_umtx_op((a), UMTX_OP_WAIT_UINT_PRIVATE, (expected), NULL, NULL), \ + 0); \ + } while (0) + +#define qt_wake_all(a) \ + do { \ + /* Docs say use INT32_MAX to specify all. */ \ + qassert( \ + _umtx_op( \ + (a), UMTX_OP_WAKE_PRIVATE, (unsigned long)INT32_MAX, NULL, NULL), \ + 0); \ + } while (0) + +#define qt_wake_one(a) \ + do { \ + qassert(_umtx_op((a), UMTX_OP_WAKE_PRIVATE, 1ul, NULL, NULL), 0); \ + } while (0) + +#elif defined(QTHREADS_OPENBSD) +// use futex syscall wrapper they provide: https://man.openbsd.org/futex +#include +#include +#include + +#ifndef NDEBUG +#define qt_wait_on_address(a, expected) \ + do { \ + int status = \ + futex((a), FUTEX_WAIT | FUTEX_PRIVATE_FLAG, (expected), NULL, NULL); \ + assert(!status || (status == -1 && errno == EAGAIN)); +} +while (0) +#else +#define qt_wait_on_address(a, expected) \ + do { \ + futex((a), FUTEX_WAIT | FUTEX_PRIVATE_FLAG, (expected), NULL, NULL); \ + } while (0) +#endif + +#define qt_wake_all(a) \ + do { \ + /* For whatever reason they used a signed integer for the val parameter so \ + * use INT32_MAX.*/ \ + futex((a), FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT32_MAX, NULL, NULL); \ + } while (0) + +#define qt_wake_one(a) \ + do { futex((a), FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1, NULL, NULL); } while (0) + +#elif defined(QTHREADS_NETBSD) +// use SYS___futex syscall +#include +#include +#include +#include + +#ifndef NDEBUG +#define qt_wait_on_address(a, expected) \ + do { \ + int status = syscall(SYS___futex, \ + (a), \ + FUTEX_WAIT | FUTEX_PRIVATE_FLAG, \ + (expected), \ + NULL, \ + NULL); \ + assert(!status || status == -1 && errno == EAGAIN); \ + } while (0) +#else +#define qt_wait_on_address(a, expected) \ + do { \ + syscall(SYS___futex, \ + (a), \ + FUTEX_WAIT | FUTEX_PRIVATE_FLAG, \ + (expected), \ + NULL, \ + NULL); \ + while (0) +#endif + +#define qt_wake_all(a) \ + do { \ + syscall(SYS___futex, \ + (a), \ + FUTEX_WAKE | FUTEX_PRIVATE_FLAG, \ + UINT32_MAX, \ + NULL, \ + NULL); \ + } while (0) + +#define qt_wake_one(a) \ + do { \ + syscall( \ + SYS___futex, (a), FUTEX_WAKE | FUTEX_PRIVATE_FLAG, 1u, NULL, NULL); \ + } while (0) + +#elif defined(QTHREADS_DRAGONFLYBSD) +// use umtx_sleep and umtx_wakeup +#include +#include +#ifndef NDEBUG +#define qt_wait_on_address(a, expected) \ + do { \ + int status = umtx_sleep((a), (expected), 0); \ + assert(!status || (status == -1 && errno == EBUSY)); \ + } while (0) +#else +#define qt_wait_on_address(a, expected) \ + do { umtx_sleep((a), (expected), 0); } while (0) +#endif + +#define qt_wake_all(a) \ + do { \ + /* The interface uses signed integers, so use INT32_MAX here */ \ + qassert(umtx_wakeup((a), INT32_MAX), 0); \ + } while (0) + +#define qt_wake_one(a) \ + do { qassert(umtx_wakeup((a), 1), 0); } while (0) + +#elif defined(QTHREADS_WINDOWS) +// use WaitOnAddress/WakeByAddressSingle/WakeByAddressAll +#include +#define qt_wait_on_address(a, expected) \ + do { qassert(WaitOnAddress((a), (expected), 4, INFINITE), TRUE); } while (0) + +#define qt_wake_all(a) \ + do { WakeByAddressAll(a); } while (0) + +#define qt_wake_one(a) \ + do { WakeByAddressSingle(a); } while (0) + +#elif defined(__sun) +// Solaris supposedly provides something futex-like via "user-level adaptive +// spin mutexes". +// TODO: implement that +#error "futex equivalent not implemented for solaris." + +#else +#error "no known futex equivalent for current OS" +#endif + +#endif From d6ce3dcd0692f49c42380816308666a3b74a5809 Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Tue, 25 Mar 2025 14:21:59 -0600 Subject: [PATCH 09/10] Add very preliminary threadpool implementation. --- include/qt_threadpool.h | 23 +++ src/CMakeLists.txt | 1 + src/threadpool.c | 406 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 430 insertions(+) create mode 100644 include/qt_threadpool.h create mode 100644 src/threadpool.c diff --git a/include/qt_threadpool.h b/include/qt_threadpool.h new file mode 100644 index 000000000..514853c19 --- /dev/null +++ b/include/qt_threadpool.h @@ -0,0 +1,23 @@ +#ifndef QT_THREADPOOL_H +#define QT_THREADPOOL_H + +#include + +#include + +typedef int (*qt_threadpool_func_type)(void *); + +typedef enum { + POOL_INIT_SUCCESS, + POOL_INIT_ALREADY_INITIALIZED, + POOL_INIT_NO_THREADS_SPECIFIED, + POOL_INIT_OUT_OF_MEMORY, + // TODO: better granularity when forwarding errors from thread creation. + POOL_INIT_ERROR +} hw_pool_init_status; + +hw_pool_init_status hw_pool_init(uint32_t num_threads); +void hw_pool_destroy(); +void hw_pool_run_on_all(qt_threadpool_func_type func, void *arg); + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 97c5917a3..26f803256 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -35,6 +35,7 @@ set(QTHREADS_SOURCES alloc/${QTHREADS_ALLOC}.c affinity/common.c affinity/${QTHREADS_TOPOLOGY}.c + threadpool.c touch.c tls.c teams.c diff --git a/src/threadpool.c b/src/threadpool.c new file mode 100644 index 000000000..0af68d70d --- /dev/null +++ b/src/threadpool.c @@ -0,0 +1,406 @@ +#include +#include +#include +#include +#include +#include + +#if defined(__STDC_NO_THREADS__) +#define QPOOL_USE_PTHREADS +#endif + +#ifdef QPOOL_USE_PTHREADS +#include +#else +#include +#endif + +#include + +#include "qt_arithmetic.h" +#include "qt_asserts.h" +#include "qt_atomic_wait.h" +#include "qt_branching.h" +#include "qt_macros.h" +#include "qt_threadpool.h" +#include "qt_visibility.h" + +// 3d-bit thread indices are good enough. +// At least for the forseeable future we never +// need/want billions of hardware threads in shared-memory. +_Thread_local uint32_t context_index; + +// TODO: try other ways to pause/resume +// TODO: sched_yield implementation +// TODO: nanosleep implementation +// TODO: nanosleep with non-constant backoff schemes since parallel sections +// often happen in rapid succession. + +// Synchronization mechanism for sleeping/waking worker threads +#define QPOOL_USE_FUTEX +// Synchronization mechanism for sleeping/waking the main thread +#define QPOOL_MAIN_USE_FUTEX + +// Placeholder +#define DEFAULT_CACHE_LINE_SIZE 64 + +unsigned int get_cache_line_size() { return DEFAULT_CACHE_LINE_SIZE; } + +// Reserved pointer value to signal pool shutdown. +#define POOL_END_SIGNAL ((void *)1u) + +typedef struct { + qt_threadpool_func_type func; + void *arg; +} pooled_thread_work; + +static int run_work(pooled_thread_work w) { return w.func(w.arg); } + +typedef struct { + // allocated array of thread headers (pooled_thread_control) + // padded dynamically so each has its own cache line. + void *_Atomic threads; + _Atomic uint32_t num_threads; + // alignment constraints usually mean this space is available anyway even + // though we don't currently use it. + _Atomic uint32_t reserved; +#ifdef QPOOL_USE_FUTEX + // Note: we generally want num_active_threads and main_resume_flag on the + // same cache line. Currently this usually just falls out naturally from the + // alignment constraints here anyway. + _Atomic uint32_t num_active_threads; + qt_atomic_wait_t main_resume_flag; +#else +#error "no alternatives to futexes yet." +#endif +} pool_header; + +typedef struct { + // 16 byte aligned to allow loading it in one atomic instruction + // on architectures where that makes sense (most of them). + // Specifically this works on x86 with AVX (see https://rigtorp.se/isatomic/ + // for details). + // alignas(16) pooled_thread_work work; + alignas(16) _Atomic qt_threadpool_func_type func; + void *_Atomic arg; +#ifdef QPOOL_USE_FUTEX + _Atomic uint32_t waiting_flag; +#endif + _Atomic uint32_t index; + pool_header *_Atomic pool; +#ifdef QPOOL_USE_PTHREADS + pthread_t thread; +#else + thrd_t thread; +#endif +} pooled_thread_control; + +// alignment spec really only necessary if we're going to do mixed-size atomic +// loads/stores on this. +alignas(64) static pool_header hw_pool; +// TODO: how exactly do we nest these things without requiring additional +// syscalls? Would it be better to offer different specializations of the base +// hardware thread or potentially incur additional overheads from nesting? Are +// there overheads from just nesting? + +// Needed for the logic around cache lines later. +// It definitely won't fit in 32 bytes, but should easily fit +// in 64 which is also the most common cache line size at the moment. +// The main thing that could throw this off is if +// pthread_t is absolutely massive for some reason. +#ifdef QPOOL_USE_PTHREADS +qt_static_assert(sizeof(pooled_thread_control) <= 64u, + "pthread_t is too large."); +#else +qt_static_assert(sizeof(pooled_thread_control) <= 64u, "thrd_t is too large."); +#endif + +static pooled_thread_work load_work(pooled_thread_control *control) { + pooled_thread_work work; + // Zero out the loaded work object in debug mode. +#ifndef NDEBUG + work.func = + atomic_exchange_explicit(&control->func, NULL, memory_order_relaxed); +#else + work.func = atomic_load_explicit(&control->func, memory_order_relaxed); +#endif + // TODO: is this branch helpful? Seems like this thread will already own the + // cache line anyway. On some extremely old/small architectures it might be + // beneficial to branch like this and exit the outer loop in + // pooled_thread_func immediately but that wreaks havoc on the code structure + // and likely doesn't matter on most modern architectures. if + // unlikely(work.func == POOL_END_SIGNAL) return ret; +#ifndef NDEBUG + work.arg = + atomic_exchange_explicit(&control->arg, NULL, memory_order_relaxed); +#else + work.arg = atomic_load_explicit(&control->arg, memory_order_relaxed); +#endif + return work; +} + +static void store_work(pooled_thread_control *control, + qt_threadpool_func_type func, + void *arg) { + atomic_store_explicit(&control->func, func, memory_order_relaxed); + atomic_store_explicit(&control->arg, arg, memory_order_relaxed); +} + +static void init_thread_control(pooled_thread_control *control, + uint32_t index, + pool_header *pool) { + store_work(control, NULL, NULL); + qt_atomic_wait_set_empty(&control->waiting_flag); + atomic_store_explicit(&control->index, index, memory_order_relaxed); + atomic_store_explicit(&control->pool, pool, memory_order_relaxed); +} + +static void launch_work_on_thread(pooled_thread_control *control, + qt_threadpool_func_type func, + void *arg) { + // assign the work + control->func = func; + control->arg = arg; + // TODO: check that the actual value of the futex for the threads is managed + // right. + qt_atomic_wait_set_full(&control->waiting_flag); + qt_wake_one(&control->waiting_flag); +} + +static pooled_thread_work worker_wait_for_work(pooled_thread_control *control) { +#ifdef QPOOL_USE_FUTEX + for (;;) { + qt_wait_on_address(&control->waiting_flag, qt_atomic_wait_empty); + if likely (qt_atomic_wait_load(&control->waiting_flag) != + qt_atomic_wait_empty) { + atomic_store_explicit(&control->waiting_flag, 0u, memory_order_relaxed); + return load_work(control); + } + // TODO: pause instruction here? May not be necessary since we're already + // using a futex or equivalent. + } +#else +#error "No other synchronization available." + // TODO: sched_yield implementation + // TODO: nanosleep implementation + // TODO: nanosleep with non-constant backoff schemes since parallel sections + // often happen in rapid succession. +#endif +} + +static void notify_worker_of_termination(pooled_thread_control *control) { + atomic_store_explicit(&control->func, POOL_END_SIGNAL, memory_order_relaxed); +#ifndef NDEBUG + atomic_store_explicit(&control->arg, NULL, memory_order_relaxed); +#endif + qt_atomic_wait_set_full(&control->waiting_flag); + qt_wake_one(&control->waiting_flag); +} + +static void init_main_sync(pool_header *pool) { + qt_atomic_wait_set_full(&pool->main_resume_flag); +} + +static void notify_main_of_completion(pool_header *pool) { +#ifdef QPOOL_MAIN_USE_FUTEX + // TODO: is it worthwhile to do something more sophisticated to count down the + // number of threads remaining? + if (!(atomic_fetch_sub_explicit( + &pool->num_active_threads, 1u, memory_order_relaxed) - + 1u)) { + assert(qt_atomic_wait_load(&pool->main_resume_flag) == qt_atomic_wait_full); + qt_atomic_wait_set_empty(&pool->main_resume_flag); + qt_wake_one(&pool->main_resume_flag); + } +#else +#error "No other synchronization available." +#endif +} + +static void suspend_main_while_working(pool_header *pool) { + do { + qt_wait_on_address(&pool->main_resume_flag, qt_atomic_wait_full); + } while (qt_atomic_wait_load(&pool->main_resume_flag) != + qt_atomic_wait_empty); // TODO: pause instruction on spurious wake? +} + +#ifdef QPOOL_USE_PTHREADS +static void *pooled_thread_func(void *void_arg) { +#else +static int pooled_thread_func(void *void_arg) { +#endif + pooled_thread_control *control = (pooled_thread_control *)void_arg; + context_index = atomic_load_explicit(&control->index, memory_order_relaxed); + pooled_thread_work current_work; + for (;;) { + pooled_thread_work work = worker_wait_for_work(control); + if unlikely (work.func == POOL_END_SIGNAL) break; + run_work(work); + notify_main_of_completion(control->pool); + } + // Return values are not consistently sized/typed across runtimes, so we don't + // currently use them. Technically there's 32 bits of info that could be used + // here. Errors should be forwarded to the main thread when a block of pooled + // work ends though, so most errors won't need to go through here. +#ifdef QPOOL_USE_PTHREADS + return NULL; +#else + return 0; +#endif +} + +API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) { + if unlikely (!num_threads) return POOL_INIT_NO_THREADS_SPECIFIED; + uint32_t old = 0u; + assert(num_threads < UINT32_MAX); + if unlikely (!atomic_compare_exchange_strong_explicit(&hw_pool.num_threads, + &old, + num_threads, + memory_order_acquire, + memory_order_relaxed)) + return POOL_INIT_ALREADY_INITIALIZED; + size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size()); + size_t size = (size_t)num_threads * alignment; + char *buffer = aligned_alloc(alignment, size); + hw_pool_init_status retval; + if unlikely (!buffer) { + retval = POOL_INIT_OUT_OF_MEMORY; + goto release_pool; + } + hw_pool.threads = buffer; + int status; +#ifdef QPOOL_USE_PTHREADS + pthread_attr_t attr; + status = pthread_attr_init(&attr); + if unlikely (status) { + retval = POOL_INIT_ERROR; + goto release_buffer; + } +#endif + uint32_t i = 0u; + while (i < num_threads) { + pooled_thread_control *thread_control = + (pooled_thread_control *)(buffer + alignment * (size_t)i); + // Initialize the thread control struct in two 128b atomic writes. + // TODO: It's possible to just do this in a single 256b atomic write on most + // x86 platforms. That may also require increasing the alignment constraints + // for the control_slice. + // TODO: also ifdef in an implementation for platforms that can't do + // lock-free 128b writes or that don't handle mixed-size atomic writes. + // TODO: making some kind of ifunc to handle this initialization is probably + // actually the right way to do it because it's hard to know enough about + // the CPU at compile-time. + init_thread_control(thread_control, i, &hw_pool); + int status; +#ifdef QPOOL_USE_PTHREADS + status = pthread_create( + &thread_control->thread, &attr, pooled_thread_func, thread_control); + if unlikely (status) goto cleanup_threads; +#else + status = + thrd_create(&thread_control->thread, pooled_thread_func, thread_control); + if unlikely (status != thrd_success) goto cleanup_threads; +#endif + ++i; + } +#ifdef QPOOL_USE_PTHREADS + pthread_attr_destroy(&attr); +#endif + return POOL_INIT_SUCCESS; +cleanup_threads: + if (i) { + uint32_t j = --i; + while (i) { + // TODO: fix deinit to match new layout and interrupt mechanism. + pooled_thread_control *thread_control = + (pooled_thread_control *)(buffer + alignment * (size_t)i); + atomic_store_explicit( + &thread_control->func, POOL_END_SIGNAL, memory_order_release); +#ifndef NDEBUG + atomic_store_explicit(&thread_control->arg, NULL, memory_order_relaxed); +#endif + --i; + } + i = j; + while (i) { + pooled_thread_control *thread_control = + (pooled_thread_control *)(buffer + alignment * (size_t)i); + // TODO: crash informatively if join fails. +#ifdef QPOOL_USE_PTHREADS + pthread_join(thread_control->thread, NULL); +#else + thrd_join(thread_control->thread, NULL); +#endif + --i; + } + } +#ifdef QPOOL_USE_PTHREADS + // No specific label needed for this one since failing immediately after + // setting up attr is the zero case for the loops. + pthread_attr_destroy(&attr); +#endif +release_buffer: + atomic_store_explicit(&hw_pool.threads, NULL, memory_order_relaxed); + free(buffer); +release_pool: + atomic_store_explicit(&hw_pool.num_threads, 0, memory_order_release); + return retval; +} + +API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() { + uint32_t num_threads = + atomic_load_explicit(&hw_pool.num_threads, memory_order_relaxed); + char *buffer = atomic_load_explicit(&hw_pool.threads, memory_order_relaxed); + size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size()); + uint32_t i = num_threads; + while (i) { + --i; + // TODO: fix deinit to match new layout and interrupt mechanism. + pooled_thread_control *thread_control = + (pooled_thread_control *)(buffer + alignment * (size_t)i); + notify_worker_of_termination(thread_control); + } + i = num_threads; + while (i) { + --i; + pooled_thread_control *thread_control = + (pooled_thread_control *)(buffer + alignment * (size_t)i); + // TODO: crash informatively if join fails somehow. +#ifdef QPOOL_USE_PTHREADS + pthread_join(thread_control->thread, NULL); +#else + thrd_join(thread_control->thread, NULL); +#endif + } + + atomic_store_explicit(&hw_pool.threads, NULL, memory_order_relaxed); + free(buffer); + atomic_store_explicit(&hw_pool.num_threads, 0, memory_order_release); +} + +API_FUNC void +pool_run_on_all(pool_header *pool, qt_threadpool_func_type func, void *arg) { + uint32_t num_threads = + atomic_load_explicit(&pool->num_threads, memory_order_relaxed); + assert(num_threads); + assert(num_threads < UINT32_MAX); + char *buffer = + (char *)atomic_load_explicit(&pool->threads, memory_order_relaxed); + atomic_store_explicit( + &pool->num_active_threads, num_threads, memory_order_relaxed); + init_main_sync(pool); + size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size()); + for (uint32_t i = 0u; + i < atomic_load_explicit(&pool->num_threads, memory_order_relaxed); + i++) { + pooled_thread_control *thread_control = + (pooled_thread_control *)(buffer + alignment * (size_t)i); + launch_work_on_thread(thread_control, func, arg); + } + suspend_main_while_working(pool); +} + +API_FUNC void hw_pool_run_on_all(qt_threadpool_func_type func, void *arg) { + pool_run_on_all(&hw_pool, func, arg); +} + From 3b63359824fb1b243f8f2609424db471003fcd6f Mon Sep 17 00:00:00 2001 From: Ian Henriksen Date: Tue, 25 Mar 2025 14:23:23 -0600 Subject: [PATCH 10/10] Add test for internal threadpool and a new testing directory for internal APIs. --- test/CMakeLists.txt | 1 + test/internal/CMakeLists.txt | 4 ++++ test/internal/threadpool.c | 18 ++++++++++++++++++ 3 files changed, 23 insertions(+) create mode 100644 test/internal/CMakeLists.txt create mode 100644 test/internal/threadpool.c diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 40c77ed2e..1803cf642 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -29,5 +29,6 @@ if (${QTHREADS_BUILD_TESTS}) add_subdirectory(basics) add_subdirectory(features) + add_subdirectory(internal) add_subdirectory(stress) endif() diff --git a/test/internal/CMakeLists.txt b/test/internal/CMakeLists.txt new file mode 100644 index 000000000..56fa1dbed --- /dev/null +++ b/test/internal/CMakeLists.txt @@ -0,0 +1,4 @@ +# tests for internal APIs can access internal headers. +include_directories("../../include") + +qthreads_test(threadpool) diff --git a/test/internal/threadpool.c b/test/internal/threadpool.c new file mode 100644 index 000000000..4c9ac7e14 --- /dev/null +++ b/test/internal/threadpool.c @@ -0,0 +1,18 @@ +#include "argparsing.h" +#include "qt_threadpool.h" + +static int on_thread_test(void *arg) { + printf("hello from thread\n"); + return 0; +} + +int main() { + hw_pool_init(2); + hw_pool_destroy(); + hw_pool_init(2); + hw_pool_run_on_all(&on_thread_test, NULL); + hw_pool_destroy(); + printf("exited successfully\n"); + fflush(stdout); + return 0; +}