Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implement start method on supplies.
Takes a closure and, for each supplied value, schedules the closure to
run on another thread. It then more's a Supply (resulting in us having
a supply of supplies) that will either have a single value more'd and
then be done if the async work completes successfully, or quit if the
work fails. Useful for kicking off work on the thread pool if you do
not want to block up the thread pushing values at you (maybe 'cus you
are reacting to UI events, but have some long-running work to kick
off).
  • Loading branch information
jnthn committed Apr 23, 2014
1 parent 631f02e commit e4508d9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/core/Supply.pm
Expand Up @@ -102,6 +102,7 @@ my role Supply {
method schedule_on(Scheduler $scheduler) {
SupplyOperations.schedule_on(self, $scheduler);
}
method start(&startee) { SupplyOperations.start(self, &startee) }
method merge(*@s) { SupplyOperations.merge(self, @s) }
method zip(*@s,:&with) { SupplyOperations.zip(self, @s, :&with) }

Expand Down
26 changes: 26 additions & 0 deletions src/core/SupplyOperations.pm
Expand Up @@ -377,6 +377,32 @@ my class SupplyOperations is repr('Uninstantiable') {
ScheduleSupply.new(:source($s), :$scheduler)
}

method start(Supply $s, &startee) {
my class StartSupply does Supply does PrivatePublishing {
has $!value;
has &!startee;

submethod BUILD(:$!value, :&!startee) { }

method tap(|c) {
my $sub = self.Supply::tap(|c);
Promise.start({ &!startee($!value) }).then({
if .status == Kept {
self!more(.result);
self!done();
}
else {
self!quit(.cause);
}
});
$sub
}
}
self.map($s, -> \value {
StartSupply.new(:value(value), :&startee)
})
}

method merge(*@s) {

@s.shift unless @s[0].DEFINITE; # lose if used as class method
Expand Down

0 comments on commit e4508d9

Please sign in to comment.