-
Notifications
You must be signed in to change notification settings - Fork 311
/
Copy pathwait_all_checked.cpp
72 lines (58 loc) · 2.28 KB
/
wait_all_checked.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <userver/engine/wait_all_checked.hpp>
#include <algorithm>
#include <engine/impl/wait_any_utils.hpp>
#include <engine/task/task_context.hpp>
#include <userver/engine/exception.hpp>
USERVER_NAMESPACE_BEGIN
namespace engine::impl {
FutureStatus DoWaitAllChecked(utils::span<ContextAccessor*> targets, Deadline deadline) {
UASSERT_MSG(AreUniqueValues(targets), "Same tasks/futures were detected in WaitAny* call");
auto& current = current_task::GetCurrentTaskContext();
WaitAnyWaitStrategy wait_strategy{targets, current};
while (true) {
bool all_completed = true;
for (auto& target : targets) {
if (!target) continue;
const bool is_ready = target->IsReady();
if (is_ready) {
target->RethrowErrorResult();
target = nullptr;
}
all_completed &= is_ready;
}
if (all_completed) break;
auto sleep_status = current.Sleep(wait_strategy, deadline);
for (const auto& target : targets) {
if (target) target->AfterWait();
}
switch (sleep_status) {
case TaskContext::WakeupSource::kWaitList:
break;
case TaskContext::WakeupSource::kDeadlineTimer:
return FutureStatus::kTimeout;
case TaskContext::WakeupSource::kCancelRequest:
return FutureStatus::kCancelled;
case TaskContext::WakeupSource::kNone:
UASSERT_MSG(false, "Unexpected WakeupSource::kNone");
break;
case TaskContext::WakeupSource::kBootstrap:
UASSERT_MSG(false, "Unexpected WakeupSource::kBootstrap");
break;
}
}
UASSERT(std::all_of(targets.begin(), targets.end(), [](auto* target) { return !target; }));
return FutureStatus::kReady;
}
void HandleWaitAllStatus(FutureStatus status) {
switch (status) {
case FutureStatus::kReady:
break;
case FutureStatus::kTimeout:
UASSERT_MSG(false, "Got timeout on a WaitAll without Deadline");
break;
case FutureStatus::kCancelled:
throw WaitInterruptedException(current_task::GetCurrentTaskContext().CancellationReason());
}
}
} // namespace engine::impl
USERVER_NAMESPACE_END