Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Refactor batch to use whenever.
  • Loading branch information
jnthn committed Nov 25, 2015
1 parent 461ceab commit cf00a6d
Showing 1 changed file with 55 additions and 57 deletions.
112 changes: 55 additions & 57 deletions src/core/Supply.pm
Expand Up @@ -768,63 +768,61 @@ my class Supply {
# }
# }
# }
#
# method batch(Supply:D $self: :$elems, :$seconds ) {
#
# return $self if (!$elems or $elems == 1) and !$seconds; # nothing to do
#
# on -> $res {
# $self => do {
# my @batched;
# my $last_time;
# sub flush {
# $res.emit(([@batched],));
# @batched = ();
# }
#
# {
# emit => do {
# if $seconds {
# $last_time = time div $seconds;
#
# $elems # and $seconds
# ?? -> \val {
# my $this_time = time div $seconds;
# if $this_time != $last_time {
# flush if @batched;
# $last_time = $this_time;
# @batched.append: val;
# }
# else {
# @batched.append: val;
# flush if @batched.elems == $elems;
# }
# }
# !! -> \val {
# my $this_time = time div $seconds;
# if $this_time != $last_time {
# flush if @batched;
# $last_time = $this_time;
# }
# @batched.append: val;
# }
# }
# else { # just $elems
# -> \val {
# @batched.append: val;
# flush if @batched.elems == $elems;
# }
# }
# },
# done => {
# flush if @batched;
# $res.done;
# }
# }
# }
# }
# }
#

method batch(Supply:D $self: :$elems, :$seconds ) {
return self if (!$elems or $elems == 1) and !$seconds; # nothing to do
supply {
my @batched;
my $last_time;
sub flush {
emit([@batched]);
@batched = ();
}
sub final-flush {
flush if @batched;
done;
}

if $seconds {
$last_time = time div $seconds;

if $elems { # and $seconds
whenever self -> \val {
my $this_time = time div $seconds;
if $this_time != $last_time {
flush if @batched;
$last_time = $this_time;
@batched.append: val;
}
else {
@batched.append: val;
flush if @batched.elems == $elems;
}
LAST final-flush;
}
}
else {
whenever self -> \val {
my $this_time = time div $seconds;
if $this_time != $last_time {
flush if @batched;
$last_time = $this_time;
}
@batched.append: val;
LAST final-flush;
}
}
}
else { # just $elems
whenever self -> \val {
@batched.append: val;
flush if @batched.elems == $elems;
LAST final-flush;
}
}
}
}

# method lines(Supply:D $self: :$chomp = True ) {
#
# on -> $res {
Expand Down

0 comments on commit cf00a6d

Please sign in to comment.