Skip to content

Commit

Permalink
Start to add guts of new hyper/race implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jnthn committed Oct 16, 2017
1 parent 2352efe commit d43b373
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/core/Rakudo/Internals/HyperIteratorBatcher.pm
@@ -0,0 +1,42 @@
# Batches values sourced from an iterator, producing a work batch from them.
my role Rakudo::Internals::HyperIteratorBatcher does Rakudo::Internals::HyperBatcher {
my constant NO_LOOKAHEAD = Mu.CREATE;
has Iterator $!iterator;
has $!lookahead;

submethod BUILD(Iterator :$iterator!) {
$!iterator := $iterator;
$!lookahead := NO_LOOKAHEAD;
}

method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) {
my IterationBuffer $items .= new;
my Bool $first;
my Bool $last;
if $!lookahead =:= NO_LOOKAHEAD {
$first = True;
if $!iterator.push-exactly($items, $batch-size) =:= IterationEnd {
$last = True;
}
else {
$!lookahead := $!iterator.pull-one;
$last = True if $!lookahead =:= IterationEnd;
}
}
else {
$first = False;
$items.push($!lookahead);
if $!iterator.push-exactly($items, $batch-size - 1) =:= IterationEnd {
$last = True;
}
else {
$!lookahead := $!iterator.pull-one;
$last = True if $!lookahead =:= IterationEnd;
}
}
my $sequence-number = self.next-sequence-number();
return Rakudo::Internals::HyperWorkBatch.new(:$sequence-number, :$items, :$first, :$last);
}
}

# vim: ft=perl6 expandtab sw=4
57 changes: 57 additions & 0 deletions src/core/Rakudo/Internals/HyperToIterator.pm
@@ -0,0 +1,57 @@
my class Rakudo::Internals::HyperToIterator does Rakudo::Internals::HyperJoiner does Iterator {
has Channel $.batches .= new;

has int $!last-target = -1;
has int $!next-to-send = 0;
has @!held-back;
method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) {
if $batch.last {
$!last-target = $batch.sequence-number;
}
self!handle-batch($batch);
if $!last-target >= 0 && $!next-to-send > $!last-target {
$!batches.close;
}
}
method !handle-batch($batch) {
my int $seq = $batch.sequence-number;
if $seq == $!next-to-send {
$!batches.send($batch);
$!next-to-send++;
if @!held-back {
@!held-back.=sort(*.sequence-number);
while @!held-back && @!held-back[0].sequence-number == $!next-to-send {
$!batches.send(@!held-back.shift);
$!next-to-send++;
}
}
}
else {
@!held-back.push($batch);
}
}

method consume-error(Exception $e --> Nil) {
note $e;
$!batches.fail($e);
}

my constant EMPTY_BUFFER = IterationBuffer.CREATE;
has IterationBuffer $!current-items = EMPTY_BUFFER;
method pull-one() {
until nqp::elems(nqp::decont($!current-items)) { # Handles empty batches
my $batch = $!batches.receive;
self.batch-used();
$!current-items = $batch.items;
CATCH {
when X::Channel::ReceiveOnClosed {
return IterationEnd;
}
# Throw other errors onwards
}
}
nqp::shift(nqp::decont($!current-items))
}
}

# vim: ft=perl6 expandtab sw=4
44 changes: 44 additions & 0 deletions src/core/Rakudo/Internals/HyperWorkBatch.pm
@@ -0,0 +1,44 @@
# A batch of work sent to a worker in a hyper or race operation. It is an
# Iterable, and iterates to the items in the batch. This is so that it can be
# easily processed in terms of (non-hyper) Iterable implementations.
my class Rakudo::Internals::HyperWorkBatch does Iterable {
# The items in the batch.
has IterationBuffer $.items;

# Sequence number of the batch, starting from zero.
has int $.sequence-number;

# Is this the first batch that was produced at the last fork point or the last batch that the
# fork point will produce?
has Bool $.first;
has Bool $.last;

# Iterator for a HyperWorkBatch;
my class HyperWorkBatchIterator does Iterator {
has $!items;
has int $!i;
has int $!n;

submethod BUILD(:$items --> Nil) {
$!items := nqp::decont($items);
$!i = 0;
$!n = nqp::elems($!items);
}

method pull-one() {
$!i < $!n
?? nqp::atpos($!items, $!i++)
!! IterationEnd
}
}

method iterator(--> Iterator) {
HyperWorkBatchIterator.new(:$!items)
}

method replace-with(IterationBuffer $ib --> Nil) {
$!items := $ib;
}
}

# vim: ft=perl6 expandtab sw=4
55 changes: 55 additions & 0 deletions src/core/Rakudo/Internals/HyperWorkStage.pm
@@ -0,0 +1,55 @@
# Work stages are individual steps in a hyper/race pipeline. They are chained
# in a linked list by the source attribute. Roles for different kinds of stages
# follow.
my role Rakudo::Internals::HyperWorkStage {
has Rakudo::Internals::HyperWorkStage $.source;
}

# A HyperBatcher stage produces batches of work to do. It will typically be
# created with an Iterable of some kind, and divide up the work into batches
# of the appropriate size. Such a stage always lives at the start of a piece
# of parallel processing pipeline.
my role Rakudo::Internals::HyperBatcher does Rakudo::Internals::HyperWorkStage {
has $!sequence = 0;

method next-sequence-number() {
$!sequence++
}

method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) { ... }
}

# A HyperProcessor performs some operation in a work batch, updating it to
# reflect the results of the operation.
my role Rakudo::Internals::HyperProcessor does Rakudo::Internals::HyperWorkStage {
method process-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... }
}

# A HyperRebatcher is given batches, and may produce zero or more batches as a
# result. The produced batches will be passed on to the next pipeline stages.
# This is intended only for steps that need to look across multiple batches,
# but that work in a "streaming" way rather than being a full bottleneck in
# the pipeline. A HyperRebatcher should produce one output batch for each
# input batch it gets (though may produce no batches on one call, and two on
# the next, for example).
my role Rakudo::Internals::HyperRebatcher does Rakudo::Internals::HyperWorkStage {
method rebatch(Rakudo::Internals::HyperWorkBatch $batch --> List) { ... }
}

# Comes at the end of a pipeline, or a stage in a multi-stage pipeline (that
# is, one with a step in it where all results are needed). The batch-used
# method should be called whenever a batch passed to consume-batch has been
# used. This allows for backpressure control: a sequential iterator at the
# end of a parallel pipeline can choose to call batch-used only at the point
# when the downstream iterator has actually eaten all the values in a batch.
my role Rakudo::Internals::HyperJoiner does Rakudo::Internals::HyperWorkStage {
has $!batch-used-channel;
method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... }
method consume-error(Exception \e) { ... }
method batch-used(--> Nil) {
$!batch-used-channel.send(True);
}
method SET-BATCH-USED-CHANNEL($!batch-used-channel) {}
}

# vim: ft=perl6 expandtab sw=4
40 changes: 40 additions & 0 deletions src/core/Rakudo/Internals/RaceToIterator.pm
@@ -0,0 +1,40 @@
my class Rakudo::Internals::RaceToIterator does Rakudo::Internals::HyperJoiner does Iterator {
has Channel $.batches .= new;

has int $!last-target = -1;
has int $!batches-seen = 0;
method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) {
$!batches.send($batch);
$!batches-seen++;
if $batch.last {
$!last-target = $batch.sequence-number;
}
if $!last-target >= 0 && $!batches-seen == $!last-target + 1 {
$!batches.close;
}
}

method consume-error(Exception $e --> Nil) {

This comment has been minimized.

Copy link
@MasterDuke17

MasterDuke17 Oct 16, 2017

Contributor

consume-error for Rakudo::Internals::HyperJoiner has a non-sigiled parameter e, is the difference deliberate?

note $e;
$!batches.fail($e);
}

my constant EMPTY_BUFFER = IterationBuffer.CREATE;
has IterationBuffer $!current-items = EMPTY_BUFFER;
method pull-one() {
until nqp::elems(nqp::decont($!current-items)) { # Handles empty batches
my $batch = $!batches.receive;
self.batch-used();
$!current-items = $batch.items;
CATCH {
when X::Channel::ReceiveOnClosed {
return IterationEnd;
}
# Throw other errors onwards
}
}
nqp::shift(nqp::decont($!current-items))
}
}

# vim: ft=perl6 expandtab sw=4
2 changes: 2 additions & 0 deletions src/core/stubs.pm
Expand Up @@ -8,9 +8,11 @@ my class X::AdHoc { ... }
my class FatRat { ... }
my class Pair { ... }
my class Promise { ... }
my class Channel { ... }
my class X::OutOfRange { ... }
my class X::Dynamic::NotFound { ... }
my class X::SecurityPolicy::Eval { ... }
my class X::Channel::ReceiveOnClosed { ... }

my role QuantHash { ... }
my role Setty { ... }
Expand Down
5 changes: 5 additions & 0 deletions tools/build/jvm_core_sources
Expand Up @@ -56,6 +56,11 @@ src/core/IterationBuffer.pm
src/core/HyperConfiguration.pm
src/core/Sequence.pm
src/core/Seq.pm
src/core/Rakudo/Internals/HyperWorkBatch.pm
src/core/Rakudo/Internals/HyperWorkStage.pm
src/core/Rakudo/Internals/HyperIteratorBatcher.pm
src/core/Rakudo/Internals/HyperToIterator.pm
src/core/Rakudo/Internals/RaceToIterator.pm
src/core/HyperSeq.pm
src/core/Nil.pm
src/core/Range.pm
Expand Down
5 changes: 5 additions & 0 deletions tools/build/moar_core_sources
Expand Up @@ -58,6 +58,11 @@ src/core/IterationBuffer.pm
src/core/HyperConfiguration.pm
src/core/Sequence.pm
src/core/Seq.pm
src/core/Rakudo/Internals/HyperWorkBatch.pm
src/core/Rakudo/Internals/HyperWorkStage.pm
src/core/Rakudo/Internals/HyperIteratorBatcher.pm
src/core/Rakudo/Internals/HyperToIterator.pm
src/core/Rakudo/Internals/RaceToIterator.pm
src/core/HyperSeq.pm
src/core/Nil.pm
src/core/Range.pm
Expand Down

0 comments on commit d43b373

Please sign in to comment.