/
HyperToIterator.pm6
115 lines (105 loc) · 3.57 KB
/
HyperToIterator.pm6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
my class Backtrace { ... }
my role X::HyperRace::Died {
has $.start-backtrace;
multi method gist(::?CLASS:D:) {
"A worker in a parallel iteration (hyper or race) initiated here:\n" ~
((try $!start-backtrace ~ "\n") // '<unknown location>') ~
"Died at:\n" ~
callsame().indent(4)
}
}
my class Rakudo::Internals::HyperToIterator does Rakudo::Internals::HyperJoiner does Iterator {
has int $!seen-last;
has int $!offset;
has $!batches;
has $!waiting;
has $!current-items;
my constant EMPTY_BUFFER = nqp::create(IterationBuffer);
submethod TWEAK() {
$!batches := Channel.new;
$!waiting := nqp::list;
$!current-items := EMPTY_BUFFER;
}
method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) {
nqp::stmts(
nqp::bindpos( # store the batch at its place
$!waiting,
nqp::sub_i($batch.sequence-number,$!offset),
$batch
),
nqp::until( # feed valid batches in order
nqp::isnull(nqp::atpos($!waiting,0)),
nqp::stmts(
$!batches.send(nqp::shift($!waiting)),
($!offset = nqp::add_i($!offset,1))
)
),
nqp::if( # set flag we've seen last one
$batch.last,
($!seen-last = 1)
),
nqp::if( # close channel if we're done
$!seen-last && nqp::not_i(nqp::elems($!waiting)),
$!batches.close
)
)
}
method consume-error(Exception $e --> Nil) {
$!batches.fail($e);
}
method pull-one() is raw {
until nqp::elems($!current-items) { # handles empty batches
$!current-items := $!batches.receive.items;
self.batch-used();
CATCH {
when X::Channel::ReceiveOnClosed {
return IterationEnd;
}
default {
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
unless nqp::istype($_, X::HyperRace::Died);
}
}
}
nqp::shift($!current-items)
}
method skip-at-least(int $skipping) {
my int $toskip = $skipping;
while $toskip {
if nqp::isge_i(nqp::elems($!current-items),$toskip) {
nqp::splice($!current-items,EMPTY_BUFFER,0,$toskip);
return 1;
}
$toskip = nqp::sub_i($toskip,nqp::elems($!current-items));
$!current-items := $!batches.receive.items;
self.batch-used();
CATCH {
when X::Channel::ReceiveOnClosed {
return 0;
}
default {
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
unless nqp::istype($_, X::HyperRace::Died);
}
}
}
0
}
method push-all(\target) {
loop {
target.append($!current-items);
$!current-items := $!batches.receive.items;
self.batch-used();
CATCH {
when X::Channel::ReceiveOnClosed {
return IterationEnd;
}
default {
($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
unless nqp::istype($_, X::HyperRace::Died);
}
}
}
}
}
# vim: ft=perl6 expandtab sw=4