Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implement migrate on supplies.
Works on a supply of supplies. Taps the first one that arrives, and
passes on its values. When another one arrives, closes the tap on the
first and starts passing on values from the second, and so forth. Very
useful for preventing race conditions when you have supplies that
represent the latest work needed, and want to ignore previous work
set in motion (e.g. auto-search text box that does some work to make
suggestions).
  • Loading branch information
jnthn committed Apr 23, 2014
1 parent 8983aa3 commit 61af179
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/core/Supply.pm
Expand Up @@ -106,6 +106,7 @@ my role Supply {
method unchanged($time, :$scheduler = $*SCHEDULER) {
SupplyOperations.unchanged(self, $time, :$scheduler)
}
method migrate() { SupplyOperations.migrate(self) }
method merge(*@s) { SupplyOperations.merge(self, @s) }
method zip(*@s,:&with) { SupplyOperations.zip(self, @s, :&with) }

Expand Down
30 changes: 30 additions & 0 deletions src/core/SupplyOperations.pm
Expand Up @@ -442,6 +442,36 @@ my class SupplyOperations is repr('Uninstantiable') {
UnchangedSupply.new(:source($s), :$time, :$scheduler);
}

method migrate(Supply $s) {
my class MigrateSupply does Supply does PrivatePublishing {
has $!source;
has $!current;
has $!lock;

submethod BUILD(:$!source) {
$!lock = Lock.new;
}

method tap(|c) {
my $source_tap;
my $sub = self.Supply::tap(|c, closing => { $source_tap.close() });
$source_tap = $!source.tap(
-> \inner_supply {
$!lock.protect({
$!current.close() if $!current;
$!current = inner_supply.tap(-> \val {
self!more(val);
});
});
},
done => { self!done(); },
quit => -> $ex { self!quit($ex) });
$sub
}
}
MigrateSupply.new(:source($s))
}

method merge(*@s) {

@s.shift unless @s[0].DEFINITE; # lose if used as class method
Expand Down

0 comments on commit 61af179

Please sign in to comment.