Skip to content

Commit

Permalink
Implement anyof and allof promise combinators.
Browse files Browse the repository at this point in the history
  • Loading branch information
jnthn committed Aug 7, 2013
1 parent 8ac6bb1 commit 9117969
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions src/vm/jvm/core/Threading.pm
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ my class X::Promise::Code {
has $.attempted;
method message() { "Can not $!attempted a code-based promise" }
}
my class X::Promise::Combinator {
has $.combinator;
method message() { "Can only use $!combinator to combine other Promise objects" }
}
my class Promise {
has $.scheduler;
has $.status;
Expand Down Expand Up @@ -127,6 +131,10 @@ my class Promise {
}
}

method has_result(Promise:D:) {
so $!status == any(Failed, Completed)
}

method then(Promise:D: &code) {
$!then_lock.lock();
if $!status == any(Failed, Completed) {
Expand Down Expand Up @@ -159,6 +167,36 @@ my class Promise {
($seconds * 1000).Int);
$p
}

method anyof(Promise:U: *@promises) {
X::Promise::Combinator.new(combinator => 'anyof').throw
unless @promises >>~~>> Promise;
self!until_n_kept(@promises, 1)
}

method allof(Promise:U: *@promises) {
X::Promise::Combinator.new(combinator => 'allof').throw
unless @promises >>~~>> Promise;
self!until_n_kept(@promises, @promises.elems)
}

my Mu $AtomicInteger;
method !until_n_kept(@promises, Int $n) {
once {
$AtomicInteger := nqp::jvmbootinterop().typeForName('java.util.concurrent.atomic.AtomicInteger');
Nil;
}
my Mu $c := $AtomicInteger.'constructor/new/(I)V'(nqp::decont($n));
my $p = Promise.new;
for @promises {
.then({
if $c.'decrementAndGet'() == 0 {
$p.keep(Nil)
}
})
}
$p
}
}

# The ThreadPoolScheduler is a straightforward scheduler that maintains a
Expand Down

0 comments on commit 9117969

Please sign in to comment.