Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implement Supply.squish
Although this does not seem to work without :with parameter: somehow the
default &[===] does *not* get set.  Giving that up for now.
  • Loading branch information
lizmat committed Apr 18, 2014
1 parent 647a777 commit 22fbedc
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 8 deletions.
17 changes: 9 additions & 8 deletions src/core/Supply.pm
Expand Up @@ -81,14 +81,15 @@ my role Supply {
}, *;
}

method for(|c) { SupplyOperations.for(|c) }
method interval(|c) { SupplyOperations.interval(|c) }
method do(&side_effect) { SupplyOperations.do(self, &side_effect) }
method grep(&filter) { SupplyOperations.grep(self, &filter) }
method map(&mapper) { SupplyOperations.map(self, &mapper) }
method uniq(:&as,:&with) { SupplyOperations.uniq(self, :&as, :&with) }
method merge($s) { SupplyOperations.merge(self, $s) }
method zip($s, *@with) { SupplyOperations.zip(self, $s, |@with) }
method for(|c) { SupplyOperations.for(|c) }
method interval(|c) { SupplyOperations.interval(|c) }
method do(&side_effect) { SupplyOperations.do(self, &side_effect) }
method grep(&filter) { SupplyOperations.grep(self, &filter) }
method map(&mapper) { SupplyOperations.map(self, &mapper) }
method uniq(:&as,:&with) { SupplyOperations.uniq(self, :&as, :&with) }
method squish(:&as,:&with) { SupplyOperations.squish(self, :&as, :&with) }
method merge($s) { SupplyOperations.merge(self, $s) }
method zip($s, *@with) { SupplyOperations.zip(self, $s, |@with) }
}

# The on meta-combinator provides a mechanism for implementing thread-safe
Expand Down
40 changes: 40 additions & 0 deletions src/core/SupplyOperations.pm
Expand Up @@ -3,6 +3,8 @@
# be declared outside of Supply.

my class SupplyOperations is repr('Uninstantiable') {
my @secret;

# Private versions of the methods to relay events to subscribers, used in
# implementing various operations.
my role PrivatePublishing {
Expand Down Expand Up @@ -160,6 +162,44 @@ my class SupplyOperations is repr('Uninstantiable') {
}
UniqSupply.new(:source($a), :&as, :&with);
}

method squish(Supply $a, :&as, :&with = &[===]) {
my class SquishSupply does Supply does PrivatePublishing {
has $!source;
has &!as;
has &!with;

submethod BUILD(:$!source, :&!as, :&!with) { }

method tap(|c) {
my $sub = self.Supply::tap(|c);
my &more = do {
my Mu $last = @secret;
my Mu $target;
&as
?? -> \val {
$target = &!as(val);
unless &!with($target,$last) {
$last = $target;
self!more(val);
}
}
!! -> \val {
unless &!with(val,$last) {
$last = val;
self!more(val);
}
};
};
$!source.tap( &more,
done => { self!done(); },
quit => -> $ex { self!quit($ex) }
);
$sub
}
}
SquishSupply.new(:source($a), :&as, :&with);
}

method map(Supply $a, &mapper) {
my class MapSupply does Supply does PrivatePublishing {
Expand Down

0 comments on commit 22fbedc

Please sign in to comment.