Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implement Supply.uniq(:expires) for throttling
Only for the non :with case so far
  • Loading branch information
lizmat committed Apr 23, 2014
1 parent 61af179 commit 48937da
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 34 deletions.
4 changes: 3 additions & 1 deletion src/core/Supply.pm
Expand Up @@ -91,7 +91,9 @@ my role Supply {
method do(&side_effect) { SupplyOperations.do(self, &side_effect) }
method grep(&filter) { SupplyOperations.grep(self, &filter) }
method map(&mapper) { SupplyOperations.map(self, &mapper) }
method uniq(:&as,:&with) { SupplyOperations.uniq(self, :&as, :&with) }
method uniq( :&as, :&with, :$expires) {
SupplyOperations.uniq( self, :&as, :&with, :$expires)
}
method squish(:&as,:&with) { SupplyOperations.squish(self, :&as, :&with) }
method rotor( $elems?, $overlap? ) {
SupplyOperations.rotor(self, $elems, $overlap)
Expand Down
118 changes: 85 additions & 33 deletions src/core/SupplyOperations.pm
Expand Up @@ -128,54 +128,106 @@ my class SupplyOperations is repr('Uninstantiable') {
GrepSupply.new(:source($a), :&filter)
}

method uniq(Supply $a, :&as, :&with) {
method uniq(Supply $a, :&as, :&with, :$expires) {
my class UniqSupply does Supply does PrivatePublishing {
has $!source;
has &!as;
has &!with;
has $!expires;

submethod BUILD(:$!source, :&!as, :&!with) { }
submethod BUILD(:$!source, :&!as, :&!with, :$expires) { }

method tap(|c) {
my $source_tap;
my $sub = self.Supply::tap(|c, closing => { $source_tap.close() });
my &more = do {
if &!with and &!with !=== &[===] {
my @seen; # should be Mu, but doesn't work in settings
my Mu $target;
&as
?? -> \val {
$target = &!as(val);
if @seen.first({ &!with($target,$_) } ) =:= Nil {
@seen.push($target);
self!more(val);
if $!expires {
if &!with and &!with !=== &[===] {
my @seen; # really Mu, but doesn't work in settings
my Mu $target;
&as
?? -> \val {
$target = &!as(val);
if @seen.first({ &!with($target,$_) } ) =:= Nil {
@seen.push($target);
self!more(val);
}
}
}
!! -> \val {
if @seen.first({ &!with(val,$_) } ) =:= Nil {
@seen.push(val);
self!more(val);
!! -> \val {
if @seen.first({ &!with(val,$_) } ) =:= Nil {
@seen.push(val);
self!more(val);
}
};
}
else {
my $seen := nqp::hash();
my str $target;
&as
?? -> \val {
my $now := now;
$target = nqp::unbox_s(&!as(val).WHICH);
if nqp::existskey($seen, $target) {
self!more(val) # expired
if nqp::atkey($seen,$target) > $now;
}
else {
self!more(val);
}
nqp::bindkey( $seen, $target, $now+$expires);
}
};
!! -> \val {
my $now := now;
$target = nqp::unbox_s(val.WHICH);
if nqp::existskey($seen, $target) {
self!more(val) # expired
if nqp::atkey($seen,$target) > $now;
}
else {
self!more(val);
}
nqp::bindkey( $seen, $target, $now+$expires);
};
}
}
else {
my $seen := nqp::hash();
my str $target;
&as
?? -> \val {
$target = nqp::unbox_s(&!as(val).WHICH);
unless nqp::existskey($seen, $target) {
nqp::bindkey($seen, $target, 1);
self!more(val);
else { # !$!expires
if &!with and &!with !=== &[===] {
my @seen; # really Mu, but doesn't work in settings
my Mu $target;
&as
?? -> \val {
$target = &!as(val);
if @seen.first({ &!with($target,$_) } ) =:= Nil {
@seen.push($target);
self!more(val);
}
}
}
!! -> \val {
$target = nqp::unbox_s(val.WHICH);
unless nqp::existskey($seen, $target) {
nqp::bindkey($seen, $target, 1);
self!more(val);
!! -> \val {
if @seen.first({ &!with(val,$_) } ) =:= Nil {
@seen.push(val);
self!more(val);
}
};
}
else {
my $seen := nqp::hash();
my str $target;
&as
?? -> \val {
$target = nqp::unbox_s(&!as(val).WHICH);
unless nqp::existskey($seen, $target) {
nqp::bindkey($seen, $target, 1);
self!more(val);
}
}
};
!! -> \val {
$target = nqp::unbox_s(val.WHICH);
unless nqp::existskey($seen, $target) {
nqp::bindkey($seen, $target, 1);
self!more(val);
}
};
}
}
};
$source_tap = $!source.tap( &more,
Expand Down

0 comments on commit 48937da

Please sign in to comment.