Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Supply s/buffering/batch/
  • Loading branch information
lizmat committed Apr 21, 2014
1 parent 3e4d2ca commit 8f4fa15
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
4 changes: 2 additions & 2 deletions src/core/Supply.pm
Expand Up @@ -92,8 +92,8 @@ my role Supply {
method rotor( $elems?, $overlap? ) {
SupplyOperations.rotor(self, $elems, $overlap)
}
method buffering( :$elems, :$seconds ) {
SupplyOperations.buffering( self, :$elems, :$seconds)
method batch( :$elems, :$seconds ) {
SupplyOperations.batch( self, :$elems, :$seconds)
}
method merge(*@s) { SupplyOperations.merge(self, @s) }
method zip(*@s,:&with) { SupplyOperations.zip(self, @s, :&with) }
Expand Down
34 changes: 17 additions & 17 deletions src/core/SupplyOperations.pm
Expand Up @@ -257,18 +257,18 @@ my class SupplyOperations is repr('Uninstantiable') {
method tap(|c) {
my $tap = self.Supply::tap(|c);

my @buffered;
my @batched;
sub flush {
self!more([@buffered]);
@buffered.splice( 0, +@buffered - $!overlap );
self!more([@batched]);
@batched.splice( 0, +@batched - $!overlap );
}

$!source.tap( -> \val {
@buffered.push: val;
flush if @buffered.elems == $!elems;
@batched.push: val;
flush if @batched.elems == $!elems;
},
done => {
flush if @buffered;
flush if @batched;
self!done();
},
quit => -> $ex { self!quit($ex) });
Expand All @@ -278,11 +278,11 @@ my class SupplyOperations is repr('Uninstantiable') {
RotorSupply.new(:source($s), :$elems, :$overlap)
}

method buffering(Supply $s, :$elems, :$seconds ) {
method batch(Supply $s, :$elems, :$seconds ) {

return $s if (!$elems or $elems == 1) and !$seconds; # nothing to do

my class BufferingSupply does Supply does PrivatePublishing {
my class BatchSupply does Supply does PrivatePublishing {
has $!source;
has $.elems;
has $.seconds;
Expand All @@ -292,11 +292,11 @@ my class SupplyOperations is repr('Uninstantiable') {
method tap(|c) {
my $tap = self.Supply::tap(|c);

my @buffered;
my @batched;
my $last_time;
sub flush {
self!more([@buffered]);
@buffered = ();
self!more([@batched]);
@batched = ();
}

my &more = do {
Expand All @@ -305,8 +305,8 @@ my class SupplyOperations is repr('Uninstantiable') {

$!elems # and $!seconds
?? -> \val {
@buffered.push: val;
if @buffered.elems == $!elems {
@batched.push: val;
if @batched.elems == $!elems {
flush;
}
else {
Expand All @@ -327,23 +327,23 @@ my class SupplyOperations is repr('Uninstantiable') {
}
else { # just $!elems
-> \val {
@buffered.push: val;
if @buffered.elems == $!elems {
@batched.push: val;
if @batched.elems == $!elems {
flush;
}
}
}
}
$!source.tap( &more,
done => {
flush if @buffered;
flush if @batched;
self!done();
},
quit => -> $ex { self!quit($ex) });
$tap
}
}
BufferingSupply.new(:source($s), :$elems, :$seconds)
BatchSupply.new(:source($s), :$elems, :$seconds)
}

method merge(*@s) {
Expand Down

0 comments on commit 8f4fa15

Please sign in to comment.