Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Make sure Supply.squish handles different threads
  • Loading branch information
lizmat committed Apr 23, 2014
1 parent cf82ef2 commit 9047e09
Showing 1 changed file with 17 additions and 33 deletions.
50 changes: 17 additions & 33 deletions src/core/SupplyOperations.pm
Expand Up @@ -237,42 +237,26 @@ my class SupplyOperations is repr('Uninstantiable') {

method squish(Supply $a, :&as, :&with is copy) {
&with //= &[===];
my class SquishSupply does Supply does PrivatePublishing {
has $!source;
has &!as;
has &!with;

submethod BUILD(:$!source, :&!as, :&!with) { }

method tap(|c) {
my $source_tap;
my $sub = self.Supply::tap(|c, closing => { $source_tap.close() });
my &more = do {
my Mu $last = @secret;
my Mu $target;
&as
?? -> \val {
$target = &!as(val);
unless &!with($target,$last) {
$last = $target;
self!more(val);
}
on -> $res {
$a => do {
my Mu $last = @secret;
my Mu $target;
&as
?? -> \val {
$target = &as(val);
unless &with($target,$last) {
$last = $target;
$res.more(val);
}
!! -> \val {
unless &!with(val,$last) {
$last = val;
self!more(val);
}
};
};
$source_tap = $!source.tap( &more,
done => { self!done(); },
quit => -> $ex { self!quit($ex) }
);
$sub
}
!! -> \val {
unless &with(val,$last) {
$last = val;
$res.more(val);
}
};
}
}
SquishSupply.new(:source($a), :&as, :&with);
}

method map(Supply $a, &mapper) {
Expand Down

0 comments on commit 9047e09

Please sign in to comment.