Skip to content

Commit

Permalink
refactor: Update AsyncMap operation - major improvements - thanks @ke…
Browse files Browse the repository at this point in the history
…lunik.

Signed-off-by: Pol Dellaiera <pol.dellaiera@protonmail.com>
  • Loading branch information
drupol committed Nov 7, 2020
1 parent cc378b4 commit 1fbf5e9
Showing 1 changed file with 14 additions and 43 deletions.
57 changes: 14 additions & 43 deletions src/Operation/AsyncMap.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@

namespace loophp\collection\Operation;

use Amp\Emitter;
use Amp\Promise;
use Amp\Sync\LocalSemaphore;
use Closure;
use Exception;
use Generator;
use Iterator;

use function Amp\Iterator\fromIterable;
use function Amp\ParallelFunctions\parallel;
use function Amp\Promise\all;
use function Amp\Promise\wait;
use function Amp\Sync\ConcurrentIterator\map;
use function function_exists;

// phpcs:disable
Expand All @@ -34,13 +34,13 @@
final class AsyncMap extends AbstractOperation
{
/**
* @psalm-return Closure(callable(T, TKey): T): Closure(Iterator<TKey, T>): Generator<TKey, T>
* @psalm-return Closure(callable(T, TKey): T ...): Closure(Iterator<TKey, T>): Generator<TKey, T>
*/
public function __invoke(): Closure
{
return
/**
* @psalm-param callable(T, TKey): T $callback
* @psalm-param callable(T, TKey): T ...$callbacks
*
* @psalm-return Closure(Iterator<TKey, T>): Generator<TKey, T>
*/
Expand Down Expand Up @@ -68,51 +68,22 @@ static function (Iterator $iterator) use ($callbacks): Generator {
*/
static fn ($carry, callable $callback) => $callback($carry, $key);

$callback = static fn ($value, $key) => array_reduce($callbacks, $callbackFactory($key), $value);

$emitter = new Emitter();
$iter = $emitter->iterate();
$callback = parallel($callback);

/** @psalm-var callable(Iterator<TKey, T>): Generator<TKey, T> $map */
$map = Map::of()(
$callback =
/**
* @param mixed $value
* @psalm-param T $value
* @psalm-param array{0: TKey, 1:T} $value
*
* @param mixed $key
* @psalm-param TKey $key
* @psalm-return array{0: TKey, 1: T}
*/
static function ($value, $key) use ($callback, $emitter): Promise {
$promise = $callback($value, $key);

$promise->onResolve(
/**
* @param mixed $error
* @psalm-param null|\Throwable $error
*
* @param mixed $value
* @psalm-param T $value
*/
static function ($error, $value) use ($key, $emitter): ?Promise {
if (null !== $error) {
return $emitter->fail($error);
}

return $emitter->emit([$key, $value]);
}
);
static function (array $value) use ($callbacks, $callbackFactory): array {
[$key, $value] = $value;

return $promise;
}
);
return [$key, array_reduce($callbacks, $callbackFactory($key), $value)];
};

all(iterator_to_array($map($iterator)))
->onResolve(
static fn ($error) => !$error && $emitter->complete()
);
$iter = map(fromIterable(Pack::of()($iterator)), new LocalSemaphore(32), parallel($callback));

while (wait($iter->advance())) {
/** @psalm-var array{0: TKey, 1: T} $item */
$item = $iter->getCurrent();

yield $item[0] => $item[1];
Expand Down

0 comments on commit 1fbf5e9

Please sign in to comment.