Skip to content

Commit

Permalink
Migrate methods from SupplyOperations to Supply
Browse files Browse the repository at this point in the history
Many of these are not creating inner classes that "does Supply", so they can
live inside the Supply role proper, thereby eliminating a layer of indirection.
  • Loading branch information
lizmat committed Apr 28, 2014
1 parent 1717d57 commit 12338e6
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 268 deletions.
268 changes: 255 additions & 13 deletions src/core/Supply.pm
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,8 @@ my role Supply {
method for(|c) { SupplyOperations.for(|c) }
method interval(|c) { SupplyOperations.interval(|c) }
method flat() { SupplyOperations.flat(self) }
method do(&side_effect) { SupplyOperations.do(self, &side_effect) }
method grep(&filter) { SupplyOperations.grep(self, &filter) }
method map(&mapper) { SupplyOperations.map(self, &mapper) }
method uniq( :&as, :&with, :$expires) {
SupplyOperations.uniq( self, :&as, :&with, :$expires)
}
method squish(:&as,:&with) { SupplyOperations.squish(self, :&as, :&with) }
method rotor( $elems?, $overlap? ) {
SupplyOperations.rotor(self, $elems, $overlap)
}
method batch( :$elems, :$seconds ) {
SupplyOperations.batch( self, :$elems, :$seconds)
}
method schedule_on(Scheduler $scheduler) {
SupplyOperations.schedule_on(self, $scheduler);
}
Expand All @@ -141,12 +130,265 @@ my role Supply {
SupplyOperations.delay(self, $time, :$scheduler)
}
method migrate() { SupplyOperations.migrate(self) }
method merge(*@s) { SupplyOperations.merge(self, @s) }
method zip(*@s,:&with) { SupplyOperations.zip(self, @s, :&with) }

method act(&actor) {
self.do(&actor).tap(|%_)
}

method do(Supply $self: &side_effect) {
on -> $res {
$self => -> \val { side_effect(val); $res.more(val) }
}
}

method uniq(Supply $self: :&as, :&with, :$expires) {
on -> $res {
$self => do {
if $expires {
if &with and &with !=== &[===] {
my @seen; # really Mu, but doesn't work in settings
my Mu $target;
&as
?? -> \val {
my $now := now;
$target = &as(val);
my $index =
@seen.first-index({&with($target,$_[0])});
if $index.defined {
if $now > @seen[$index][1] { # expired
@seen[$index][1] = $now+$expires;
$res.more(val);
}
}
else {
@seen.push: [$target, $now+$expires];
$res.more(val);
}
}
!! -> \val {
my $now := now;
my $index =
@seen.first-index({&with(val,$_[0])});
if $index.defined {
if $now > @seen[$index][1] { # expired
@seen[$index][1] = $now+$expires;
$res.more(val);
}
}
else {
@seen.push: [val, $now+$expires];
$res.more(val);
}
};
}
else {
my $seen := nqp::hash();
my str $target;
&as
?? -> \val {
my $now := now;
$target = nqp::unbox_s(&as(val).WHICH);
if !nqp::existskey($seen,$target) ||
$now > nqp::atkey($seen,$target) { #expired
$res.more(val);
nqp::bindkey($seen,$target,$now+$expires);
}
}
!! -> \val {
my $now := now;
$target = nqp::unbox_s(val.WHICH);
if !nqp::existskey($seen,$target) ||
$now > nqp::atkey($seen,$target) { #expired
$res.more(val);
nqp::bindkey($seen,$target,$now+$expires);
}
};
}
}
else { # !$!expires
if &with and &with !=== &[===] {
my @seen; # really Mu, but doesn't work in settings
my Mu $target;
&as
?? -> \val {
$target = &as(val);
if @seen.first({ &with($target,$_) } ) =:= Nil {
@seen.push($target);
$res.more(val);
}
}
!! -> \val {
if @seen.first({ &with(val,$_) } ) =:= Nil {
@seen.push(val);
$res.more(val);
}
};
}
else {
my $seen := nqp::hash();
my str $target;
&as
?? -> \val {
$target = nqp::unbox_s(&as(val).WHICH);
unless nqp::existskey($seen, $target) {
nqp::bindkey($seen, $target, 1);
$res.more(val);
}
}
!! -> \val {
$target = nqp::unbox_s(val.WHICH);
unless nqp::existskey($seen, $target) {
nqp::bindkey($seen, $target, 1);
$res.more(val);
}
};
}
}
}
}
}

method squish(Supply $self: :&as, :&with is copy) {
&with //= &[===];
on -> $res {
my @secret;
$self => do {
my Mu $last = @secret;
my Mu $target;
&as
?? -> \val {
$target = &as(val);
unless &with($target,$last) {
$last = $target;
$res.more(val);
}
}
!! -> \val {
unless &with(val,$last) {
$last = val;
$res.more(val);
}
};
}
}
}

method rotor(Supply $self: $elems? is copy, $overlap? is copy ) {

$elems //= 2;
$overlap //= 1;
return $self if $elems == 1 and $overlap == 0; # nothing to do

on -> $res {
$self => do {
my @batched;
sub flush {
$res.more( [@batched] );
@batched.splice( 0, +@batched - $overlap );
}

{
more => -> \val {
@batched.push: val;
flush if @batched.elems == $elems;
},
done => {
flush if @batched;
$res.done;
}
}
}
}
}

method batch(Supply $self: :$elems, :$seconds ) {

return $self if (!$elems or $elems == 1) and !$seconds; # nothing to do

on -> $res {
$self => do {
my @batched;
my $last_time;
sub flush {
$res.more([@batched]);
@batched = ();
}

{
more => do {
if $seconds {
$last_time = time div $seconds;

$elems # and $seconds
?? -> \val {
my $this_time = time div $seconds;
if $this_time != $last_time {
flush if @batched;
$last_time = $this_time;
@batched.push: val;
}
else {
@batched.push: val;
flush if @batched.elems == $elems;
}
}
!! -> \val {
my $this_time = time div $seconds;
if $this_time != $last_time {
flush if @batched;
$last_time = $this_time;
}
@batched.push: val;
}
}
else { # just $elems
-> \val {
@batched.push: val;
flush if @batched.elems == $elems;
}
}
},
done => {
flush if @batched;
$res.done;
}
}
}
}
}

method merge(*@s) {

@s.unshift(self) if self.DEFINITE; # add if instance method
return Supply unless +@s; # nothing to be done
return @s[0] if +@s == 1; # nothing to be done

my $dones = 0;
on -> $res {
@s => {
more => -> \val { $res.more(val) },
done => { $res.done() if ++$dones == +@s }
},
}
}

method zip(*@s, :&with is copy) {

@s.unshift(self) if self.DEFINITE; # add if instance method
return Supply unless +@s; # nothing to be done
return @s[0] if +@s == 1; # nothing to be done

my &infix:<op> = &with // &[,]; # hack for [[&with]] parse failure
my @values = ( [] xx +@s );
on -> $res {
@s => -> $val, $index {
@values[$index].push($val);
if all(@values) {
$res.more( [op] @values>>.shift );
}
}
}
}
}

# The on meta-combinator provides a mechanism for implementing thread-safe
Expand Down
Loading

0 comments on commit 12338e6

Please sign in to comment.