Skip to content

Commit

Permalink
Add new hyper/race pipeline builder
Browse files Browse the repository at this point in the history
This is the part that kicks off parallel workers. It does so using
non-blocking constructs, so as not to exhaust the thread pool and to
play nice with other tasks ongoing in the pool. This also means we'll
have nice behavior if a parallel worker does a `react` or `await` (so,
non-blocking under 6.d.PREVIEW). Support for rebatchers still to come.
  • Loading branch information
jnthn committed Oct 16, 2017
1 parent d43b373 commit dfa230f
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 0 deletions.
117 changes: 117 additions & 0 deletions src/core/Rakudo/Internals/HyperPipeline.pm
@@ -0,0 +1,117 @@
# Takes a linked list of pipeline stages and assembles them into a pipeline.
# Given a pipeline must end with a HyperJoiner, it expects to be passed
# something of this type.
my class Rakudo::Internals::HyperPipeline {
method start(Rakudo::Internals::HyperJoiner $stage, HyperConfiguration $config) {
# Create channel that the last non-join operation in the pipeline will
# put its results into, and start a worker to handle the channel.
my $cur-dest-channel = Channel.new;
self!join-worker($stage, $cur-dest-channel);

# Create a channel that will signal we're ready for more batches,
# and set join stage to send on it when batch-used is called.
my $ready-channel = Channel.new;
$stage.SET-BATCH-USED-CHANNEL($ready-channel);

# Go through the rest of the stages.
my $cur-stage = $stage.source;
my @processors;
while $cur-stage {
my $next-stage = $cur-stage.source;
given $cur-stage {
when Rakudo::Internals::HyperProcessor {
# Unshift them so a sequence will be in application order.
unshift @processors, $_;
}
when Rakudo::Internals::HyperBatcher {
if $next-stage {
die "A HyperBatcher may only be at the pipeline start";
}
$cur-dest-channel = self!maybe-processor-workers:
[@processors], $cur-dest-channel, $config.degree;
@processors = ();
self!batch-worker($cur-stage, $cur-dest-channel, $ready-channel,
$config.batch);
}
default {
die "Unrecognized hyper pipeline stage " ~ .^name();
}
}
$cur-stage = $next-stage;
}

# Set off $degree batches.
$ready-channel.send(True) for ^$config.degree;
}

method !batch-worker(Rakudo::Internals::HyperBatcher $stage, Channel $dest-channel,
Channel $ready-channel, int $size) {
start {
loop {
$*AWAITER.await($ready-channel);
my $batch := $stage.produce-batch($size);
$dest-channel.send($batch);
last if $batch.last;
CATCH {
default {
.note;
$dest-channel.fail($_);
}
}
}
}
}

method !maybe-processor-workers(@processors, Channel $dest-channel, Int $degree) {
return $dest-channel unless @processors;
my $source-channel := Channel.new;
for ^$degree {
start {
loop {
my $batch := $*AWAITER.await($source-channel);
for @processors {
.process-batch($batch);
}
$dest-channel.send($batch);
}
CATCH {
when X::Channel::ReceiveOnClosed {
$dest-channel.close;
}
default {
.note;
$dest-channel.fail($_);
}
}
}
}
return $source-channel;
}

method !join-worker(Rakudo::Internals::HyperJoiner $stage, Channel $source) {
start {
loop {
$stage.consume-batch($*AWAITER.await($source));
}
CATCH {
when X::Channel::ReceiveOnClosed {
# We got everything; quietly exit the start block.
}
default {
$stage.consume-error($_);
CATCH {
default {
# Error handling code blew up; let the scheduler's
# error handler do it, which will typically bring
# the program down. Should never get here unless
# we've some bug in a joiner implementation.
$*SCHEDULER.handle_uncaught($_);
}
}
}
}
}
}
}

# vim: ft=perl6 expandtab sw=4
1 change: 1 addition & 0 deletions tools/build/jvm_core_sources
Expand Up @@ -58,6 +58,7 @@ src/core/Sequence.pm
src/core/Seq.pm
src/core/Rakudo/Internals/HyperWorkBatch.pm
src/core/Rakudo/Internals/HyperWorkStage.pm
src/core/Rakudo/Internals/HyperPipeline.pm
src/core/Rakudo/Internals/HyperIteratorBatcher.pm
src/core/Rakudo/Internals/HyperToIterator.pm
src/core/Rakudo/Internals/RaceToIterator.pm
Expand Down
1 change: 1 addition & 0 deletions tools/build/moar_core_sources
Expand Up @@ -60,6 +60,7 @@ src/core/Sequence.pm
src/core/Seq.pm
src/core/Rakudo/Internals/HyperWorkBatch.pm
src/core/Rakudo/Internals/HyperWorkStage.pm
src/core/Rakudo/Internals/HyperPipeline.pm
src/core/Rakudo/Internals/HyperIteratorBatcher.pm
src/core/Rakudo/Internals/HyperToIterator.pm
src/core/Rakudo/Internals/RaceToIterator.pm
Expand Down

0 comments on commit dfa230f

Please sign in to comment.