/
rendez.cc
92 lines (82 loc) · 2.46 KB
/
rendez.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include "ten/task/rendez.hh"
#include "proc.hh"
#include <mutex>
namespace ten {
void rendez::sleep(std::unique_lock<qutex> &lk) {
DCHECK(lk.owns_lock()) << "must own lock before calling rendez::sleep";
task *t = this_task();
{
std::lock_guard<std::timed_mutex> ll(_m);
DCHECK(std::find(_waiting.begin(), _waiting.end(), t) == _waiting.end())
<< "BUG: " << t << " already waiting on rendez " << this;
DVLOG(5) << "RENDEZ[" << this << "] PUSH BACK: " << t;
_waiting.push_back(t);
}
// must hold the lock until we're in the waiting list
// otherwise another thread might modify the condition and
// call wakeup() and waiting would be empty so we'd sleep forever
lk.unlock();
try
{
{
task::cancellation_point cancellable;
t->swap();
}
lk.lock();
} catch (...) {
{
std::lock_guard<std::timed_mutex> ll(_m);
auto i = std::find(_waiting.begin(), _waiting.end(), t);
if (i != _waiting.end()) {
_waiting.erase(i);
}
}
lk.lock();
throw;
}
}
void rendez::wakeup() {
task *t = nullptr;
{
std::lock_guard<std::timed_mutex> lk(_m);
if (!_waiting.empty()) {
t = _waiting.front();
_waiting.pop_front();
}
}
DVLOG(5) << "RENDEZ[" << this << "] " << this_task() << " wakeup: " << t;
if (t) t->ready();
}
void rendez::wakeupall() {
tasklist waiting;
{
std::lock_guard<std::timed_mutex> lk(_m);
std::swap(waiting, _waiting);
}
for (auto t : waiting) {
DVLOG(5) << "RENDEZ[" << this << "] " << this_task() << " wakeupall: " << t;
t->ready();
}
}
#if 0
bool rendez::sleep_for(unique_lock<qutex> &lk, unsigned int ms) {
task *t = this_proc()->ctask;
if (find(waiting.begin(), waiting.end(), t) == waiting.end()) {
DVLOG(5) << "RENDEZ SLEEP PUSH BACK: " << t;
waiting.push_back(t);
}
lk.unlock();
this_proc()->sched().add_timeout(t, ms);
t->swap();
lk.lock();
this_proc()->sched().del_timeout(t);
// if we're not in the waiting list then we were signaled to wakeup
return find(waiting.begin(), waiting.end(), t) == waiting.end();
}
#endif
rendez::~rendez() {
using ::operator<<;
std::lock_guard<std::timed_mutex> lk(_m);
DCHECK(_waiting.empty()) << "BUG: still waiting: " << _waiting;
}
} // namespace