Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Clean-up and re-working of Promise.
The outside API remains largely the same, except the constructor is
now only taking :$scheduler. The main change is that Promise is no
longer tied to running code at all. Rather, that's just what the
run factory method and then methods set up. Promise.sleep(...) is
also mostly delegated to the Scheduler. This makes Promise more
focused around its synchronization primitive nature, and puts the
other Promise factories on a more even footing.
  • Loading branch information
jnthn committed Oct 30, 2013
1 parent 91a1b0c commit 0358bae
Showing 1 changed file with 85 additions and 70 deletions.
155 changes: 85 additions & 70 deletions src/vm/jvm/core/Threading.pm
Expand Up @@ -208,52 +208,67 @@ my class ThreadPoolScheduler does Scheduler {
# This thread pool scheduler will be the default one.
$PROCESS::SCHEDULER = ThreadPoolScheduler.new();

# A promise represents a piece of asynchronous work, which may be in progress,
# completed or even yet to start. Typically, a promise is created using the
# C<async> function.
my enum PromiseStatus (:Planned(0), :Running(1), :Kept(2), :Broken(3));
my class X::Promise::Code is Exception {
has $.attempted;
method message() { "Can not $!attempted a code-based promise" }
}
# A promise is a synchronization mechanism for a piece of work that will
# produce a single result (keeping the promise) or fail (breaking the
# promise).
my enum PromiseStatus (:Planned(0), :Kept(1), :Broken(2));
my class X::Promise::Combinator is Exception {
has $.combinator;
method message() { "Can only use $!combinator to combine other Promise objects" }
}
my class X::Promise::CauseOnlyValidOnBroken is Exception {
method message() { "Can only call cause on a broken promise" }
}
my class X::Promise::KeeperTaken is Exception {
method message() { "Access denied to keep/break this Promise; the keeper was already taken" }
}
my class Promise {
has $.scheduler;
has $.status;
has &!code;
has $!result;
has @!thens;
has int $!keeper_taken;
has Mu $!ready_semaphore;
has Mu $!then_lock;
has Mu $!lock;
has @!thens;

submethod BUILD(:$!scheduler = $*SCHEDULER, :&!code, :$scheduled = True) {
submethod BUILD(:$!scheduler = $*SCHEDULER) {
my $interop := nqp::jvmbootinterop();
my \Semaphore := $interop.typeForName('java.util.concurrent.Semaphore');
my \ReentrantLock := $interop.typeForName('java.util.concurrent.locks.ReentrantLock');
$!status = Planned;
$!ready_semaphore := Semaphore.'constructor/new/(I)V'(-1);
$!then_lock := ReentrantLock.'constructor/new/()V'();
self!schedule() if &!code && $scheduled;
$!lock := ReentrantLock.'constructor/new/()V'();
$!status = Planned;
}

# A Promise::Keeper is used to enable the right to keep/break a promise
# to be restricted to a given "owner". Taking the keeper for a Promise
# prevents anybody else from getting hold of it.
class Keeper { ... }
trusts Keeper;
class Keeper {
has $.promise;
method keep(\result) {
$!promise!Promise::keep(result)
}
method break(\exception) {
$!promise!Promise::break(exception)
}
}

method !schedule() {
$!scheduler.schedule_with_catch(
{
$!status = Running;
self!keep(&!code());
},
-> $ex { self!break($ex) })
method keeper() {
$!lock.lock();
if $!keeper_taken {
$!lock.unlock();
X::Promise::KeeperTaken.new.throw
}
my $k := nqp::create(Keeper);
nqp::bindattr($k, Keeper, '$!promise', self);
$!keeper_taken = 1;
$!lock.unlock();
$k
}

method keep(Promise:D: $result) {
X::Promise::Code.new(attempted => 'keep').throw if &!code;
self!keep($result)
self.keeper.keep($result)
}

method !keep($!result) {
Expand All @@ -264,8 +279,7 @@ my class Promise {
}

method break(Promise:D: $result) {
X::Promise::Code.new(attempted => 'break').throw if &!code;
self!break($result ~~ Exception ?? $result !! X::AdHoc.new(payload => $result))
self.keeper.break($result ~~ Exception ?? $result !! X::AdHoc.new(payload => $result))
}

method !break($!result) {
Expand All @@ -274,25 +288,19 @@ my class Promise {
self!schedule_thens();
}

method !schedule_then($fulfilled) {
my $orig_code = &!code;
&!code = { $orig_code($fulfilled) }
self!schedule();
}

method !schedule_thens() {
$!then_lock.lock();
$!lock.lock();
while @!thens {
@!thens.shift()!schedule_then(self)
$!scheduler.schedule_with_catch(@!thens.shift, @!thens.shift)
}
$!then_lock.unlock();
$!lock.unlock();
}

method result(Promise:D:) {
# One important missing optimization here is that if the promise is
# not yet started, then the work can be done immediately by the
# thing that is blocking on it.
if $!status == none(Broken, Kept) {
if $!status == Planned {
$!ready_semaphore.'method/acquire/()V'();
}
if $!status == Kept {
Expand All @@ -316,35 +324,39 @@ my class Promise {
}

method then(Promise:D: &code) {
$!then_lock.lock();
$!lock.lock();
if $!status == any(Broken, Kept) {
# Already have the result, schedule immediately.
$!then_lock.unlock();
Promise.new(:$!scheduler, :code({ code(self) }))
# Already have the result, run immediately.
$!lock.unlock();
Promise.run(:$!scheduler, :code({ code(self) }))
}
else {
# Create a (currently unscheduled) promise and add it to
# the list.
my $then_promise = Promise.new(:$!scheduler, :code(&code), :!scheduled);
@!thens.push($then_promise);
$!then_lock.unlock();
# Create a Promise, and push 2 entries to @!thens: something that
# runs the then code, and something that handles its exceptions.
# They will be sent to the scheduler when this promise is kept or
# broken.
my $then_promise = Promise.new(:$!scheduler);
my $k = $then_promise.keeper;
@!thens.push({ $k.keep(code(self)) });
@!thens.push(-> $ex { $k.break($ex) });
$!lock.unlock();
$then_promise
}
}

my Mu $timer;
method sleep(Promise:U: $seconds) {
once {
my Mu \Timer := nqp::jvmbootinterop().typeForName('java.util.Timer');
$timer := Timer.'constructor/new/(Z)V'(True);
Nil;
}
my $p = Promise.new;
$timer.'method/schedule/(Ljava/util/TimerTask;J)V'(
nqp::jvmbootinterop().proxy(
'java.util.TimerTask',
nqp::hash('run', -> { $p.keep(True) })),
($seconds * 1000).Int);
method run(Promise:U: &code, :$scheduler = $*SCHEDULER) {
my $p = Promise.new(:$scheduler);
my $k = $p.keeper;
$scheduler.schedule_with_catch(
{ $k.keep(code()) },
-> $ex { $k.break($ex) });
$p
}

method sleep(Promise:U: $seconds, :$scheduler = $*SCHEDULER) {
my $p = Promise.new(:$scheduler);
my $k = $p.keeper;
$scheduler.schedule_in({ $k.keep(True) }, $seconds);
$p
}

Expand All @@ -368,22 +380,31 @@ my class Promise {
}
my Mu $c := $AtomicInteger.'constructor/new/(I)V'(nqp::decont($n));
my $p = Promise.new;
for @promises {
.then({
my $k = $p.keeper;
for @promises -> $cand {
$cand.then({
if .status == Kept {
if $c.'decrementAndGet'() == 0 {
$p.keep(Nil)
$k.keep(True)
}
}
else {
$p.break(.cause)
if $c.'getAndAdd'(-($n + 1)) > 0 {
$k.break(.cause)
}
}
})
}
$p
}
}

# Schedules a piece of asynchronous work using the current scheduler, and
# returns a Promise that represents it.
sub async(&code) {
Promise.run(&code);
}

# A channel provides a thread-safe way to send a series of values from some
# producer(s) to some consumer(s).
my class X::Channel::SendOnCompleted is Exception {
Expand Down Expand Up @@ -616,12 +637,6 @@ my class IO::Async::File {
}
}

# Schedules a piece of asynchronous work using the current scheduler, and
# returns a promise that represents it.
sub async(&code) {
Promise.new(:scheduler($*SCHEDULER), :&code);
}

# Waits for a promise to be kept or a channel to be able to receive a value
# and, once it can, unwraps or returns the result. This should be made more
# efficient by using continuations to suspend any task running in the thread
Expand Down

0 comments on commit 0358bae

Please sign in to comment.