Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Make sure Supply.batch handles different threads
  • Loading branch information
lizmat committed Apr 23, 2014
1 parent 84fecfa commit 3018590
Showing 1 changed file with 34 additions and 47 deletions.
81 changes: 34 additions & 47 deletions src/core/SupplyOperations.pm
Expand Up @@ -312,69 +312,56 @@ my class SupplyOperations is repr('Uninstantiable') {

return $s if (!$elems or $elems == 1) and !$seconds; # nothing to do

my class BatchSupply does Supply does PrivatePublishing {
has $!source;
has $.elems;
has $.seconds;

submethod BUILD(:$!source, :$!elems, :$!seconds) { }

method tap(|c) {
my $source_tap;
my $tap = self.Supply::tap(|c, closing => { $source_tap.close() });

on -> $res {
$s => do {
my @batched;
my $last_time;
sub flush {
self!more([@batched]);
$res.more([@batched]);
@batched = ();
}

my &more = do {
if $!seconds {
$last_time = time div $!seconds;
{
more => do {
if $seconds {
$last_time = time div $seconds;

$!elems # and $!seconds
?? -> \val {
my $this_time = time div $!seconds;
if $this_time != $last_time {
flush if @batched;
$last_time = $this_time;
@batched.push: val;
$elems # and $seconds
?? -> \val {
my $this_time = time div $seconds;
if $this_time != $last_time {
flush if @batched;
$last_time = $this_time;
@batched.push: val;
}
else {
@batched.push: val;
flush if @batched.elems == $elems;
}
}
else {
!! -> \val {
my $this_time = time div $seconds;
if $this_time != $last_time {
flush if @batched;
$last_time = $this_time;
}
@batched.push: val;
flush if @batched.elems == $!elems;
}
}
!! -> \val {
my $this_time = time div $!seconds;
if $this_time != $last_time {
flush if @batched;
$last_time = $this_time;
}
@batched.push: val;
}
}
else { # just $!elems
-> \val {
@batched.push: val;
if @batched.elems == $!elems {
flush;
}
else { # just $elems
-> \val {
@batched.push: val;
flush if @batched.elems == $elems;
}
}
},
done => {
flush if @batched;
$res.done;
}
}
$source_tap = $!source.tap( &more,
done => {
flush if @batched;
self!done();
},
quit => -> $ex { self!quit($ex) });
$tap
}
}
BatchSupply.new(:source($s), :$elems, :$seconds)
}

method schedule_on(Supply $s, Scheduler $scheduler) {
Expand Down

0 comments on commit 3018590

Please sign in to comment.