From 0158bf4c49f62ee0fa63cccfe0c2f0bc0e6743ca Mon Sep 17 00:00:00 2001 From: Pelle Wessman Date: Sun, 12 May 2024 13:43:06 +0200 Subject: [PATCH] Add `mergeIterables()` (#30) Fixes #18 --- README.md | 24 ++++++++- index.js | 20 ++++++- test/values.spec.js | 125 +++++++++++++++++++++++++++++--------------- 3 files changed, 125 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index add65cf..3b763ce 100644 --- a/README.md +++ b/README.md @@ -69,10 +69,16 @@ for await (const item of mappedIterator) { ## API -### `bufferedAsyncMap(input, callback[, { bufferSize=6 }]) => AsyncIterableIterator` +### bufferedAsyncMap() Iterates and applies the `callback` to up to `bufferSize` items from `input` yielding values as they resolve. +#### Syntax + +`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false }]) => AsyncIterableIterator` + +#### Arguments + * `input` – either an async iterable, an ordinare iterable or an array * `callback(item)` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer) @@ -81,6 +87,22 @@ Iterates and applies the `callback` to up to `bufferSize` items from `input` yie * `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer. * `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered +### mergeIterables() + +Merges all given (async) iterables in parallel, returning the values as they resolve + +#### Syntax + +`mergeIterables(input[, { bufferSize=6 }]) => AsyncIterableIterator` + +#### Arguments + +* `input` – either an async iterable, an ordinare iterable or an array + +#### Options + +* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer. + ## Similar modules * [`hwp`](https://github.com/mcollina/hwp) – similar module by [@mcollina](https://github.com/mcollina) diff --git a/index.js b/index.js index a6b1b83..f05915c 100644 --- a/index.js +++ b/index.js @@ -5,12 +5,30 @@ // TODO: Look into https://tc39.es/ecma262/#sec-iteratorclose / https://tc39.es/ecma262/#sec-asynciteratorclose // TODO: See "iteratorKind" in https://tc39.es/ecma262/#sec-runtime-semantics-forin-div-ofbodyevaluation-lhs-stmt-iterator-lhskind-labelset – see how it loops and validates the returned values // TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js -// TODO: Make a proper merge for async iterables by accepting multiple input iterables, see: https://twitter.com/matteocollina/status/1392056092482576385 import { findLeastTargeted } from './lib/find-least-targeted.js'; import { arrayDeleteInPlace, makeIterableAsync } from './lib/misc.js'; import { isAsyncIterable, isIterable, isPartOfArray } from './lib/type-checks.js'; +/** + * @template T + * @param {AsyncIterable | Iterable | T[]} item + * @returns {AsyncIterable} + */ +async function * yieldIterable (item) { + yield * item; +} + +/** + * @template T + * @param {Array | Iterable | T[]>} input + * @param {{ bufferSize?: number|undefined }} [options] + * @returns {AsyncIterable} + */ +export async function * mergeIterables (input, { bufferSize } = {}) { + yield * bufferedAsyncMap(input, yieldIterable, { bufferSize }); +} + /** * @template T * @template R diff --git a/test/values.spec.js b/test/values.spec.js index 91fcec1..e8e37e0 100644 --- a/test/values.spec.js +++ b/test/values.spec.js @@ -6,6 +6,7 @@ import sinonChai from 'sinon-chai'; import { bufferedAsyncMap, + mergeIterables, } from '../index.js'; import { @@ -193,62 +194,60 @@ describe('bufferedAsyncMap() values', () => { sinon.restore(); }); - describe('main', () => { - it('should return all values from the original AsyncIterable when looped over', async () => { - // Create the promise first, then have it be fully executed using clock.runAllAsync() - const promisedResult = (async () => { - /** @type {number[]} */ - const rawResult = []; + it('should return all values from the original AsyncIterable when looped over', async () => { + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + /** @type {number[]} */ + const rawResult = []; - for await (const value of bufferedAsyncMap(baseAsyncIterable, async (item) => item)) { - rawResult.push(value); - } + for await (const value of bufferedAsyncMap(baseAsyncIterable, async (item) => item)) { + rawResult.push(value); + } - /** @type {[number[], number]} */ - const result = [rawResult, Date.now()]; + /** @type {[number[], number]} */ + const result = [rawResult, Date.now()]; - return result; - })(); + return result; + })(); - await clock.runAllAsync(); + await clock.runAllAsync(); - const [result, duration] = await promisedResult; + const [result, duration] = await promisedResult; - result.should.deep.equal(expectedResult); - duration.should.equal(6300); - }); + result.should.deep.equal(expectedResult); + duration.should.equal(6300); + }); - it('should return all values from the original AsyncIterable when accessed directly', async () => { - // Create the promise first, then have it be fully executed using clock.runAllAsync() - const promisedResult = (async () => { - const asyncIterable = bufferedAsyncMap(baseAsyncIterable, async (item) => item); - const asyncIterator = asyncIterable[Symbol.asyncIterator](); + it('should return all values from the original AsyncIterable when accessed directly', async () => { + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + const asyncIterable = bufferedAsyncMap(baseAsyncIterable, async (item) => item); + const asyncIterator = asyncIterable[Symbol.asyncIterator](); - /** @type {Promise>[]} */ - const iterations = []; + /** @type {Promise>[]} */ + const iterations = []; - for (let i = 0; i < count; i++) { - iterations.push(asyncIterator.next()); - } + for (let i = 0; i < count; i++) { + iterations.push(asyncIterator.next()); + } - const rawResult = await Promise.all(iterations); + const rawResult = await Promise.all(iterations); - /** @type {[(number|void)[], number]} */ - const result = [ - rawResult.map(item => item.value), - Date.now(), - ]; + /** @type {[(number|void)[], number]} */ + const result = [ + rawResult.map(item => item.value), + Date.now(), + ]; - return result; - })(); + return result; + })(); - await clock.runAllAsync(); + await clock.runAllAsync(); - const [result, duration] = await promisedResult; + const [result, duration] = await promisedResult; - result.should.deep.equal(expectedResult); - duration.should.equal(4300); - }); + result.should.deep.equal(expectedResult); + duration.should.equal(4300); }); it('should return all values from the original AsyncIterable when given as an array', async () => { @@ -280,7 +279,7 @@ describe('bufferedAsyncMap() values', () => { duration.should.equal(2000); }); - it('should handle chained async generator values from the original AsyncIterable when looped over', async () => { + it('should handle nested async generator values from the original AsyncIterable when looped over', async () => { // Create the promise first, then have it be fully executed using clock.runAllAsync() const promisedResult = (async () => { /** @type {string[]} */ @@ -890,4 +889,46 @@ describe('bufferedAsyncMap() values', () => { duration.should.equal(111900); }); }); + + describe('mergeIterables', () => { + it('should process iterables in parallel', async () => { + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + /** @type {string[]} */ + const rawResult = []; + + for await (const value of mergeIterables([ + yieldValuesOverTimeWithPrefix(6, (i) => i % 2 === 1 ? 2000 : 100, 'first-'), + yieldValuesOverTimeWithPrefix(6, (i) => i % 2 === 1 ? 2000 : 100, 'second-'), + ])) { + rawResult.push(value); + } + + /** @type {[string[], number]} */ + const result = [rawResult, Date.now()]; + + return result; + })(); + + await clock.runAllAsync(); + + const [result, duration] = await promisedResult; + + result.should.deep.equal([ + 'first-0', + 'second-0', + 'second-1', + 'first-1', + 'second-2', + 'first-2', + 'second-3', + 'first-3', + 'second-4', + 'first-4', + 'second-5', + 'first-5', + ]); + duration.should.equal(6300); + }); + }); });