-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add when_any() for waiting for at least one task to complete #11
Comments
A somewhat cumbersome way of implementing the when_any() pattern such that the first operation to complete is a "winner" and the others are "losers" and should be cancelled: task<> alternative1(cancellation_token ct);
task<> alternative2(cancellation_token ct);
task<> cancel_on_success(task<> t, cancellation_source cs)
{
co_await t;
cs.request_cancellation();
}
task<> do_1_or_2(cancellation_token ct)
{
cancellation_source source;
cancellation_registration cancellationForwarder{ ct, [&source] { source.request_cancellation(); } };
co_await when_all(
cancel_on_success(alternative1(source.token()), source),
cancel_on_success(alternative2(source.token()), source));
} This will cancel both sub-tasks if the cancellation_token passed in has cancellation requested. |
The other main use-case of eg. Something like this (not thoroughly thought out) task<std::string> get_record(int id);
task<> example()
{
std::vector<task<std::string>> tasks;
for (int i = 0; i < 100; ++i) tasks.push_back(get_record(i));
while (!tasks.empty())
{
std::size_t readyIndex = co_await when_any(tasks);
std::cout << co_await tasks[readyIndex] << std::endl;
tasks.erase(tasks.begin() + readyIndex);
}
} However, I feel that something like this could be handled just as well using (and possibly more efficiently) using task<std::string> get_record(int id);
task<> example()
{
async_mutex mutex;
auto handleRecord = [&](int id) -> task<>
{
auto& result = co_await get_record(id);
{
auto lock = co_await mutex.scoped_lock_async();
std::cout << result << std::endl;
}
};
std::vector<task<>> tasks;
for (int i = 0; i < 100; ++i) tasks.push_back(handleRecord(i));
co_await when_all(std::move(tasks));
} |
Without |
If you just want to check whether a task completed within a certain time then you can just query the time both before and after the task completes and check the total time taken. If you want to cancel the operation after a certain timeout has elapsed then you can use cppcoro::task<int> foo(cppcoro::cancellation_token ct);
cppcoro::task<int> foo_with_timeout(
cppcoro::io_service& ioSvc,
std::chrono::milliseconds timeout)
{
cppcoro::cancellation_source src;
auto [result, unused] = co_await cppcoro::when_all(
[&]() -> cppcoro::task<int> {
auto cancelOnExit = cppcoro::on_scope_exit([&] { src.request_cancellation(); });
co_return co_await foo(src.token());
}(),
[&]() -> cppcoro::task<void> {
auto cancelOnExit = cppcoro::on_scope_exit([&] { src.request_cancellation(); });
co_await ioSvc.schedule_after(timeout);
}());
co_return result;
} This will then need the operation to wait for the timer to be cancelled before it can return a result, however. You may be able to use the |
The difficult part of designing when_any() will be how to handle cancellation of the co_await operations of the other tasks.
Currently, the
task<T>
andshared_task<T>
types don't allow the caller to cancel theco_await
operation once it has been awaited. We need to wait for the task to complete before the awaiting coroutine returns.If the tasks themselves are cancellable, we could hook something up using
cancellation_token
s.eg. If we pass the same
cancellation_token
into each task then concurrently await all of the tasks and when any task completes, we then callrequest_cancellation()
on thecancellation_source
to request the other tasks to cancel promptly. Then we could just usewhen_all()
to wait for all of the tasks.To do this more generally, we'd need to be able to cancel the await operation on a task without necessarily cancelling the task itself. This would require a different data-structure in the promise object for keeping track of awaiters to allow unsubscribing an awaiter from that list in a lock-free way.
Maybe consider a similar data-structure to that used by
cancellation_registration
?The text was updated successfully, but these errors were encountered: