Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement io_service::process_events_until_complete #69

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,9 @@ namespace cppcoro
std::uint64_t process_one_event();
std::uint64_t process_one_pending_event();

template<typename TASK>
decltype(auto) process_events_until_complete(TASK&& task);

// Request that all threads processing events exit their event loops.
void stop() noexcept;

Expand Down
4 changes: 2 additions & 2 deletions include/cppcoro/async_generator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ namespace cppcoro
// State transition diagram
// VNRCA - value_not_ready_consumer_active
// VNRCS - value_not_ready_consumer_suspended
// VRPA - value_ready_consumer_active
// VRPS - value_ready_consumer_suspended
// VRPA - value_ready_producer_active
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :)

// VRPS - value_ready_producer_suspended
//
// A +--- VNRCA --[C]--> VNRCS yield_value()
// | | | A | A | .
Expand Down
27 changes: 27 additions & 0 deletions include/cppcoro/io_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ namespace cppcoro
const std::chrono::duration<REP, PERIOD>& delay,
cancellation_token cancellationToken = {}) noexcept;

/// Process events until the task completes.
///
/// \return
/// Result of the co_await task.
template<typename TASK>
decltype(auto) process_events_until_complete(TASK&& task);

/// Process events until the io_service is stopped.
///
/// \return
Expand Down Expand Up @@ -178,6 +185,26 @@ namespace cppcoro

};

template<typename TASK>
decltype(auto) io_service::process_events_until_complete(TASK&& task)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation should work just fine for the cases where there is only a single thread processing events, but if there are multiple threads processing events then calling stop() when the task completes can potentially lead those other threads to exit their event loops too (whether or not they do actually exit is racy since the waiting thread calls reset() without necessarily waiting for all other threads to exit their loops). Ideally only the current thread would exit the event loop in this case.

To implement support for this behaviour would require integration into the core event loop within io_service to allow it to check the status of the task/awaitable after processing each IOCP event. It would also likely need to use a short timeout on the call to GetQueuedCompletionStatus() to allow it to poll for the task/awaitable potentially completing on another thread.

It will also need to change a little once the generic_ops branch is merged as the [shared_]task<T>::get_starter() methods and cppcoro::detail::continuation type have both been removed in that work. I'll update the implementation as appropriate when merging this work into the generic_ops branch.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, I'm happy with the implementation as it is for now.

I'll think further about whether it makes sense to limit its use to the single-threaded event processing case or to use a more complicated implementation that allows for boost-blocking use-cases with multiple I/O threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to make it work with multiple threads. I think it may be possible to do it without polling. FWIW I think that this function is semantically similar to sync_wait, so if one is not limited to a single thread, another shouldn't be as well.

{
if (!task.is_ready())
{
auto callback = [](void* io) noexcept
{
static_cast<io_service*>(io)->stop();
};

auto starter = task.get_starter();
starter.start(cppcoro::detail::continuation{ callback, this });

process_events();
reset();
}

return std::forward<TASK>(task).operator co_await().await_resume();
}

class io_service::schedule_operation
{
public:
Expand Down
36 changes: 36 additions & 0 deletions test/io_service_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,40 @@ TEST_CASE_FIXTURE(io_service_fixture_with_threads<1>, "Many concurrent timers")
<< "ms");
}

TEST_CASE("io_service::process_events_until_complete(task<T>)")
{
cppcoro::io_service ioService;

auto makeTask = [](cppcoro::io_service& io) -> cppcoro::task<std::string>
{
co_await io.schedule();
co_return "foo";
};

auto task = makeTask(ioService);

CHECK(ioService.process_events_until_complete(task) == "foo");
CHECK(task.is_ready());
CHECK(ioService.process_events_until_complete(task) == "foo");
CHECK(ioService.process_events_until_complete(makeTask(ioService)) == "foo");
}

TEST_CASE("io_service::process_events_until_complete(shared_task<T>)")
{
cppcoro::io_service ioService;

auto makeTask = [](cppcoro::io_service& io) -> cppcoro::shared_task<std::string>
{
co_await io.schedule();
co_return "foo";
};

auto task = makeTask(ioService);

CHECK(ioService.process_events_until_complete(task) == "foo");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be able to add an extra test in here for passing the already-completed task into process_event_until_complete() to test the code-path in the implementation where task.is_ready() returns true?

CHECK(task.is_ready());
CHECK(ioService.process_events_until_complete(task) == "foo");
CHECK(ioService.process_events_until_complete(makeTask(ioService)) == "foo");
}

TEST_SUITE_END();