/
CoreJob.cpp
143 lines (120 loc) · 3.97 KB
/
CoreJob.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright (C) 2012-2016 Leap Motion, Inc. All rights reserved.
#include "stdafx.h"
#include "CoreJob.h"
#include "CoreContext.h"
#include FUTURE_HEADER
using namespace autowiring;
// Arm doesn't have std::future, but does have std::chrono. We need to convert from std::chrono
// to autoboost::chrono when passing arguments to "std::future"(alias to autoboost::future) on arm.
#if __ANDROID__ && !GCC_CHECK(4, 9)
autoboost::chrono::nanoseconds NanosecondsForFutureWait(const std::chrono::nanoseconds& time) {
return autoboost::chrono::nanoseconds(time.count());
}
#else
std::chrono::nanoseconds NanosecondsForFutureWait(const std::chrono::nanoseconds& time) {
return time;
}
#endif
CoreJob::CoreJob(const char* name) :
ContextMember(name)
{}
CoreJob::~CoreJob(void)
{}
void CoreJob::OnPended(std::unique_lock<std::mutex>&& lk){
if(!m_curEventInTeardown) {
// Something is already outstanding, it will handle dispatching for us.
return;
}
if(!m_running) {
// Nothing to do, we aren't running yet--just hold on to this entry until we are
// ready to initiate it.
return;
}
// Increment outstanding count because we now have an entry out in a thread pool
auto outstanding = GetOutstanding();
if(!outstanding) {
// We're currently signalled to stop, we must empty the queue and then
// return here--we can't accept dispatch delivery on a stopped queue.
for (auto cur = m_pHead; cur;) {
auto next = cur->m_pFlink;
delete cur;
cur = next;
}
} else {
// Need to ask the thread pool to handle our events again:
m_curEventInTeardown = false;
std::future<void>* future = static_cast<std::future<void>*>(std::atomic_exchange<void*>(&m_curEvent, nullptr));
if (future) {
delete future;
}
m_curEvent = new std::future<void>(
std::async(
std::launch::async,
[this, outstanding] () mutable {
this->DispatchAllAndClearCurrent();
outstanding.reset();
}
));
}
}
void CoreJob::DispatchAllAndClearCurrent(void) {
CurrentContextPusher pshr(GetContext());
for(;;) {
// Trivially run down the queue as long as we're in the pool:
this->DispatchAllEvents();
// Check the size of the queue. Could be that someone added something
// between when we finished looping, and when we obtained the lock, and
// we don't want to exit our pool if that has happened.
std::lock_guard<std::mutex> lk(m_dispatchLock);
if(AreAnyDispatchersReady())
continue;
// Indicate that we're tearing down and will be done very soon. This is
// a signal to consumers that a call to m_curEvent.wait() will be nearly
// non-blocking.
m_curEventInTeardown = true;
break;
}
m_queueUpdated.notify_all();
}
bool CoreJob::OnStart(void) {
std::shared_ptr<CoreContext> context = m_context.lock();
if(!context) {
return false;
}
m_running = true;
std::unique_lock<std::mutex> lk(m_dispatchLock);
if(m_pHead)
// Simulate a pending event, because we need to set up our async:
OnPended(std::move(lk));
return true;
}
void CoreJob::Abort(void) {
DispatchQueue::Abort();
m_running = false;
}
void CoreJob::OnStop(bool graceful) {
if(graceful) {
// Pend a call which will invoke Abort once the dispatch queue is done:
DispatchQueue::Pend(
[this] {this->Abort();}
);
} else {
// Abort the dispatch queue so anyone waiting will wake up
Abort();
}
}
void CoreJob::DoAdditionalWait(void) {
std::future<void>* future = static_cast<std::future<void>*>(std::atomic_exchange<void*>(&m_curEvent, nullptr));
if (future) {
future->wait();
delete future;
}
}
bool CoreJob::DoAdditionalWait(std::chrono::nanoseconds timeout) {
std::future<void>* future = static_cast<std::future<void>*>(std::atomic_exchange<void*>(&m_curEvent, nullptr));
if (!future)
return true;
const auto status = future->wait_for(NanosecondsForFutureWait(timeout));
delete future;
return status == std::future_status::ready;
}