Skip to content

Commit

Permalink
Make sure Supply.uniq handles different threads
Browse files Browse the repository at this point in the history
  • Loading branch information
lizmat committed Apr 23, 2014
1 parent 34cc2d2 commit cf82ef2
Showing 1 changed file with 94 additions and 111 deletions.
205 changes: 94 additions & 111 deletions src/core/SupplyOperations.pm
Original file line number Diff line number Diff line change
Expand Up @@ -129,127 +129,110 @@ my class SupplyOperations is repr('Uninstantiable') {
}

method uniq(Supply $a, :&as, :&with, :$expires) {
my class UniqSupply does Supply does PrivatePublishing {
has $!source;
has &!as;
has &!with;
has $!expires;

submethod BUILD(:$!source, :&!as, :&!with, :$!expires) { }

method tap(|c) {
my $source_tap;
my $sub = self.Supply::tap(|c, closing => { $source_tap.close() });
my &more = do {
if $!expires {
if &!with and &!with !=== &[===] {
my @seen; # really Mu, but doesn't work in settings
my Mu $target;
&as
?? -> \val {
my $now := now;
$target = &!as(val);
my $index =
@seen.first-index({&!with($target,$_[0])});
if $index.defined {
if $now > @seen[$index][1] { # expired
@seen[$index][1] = $now+$expires;
self!more(val);
}
}
else {
@seen.push: ($target, $now+$expires);
self!more(val);
on -> $res {
$a => do {
if $expires {
if &with and &with !=== &[===] {
my @seen; # really Mu, but doesn't work in settings
my Mu $target;
&as
?? -> \val {
my $now := now;
$target = &as(val);
my $index =
@seen.first-index({&with($target,$_[0])});
if $index.defined {
if $now > @seen[$index][1] { # expired
@seen[$index][1] = $now+$expires;
$res.more(val);
}
}
!! -> \val {
my $now := now;
my $index =
@seen.first-index({&!with(val,$_[0])});
if $index.defined {
if $now > @seen[$index][1] { # expired
@seen[$index][1] = $now+$expires;
self!more(val);
}
}
else {
@seen.push: ($target, $now+$expires);
self!more(val);
}
};
}
else {
my $seen := nqp::hash();
my str $target;
&as
?? -> \val {
my $now := now;
$target = nqp::unbox_s(&!as(val).WHICH);
if !nqp::existskey($seen,$target) ||
$now > nqp::atkey($seen,$target) { #expired
self!more(val);
nqp::bindkey($seen,$target,$now+$expires);
}
else {
@seen.push: ($target, $now+$expires);
$res.more(val);
}
!! -> \val {
my $now := now;
$target = nqp::unbox_s(val.WHICH);
if !nqp::existskey($seen,$target) ||
$now > nqp::atkey($seen,$target) { #expired
self!more(val);
nqp::bindkey($seen,$target,$now+$expires);
}
!! -> \val {
my $now := now;
my $index =
@seen.first-index({&with(val,$_[0])});
if $index.defined {
if $now > @seen[$index][1] { # expired
@seen[$index][1] = $now+$expires;
$res.more(val);
}
};
}
}
else {
@seen.push: ($target, $now+$expires);
$res.more(val);
}
};
}
else { # !$!expires
if &!with and &!with !=== &[===] {
my @seen; # really Mu, but doesn't work in settings
my Mu $target;
&as
?? -> \val {
$target = &!as(val);
if @seen.first({ &!with($target,$_) } ) =:= Nil {
@seen.push($target);
self!more(val);
}
else {
my $seen := nqp::hash();
my str $target;
&as
?? -> \val {
my $now := now;
$target = nqp::unbox_s(&as(val).WHICH);
if !nqp::existskey($seen,$target) ||
$now > nqp::atkey($seen,$target) { #expired
$res.more(val);
nqp::bindkey($seen,$target,$now+$expires);
}
!! -> \val {
if @seen.first({ &!with(val,$_) } ) =:= Nil {
@seen.push(val);
self!more(val);
}
};
}
else {
my $seen := nqp::hash();
my str $target;
&as
?? -> \val {
$target = nqp::unbox_s(&!as(val).WHICH);
unless nqp::existskey($seen, $target) {
nqp::bindkey($seen, $target, 1);
self!more(val);
}
}
!! -> \val {
my $now := now;
$target = nqp::unbox_s(val.WHICH);
if !nqp::existskey($seen,$target) ||
$now > nqp::atkey($seen,$target) { #expired
$res.more(val);
nqp::bindkey($seen,$target,$now+$expires);
}
!! -> \val {
$target = nqp::unbox_s(val.WHICH);
unless nqp::existskey($seen, $target) {
nqp::bindkey($seen, $target, 1);
self!more(val);
}
};
}
};
}
};
$source_tap = $!source.tap( &more,
done => { self!done(); },
quit => -> $ex { self!quit($ex) }
);
$sub
}
else { # !$!expires
if &with and &with !=== &[===] {
my @seen; # really Mu, but doesn't work in settings
my Mu $target;
&as
?? -> \val {
$target = &as(val);
if @seen.first({ &with($target,$_) } ) =:= Nil {
@seen.push($target);
$res.more(val);
}
}
!! -> \val {
if @seen.first({ &with(val,$_) } ) =:= Nil {
@seen.push(val);
$res.more(val);
}
};
}
else {
my $seen := nqp::hash();
my str $target;
&as
?? -> \val {
$target = nqp::unbox_s(&as(val).WHICH);
unless nqp::existskey($seen, $target) {
nqp::bindkey($seen, $target, 1);
$res.more(val);
}
}
!! -> \val {
$target = nqp::unbox_s(val.WHICH);
unless nqp::existskey($seen, $target) {
nqp::bindkey($seen, $target, 1);
$res.more(val);
}
};
}
}
}
}
UniqSupply.new(:source($a), :&as, :&with, :$expires);
}

method squish(Supply $a, :&as, :&with is copy) {
Expand Down

0 comments on commit cf82ef2

Please sign in to comment.