diff --git a/CMakeLists.txt b/CMakeLists.txt index a2a9ce7..0fbfea0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,7 +43,7 @@ libhal_add_tests(async_context clock_adapter run_until_done async_stacking - context_swapping + cross_context_await MODULES tests/util.cppm diff --git a/benchmarks/armv8-Macos-clang-20.txt b/benchmarks/armv8-Macos-clang-20.txt new file mode 100644 index 0000000..7c63fb8 --- /dev/null +++ b/benchmarks/armv8-Macos-clang-20.txt @@ -0,0 +1,23 @@ +Unable to determine clock rate from sysctl: hw.cpufrequency: No such file or directory +This does not affect benchmark measurements, only the metadata output. +***WARNING*** Failed to set thread affinity. Estimated CPU frequency may be incorrect. +2026-04-19T20:39:17-07:00 +Running ./build/armv8-Macos-clang-20/Release/benchmark +Run on (10 X 24 MHz CPU s) +CPU Caches: + L1 Data 64 KiB + L1 Instruction 128 KiB + L2 Unified 4096 KiB (x10) +Load Average: 2.28, 2.58, 2.87 +---------------------------------------------------------------------------------- +Benchmark Time CPU Iterations +---------------------------------------------------------------------------------- +bm_function_pointer_call 2.18 ns 2.18 ns 321788778 +bm_virtual_call 2.19 ns 2.19 ns 320281115 +bm_virtual_call_variant 3.11 ns 3.10 ns 225398553 +bm_future_sync_return 4.11 ns 4.07 ns 172938999 +bm_future_coroutine 26.5 ns 26.5 ns 26412802 +bm_future_sync_await 19.8 ns 19.7 ns 35554653 +bm_future_mixed 11.0 ns 11.0 ns 63749374 +bm_future_void_coroutine 28.1 ns 28.1 ns 24933659 +bm_future_void_coroutine_context_resume 26.9 ns 26.9 ns 26055431 diff --git a/benchmarks/benchmark.cpp b/benchmarks/benchmark.cpp index 2c8900e..f61d69d 100644 --- a/benchmarks/benchmark.cpp +++ b/benchmarks/benchmark.cpp @@ -295,17 +295,17 @@ __attribute__((noinline)) async::future coro_level3(async::context&, int x) co_return x * 2; } -__attribute__((noinline)) async::future coro_level2(async::context& ctx, +__attribute__((noinline)) async::future coro_level2(async::context& p_ctx, int x) { - int val = co_await coro_level3(ctx, x); + int val = co_await coro_level3(p_ctx, x); co_return val + 1; } -__attribute__((noinline)) async::future coro_level1(async::context& ctx, +__attribute__((noinline)) async::future coro_level1(async::context& p_ctx, int x) { - int val = co_await coro_level2(ctx, x); + int val = co_await coro_level2(p_ctx, x); co_return val + 1; } @@ -316,7 +316,8 @@ static void bm_future_coroutine(benchmark::State& state) int input = 42; for (auto _ : state) { auto f = coro_level1(ctx, input); - int result = sync_wait(f); + ctx.sync_wait([](auto...) {}); + int result = f.value(); benchmark::DoNotOptimize(result); } } @@ -357,7 +358,8 @@ static void bm_future_sync_await(benchmark::State& state) int input = 42; for (auto _ : state) { auto f = sync_in_coro_level1(ctx, input); - int result = sync_wait(f); + ctx.sync_wait([](auto...) {}); + int result = f.value(); benchmark::DoNotOptimize(result); } } @@ -398,7 +400,8 @@ static void bm_future_mixed(benchmark::State& state) int input = 42; for (auto _ : state) { auto f = mixed_coro_level1(ctx, input); - int result = sync_wait(f); + ctx.sync_wait([](auto...) {}); + int result = f.value(); benchmark::DoNotOptimize(result); } } @@ -440,7 +443,7 @@ static void bm_future_void_coroutine(benchmark::State& state) int output = 0; for (auto _ : state) { auto f = void_coro_level1(ctx, output, input); - sync_wait(f); + ctx.sync_wait([](auto...) {}); benchmark::DoNotOptimize(f); benchmark::DoNotOptimize(output); } diff --git a/modules/coroutine.cppm b/modules/coroutine.cppm index 478df31..a453f6a 100644 --- a/modules/coroutine.cppm +++ b/modules/coroutine.cppm @@ -240,54 +240,18 @@ class promise_base; * for schedulers to efficiently track which contexts become ready for execution * without polling. * - * The `set_listener()` method is called from within `context::unblock()`, which - * may be invoked from an ISR, a driver completion handler, or another thread. - * Implementations MUST be ISR-safe and noexcept. Avoid any operations that - * could block, allocate memory, or acquire non-ISR-safe locks within - * `set_listener()`. + * The implementation of `on_unblock()` may be called an interrupt service + * routine thus it must be noexcept and interrupt service routine safe. Avoid + * any operations that could block, allocate memory, or acquire non-ISR-safe + * locks within `on_unblock()`. * - * Typical usage is through `context_handle`, which automatically registers and - * deregisters the listener on construction and destruction respectively. - * Direct registration is possible via `context::set_listener()` but requires - * manual lifetime management — the listener MUST outlive the context it is - * registered with. - * - * Example implementation: - * @code - * class my_scheduler : public async::context_listener { - * private: - * void set_listener(async::context& p_context) noexcept override { - * m_ready_queue.push(&p_context); - * } - * // ... - * }; - * @endcode + * `on_sync_block()` communicates to the scheduler that one context is blocked + * by another context, allowing the scheduler to decide how it wants to schedule + * the context. */ export struct context_listener { public: - template - static auto from(Callable&& p_unblock_handler) - { - struct lambda_context_listener : public context_listener - { - Callable handler; - - lambda_context_listener(Callable&& p_handler) - : handler(std::move(p_handler)) - { - } - - private: - void on_unblock(async::context& p_context) noexcept override - { - handler(p_context); - } - }; - - return lambda_context_listener{ std::forward(p_unblock_handler) }; - } - virtual ~context_listener() = default; private: @@ -298,7 +262,7 @@ private: * * This method is invoked by `context::unblock()` immediately after the * context's state is set to `blocked_by::nothing`. It signals to the - * implementing scheduler that the context is now ready to be resumed. + * scheduler that the context is now ready to be resumed. * * @param p_context The context that has just been unblocked. The context's * state will be `blocked_by::nothing` at the time of this call. The @@ -636,18 +600,18 @@ public: */ void resume() { - if (state() == blocked_by::nothing) [[likely]] { + if (m_awaited_context == nullptr and + get_original().m_state == blocked_by::nothing) [[likely]] { m_active_handle.resume(); } else if (m_awaited_context != nullptr) { // This context is awaiting another context, check if its done if (m_awaited_context->done()) { m_awaited_context = nullptr; - unblock_without_notification(); m_active_handle.resume(); } else { // If the context is not done, resume the awaited context m_awaited_context->resume(); - // INFO: The call above can be recursive if the awaited context is also + // NOTE: The call above can be recursive if the awaited context is also // awaiting another context. This can occur all the way down until the // final leaf context is resumed. We expect such cases to be rare. } @@ -794,6 +758,9 @@ private: friend class promise_base; friend class proxy_context; + template + friend class future; + /** * @brief Check if this is a proxy context * @@ -892,11 +859,13 @@ private: std::coroutine_handle<> m_active_handle = noop_sentinel; // word 1 stack_word* m_stack_pointer = nullptr; // word 2 std::span m_stack{}; // word 3-4 - context* m_original = nullptr; // word 5 - context_listener* m_listener = nullptr; // word 6 - sleep_duration m_sleep_time = sleep_duration::zero(); // word 7 - context* m_awaited_context = nullptr; // word 8 - blocked_by m_state = blocked_by::nothing; // word 9: pad 3 + context_listener* m_listener = nullptr; // word 5 + context* m_original = nullptr; // word 6 + context* m_awaited_context = nullptr; // word 7 + context* m_awaiting_caller = nullptr; // word 8 + // ---- Members below are below word length --- + sleep_duration m_sleep_time = sleep_duration::zero(); // 4B (uint32_t) + blocked_by m_state = blocked_by::nothing; // 1B (uint8_t) }; /** @@ -1810,9 +1779,18 @@ public: [[maybe_unused]] std::coroutine_handle> p_calling_coroutine) noexcept { - // This will not throw because the discriminate check was performed in - // `await_ready()`. - return std::get(m_operation.m_state); + auto handle = std::get(m_operation.m_state); + auto& calling_ctx = p_calling_coroutine.promise().get_context(); + auto& awaited_ctx = full_handle_type::from_address(handle.address()) + .promise() + .get_context(); + + if (&calling_ctx != &awaited_ctx) [[unlikely]] { + calling_ctx.m_awaited_context = &awaited_ctx; + awaited_ctx.m_awaiting_caller = &calling_ctx; + } + + return handle; } [[nodiscard]] constexpr monostate_or&& await_resume() const @@ -1961,6 +1939,14 @@ constexpr future promise::get_return_object() noexcept * * This method cancels all pending operations on the context. * + * A context awaiting this context will be disconnected from this context. + * Meaning, if the context awaiting this context, when resumed will resume where + * it left off. If a future with this context was awaited and was completed with + * a value, then resuming the awaiting context will operate as normal. If the + * future was cancelled before completing with a value or error, then resuming + * the context and exiting the awaitable will result in the + * `async::future::cancelled` exception type being throw. + * * @note This method is called internally by the context destructor to ensure * proper cleanup of all pending asynchronous operations. */ @@ -1971,5 +1957,10 @@ void context::cancel() .promise() .cancel(); } + + if (m_awaiting_caller != nullptr) { + m_awaiting_caller->m_awaited_context = nullptr; + m_awaiting_caller = nullptr; + } } } // namespace async::inline v0 diff --git a/modules/sync.cppm b/modules/sync.cppm index 274f28d..ce2dee4 100644 --- a/modules/sync.cppm +++ b/modules/sync.cppm @@ -124,12 +124,6 @@ public: class guard { public: - guard(mutex* p_access, context* p_context) - : m_access(p_access) - , m_context(p_context) - { - } - ~guard() { release(); @@ -159,6 +153,14 @@ public: } private: + friend class mutex; + + guard(mutex* p_access, context* p_context) + : m_access(p_access) + , m_context(p_context) + { + } + mutex* m_access; context* m_context; diff --git a/tests/context_listener.test.cpp b/tests/context_listener.test.cpp index 2231d77..f086b5d 100644 --- a/tests/context_listener.test.cpp +++ b/tests/context_listener.test.cpp @@ -6,126 +6,161 @@ import async_context; import test_utils; -void on_upload_test() +void context_listener_test() { using namespace boost::ut; using namespace std::chrono_literals; - "on_upload() via lambda"_test = []() { - // Setup - async::inplace_context<1024> ctx; - bool unblock_called = false; + // Setup + struct listener : public async::context_listener + { async::context const* unblocked_context = nullptr; - auto upload_handler = - async::context_listener::from([&](async::context const& p_context) { - unblock_called = true; - unblocked_context = &p_context; - }); - ctx.set_listener(&upload_handler); - - unsigned step = 0; - auto co = [&step](async::context&) -> async::future { - step = 1; - co_await 10ms; - }; - - // Exercise 1 - auto future = co(ctx); - - // Verify 1 - expect(that % 0 < ctx.memory_used()); - expect(that % not future.done()); - expect(that % not future.has_value()); - expect(that % 0 == step); - - // Exercise 2 - future.resume(); - - // Verify 2 - expect(that % unblock_called == false); - expect(that % unblocked_context == nullptr); - expect(that % 0 < ctx.memory_used()); - expect(that % not future.done()); - expect(that % 10ms == ctx.sleep_time()); - expect(that % not future.has_value()); - expect(that % 1 == step); - - // Exercise 3 - ctx.unblock(); - future.resume(); - - // Verify 3 - expect(that % unblock_called == true); - expect(that % unblocked_context == &ctx); - expect(that % unblock_called == true); - expect(that % 0 == ctx.memory_used()); - expect(that % future.done()); - expect(that % 1 == step); - - ctx.clear_listener(); - }; + async::context const* sync_blocker = nullptr; + async::context* sync_blocked = nullptr; + + void on_unblock(async::context& p_context) noexcept override + { + unblocked_context = &p_context; + } + + void on_sync_block(async::context& p_blocked, + async::context const& p_blocker) noexcept override + { + sync_blocker = &p_blocker; + sync_blocked = &p_blocked; + } - "on_upload() via inheritance"_test = []() { - // Setup - async::inplace_context<1024> ctx; - struct un_blocker : public async::context_listener + void reset() { - bool unblock_called = false; - async::context const* unblocked_context = nullptr; - - void on_unblock(async::context& p_context) noexcept override - { - unblock_called = true; - unblocked_context = &p_context; - } - }; - - un_blocker ub; - ctx.set_listener(&ub); - - unsigned step = 0; - auto co = [&step](async::context&) -> async::future { - step = 1; - co_await 10ms; - }; - - // Exercise 1 - auto future = co(ctx); - - // Verify 1 - expect(that % 0 < ctx.memory_used()); - expect(that % not future.done()); - expect(that % not future.has_value()); - expect(that % 0 == step); - - // Exercise 2 - future.resume(); - - // Verify 2 - expect(that % ub.unblock_called == false); - expect(that % ub.unblocked_context == nullptr); - expect(that % 0 < ctx.memory_used()); - expect(that % not future.done()); - expect(that % 10ms == ctx.sleep_time()); - expect(that % not future.has_value()); - expect(that % 1 == step); - - // Exercise 3 - ctx.unblock(); - future.resume(); - - // Verify 3 - expect(that % ub.unblock_called == true); - expect(that % ub.unblocked_context == &ctx); - expect(that % 0 == ctx.memory_used()); - expect(that % future.done()); - expect(that % 1 == step); - - ctx.clear_listener(); + unblocked_context = nullptr; + sync_blocker = nullptr; + sync_blocked = nullptr; + } }; + + listener listener_obj; + async::inplace_context<1024> ctx1; + async::inplace_context<1024> ctx2; + ctx1.set_listener(&listener_obj); + ctx2.set_listener(&listener_obj); + + async::mutex mutex; + + auto coro = [&](async::context& p_context) -> async::future { + auto lock = co_await mutex.lock(p_context); + co_await 1ms; + co_return; + }; + + // Exercise 1 + auto future1 = coro(ctx1); + auto future2 = coro(ctx2); + + // Verify 1 + expect(that % not future1.done()); + expect(that % not future2.done()); + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::nothing == ctx2.state()); + expect(that % nullptr == listener_obj.unblocked_context); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + // Exercise 2 + future1.resume(); // should acquire resource and get blocked by time. + future2.resume(); // should block by sync + + // Verify 2 + expect(that % async::blocked_by::time == ctx1.state()); + expect(that % async::blocked_by::sync == ctx2.state()); + expect(that % 1ms == ctx1.sleep_time()); + expect(that % &ctx1 == mutex.owner()); + expect(that % nullptr == listener_obj.unblocked_context); + expect(that % &ctx2 == listener_obj.sync_blocked); + expect(that % &ctx1 == listener_obj.sync_blocker); + + // Exercise 3 + listener_obj.reset(); + ctx1.unblock(); + + // Verify 3 + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::sync == ctx2.state()); + expect(that % &ctx1 == mutex.owner()); + expect(that % &ctx1 == listener_obj.unblocked_context); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + // Exercise 4 + listener_obj.reset(); + ctx2.unblock(); + + // Verify 4 + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::nothing == ctx2.state()); + expect(that % &ctx1 == mutex.owner()); + expect(that % &ctx2 == listener_obj.unblocked_context); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + // Exercise + listener_obj.reset(); + future2.resume(); + + // Verify 4: ctx2 is re-blocked by sync because ctx1 still has the lock + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::sync == ctx2.state()); + expect(that % &ctx1 == mutex.owner()); + expect(that % nullptr == listener_obj.unblocked_context); + expect(that % &ctx2 == listener_obj.sync_blocked); + expect(that % &ctx1 == listener_obj.sync_blocker); + + // Exercise 5 + listener_obj.reset(); + ctx1.unblock(); // unblock the time based wait + future1.resume(); // finishes and releases lock + + // Verify 5 + expect(that % future1.done()); + expect(that % async::blocked_by::sync == ctx2.state()); + expect(that % nullptr == mutex.owner()); + expect(that % &ctx1 == listener_obj.unblocked_context); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + // Exercise 6 + listener_obj.reset(); + ctx2.unblock(); + future2.resume(); // acquires lock blocks by time + + // Verify 6 + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::time == ctx2.state()); + expect(that % 1ms == ctx2.sleep_time()); + expect(that % &ctx2 == listener_obj.unblocked_context); + expect(that % &ctx2 == mutex.owner()); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + // Exercise 7 + listener_obj.reset(); + ctx2.unblock(); + future2.resume(); // finishes and releases lock + + // Verify 7 + expect(that % async::blocked_by::nothing == ctx1.state()); + expect(that % async::blocked_by::nothing == ctx2.state()); + expect(that % future1.done()); + expect(that % future2.done()); + expect(that % nullptr == mutex.owner()); + expect(that % &ctx2 == listener_obj.unblocked_context); + expect(that % nullptr == listener_obj.sync_blocked); + expect(that % nullptr == listener_obj.sync_blocker); + + ctx1.clear_listener(); + ctx2.clear_listener(); }; int main() { - on_upload_test(); + context_listener_test(); } diff --git a/tests/context_swapping.test.cpp b/tests/context_swapping.test.cpp deleted file mode 100644 index 278eef0..0000000 --- a/tests/context_swapping.test.cpp +++ /dev/null @@ -1,82 +0,0 @@ -#include - -#include - -import async_context; -import test_utils; - -void context_swapping() -{ - using namespace boost::ut; - - // Context swapping: one routine runs on an outer context and produces a - // future. A second routine on a separate context co_awaits that future using - // a locally nested context. Awaiting a future whose context was allocated - // inside a coroutine frame is a contract violation and will abort. - - "co_await future from another context"_test = []() { - // Setup - async::inplace_context<1024> ctx; - - unsigned step = 0; - - // routine_a produces a future that routine will await - auto routine_a = [&step](async::context& p_ctx) -> async::future { - step = 1; - co_await std::suspend_always{}; - step = 2; - co_await std::suspend_always{}; - step = 3; - co_return 42; - }; - - // routine captures routine_a and co_awaits it using a nested context - // allocated inside the coroutine frame — this is a contract violation - auto routine = [&step, &routine_a](async::context&) -> async::future { - step = 10; - co_await std::suspend_always{}; - - async::inplace_context<128> nested_ctx; - // This is a bug and a contract violation, this will either call the - // contract violator OR call std::terminate - auto result = co_await routine_a(nested_ctx); - // These should never be reached - step = 11; - co_return result; - }; - - // Exercise: start routine on the outer context - auto future = routine(ctx); - - // Verify: routine has not run yet - expect(that % not future.done()); - expect(that % 0 == step); - // ctx holds routine's frame - expect(that % 0 < ctx.memory_used()); - - // Start routine — it runs to step=10 then suspends at the first - // co_await suspend_always - future.resume(); - expect(that % 10 == step); - expect(that % not future.done()); - - // Resume routine — it allocates nested_ctx and starts routine_a, - // which runs to step=1 then suspends - future.resume(); - expect(that % 1 == step); - - // NOTE: aborts() forks a child process, so a std::set_terminate handler - // set here cannot be observed from the parent — terminate_was_called would - // always be false regardless. The aborts() check is the only observable - // assertion available across the process boundary. - expect(aborts([&future]() { future.resume(); })); - expect(that % 1 == step); - expect(that % not future.done()); - // ctx holds routine's suspended frame while the nested context does work - }; -} - -int main() -{ - context_swapping(); -} diff --git a/tests/cross_context_await.test.cpp b/tests/cross_context_await.test.cpp new file mode 100644 index 0000000..1bf62e8 --- /dev/null +++ b/tests/cross_context_await.test.cpp @@ -0,0 +1,237 @@ +#include +#include +#include + +#include + +import async_context; +import test_utils; + +void cross_context_await_test() +{ + using namespace boost::ut; + using namespace std::chrono_literals; + + // ------------------------------------------------------------------------- + // Normal completion: producer suspends once, then completes. + // Verifies value propagation and memory cleanup. + // + // Resume sequence: + // fa.resume() -> consumer step=10, starts producer on ctx_b -> step=1, + // producer suspends. ctx_a is now cross-awaiting ctx_b. + // ctx_b.resume() -> producer step=2, co_return 42. ctx_b done. + // fa.resume() -> ctx_b done, consumer picks up 42, step=11, completes. + // ------------------------------------------------------------------------- + "cross-context co_await: normal completion propagates value"_test = []() { + async::inplace_context<512> ctx_a; + async::inplace_context<512> ctx_b; + unsigned step = 0; + + auto producer = [&step](async::context&) -> async::future { + step = 1; + co_await std::suspend_always{}; + step = 2; + co_return 42; + }; + + auto consumer = + [&step, &producer, &ctx_b](async::context&) -> async::future { + step = 10; + int result = co_await producer(ctx_b); + step = 11; + co_return result; + }; + + auto fa = consumer(ctx_a); + + expect(that % 0 == step); + expect(that % not fa.done()); + expect(that % 0 < ctx_a.memory_used()); + + fa.resume(); + expect(that % 1 == step); + expect(that % not fa.done()); + expect(that % not ctx_b.done()); + expect(that % 0 < ctx_b.memory_used()); + + ctx_b.resume(); + expect(that % 2 == step); + expect(that % ctx_b.done()); + expect(that % 0 == ctx_b.memory_used()); + + fa.resume(); + expect(that % 11 == step); + expect(that % fa.done()); + expect(that % fa.has_value()); + expect(that % 42 == fa.value()); + expect(that % 0 == ctx_a.memory_used()); + }; + + // ------------------------------------------------------------------------- + // State delegation: while ctx_a is cross-awaiting ctx_b, + // ctx_a.state() and ctx_a.sleep_time() must delegate to ctx_b. + // ------------------------------------------------------------------------- + "cross-context co_await: state() and sleep_time() delegate to awaited context"_test = + []() { + using namespace std::chrono_literals; + async::inplace_context<512> ctx_a; + async::inplace_context<512> ctx_b; + + auto producer = [](async::context&) -> async::future { + co_await 50ms; + co_return; + }; + + auto consumer = [&producer, + &ctx_b](async::context&) -> async::future { + co_await producer(ctx_b); + co_return; + }; + + auto fa = consumer(ctx_a); + + // consumer starts producer on ctx_b, producer blocks by time + fa.resume(); + + expect(that % async::blocked_by::time == ctx_b.state()); + // ctx_a must delegate to ctx_b while cross-awaiting + expect(that % async::blocked_by::time == ctx_a.state()); + expect(that % ctx_b.sleep_time() == ctx_a.sleep_time()); + }; + + // ------------------------------------------------------------------------- + // Back-link cleared after normal completion. + // + // After the cross-context await resolves, await_resume must clear both + // m_awaited_context and m_awaiting_caller. If it does not, ctx_a.state() + // will delegate to the finished ctx_b (returning nothing) instead of + // reporting its own blocked_by::time — this test catches that failure. + // + // Producer completes immediately (no intermediate suspension), so: + // fa.resume() #1 -> consumer step=1, producer runs to completion, + // symmetric transfer to noop, resume returns. + // Consumer is still suspended at co_await. + // fa.resume() #2 -> consumer sees ctx_b done, await_resume clears links, + // consumer step=2, blocks by time on ctx_a. + // ------------------------------------------------------------------------- + "cross-context co_await: back-links cleared after normal completion"_test = + []() { + using namespace std::chrono_literals; + async::inplace_context<512> ctx_a; + async::inplace_context<512> ctx_b; + unsigned step = 0; + + auto producer = [](async::context&) -> async::future { + co_return; // completes on first resume, no intermediate suspension + }; + + auto consumer = + [&step, &producer, &ctx_b](async::context&) -> async::future { + step = 1; + co_await producer(ctx_b); + step = 2; + co_await 50ms; // must block ctx_a by time, not delegate to finished + // ctx_b + step = 3; + co_return; + }; + + auto fa = consumer(ctx_a); + + // producer completes immediately; symmetric transfer to noop returns + // control consumer is still suspended at co_await — needs a second resume + fa.resume(); + expect(that % 1 == step); + expect(that % ctx_b.done()); + expect(that % 0 == ctx_b.memory_used()); + expect(that % not fa.done()); + expect(that % async::blocked_by::nothing == ctx_a.state()); + + // consumer resumes past co_await, step=2, blocks by time + fa.resume(); + expect(that % 2 == step); + // if m_awaited_context was not cleared, this returns blocked_by::nothing + // (delegating to the finished ctx_b) — the test catches that failure + expect(that % async::blocked_by::time == ctx_a.state()); + expect(that % 50ms == ctx_a.sleep_time()); + expect(that % not fa.done()); + + // consumer resumes past co_await, step=2, blocks by time + ctx_a.unblock(); + fa.resume(); + expect(that % 3 == step); + expect(that % fa.done()); + }; + + // ------------------------------------------------------------------------- + // Safety: awaited context destroyed while caller is suspended. + // + // ctx_b is destroyed while ctx_a is cross-awaiting it. ctx_b's destructor + // must null out ctx_a.m_awaited_context via the m_awaiting_caller back-link. + // After destruction, ctx_a.state() must reflect its own state (nothing), + // and resuming ctx_a must deliver future::cancelled to the consumer's + // try/catch block. + // ------------------------------------------------------------------------- + "cross-context co_await: awaited context destroyed clears caller pointer"_test = + []() { + async::inplace_context<512> ctx_a; + unsigned step = 0; + bool cancelled_caught = false; + + // held in optional so we can destroy it independently of ctx_a + std::optional> ctx_b; + ctx_b.emplace(); + + auto producer = [&step](async::context&) -> async::future { + step = 1; + co_await 50ms; + step = 2; // never reached + co_return 42; + }; + + auto consumer = [&](async::context&) -> async::future { + step = 10; + try { + [[maybe_unused]] int result = co_await producer(*ctx_b); + step = 11; // never reached + } catch (async::future::cancelled const&) { + cancelled_caught = true; + step = 99; + } + co_return; + }; + + auto fa = consumer(ctx_a); + + // consumer step=10, producer on ctx_b step=1, ctx_a cross-awaiting ctx_b + fa.resume(); + expect(that % 1 == step); + expect(that % not fa.done()); + expect(that % not ctx_b->done()); + // ctx_a delegates state through to ctx_b context + expect(that % async::blocked_by::time == ctx_a.state()); + + // Destroy ctx_b while ctx_a is suspended awaiting it. + // Expected: ctx_b.~inplace_context() calls cancel(), which sets the + // producer future to cancelled_state, then the destructor nulls + // ctx_a.m_awaited_context via m_awaiting_caller. + ctx_b.reset(); + + // ctx_a must no longer delegate state through the destroyed context + expect(that % async::blocked_by::nothing == ctx_a.state()); + + // Resuming ctx_a: await_resume sees cancelled_state, throws + // future::cancelled, caught inside consumer, step=99 + fa.resume(); + expect(that % 99 == step); + expect(that % async::blocked_by::nothing == ctx_a.state()); + expect(that % true == cancelled_caught); + expect(that % fa.done()); + expect(that % 0 == ctx_a.memory_used()); + }; +} + +int main() +{ + cross_context_await_test(); +} diff --git a/tests/run_until_done.test.cpp b/tests/run_until_done.test.cpp index ec9f653..59d5beb 100644 --- a/tests/run_until_done.test.cpp +++ b/tests/run_until_done.test.cpp @@ -108,6 +108,29 @@ void run_sleep_task_test() std::vector unblocked_contexts; std::mutex unblocked_mutex; + struct listener : public async::context_listener + { + std::vector& unblocked_list; + std::mutex& unblocked_mutex; + + listener(std::vector& p_unblocked_list, + std::mutex& p_unblocked_mutex) + : unblocked_list(p_unblocked_list) + , unblocked_mutex(p_unblocked_mutex) + { + } + + void on_unblock(async::context& p_context) noexcept override + { + if (p_context.state() == async::blocked_by::signal) { + std::lock_guard lock(unblocked_mutex); + unblocked_list.push_back(&p_context); + } + } + }; + + listener listen{ unblocked_contexts, unblocked_mutex }; + // Exercise async::run_until_done( clk, @@ -118,12 +141,7 @@ void run_sleep_task_test() std::println("Sleeping for {}", p_wake_time - clk.now()); std::this_thread::sleep_until(p_wake_time); }, - async::context_listener::from([&](async::context& p_ctx) noexcept { - if (p_ctx.state() == async::blocked_by::signal) { - std::lock_guard lock(unblocked_mutex); - unblocked_contexts.push_back(&p_ctx); - } - }), + std::move(listen), // List of tasks ctx0, ctx1,