diff --git a/src/functions.php b/src/functions.php index 6594de13..4e9dbc45 100644 --- a/src/functions.php +++ b/src/functions.php @@ -189,6 +189,39 @@ function reduce(array $promisesOrValues, callable $reduceFunc, $initialValue = n }, $cancellationQueue); } +function throttle(array $promisesOrValues, $concurrency = 10) +{ + $numberOfPromises = count($promisesOrValues); + $retVals = []; + + // Modify all promises to also store their resolved value + $promisesOrValues = array_map(function ($promiseOrValue) use (&$retVals) { + return resolve($promiseOrValue)->then(function ($r) use (&$retVals) { + $retVals[] = $r; + return $r; + }); + }, $promisesOrValues); + + // Build up the promise-chain + $promisesOrValues = array_map(function ($promiseOrValue, $promiseIndex) use ($promisesOrValues, $numberOfPromises, $concurrency) { + $nextIndex = $promiseIndex + $concurrency; + + if ($nextIndex >= $numberOfPromises) { + return resolve($promiseOrValue); + } + + return resolve($promiseOrValue)->then(function ($r) use ($promisesOrValues, $nextIndex) { + return resolve($promisesOrValues[$nextIndex]); + }); + }, $promisesOrValues, array_keys($promisesOrValues)); + + // Start with the initial set + $firstPromises = array_splice($promisesOrValues, 0, $concurrency); + return all($firstPromises)->then(function () use (&$retVals) { + return $retVals; + }); +} + /** * @internal */ diff --git a/tests/FunctionThrottleTest.php b/tests/FunctionThrottleTest.php new file mode 100644 index 00000000..d327fb6a --- /dev/null +++ b/tests/FunctionThrottleTest.php @@ -0,0 +1,46 @@ +createCallableMock(); + $mock + ->expects($this->once()) + ->method('__invoke') + ->with($this->identicalTo([1, 2, 3])); + + throttle([resolve(1), resolve(2), resolve(3)], 10) + ->then($mock); + } + + /** @test */ + public function shouldResolvePromisesArrayWithMultipleOfConcurrency() + { + $mock = $this->createCallableMock(); + $mock + ->expects($this->once()) + ->method('__invoke') + ->with($this->identicalTo([1, 2, 3, 4, 5, 6])); + + throttle([resolve(1), resolve(2), resolve(3), resolve(4), resolve(5), resolve(6)], 2) + ->then($mock); + } + + /** @test */ + public function shouldResolvePromisesArrayWithMoreThenConcurrency() + { + $mock = $this->createCallableMock(); + $mock + ->expects($this->once()) + ->method('__invoke') + ->with($this->identicalTo([1, 2, 3, 4, 5, 6, 7])); + + throttle([resolve(1), resolve(2), resolve(3), resolve(4), resolve(5), resolve(6), resolve(7)], 2) + ->then($mock); + } +}