Skip to content

Commit

Permalink
Implement ThreadPoolScheduler non-blocking await.
Browse files Browse the repository at this point in the history
Now, if we do an `await` in code being run in the thread pool, and the
awaited value(s) are not yet available, we take a continuation. This
frees up the thread to process more work from the thread pool. When
all of the awaited results are available, or the production of one of
them produces an exception, then the resumption of the continuation
will be scheduled in the thread pool.

This behavior will only happen if you say `use v6.d.PREVIEW`.
  • Loading branch information
jnthn committed Jan 25, 2017
1 parent 0bb2dbc commit 94bfd71
Showing 1 changed file with 124 additions and 2 deletions.
126 changes: 124 additions & 2 deletions src/core/ThreadPoolScheduler.pm
Expand Up @@ -3,6 +3,127 @@
# using them. # using them.


my class ThreadPoolScheduler does Scheduler { my class ThreadPoolScheduler does Scheduler {
constant THREAD_POOL_PROMPT = Mu.new;

class ThreadPoolAwaiter does Awaiter {
has $!queue;

submethod BUILD(:$queue!) {
$!queue := nqp::decont($queue);
}

method await(Awaitable:D $a) {
my $handle := $a.get-await-handle;
if $handle.already {
$handle.success
?? $handle.result
!! $handle.cause.rethrow
}
else {
my $success;
my $result;
nqp::continuationcontrol(0, THREAD_POOL_PROMPT, -> Mu \c {
$handle.subscribe-awaiter(-> \success, \result {
$success := success;
$result := result;
nqp::push($!queue, { nqp::continuationinvoke(c, nqp::null()) });
Nil
});
});
$success
?? $result
!! $result.rethrow
}
}

method await-all(Iterable:D \i) {
# Collect results that are already available, and handles where the
# results are not yet available together with the matching insertion
# indices.
my \results = nqp::list();
my \handles = nqp::list();
my \indices = nqp::list_i();
my int $insert = 0;
for i -> $awaitable {
unless nqp::istype($awaitable, Awaitable) {
die "Can only specify Awaitable objects to await (got a $awaitable.^name())";
}
unless nqp::isconcrete($awaitable) {
die "Must specify a defined Awaitable to await (got an undefined $awaitable.^name())";
}

my $handle := $awaitable.get-await-handle;
if $handle.already {
$handle.success
?? nqp::bindpos(results, $insert, $handle.result)
!! $handle.cause.rethrow
}
else {
nqp::push(handles, $handle);
nqp::push_i(indices, $insert);
}

$insert++;
}

# See if we have anything that we really need to suspend for. If
# so, we need to take great care that the continuation taking is
# complete before we try to resume it (completions can happen on
# different threads, and so concurrent with us subscribing, not
# to mention concurrent with each other wanting to resume). We
# use a lock to take care of this, holding the lock until the
# continuation has been taken.
my int $num-handles = nqp::elems(handles);
if $num-handles {
my $continuation;
my $exception;
my $l = Lock.new;
$l.lock;
{
my int $remaining = $num-handles;
loop (my int $i = 0; $i < $num-handles; $i++) {
my $handle := nqp::atpos(handles, $i);
my int $insert = nqp::atpos_i(indices, $i);
$handle.subscribe-awaiter(-> \success, \result {
my int $resume;
$l.protect: {
if success {
nqp::bindpos(results, $insert, result);
--$remaining;
$resume = 1 unless $remaining;
}
elsif !nqp::isconcrete($exception) {
$exception := result;
$remaining = 0;
$resume = 1;
}
}
if $resume {
nqp::push($!queue, {
nqp::continuationinvoke($continuation, nqp::null())
});
}
});
}
CATCH {
# Unlock if we fail here, and let the exception
# propagate outwards.
$l.unlock();
}
}
nqp::continuationcontrol(0, THREAD_POOL_PROMPT, -> Mu \c {
$continuation := c;
$l.unlock;
});

# If we got an exception, throw it.
$exception.rethrow if nqp::isconcrete($exception);
}

nqp::p6bindattrinvres(nqp::create(List), List, '$!reified', results);
}
}

# A concurrent work queue that blocks any worker threads that poll it # A concurrent work queue that blocks any worker threads that poll it
# when empty until some work arrives. # when empty until some work arrives.
my class Queue is repr('ConcBlockingQueue') { } my class Queue is repr('ConcBlockingQueue') { }
Expand Down Expand Up @@ -38,10 +159,11 @@ my class ThreadPoolScheduler does Scheduler {
$!started_any = 1; $!started_any = 1;
$!counts_lock.protect: { $!threads_started = $!threads_started + 1 }; $!counts_lock.protect: { $!threads_started = $!threads_started + 1 };
Thread.start(:app_lifetime, { Thread.start(:app_lifetime, {
my $*AWAITER := ThreadPoolAwaiter.new(:$!queue);
loop { loop {
my Mu $task := nqp::shift($!queue); my Mu $task := nqp::shift($!queue);
$!counts_lock.protect: { $!loads = $!loads + 1 }; $!counts_lock.protect: { $!loads = $!loads + 1 };
try { nqp::continuationreset(THREAD_POOL_PROMPT, {
if nqp::islist($task) { if nqp::islist($task) {
my Mu $code := nqp::shift($task); my Mu $code := nqp::shift($task);
my \args = nqp::p6bindattrinvres(nqp::create(List), List, '$!reified', $task); my \args = nqp::p6bindattrinvres(nqp::create(List), List, '$!reified', $task);
Expand All @@ -61,7 +183,7 @@ my class ThreadPoolScheduler does Scheduler {
self.handle_uncaught($_) self.handle_uncaught($_)
} }
} }
} });
$!counts_lock.protect: { $!loads = $!loads - 1 }; $!counts_lock.protect: { $!loads = $!loads - 1 };
} }
}); });
Expand Down

0 comments on commit 94bfd71

Please sign in to comment.