/
Awaiter.pm6
115 lines (105 loc) · 3.92 KB
/
Awaiter.pm6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
my role Awaiter {
method await(Awaitable:D $a) { ... }
method await-all(Iterable:D $i) { ... }
}
my class Awaiter::Blocking does Awaiter {
method await(Awaitable:D $a) {
my $handle := $a.get-await-handle;
if $handle.already {
$handle.success
?? $handle.result
!! $handle.cause.rethrow
}
else {
my $s = Semaphore.new(0);
my $success;
my $result;
$handle.subscribe-awaiter(-> \success, \result {
$success := success;
$result := result;
$s.release;
});
$s.acquire;
$success
?? $result
!! $result.rethrow
}
}
method await-all(Iterable:D \i) {
# Collect results that are already available, and handles where the
# results are not yet available together with the matching insertion
# indices.
my \results = nqp::list();
my \handles = nqp::list();
my \indices = nqp::list_i();
my int $insert = 0;
my $saw-slip = False;
for i -> $awaitable {
unless nqp::istype($awaitable, Awaitable) {
die "Can only specify Awaitable objects to await (got a $awaitable.^name())";
}
unless nqp::isconcrete($awaitable) {
die "Must specify a defined Awaitable to await (got an undefined $awaitable.^name())";
}
my $handle := $awaitable.get-await-handle;
if $handle.already {
if $handle.success {
my \result = $handle.result;
nqp::bindpos(results, $insert, result);
$saw-slip = True if nqp::istype(result, Slip);
}
else {
$handle.cause.rethrow
}
}
else {
nqp::push(handles, $handle);
nqp::push_i(indices, $insert);
}
++$insert;
}
# See if we have anything that we need to really block on. If so, we
# use a lock and condition variable to handle the blocking. The lock
# protects writes into the array.
my int $num-handles = nqp::elems(handles);
if $num-handles {
my $exception = Mu;
my $l = Lock.new;
my $ready = $l.condition();
my int $remaining = $num-handles;
loop (my int $i = 0; $i < $num-handles; ++$i) {
my $handle := nqp::atpos(handles, $i);
my int $insert = nqp::atpos_i(indices, $i);
$handle.subscribe-awaiter(-> \success, \result {
$l.protect: {
if success && $remaining {
nqp::bindpos(results, $insert, result);
$saw-slip = True if nqp::istype(result, Slip);
--$remaining;
$ready.signal unless $remaining;
}
elsif !nqp::isconcrete($exception) {
$exception := result;
$remaining = 0;
$ready.signal;
}
}
});
}
# Block until remaining is 0 (need the loop to cope with suprious
# wakeups).
loop {
$l.protect: {
last if $remaining == 0;
$ready.wait;
}
}
# If we got an exception, throw it.
$exception.rethrow if nqp::isconcrete($exception);
}
my \result-list = nqp::p6bindattrinvres(nqp::create(List), List, '$!reified', results);
$saw-slip ?? result-list.map(-> \val { val }).List !! result-list
}
}
PROCESS::<$AWAITER> := Awaiter::Blocking;
# vim: ft=perl6 expandtab sw=4