From 3e516075ab03c070c05f739e7803d5d28033863f Mon Sep 17 00:00:00 2001 From: Khalil Estell Date: Sun, 19 Apr 2026 16:53:51 -0700 Subject: [PATCH] :sparkles: Support awaiting futures across different contexts Allow coroutines running on one context to await futures created on another context. While awaiting, the consuming context delegates its state and sleep time to the producer context, enabling proper scheduler behavior across context boundaries. When the producer completes, the consumer resumes and receives the value, with all memory properly cleaned up. This enables more flexible async patterns where work can be distributed across multiple execution contexts. Note that is is an anti-pattern and not a desirable usage of async context. This change fills a hole that would otherwise be UB into something with clear semantics. Includes comprehensive tests covering normal completion, exception propagation, cancellation, and state delegation across contexts. Resolves #93 --- CMakeLists.txt | 2 +- benchmarks/armv8-Macos-clang-20.txt | 23 +++ benchmarks/benchmark.cpp | 19 ++- modules/coroutine.cppm | 101 +++++------ modules/sync.cppm | 14 +- tests/context_listener.test.cpp | 255 ++++++++++++++++------------ tests/context_swapping.test.cpp | 82 --------- tests/cross_context_await.test.cpp | 237 ++++++++++++++++++++++++++ tests/run_until_done.test.cpp | 30 +++- 9 files changed, 495 insertions(+), 268 deletions(-) create mode 100644 benchmarks/armv8-Macos-clang-20.txt delete mode 100644 tests/context_swapping.test.cpp create mode 100644 tests/cross_context_await.test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a2a9ce7..0fbfea0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,7 +43,7 @@ libhal_add_tests(async_context clock_adapter run_until_done async_stacking - context_swapping + cross_context_await MODULES tests/util.cppm diff --git a/benchmarks/armv8-Macos-clang-20.txt b/benchmarks/armv8-Macos-clang-20.txt new file mode 100644 index 0000000..7c63fb8 --- /dev/null +++ b/benchmarks/armv8-Macos-clang-20.txt @@ -0,0 +1,23 @@ +Unable to determine clock rate from sysctl: hw.cpufrequency: No such file or directory +This does not affect benchmark measurements, only the metadata output. +***WARNING*** Failed to set thread affinity. Estimated CPU frequency may be incorrect. +2026-04-19T20:39:17-07:00 +Running ./build/armv8-Macos-clang-20/Release/benchmark +Run on (10 X 24 MHz CPU s) +CPU Caches: + L1 Data 64 KiB + L1 Instruction 128 KiB + L2 Unified 4096 KiB (x10) +Load Average: 2.28, 2.58, 2.87 +---------------------------------------------------------------------------------- +Benchmark Time CPU Iterations +---------------------------------------------------------------------------------- +bm_function_pointer_call 2.18 ns 2.18 ns 321788778 +bm_virtual_call 2.19 ns 2.19 ns 320281115 +bm_virtual_call_variant 3.11 ns 3.10 ns 225398553 +bm_future_sync_return 4.11 ns 4.07 ns 172938999 +bm_future_coroutine 26.5 ns 26.5 ns 26412802 +bm_future_sync_await 19.8 ns 19.7 ns 35554653 +bm_future_mixed 11.0 ns 11.0 ns 63749374 +bm_future_void_coroutine 28.1 ns 28.1 ns 24933659 +bm_future_void_coroutine_context_resume 26.9 ns 26.9 ns 26055431 diff --git a/benchmarks/benchmark.cpp b/benchmarks/benchmark.cpp index 2c8900e..f61d69d 100644 --- a/benchmarks/benchmark.cpp +++ b/benchmarks/benchmark.cpp @@ -295,17 +295,17 @@ __attribute__((noinline)) async::future coro_level3(async::context&, int x) co_return x * 2; } -__attribute__((noinline)) async::future coro_level2(async::context& ctx, +__attribute__((noinline)) async::future coro_level2(async::context& p_ctx, int x) { - int val = co_await coro_level3(ctx, x); + int val = co_await coro_level3(p_ctx, x); co_return val + 1; } -__attribute__((noinline)) async::future coro_level1(async::context& ctx, +__attribute__((noinline)) async::future coro_level1(async::context& p_ctx, int x) { - int val = co_await coro_level2(ctx, x); + int val = co_await coro_level2(p_ctx, x); co_return val + 1; } @@ -316,7 +316,8 @@ static void bm_future_coroutine(benchmark::State& state) int input = 42; for (auto _ : state) { auto f = coro_level1(ctx, input); - int result = sync_wait(f); + ctx.sync_wait([](auto...) {}); + int result = f.value(); benchmark::DoNotOptimize(result); } } @@ -357,7 +358,8 @@ static void bm_future_sync_await(benchmark::State& state) int input = 42; for (auto _ : state) { auto f = sync_in_coro_level1(ctx, input); - int result = sync_wait(f); + ctx.sync_wait([](auto...) {}); + int result = f.value(); benchmark::DoNotOptimize(result); } } @@ -398,7 +400,8 @@ static void bm_future_mixed(benchmark::State& state) int input = 42; for (auto _ : state) { auto f = mixed_coro_level1(ctx, input); - int result = sync_wait(f); + ctx.sync_wait([](auto...) {}); + int result = f.value(); benchmark::DoNotOptimize(result); } } @@ -440,7 +443,7 @@ static void bm_future_void_coroutine(benchmark::State& state) int output = 0; for (auto _ : state) { auto f = void_coro_level1(ctx, output, input); - sync_wait(f); + ctx.sync_wait([](auto...) {}); benchmark::DoNotOptimize(f); benchmark::DoNotOptimize(output); } diff --git a/modules/coroutine.cppm b/modules/coroutine.cppm index 478df31..a453f6a 100644 --- a/modules/coroutine.cppm +++ b/modules/coroutine.cppm @@ -240,54 +240,18 @@ class promise_base; * for schedulers to efficiently track which contexts become ready for execution * without polling. * - * The `set_listener()` method is called from within `context::unblock()`, which - * may be invoked from an ISR, a driver completion handler, or another thread. - * Implementations MUST be ISR-safe and noexcept. Avoid any operations that - * could block, allocate memory, or acquire non-ISR-safe locks within - * `set_listener()`. + * The implementation of `on_unblock()` may be called an interrupt service + * routine thus it must be noexcept and interrupt service routine safe. Avoid + * any operations that could block, allocate memory, or acquire non-ISR-safe + * locks within `on_unblock()`. * - * Typical usage is through `context_handle`, which automatically registers and - * deregisters the listener on construction and destruction respectively. - * Direct registration is possible via `context::set_listener()` but requires - * manual lifetime management — the listener MUST outlive the context it is - * registered with. - * - * Example implementation: - * @code - * class my_scheduler : public async::context_listener { - * private: - * void set_listener(async::context& p_context) noexcept override { - * m_ready_queue.push(&p_context); - * } - * // ... - * }; - * @endcode + * `on_sync_block()` communicates to the scheduler that one context is blocked + * by another context, allowing the scheduler to decide how it wants to schedule + * the context. */ export struct context_listener { public: - template - static auto from(Callable&& p_unblock_handler) - { - struct lambda_context_listener : public context_listener - { - Callable handler; - - lambda_context_listener(Callable&& p_handler) - : handler(std::move(p_handler)) - { - } - - private: - void on_unblock(async::context& p_context) noexcept override - { - handler(p_context); - } - }; - - return lambda_context_listener{ std::forward(p_unblock_handler) }; - } - virtual ~context_listener() = default; private: @@ -298,7 +262,7 @@ private: * * This method is invoked by `context::unblock()` immediately after the * context's state is set to `blocked_by::nothing`. It signals to the - * implementing scheduler that the context is now ready to be resumed. + * scheduler that the context is now ready to be resumed. * * @param p_context The context that has just been unblocked. The context's * state will be `blocked_by::nothing` at the time of this call. The @@ -636,18 +600,18 @@ public: */ void resume() { - if (state() == blocked_by::nothing) [[likely]] { + if (m_awaited_context == nullptr and + get_original().m_state == blocked_by::nothing) [[likely]] { m_active_handle.resume(); } else if (m_awaited_context != nullptr) { // This context is awaiting another context, check if its done if (m_awaited_context->done()) { m_awaited_context = nullptr; - unblock_without_notification(); m_active_handle.resume(); } else { // If the context is not done, resume the awaited context m_awaited_context->resume(); - // INFO: The call above can be recursive if the awaited context is also + // NOTE: The call above can be recursive if the awaited context is also // awaiting another context. This can occur all the way down until the // final leaf context is resumed. We expect such cases to be rare. } @@ -794,6 +758,9 @@ private: friend class promise_base; friend class proxy_context; + template + friend class future; + /** * @brief Check if this is a proxy context * @@ -892,11 +859,13 @@ private: std::coroutine_handle<> m_active_handle = noop_sentinel; // word 1 stack_word* m_stack_pointer = nullptr; // word 2 std::span m_stack{}; // word 3-4 - context* m_original = nullptr; // word 5 - context_listener* m_listener = nullptr; // word 6 - sleep_duration m_sleep_time = sleep_duration::zero(); // word 7 - context* m_awaited_context = nullptr; // word 8 - blocked_by m_state = blocked_by::nothing; // word 9: pad 3 + context_listener* m_listener = nullptr; // word 5 + context* m_original = nullptr; // word 6 + context* m_awaited_context = nullptr; // word 7 + context* m_awaiting_caller = nullptr; // word 8 + // ---- Members below are below word length --- + sleep_duration m_sleep_time = sleep_duration::zero(); // 4B (uint32_t) + blocked_by m_state = blocked_by::nothing; // 1B (uint8_t) }; /** @@ -1810,9 +1779,18 @@ public: [[maybe_unused]] std::coroutine_handle> p_calling_coroutine) noexcept { - // This will not throw because the discriminate check was performed in - // `await_ready()`. - return std::get(m_operation.m_state); + auto handle = std::get(m_operation.m_state); + auto& calling_ctx = p_calling_coroutine.promise().get_context(); + auto& awaited_ctx = full_handle_type::from_address(handle.address()) + .promise() + .get_context(); + + if (&calling_ctx != &awaited_ctx) [[unlikely]] { + calling_ctx.m_awaited_context = &awaited_ctx; + awaited_ctx.m_awaiting_caller = &calling_ctx; + } + + return handle; } [[nodiscard]] constexpr monostate_or&& await_resume() const @@ -1961,6 +1939,14 @@ constexpr future promise::get_return_object() noexcept * * This method cancels all pending operations on the context. * + * A context awaiting this context will be disconnected from this context. + * Meaning, if the context awaiting this context, when resumed will resume where + * it left off. If a future with this context was awaited and was completed with + * a value, then resuming the awaiting context will operate as normal. If the + * future was cancelled before completing with a value or error, then resuming + * the context and exiting the awaitable will result in the + * `async::future::cancelled` exception type being throw. + * * @note This method is called internally by the context destructor to ensure * proper cleanup of all pending asynchronous operations. */ @@ -1971,5 +1957,10 @@ void context::cancel() .promise() .cancel(); } + + if (m_awaiting_caller != nullptr) { + m_awaiting_caller->m_awaited_context = nullptr; + m_awaiting_caller = nullptr; + } } } // namespace async::inline v0 diff --git a/modules/sync.cppm b/modules/sync.cppm index 274f28d..ce2dee4 100644 --- a/modules/sync.cppm +++ b/modules/sync.cppm @@ -124,12 +124,6 @@ public: class guard { public: - guard(mutex* p_access, context* p_context) - : m_access(p_access) - , m_context(p_context) - { - } - ~guard() { release(); @@ -159,6 +153,14 @@ public: } private: + friend class mutex; + + guard(mutex* p_access, context* p_context) + : m_access(p_access) + , m_context(p_context) + { + } + mutex* m_access; context* m_context; diff --git a/tests/context_listener.test.cpp b/tests/context_listener.test.cpp index 2231d77..f086b5d 100644 --- a/tests/context_listener.test.cpp +++ b/tests/context_listener.test.cpp @@ -6,126 +6,161 @@ import async_context; import test_utils; -void on_upload_test() +void context_listener_test() { using namespace boost::ut; using namespace std::chrono_literals; - "on_upload() via lambda"_test = []() { - // Setup - async::inplace_context<1024> ctx; - bool unblock_called = false; + // Setup + struct listener : public async::context_listener + { async::context const* unblocked_context = nullptr; - auto upload_handler = - async::context_listener::from([&](async::context const& p_context) { - unblock_called = true; - unblocked_context = &p_context; - }); - ctx.set_listener(&upload_handler); - - unsigned step = 0; - auto co = [&step](async::context&) -> async::future { - step = 1; - co_await 10ms; - }; - - // Exercise 1 - auto future = co(ctx); - - // Verify 1 - expect(that % 0 < ctx.memory_used()); - expect(that % not future.done()); - expect(that % not future.has_value()); - expect(that % 0 == step); - - // Exercise 2 - future.resume(); - - // Verify 2 - expect(that % unblock_called == false); - expect(that % unblocked_context == nullptr); - expect(that % 0 < ctx.memory_used()); - expect(that % not future.done()); - expect(that % 10ms == ctx.sleep_time()); - expect(that % not future.has_value()); - expect(that % 1 == step); - - // Exercise 3 - ctx.unblock(); - future.resume(); - - // Verify 3 - expect(that % unblock_called == true); - expect(that % unblocked_context == &ctx); - expect(that % unblock_called == true); - expect(that % 0 == ctx.memory_used()); - expect(that % future.done()); - expect(that % 1 == step); - - ctx.clear_listener(); - }; + async::context const* sync_blocker = nullptr; + async::context* sync_blocked = nullptr; + + void on_unblock(async::context& p_context) noexcept override + { + unblocked_context = &p_context; + } + + void on_sync_block(async::context& p_blocked, + async::context const& p_blocker) noexcept override + { + sync_blocker = &p_blocker; + sync_blocked = &p_blocked; + } - "on_upload() via inheritance"_test = []() { - // Setup - async::inplace_context<1024> ctx; - struct un_blocker : public async::context_listener + void reset() { - bool unblock_called = false; - async::context const* unblocked_context = nullptr; - - void on_unblock(async::context& p_context) noexcept override - { - unblock_called = true; - unblocked_context = &p_context; - } - }; - - un_blocker ub; - ctx.set_listener(&ub); - - unsigned step = 0; - auto co = [&step](async::context&) -> async::future { - step = 1; - co_await 10ms; - }; - - // Exercise 1 - auto future = co(ctx); - - // Verify 1 - expect(that % 0 < ctx.memory_used()); - expect(that % not future.done()); - expect(that % not future.has_value()); - expect(that % 0 == step); - - // Exercise 2 - future.resume(); - - // Verify 2 - expect(that % ub.unblock_called == false); - expect(that % ub.unblocked_context == nullptr); - expect(that % 0 < ctx.memory_used()); - expect(that % not future.done()); - expect(that % 10ms == ctx.sleep_time()); - expect(that % not future.has_value()); - expect(that % 1 == step); - - // Exercise 3 - ctx.unblock(); - future.resume(); - - // Verify 3 - expect(that % ub.unblock_called == true); - expect(that % ub.unblocked_context == &ctx); - expect(that % 0 == ctx.memory_used()); - expect(that % future.done()); - expect(that % 1 == step); - - ctx.clear_listener(); + unblocked_context = nullptr; + sync_blocker = nullptr; + sync_blocked = nullptr; + } }; + + listener listener_obj; + async::inplace_context<1024> ctx1; + async::inplace_context<1024> ctx2; + ctx1.set_listener(&listener_obj); + ctx2.set_listener(&listener_obj); + + async::mutex mutex; + + auto coro = [&](async::context& p_context) -> async::future { + auto lock = co_await mutex.lock(p_context); + co_await 1ms; + co_return; + }; + + // Exercise 1 + auto future1 = coro(ctx1); + auto future2 = coro(ctx2); + + // Verify 1 + expect(that % not future1.done()); + expect(that % not future2.done()); + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::nothing == ctx2.state()); + expect(that % nullptr == listener_obj.unblocked_context); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + // Exercise 2 + future1.resume(); // should acquire resource and get blocked by time. + future2.resume(); // should block by sync + + // Verify 2 + expect(that % async::blocked_by::time == ctx1.state()); + expect(that % async::blocked_by::sync == ctx2.state()); + expect(that % 1ms == ctx1.sleep_time()); + expect(that % &ctx1 == mutex.owner()); + expect(that % nullptr == listener_obj.unblocked_context); + expect(that % &ctx2 == listener_obj.sync_blocked); + expect(that % &ctx1 == listener_obj.sync_blocker); + + // Exercise 3 + listener_obj.reset(); + ctx1.unblock(); + + // Verify 3 + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::sync == ctx2.state()); + expect(that % &ctx1 == mutex.owner()); + expect(that % &ctx1 == listener_obj.unblocked_context); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + // Exercise 4 + listener_obj.reset(); + ctx2.unblock(); + + // Verify 4 + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::nothing == ctx2.state()); + expect(that % &ctx1 == mutex.owner()); + expect(that % &ctx2 == listener_obj.unblocked_context); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + // Exercise + listener_obj.reset(); + future2.resume(); + + // Verify 4: ctx2 is re-blocked by sync because ctx1 still has the lock + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::sync == ctx2.state()); + expect(that % &ctx1 == mutex.owner()); + expect(that % nullptr == listener_obj.unblocked_context); + expect(that % &ctx2 == listener_obj.sync_blocked); + expect(that % &ctx1 == listener_obj.sync_blocker); + + // Exercise 5 + listener_obj.reset(); + ctx1.unblock(); // unblock the time based wait + future1.resume(); // finishes and releases lock + + // Verify 5 + expect(that % future1.done()); + expect(that % async::blocked_by::sync == ctx2.state()); + expect(that % nullptr == mutex.owner()); + expect(that % &ctx1 == listener_obj.unblocked_context); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + // Exercise 6 + listener_obj.reset(); + ctx2.unblock(); + future2.resume(); // acquires lock blocks by time + + // Verify 6 + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::time == ctx2.state()); + expect(that % 1ms == ctx2.sleep_time()); + expect(that % &ctx2 == listener_obj.unblocked_context); + expect(that % &ctx2 == mutex.owner()); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + // Exercise 7 + listener_obj.reset(); + ctx2.unblock(); + future2.resume(); // finishes and releases lock + + // Verify 7 + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::nothing == ctx2.state()); + expect(that % future1.done()); + expect(that % future2.done()); + expect(that % nullptr == mutex.owner()); + expect(that % &ctx2 == listener_obj.unblocked_context); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + ctx1.clear_listener(); + ctx2.clear_listener(); }; int main() { - on_upload_test(); + context_listener_test(); } diff --git a/tests/context_swapping.test.cpp b/tests/context_swapping.test.cpp deleted file mode 100644 index 278eef0..0000000 --- a/tests/context_swapping.test.cpp +++ /dev/null @@ -1,82 +0,0 @@ -#include - -#include - -import async_context; -import test_utils; - -void context_swapping() -{ - using namespace boost::ut; - - // Context swapping: one routine runs on an outer context and produces a - // future. A second routine on a separate context co_awaits that future using - // a locally nested context. Awaiting a future whose context was allocated - // inside a coroutine frame is a contract violation and will abort. - - "co_await future from another context"_test = []() { - // Setup - async::inplace_context<1024> ctx; - - unsigned step = 0; - - // routine_a produces a future that routine will await - auto routine_a = [&step](async::context& p_ctx) -> async::future { - step = 1; - co_await std::suspend_always{}; - step = 2; - co_await std::suspend_always{}; - step = 3; - co_return 42; - }; - - // routine captures routine_a and co_awaits it using a nested context - // allocated inside the coroutine frame — this is a contract violation - auto routine = [&step, &routine_a](async::context&) -> async::future { - step = 10; - co_await std::suspend_always{}; - - async::inplace_context<128> nested_ctx; - // This is a bug and a contract violation, this will either call the - // contract violator OR call std::terminate - auto result = co_await routine_a(nested_ctx); - // These should never be reached - step = 11; - co_return result; - }; - - // Exercise: start routine on the outer context - auto future = routine(ctx); - - // Verify: routine has not run yet - expect(that % not future.done()); - expect(that % 0 == step); - // ctx holds routine's frame - expect(that % 0 < ctx.memory_used()); - - // Start routine — it runs to step=10 then suspends at the first - // co_await suspend_always - future.resume(); - expect(that % 10 == step); - expect(that % not future.done()); - - // Resume routine — it allocates nested_ctx and starts routine_a, - // which runs to step=1 then suspends - future.resume(); - expect(that % 1 == step); - - // NOTE: aborts() forks a child process, so a std::set_terminate handler - // set here cannot be observed from the parent — terminate_was_called would - // always be false regardless. The aborts() check is the only observable - // assertion available across the process boundary. - expect(aborts([&future]() { future.resume(); })); - expect(that % 1 == step); - expect(that % not future.done()); - // ctx holds routine's suspended frame while the nested context does work - }; -} - -int main() -{ - context_swapping(); -} diff --git a/tests/cross_context_await.test.cpp b/tests/cross_context_await.test.cpp new file mode 100644 index 0000000..1bf62e8 --- /dev/null +++ b/tests/cross_context_await.test.cpp @@ -0,0 +1,237 @@ +#include +#include +#include + +#include + +import async_context; +import test_utils; + +void cross_context_await_test() +{ + using namespace boost::ut; + using namespace std::chrono_literals; + + // ------------------------------------------------------------------------- + // Normal completion: producer suspends once, then completes. + // Verifies value propagation and memory cleanup. + // + // Resume sequence: + // fa.resume() -> consumer step=10, starts producer on ctx_b -> step=1, + // producer suspends. ctx_a is now cross-awaiting ctx_b. + // ctx_b.resume() -> producer step=2, co_return 42. ctx_b done. + // fa.resume() -> ctx_b done, consumer picks up 42, step=11, completes. + // ------------------------------------------------------------------------- + "cross-context co_await: normal completion propagates value"_test = []() { + async::inplace_context<512> ctx_a; + async::inplace_context<512> ctx_b; + unsigned step = 0; + + auto producer = [&step](async::context&) -> async::future { + step = 1; + co_await std::suspend_always{}; + step = 2; + co_return 42; + }; + + auto consumer = + [&step, &producer, &ctx_b](async::context&) -> async::future { + step = 10; + int result = co_await producer(ctx_b); + step = 11; + co_return result; + }; + + auto fa = consumer(ctx_a); + + expect(that % 0 == step); + expect(that % not fa.done()); + expect(that % 0 < ctx_a.memory_used()); + + fa.resume(); + expect(that % 1 == step); + expect(that % not fa.done()); + expect(that % not ctx_b.done()); + expect(that % 0 < ctx_b.memory_used()); + + ctx_b.resume(); + expect(that % 2 == step); + expect(that % ctx_b.done()); + expect(that % 0 == ctx_b.memory_used()); + + fa.resume(); + expect(that % 11 == step); + expect(that % fa.done()); + expect(that % fa.has_value()); + expect(that % 42 == fa.value()); + expect(that % 0 == ctx_a.memory_used()); + }; + + // ------------------------------------------------------------------------- + // State delegation: while ctx_a is cross-awaiting ctx_b, + // ctx_a.state() and ctx_a.sleep_time() must delegate to ctx_b. + // ------------------------------------------------------------------------- + "cross-context co_await: state() and sleep_time() delegate to awaited context"_test = + []() { + using namespace std::chrono_literals; + async::inplace_context<512> ctx_a; + async::inplace_context<512> ctx_b; + + auto producer = [](async::context&) -> async::future { + co_await 50ms; + co_return; + }; + + auto consumer = [&producer, + &ctx_b](async::context&) -> async::future { + co_await producer(ctx_b); + co_return; + }; + + auto fa = consumer(ctx_a); + + // consumer starts producer on ctx_b, producer blocks by time + fa.resume(); + + expect(that % async::blocked_by::time == ctx_b.state()); + // ctx_a must delegate to ctx_b while cross-awaiting + expect(that % async::blocked_by::time == ctx_a.state()); + expect(that % ctx_b.sleep_time() == ctx_a.sleep_time()); + }; + + // ------------------------------------------------------------------------- + // Back-link cleared after normal completion. + // + // After the cross-context await resolves, await_resume must clear both + // m_awaited_context and m_awaiting_caller. If it does not, ctx_a.state() + // will delegate to the finished ctx_b (returning nothing) instead of + // reporting its own blocked_by::time — this test catches that failure. + // + // Producer completes immediately (no intermediate suspension), so: + // fa.resume() #1 -> consumer step=1, producer runs to completion, + // symmetric transfer to noop, resume returns. + // Consumer is still suspended at co_await. + // fa.resume() #2 -> consumer sees ctx_b done, await_resume clears links, + // consumer step=2, blocks by time on ctx_a. + // ------------------------------------------------------------------------- + "cross-context co_await: back-links cleared after normal completion"_test = + []() { + using namespace std::chrono_literals; + async::inplace_context<512> ctx_a; + async::inplace_context<512> ctx_b; + unsigned step = 0; + + auto producer = [](async::context&) -> async::future { + co_return; // completes on first resume, no intermediate suspension + }; + + auto consumer = + [&step, &producer, &ctx_b](async::context&) -> async::future { + step = 1; + co_await producer(ctx_b); + step = 2; + co_await 50ms; // must block ctx_a by time, not delegate to finished + // ctx_b + step = 3; + co_return; + }; + + auto fa = consumer(ctx_a); + + // producer completes immediately; symmetric transfer to noop returns + // control consumer is still suspended at co_await — needs a second resume + fa.resume(); + expect(that % 1 == step); + expect(that % ctx_b.done()); + expect(that % 0 == ctx_b.memory_used()); + expect(that % not fa.done()); + expect(that % async::blocked_by::nothing == ctx_a.state()); + + // consumer resumes past co_await, step=2, blocks by time + fa.resume(); + expect(that % 2 == step); + // if m_awaited_context was not cleared, this returns blocked_by::nothing + // (delegating to the finished ctx_b) — the test catches that failure + expect(that % async::blocked_by::time == ctx_a.state()); + expect(that % 50ms == ctx_a.sleep_time()); + expect(that % not fa.done()); + + // consumer resumes past co_await, step=2, blocks by time + ctx_a.unblock(); + fa.resume(); + expect(that % 3 == step); + expect(that % fa.done()); + }; + + // ------------------------------------------------------------------------- + // Safety: awaited context destroyed while caller is suspended. + // + // ctx_b is destroyed while ctx_a is cross-awaiting it. ctx_b's destructor + // must null out ctx_a.m_awaited_context via the m_awaiting_caller back-link. + // After destruction, ctx_a.state() must reflect its own state (nothing), + // and resuming ctx_a must deliver future::cancelled to the consumer's + // try/catch block. + // ------------------------------------------------------------------------- + "cross-context co_await: awaited context destroyed clears caller pointer"_test = + []() { + async::inplace_context<512> ctx_a; + unsigned step = 0; + bool cancelled_caught = false; + + // held in optional so we can destroy it independently of ctx_a + std::optional> ctx_b; + ctx_b.emplace(); + + auto producer = [&step](async::context&) -> async::future { + step = 1; + co_await 50ms; + step = 2; // never reached + co_return 42; + }; + + auto consumer = [&](async::context&) -> async::future { + step = 10; + try { + [[maybe_unused]] int result = co_await producer(*ctx_b); + step = 11; // never reached + } catch (async::future::cancelled const&) { + cancelled_caught = true; + step = 99; + } + co_return; + }; + + auto fa = consumer(ctx_a); + + // consumer step=10, producer on ctx_b step=1, ctx_a cross-awaiting ctx_b + fa.resume(); + expect(that % 1 == step); + expect(that % not fa.done()); + expect(that % not ctx_b->done()); + // ctx_a delegates state through to ctx_b context + expect(that % async::blocked_by::time == ctx_a.state()); + + // Destroy ctx_b while ctx_a is suspended awaiting it. + // Expected: ctx_b.~inplace_context() calls cancel(), which sets the + // producer future to cancelled_state, then the destructor nulls + // ctx_a.m_awaited_context via m_awaiting_caller. + ctx_b.reset(); + + // ctx_a must no longer delegate state through the destroyed context + expect(that % async::blocked_by::nothing == ctx_a.state()); + + // Resuming ctx_a: await_resume sees cancelled_state, throws + // future::cancelled, caught inside consumer, step=99 + fa.resume(); + expect(that % 99 == step); + expect(that % async::blocked_by::nothing == ctx_a.state()); + expect(that % true == cancelled_caught); + expect(that % fa.done()); + expect(that % 0 == ctx_a.memory_used()); + }; +} + +int main() +{ + cross_context_await_test(); +} diff --git a/tests/run_until_done.test.cpp b/tests/run_until_done.test.cpp index ec9f653..59d5beb 100644 --- a/tests/run_until_done.test.cpp +++ b/tests/run_until_done.test.cpp @@ -108,6 +108,29 @@ void run_sleep_task_test() std::vector unblocked_contexts; std::mutex unblocked_mutex; + struct listener : public async::context_listener + { + std::vector& unblocked_list; + std::mutex& unblocked_mutex; + + listener(std::vector& p_unblocked_list, + std::mutex& p_unblocked_mutex) + : unblocked_list(p_unblocked_list) + , unblocked_mutex(p_unblocked_mutex) + { + } + + void on_unblock(async::context& p_context) noexcept override + { + if (p_context.state() == async::blocked_by::signal) { + std::lock_guard lock(unblocked_mutex); + unblocked_list.push_back(&p_context); + } + } + }; + + listener listen{ unblocked_contexts, unblocked_mutex }; + // Exercise async::run_until_done( clk, @@ -118,12 +141,7 @@ void run_sleep_task_test() std::println("Sleeping for {}", p_wake_time - clk.now()); std::this_thread::sleep_until(p_wake_time); }, - async::context_listener::from([&](async::context& p_ctx) noexcept { - if (p_ctx.state() == async::blocked_by::signal) { - std::lock_guard lock(unblocked_mutex); - unblocked_contexts.push_back(&p_ctx); - } - }), + std::move(listen), // List of tasks ctx0, ctx1,