/
HyperWorkBuffer.pm
59 lines (52 loc) · 1.72 KB
/
HyperWorkBuffer.pm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# A HyperWorkBuffer represents a chunk of work to be processed as part of a
# parallelized operation (either thanks to hyper or race). It carries a
# sequence number, and input buffer (items to process), and an output buffer
# (results of processing them).
my class HyperWorkBuffer {
has int $.sequence-number is rw;
has $.input;
has $.output;
method new() {
my \wb = nqp::create(self);
nqp::bindattr(wb, HyperWorkBuffer, '$!input', nqp::create(IterationBuffer));
nqp::bindattr(wb, HyperWorkBuffer, '$!output', nqp::create(IterationBuffer));
wb
}
# Clears both buffers.
method clear(--> Nil) {
nqp::setelems($!input, 0);
nqp::setelems($!output, 0);
}
# Swaps around the input/output buffers, and clears the output buffer.
# (This is used between pipelined stages, where the next stage will
# use the items in the first.)
method swap(--> Nil) {
my $new-input := $!output;
$!output := $!input;
$!input := $new-input;
nqp::setelems($!output, 0);
}
# Gets an iterator of the input.
method input-iterator() {
class :: does Iterator {
has $!buffer;
has int $!i;
method new(\buffer) {
nqp::p6bindattrinvres(
nqp::create(self),self,'$!buffer',buffer
)
}
method pull-one() {
my int $i = $!i;
if $i < nqp::elems($!buffer) {
$!i = $i + 1;
nqp::atpos($!buffer, $i)
}
else {
IterationEnd
}
}
}.new($!input)
}
}
# vim: ft=perl6 expandtab sw=4