Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implement unchanged on supplies.
Waits for the value to have been stable for a certain amount of time
before passing it along. If it changes within that time, then the
timer is reset. Useful for any situation where you want the data to
stabalize before doing further work.
  • Loading branch information
jnthn committed Apr 23, 2014
1 parent e4508d9 commit 64f1110
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/core/Supply.pm
Expand Up @@ -103,6 +103,9 @@ my role Supply {
SupplyOperations.schedule_on(self, $scheduler);
}
method start(&startee) { SupplyOperations.start(self, &startee) }
method unchanged($time, :$scheduler = $*SCHEDULER) {
SupplyOperations.unchanged(self, $time, :$scheduler)
}
method merge(*@s) { SupplyOperations.merge(self, @s) }
method zip(*@s,:&with) { SupplyOperations.zip(self, @s, :&with) }

Expand Down
39 changes: 39 additions & 0 deletions src/core/SupplyOperations.pm
Expand Up @@ -403,6 +403,45 @@ my class SupplyOperations is repr('Uninstantiable') {
})
}

method unchanged(Supply $s, $time, :$scheduler = $*SCHEDULER) {
my class UnchangedSupply does Supply does PrivatePublishing {
has $!source;
has $!time;
has $!scheduler;
has $!lock;
has $!last_cancellation;

submethod BUILD(:$!source, :$!time, :$!scheduler) {
$!lock = Lock.new;
}

method tap(|c) {
my $source_tap;
my $sub = self.Supply::tap(|c, closing => { $source_tap.close() });
$source_tap = $!source.tap(
-> \val {
$!lock.protect({
if $!last_cancellation {
$!last_cancellation.cancel;
}
$!last_cancellation = $!scheduler.cue(
:in($time),
{
$!lock.protect({
$!last_cancellation = Nil;
});
self!more(val);
});
});
},
done => { self!done(); },
quit => -> $ex { self!quit($ex) });
$sub
}
}
UnchangedSupply.new(:source($s), :$time, :$scheduler);
}

method merge(*@s) {

@s.shift unless @s[0].DEFINITE; # lose if used as class method
Expand Down

0 comments on commit 64f1110

Please sign in to comment.