/
Publish.pm
50 lines (44 loc) · 1.46 KB
/
Publish.pm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# Various generators and combinators are provided by Publish.
my class Publish {
method for(*@values, :$scheduler = $*SCHEDULER) {
my class ForSupply does Supply {
has @!values;
has $!scheduler;
submethod BUILD(:@!values, :$!scheduler) {}
method tap(|c) {
my $sub = self.Supply::tap(|c);
$!scheduler.cue(
{
for @!values -> \val {
$sub.next().(val);
}
if $sub.last -> $l { $l() }
},
:catch(-> $ex { if $sub.fail -> $t { $t($ex) } })
);
$sub
}
}
ForSupply.new(:@values, :$scheduler)
}
method interval($interval, $delay = 0, :$scheduler = $*SCHEDULER) {
my class IntervalSupply does Supply {
has $!scheduler;
has $!interval;
has $!delay;
submethod BUILD(:$!scheduler, :$!interval, :$!delay) {}
method tap(|c) {
my $sub = self.Supply::tap(|c);
$!scheduler.cue(
{
state $i = 0;
$sub.next().($i++);
},
:every($!interval), :in($!delay)
);
$sub
}
}
IntervalSupply.new(:$interval, :$delay, :$scheduler)
}
}