-
Notifications
You must be signed in to change notification settings - Fork 316
/
Copy pathsingle_consumer_event.cpp
82 lines (58 loc) · 2.38 KB
/
single_consumer_event.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <userver/engine/single_consumer_event.hpp>
#include <engine/impl/wait_list_light.hpp>
#include <engine/task/task_context.hpp>
#include <userver/logging/log.hpp>
#include <userver/utils/assert.hpp>
USERVER_NAMESPACE_BEGIN
namespace engine {
class SingleConsumerEvent::EventWaitStrategy final : public impl::WaitStrategy {
public:
EventWaitStrategy(SingleConsumerEvent& event, impl::TaskContext& current) : event_(event), current_(current) {}
impl::EarlyWakeup SetupWakeups() override {
return impl::EarlyWakeup{event_.waiters_->GetSignalOrAppend(¤t_)};
}
void DisableWakeups() noexcept override { event_.waiters_->Remove(current_); }
private:
SingleConsumerEvent& event_;
impl::TaskContext& current_;
};
SingleConsumerEvent::SingleConsumerEvent() noexcept = default;
SingleConsumerEvent::SingleConsumerEvent(NoAutoReset) noexcept : is_auto_reset_(false) {}
SingleConsumerEvent::~SingleConsumerEvent() = default;
bool SingleConsumerEvent::IsAutoReset() const noexcept { return is_auto_reset_; }
bool SingleConsumerEvent::WaitForEvent() { return WaitForEventUntil(Deadline{}); }
bool SingleConsumerEvent::WaitForEventUntil(Deadline deadline) {
if (GetIsSignaled()) {
return true; // optimistic path
}
impl::TaskContext& current = current_task::GetCurrentTaskContext();
LOG_TRACE() << "WaitForEventUntil()";
EventWaitStrategy wait_manager{*this, current};
while (true) {
if (GetIsSignaled()) {
LOG_TRACE() << "success";
return true;
}
LOG_TRACE() << "iteration()";
const auto wakeup_source = current.Sleep(wait_manager, deadline);
if (!impl::HasWaitSucceeded(wakeup_source)) {
LOG_TRACE() << "failure";
return false;
}
}
}
void SingleConsumerEvent::Reset() noexcept { waiters_->GetAndResetSignal(); }
void SingleConsumerEvent::Send() { waiters_->SetSignalAndWakeupOne(); }
bool SingleConsumerEvent::IsReady() const noexcept { return waiters_->IsSignaled(); }
bool SingleConsumerEvent::GetIsSignaled() noexcept {
if (is_auto_reset_) {
return waiters_->GetAndResetSignal();
} else {
return waiters_->IsSignaled();
}
}
void SingleConsumerEvent::CheckIsAutoResetForWaitPredicate() {
UINVARIANT(IsAutoReset(), "Wait with predicate requires auto-reset functionality");
}
} // namespace engine
USERVER_NAMESPACE_END