Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Most of the rest of the on -> whenever switch.
  • Loading branch information
jnthn committed Nov 25, 2015
1 parent fa096b0 commit bae2138
Showing 1 changed file with 153 additions and 169 deletions.
322 changes: 153 additions & 169 deletions src/core/Supply.pm
Expand Up @@ -805,7 +805,7 @@ my class Supply {
@batched.append: val;
flush if @batched.elems == $elems;
}
LAST final-flush;
LAST { final-flush; }
}
}
else {
Expand All @@ -816,15 +816,15 @@ my class Supply {
$last_time = $this_time;
}
@batched.append: val;
LAST final-flush;
LAST { final-flush; }
}
}
}
else { # just $elems
whenever self -> \val {
@batched.append: val;
flush if @batched.elems == $elems;
LAST final-flush;
LAST { final-flush; }
}
}
}
Expand Down Expand Up @@ -957,172 +957,156 @@ my class Supply {
}
}

# method last(Supply:D $self: Int $number = 1) { # should be Natural
# on -> $res {
# $self => do {
# my @seen;
# {
# emit => $number == 1
# ?? -> \val { @seen[0] = val }
# !! -> \val {
# @seen.shift if +@seen == $number;
# @seen.append: val;
# },
# done => {
# $res.emit($_) for @seen;
# $res.done;
# }
# }
# }
# }
# }
#
# method min(Supply:D $self: &by = &infix:<cmp>) {
# my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
# on -> $res {
# $self => do {
# my $min;
# {
# emit => -> \val {
# if val.defined and !$min.defined || cmp(val,$min) < 0 {
# $res.emit( $min = val );
# }
# },
# done => { $res.done }
# }
# }
# }
# }
#
# method max(Supply:D $self: &by = &infix:<cmp>) {
# my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
# on -> $res {
# $self => do {
# my $max;
# {
# emit => -> \val {
# if val.defined and !$max.defined || cmp(val,$max) > 0 {
# $res.emit( $max = val );
# }
# },
# done => { $res.done }
# }
# }
# }
# }
#
# method minmax(Supply:D $self: &by = &infix:<cmp>) {
# my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
# on -> $res {
# $self => do {
# my $min;
# my $max;
# {
# emit => -> \val {
# if val.defined {
# if !$min.defined {
# $res.emit( (Range.new($min = val, $max = val),) );
# }
# elsif cmp(val,$min) < 0 {
# $res.emit( (Range.new( $min = val, $max ),) );
# }
# elsif cmp(val,$max) > 0 {
# $res.emit( (Range.new( $min, $max = val ),) );
# }
# }
# },
# done => { $res.done }
# }
# }
# }
# }
#
# method grab(Supply:D $self: &when_done) {
# on -> $res {
# $self => do {
# my @seen;
# {
# emit => -> \val { @seen.append: val },
# done => {
# $res.emit($_) for when_done(@seen);
# $res.done;
# }
# }
# }
# }
# }
#
# method reverse(Supply:D:) { self.grab( {.reverse} ) }
# method sort(Supply:D: &by = &infix:<cmp>) { self.grab( {.sort(&by)} ) }
#
# method zip(**@s, :&with) {
# @s.unshift(self) if self.DEFINITE; # add if instance method
# return Supply unless +@s; # nothing to be done
#
# X::Supply::Combinator.new(
# combinator => 'zip'
# ).throw if Rakudo::Internals.NOT_ALL_DEFINED_TYPE(@s,Supply);
#
# return @s[0] if +@s == 1; # nothing to be done
#
# my @values = [] xx +@s;
# on -> $res {
# @s => &with
# ?? -> $val, $index {
# @values[$index].push($val);
# $res.emit( [[&with]] @values.map(*.shift) ) if all(@values);
# }
# !! -> $val, $index {
# @values[$index].push($val);
# $res.emit( $(@values.map(*.shift).list) ) if all(@values);
# }
# }
# }
#
# method zip-latest(**@s, :&with, :$initial ) {
# @s.unshift(self) if self.DEFINITE; # add if instance method
# return Supply unless +@s; # nothing to do.
#
# X::Supply::Combinator.new(
# combinator => 'zip-latest'
# ).throw if Rakudo::Internals.NOT_ALL_DEFINED_TYPE(@s,Supply);
#
# return @s[0] if +@s == 1; # nothing to do.
#
# my @values;
#
# my $uninitialised = +@s; # how many supplies have yet to emit until we
# # can start emitting, too?
#
# if $initial {
# @values = @$initial;
# $uninitialised = 0 max $uninitialised - @$initial;
# }
#
# my $dones = 0;
#
# on -> $res {
# @s => do {
# {
# emit => &with
# ?? -> $val, $index {
# --$uninitialised
# if $uninitialised > 0 && not @values.EXISTS-POS($index);
# @values[$index] = $val;
# $res.emit( [[&with]] @values ) unless $uninitialised;
# }
# !! -> $val, $index {
# --$uninitialised
# if $uninitialised > 0 && not @values.EXISTS-POS($index);
# @values[$index] = $val;
# $res.emit( @values.List.item ) unless $uninitialised;
# },
# done => { $res.done() if ++$dones == +@s }
# }
# }
# }
# }
#
method last(Supply:D $self: Int $number = 1) { # should be Natural
supply {
my @seen;
if $number == 1 {
whenever self -> \val {
@seen[0] := val;
LAST { emit($_) for @seen; }
}
}
else {
whenever self -> \val {
@seen.shift if +@seen == $number;
@seen.push: val;
LAST { emit($_) for @seen; }
}
}
}
}

method min(Supply:D $self: &by = &infix:<cmp>) {
my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
supply {
my $min;
whenever self -> \val {
if val.defined and !$min.defined || cmp(val,$min) < 0 {
emit( $min := val );
}
}
}
}

method max(Supply:D $self: &by = &infix:<cmp>) {
my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
supply {
my $max;
whenever self -> \val {
if val.defined and !$max.defined || cmp(val,$max) > 0 {
emit( $max = val );
}
}
}
}

method minmax(Supply:D $self: &by = &infix:<cmp>) {
my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
supply {
my $min;
my $max;
whenever self -> \val {
if val.defined {
if !$min.defined {
emit( Range.new($min = val, $max = val) );
}
elsif cmp(val,$min) < 0 {
emit( Range.new( $min = val, $max ) );
}
elsif cmp(val,$max) > 0 {
emit( Range.new( $min, $max = val ) );
}
}
}
}
}

method grab(Supply:D $self: &when_done) {
supply {
my @seen;
whenever self -> \val {
@seen.push: val;
LAST {
emit($_) for when_done(@seen);
}
}
}
}

method reverse(Supply:D:) { self.grab( {.reverse} ) }
method sort(Supply:D: &by = &infix:<cmp>) { self.grab( {.sort(&by)} ) }

method zip(**@s, :&with) {
@s.unshift(self) if self.DEFINITE; # add if instance method
return supply { } unless +@s; # nothing to be done

X::Supply::Combinator.new(
combinator => 'zip'
).throw if Rakudo::Internals.NOT_ALL_DEFINED_TYPE(@s,Supply);

return @s[0] if +@s == 1; # nothing to be done

supply {
my @values = [] xx +@s;
for @s.kv -> $index, $supply {
if &with {
whenever self -> \val {
@values[$index].push(val);
emit( [[&with]] @values.map(*.shift) ) if all(@values);
}
}
else {
whenever self -> \val {
@values[$index].push(val);
emit( $(@values.map(*.shift).list) ) if all(@values);
}
}
}
}
}

method zip-latest(**@s, :&with, :$initial ) {
@s.unshift(self) if self.DEFINITE; # add if instance method
return supply { } unless +@s; # nothing to do.

X::Supply::Combinator.new(
combinator => 'zip-latest'
).throw if Rakudo::Internals.NOT_ALL_DEFINED_TYPE(@s,Supply);

return @s[0] if +@s == 1; # nothing to do.

supply {
my @values;

my $uninitialised = +@s; # how many supplies have yet to emit until we
# can start emitting, too?

if $initial {
@values = @$initial;
$uninitialised = 0 max $uninitialised - @$initial;
}

for @s.kv -> $index, $supply {
if &with {
whenever self -> \val {
--$uninitialised
if $uninitialised > 0 && not @values.EXISTS-POS($index);
@values[$index] = val;
emit( [[&with]] @values ) unless $uninitialised;
}
}
else {
whenever self -> \val {
--$uninitialised
if $uninitialised > 0 && not @values.EXISTS-POS($index);
@values[$index] = val;
emit( @values.List.item ) unless $uninitialised;
}
}
}
}
}

# proto method throttle(|) { * }
# multi method throttle(Supply:D $self:
# Int() $elems,
Expand Down

0 comments on commit bae2138

Please sign in to comment.