Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Make sure Supply.rotor handles different threads
  • Loading branch information
lizmat committed Apr 23, 2014
1 parent 270088d commit 84fecfa
Showing 1 changed file with 14 additions and 24 deletions.
38 changes: 14 additions & 24 deletions src/core/SupplyOperations.pm
Expand Up @@ -286,36 +286,26 @@ my class SupplyOperations is repr('Uninstantiable') {
$overlap //= 1;
return $s if $elems == 1 and $overlap == 0; # nothing to do

my class RotorSupply does Supply does PrivatePublishing {
has $!source;
has $.elems;
has $.overlap;

submethod BUILD(:$!source, :$!elems, :$!overlap) { }

method tap(|c) {
my $source_tap;
my $tap = self.Supply::tap(|c, closing => { $source_tap.close() });

on -> $res {
$s => do {
my @batched;
sub flush {
self!more([@batched]);
@batched.splice( 0, +@batched - $!overlap );
$res.more( [@batched] );
@batched.splice( 0, +@batched - $overlap );
}

$source_tap = $!source.tap( -> \val {
@batched.push: val;
flush if @batched.elems == $!elems;
},
done => {
flush if @batched;
self!done();
},
quit => -> $ex { self!quit($ex) });
$tap
{
more => -> \val {
@batched.push: val;
flush if @batched.elems == $elems;
},
done => {
flush if @batched;
$res.done;
}
}
}
}
RotorSupply.new(:source($s), :$elems, :$overlap)
}

method batch(Supply $s, :$elems, :$seconds ) {
Expand Down

0 comments on commit 84fecfa

Please sign in to comment.