Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Properly handle closing a tap in many built-ins.
This was a leftover NYI from when supplies were first added. Now you
can close a tap right at the end of a chain and it cascades all the
way up to the top. For example, if there's a Supply.interval at the
end of the chain, it will stop the underlying timer from producing
values.
  • Loading branch information
jnthn committed Apr 21, 2014
1 parent 544ed97 commit 2528d63
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 21 deletions.
8 changes: 6 additions & 2 deletions src/core/Supply.pm
Expand Up @@ -9,6 +9,7 @@ my role Supply {
has &.more;
has &.done;
has &.quit;
has &.on_close;
has $.supply;
method close() {
$!supply.close(self)
Expand All @@ -18,8 +19,8 @@ my role Supply {
has @!tappers;
has $!tappers_lock = Lock.new;

method tap(&more, :&done, :&quit = {.die}) {
my $sub = Tap.new(:&more, :&done, :&quit, :supply(self));
method tap(&more, :&done, :&quit = {.die}, :&on_close) {
my $sub = Tap.new(:&more, :&done, :&quit, :&on_close, :supply(self));
$!tappers_lock.protect({
@!tappers.push($sub);
});
Expand All @@ -30,6 +31,9 @@ my role Supply {
$!tappers_lock.protect({
@!tappers .= grep(* !=== $t);
});
if $t.on_close -> &on_close {
on_close();
}
}

method tappers() {
Expand Down
48 changes: 29 additions & 19 deletions src/core/SupplyOperations.pm
Expand Up @@ -38,15 +38,17 @@ my class SupplyOperations is repr('Uninstantiable') {
submethod BUILD(:@!values, :$!scheduler) {}

method tap(|c) {
my $sub = self.Supply::tap(|c);
my $closed = False;
my $sub = self.Supply::tap(|c, on_close => { $closed = True });
$!scheduler.cue(
{
for @!values -> \val {
last if $closed;
$sub.more().(val);
}
if $sub.done -> $l { $l() }
if !$closed && $sub.done -> $l { $l() }
},
:catch(-> $ex { if $sub.quit -> $t { $t($ex) } })
:catch(-> $ex { if !$closed && $sub.quit -> $t { $t($ex) } })
);
$sub
}
Expand All @@ -63,8 +65,9 @@ my class SupplyOperations is repr('Uninstantiable') {
submethod BUILD(:$!scheduler, :$!interval, :$!delay) {}

method tap(|c) {
my $sub = self.Supply::tap(|c);
$!scheduler.cue(
my $cancellation;
my $sub = self.Supply::tap(|c, on_close => { $cancellation.cancel() });
$cancellation = $!scheduler.cue(
{
state $i = 0;
$sub.more().($i++);
Expand All @@ -84,8 +87,9 @@ my class SupplyOperations is repr('Uninstantiable') {
submethod BUILD(:$!source) { }

method tap(|c) {
my $sub = self.Supply::tap(|c);
$!source.tap( -> \val {
my $source_tap;
my $sub = self.Supply::tap(|c, on_close => { $source_tap.close() });
$source_tap = $!source.tap( -> \val {
self!more(val.flat)
},
done => { self!done(); },
Expand All @@ -110,8 +114,9 @@ my class SupplyOperations is repr('Uninstantiable') {
submethod BUILD(:$!source, :&!filter) { }

method tap(|c) {
my $sub = self.Supply::tap(|c);
$!source.tap( -> \val {
my $source_tap;
my $sub = self.Supply::tap(|c, on_close => { $source_tap.close() });
$source_tap = $!source.tap( -> \val {
if (&!filter(val)) { self!more(val) }
},
done => { self!done(); },
Expand All @@ -132,7 +137,8 @@ my class SupplyOperations is repr('Uninstantiable') {
submethod BUILD(:$!source, :&!as, :&!with) { }

method tap(|c) {
my $sub = self.Supply::tap(|c);
my $source_tap;
my $sub = self.Supply::tap(|c, on_close => { $source_tap.close() });
my &more = do {
if &!with and &!with !=== &[===] {
my @seen; # should be Mu, but doesn't work in settings
Expand Down Expand Up @@ -172,7 +178,7 @@ my class SupplyOperations is repr('Uninstantiable') {
};
}
};
$!source.tap( &more,
$source_tap = $!source.tap( &more,
done => { self!done(); },
quit => -> $ex { self!quit($ex) }
);
Expand All @@ -192,7 +198,8 @@ my class SupplyOperations is repr('Uninstantiable') {
submethod BUILD(:$!source, :&!as, :&!with) { }

method tap(|c) {
my $sub = self.Supply::tap(|c);
my $source_tap;
my $sub = self.Supply::tap(|c, on_close => { $source_tap.close() });
my &more = do {
my Mu $last = @secret;
my Mu $target;
Expand All @@ -211,7 +218,7 @@ my class SupplyOperations is repr('Uninstantiable') {
}
};
};
$!source.tap( &more,
$source_tap = $!source.tap( &more,
done => { self!done(); },
quit => -> $ex { self!quit($ex) }
);
Expand All @@ -229,8 +236,9 @@ my class SupplyOperations is repr('Uninstantiable') {
submethod BUILD(:$!source, :&!mapper) { }

method tap(|c) {
my $sub = self.Supply::tap(|c);
$!source.tap( -> \val {
my $source_tap;
my $sub = self.Supply::tap(|c, on_close => { $source_tap.close() });
$source_tap = $!source.tap( -> \val {
self!more(&!mapper(val))
},
done => { self!done(); },
Expand All @@ -255,15 +263,16 @@ my class SupplyOperations is repr('Uninstantiable') {
submethod BUILD(:$!source, :$!elems, :$!overlap) { }

method tap(|c) {
my $tap = self.Supply::tap(|c);
my $source_tap;
my $tap = self.Supply::tap(|c, on_close => { $source_tap.close() });

my @batched;
sub flush {
self!more([@batched]);
@batched.splice( 0, +@batched - $!overlap );
}

$!source.tap( -> \val {
$source_tap = $!source.tap( -> \val {
@batched.push: val;
flush if @batched.elems == $!elems;
},
Expand All @@ -290,7 +299,8 @@ my class SupplyOperations is repr('Uninstantiable') {
submethod BUILD(:$!source, :$!elems, :$!seconds) { }

method tap(|c) {
my $tap = self.Supply::tap(|c);
my $source_tap;
my $tap = self.Supply::tap(|c, on_close => { $source_tap.close() });

my @batched;
my $last_time;
Expand Down Expand Up @@ -334,7 +344,7 @@ my class SupplyOperations is repr('Uninstantiable') {
}
}
}
$!source.tap( &more,
$source_tap = $!source.tap( &more,
done => {
flush if @batched;
self!done();
Expand Down

0 comments on commit 2528d63

Please sign in to comment.