Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add 'then' method to Promise.
Attaches a piece of work to do when the promise is fulfilled or
broken. Returns a new Promise that represents the completion of
this extra work, allowing chaining.
  • Loading branch information
jnthn committed Jul 12, 2013
1 parent 00458e9 commit 6f93cae
Showing 1 changed file with 47 additions and 5 deletions.
52 changes: 47 additions & 5 deletions src/vm/jvm/core/Threading.pm
Expand Up @@ -43,33 +43,58 @@ my class Thread {
# C<async> function.
my enum PromiseStatus (:Planned(0), :Running(1), :Completed(2), :Failed(3));
my class Promise {
# Things we will use from the JVM.
my $interop := nqp::jvmbootinterop();
my \Semaphore := $interop.typeForName('java.util.concurrent.Semaphore');
# Things we will use from the JVM.
my $interop := nqp::jvmbootinterop();
my \Semaphore := $interop.typeForName('java.util.concurrent.Semaphore');
my \ReentrantLock := $interop.typeForName('java.util.concurrent.locks.ReentrantLock');

has $.scheduler;
has $.status;
has &!code;
has $!result;
has @!thens;
has Mu $!ready_semaphore;
has Mu $!then_lock;

submethod BUILD(:$!scheduler!, :&code!) {
submethod BUILD(:$!scheduler!, :&!code!, :$unscheduled = False) {
$!status = Planned;
$!ready_semaphore := Semaphore.'constructor/new/(I)V'(-1);
$!then_lock := ReentrantLock.'constructor/new/()V'();
self!schedule() unless $unscheduled;
}

method !schedule() {
$!scheduler.schedule({
$!status = Running;
$!result = code();
$!result = &!code();
$!status = Completed;
$!ready_semaphore.'method/release/(I)V'(32768);
self!schedule_thens();
CATCH {
default {
$!result = $_;
$!status = Failed;
$!ready_semaphore.'method/release/(I)V'(32768);
self!schedule_thens();
}
}
})
}

method !schedule_then($fulfilled) {
my $orig_code = &!code;
&!code = { $orig_code($fulfilled) }
self!schedule();
}

method !schedule_thens() {
$!then_lock.lock();
while @!thens {
@!thens.shift()!schedule_then(self)
}
$!then_lock.unlock();
}

method result() {
# One important missing optimization here is that if the promise is
# not yet started, then the work can be done immediately by the
Expand All @@ -84,6 +109,23 @@ my class Promise {
$!result.rethrow
}
}

method then(&code) {
$!then_lock.lock();
if $!status == any(Failed, Completed) {
# Already have the result, schedule immediately.
$!then_lock.unlock();
Promise.new(:$!scheduler, :code({ code(self) }))
}
else {
# Create a (currently unscheduled) promise and add it to
# the list.
my $then_promise = Promise.new(:$!scheduler, :code(&code), :unscheduled);
@!thens.push($then_promise);
$!then_lock.unlock();
$then_promise
}
}
}

# The ThreadPoolScheduler is a straightforward scheduler that maintains a
Expand Down

0 comments on commit 6f93cae

Please sign in to comment.