Skip to content

Commit

Permalink
Add mergeIterables() (#30)
Browse files Browse the repository at this point in the history
Fixes #18
  • Loading branch information
voxpelli committed May 12, 2024
1 parent 5c458e0 commit 0158bf4
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 44 deletions.
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,16 @@ for await (const item of mappedIterator) {

## API

### `bufferedAsyncMap(input, callback[, { bufferSize=6 }]) => AsyncIterableIterator`
### bufferedAsyncMap()

Iterates and applies the `callback` to up to `bufferSize` items from `input` yielding values as they resolve.

#### Syntax

`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false }]) => AsyncIterableIterator`

#### Arguments

* `input` – either an async iterable, an ordinare iterable or an array
* `callback(item)` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer)

Expand All @@ -81,6 +87,22 @@ Iterates and applies the `callback` to up to `bufferSize` items from `input` yie
* `bufferSize`_optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer.
* `ordered`_optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered

### mergeIterables()

Merges all given (async) iterables in parallel, returning the values as they resolve

#### Syntax

`mergeIterables(input[, { bufferSize=6 }]) => AsyncIterableIterator`

#### Arguments

* `input` – either an async iterable, an ordinare iterable or an array

#### Options

* `bufferSize`_optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer.

## Similar modules

* [`hwp`](https://github.com/mcollina/hwp) – similar module by [@mcollina](https://github.com/mcollina)
Expand Down
20 changes: 19 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,30 @@
// TODO: Look into https://tc39.es/ecma262/#sec-iteratorclose / https://tc39.es/ecma262/#sec-asynciteratorclose
// TODO: See "iteratorKind" in https://tc39.es/ecma262/#sec-runtime-semantics-forin-div-ofbodyevaluation-lhs-stmt-iterator-lhskind-labelset – see how it loops and validates the returned values
// TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js
// TODO: Make a proper merge for async iterables by accepting multiple input iterables, see: https://twitter.com/matteocollina/status/1392056092482576385

import { findLeastTargeted } from './lib/find-least-targeted.js';
import { arrayDeleteInPlace, makeIterableAsync } from './lib/misc.js';
import { isAsyncIterable, isIterable, isPartOfArray } from './lib/type-checks.js';

/**
* @template T
* @param {AsyncIterable<T> | Iterable<T> | T[]} item
* @returns {AsyncIterable<T>}
*/
async function * yieldIterable (item) {
yield * item;
}

/**
* @template T
* @param {Array<AsyncIterable<T> | Iterable<T> | T[]>} input
* @param {{ bufferSize?: number|undefined }} [options]
* @returns {AsyncIterable<T>}
*/
export async function * mergeIterables (input, { bufferSize } = {}) {
yield * bufferedAsyncMap(input, yieldIterable, { bufferSize });
}

/**
* @template T
* @template R
Expand Down
125 changes: 83 additions & 42 deletions test/values.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import sinonChai from 'sinon-chai';

import {
bufferedAsyncMap,
mergeIterables,
} from '../index.js';

import {
Expand Down Expand Up @@ -193,62 +194,60 @@ describe('bufferedAsyncMap() values', () => {
sinon.restore();
});

describe('main', () => {
it('should return all values from the original AsyncIterable when looped over', async () => {
// Create the promise first, then have it be fully executed using clock.runAllAsync()
const promisedResult = (async () => {
/** @type {number[]} */
const rawResult = [];
it('should return all values from the original AsyncIterable when looped over', async () => {
// Create the promise first, then have it be fully executed using clock.runAllAsync()
const promisedResult = (async () => {
/** @type {number[]} */
const rawResult = [];

for await (const value of bufferedAsyncMap(baseAsyncIterable, async (item) => item)) {
rawResult.push(value);
}
for await (const value of bufferedAsyncMap(baseAsyncIterable, async (item) => item)) {
rawResult.push(value);
}

/** @type {[number[], number]} */
const result = [rawResult, Date.now()];
/** @type {[number[], number]} */
const result = [rawResult, Date.now()];

return result;
})();
return result;
})();

await clock.runAllAsync();
await clock.runAllAsync();

const [result, duration] = await promisedResult;
const [result, duration] = await promisedResult;

result.should.deep.equal(expectedResult);
duration.should.equal(6300);
});
result.should.deep.equal(expectedResult);
duration.should.equal(6300);
});

it('should return all values from the original AsyncIterable when accessed directly', async () => {
// Create the promise first, then have it be fully executed using clock.runAllAsync()
const promisedResult = (async () => {
const asyncIterable = bufferedAsyncMap(baseAsyncIterable, async (item) => item);
const asyncIterator = asyncIterable[Symbol.asyncIterator]();
it('should return all values from the original AsyncIterable when accessed directly', async () => {
// Create the promise first, then have it be fully executed using clock.runAllAsync()
const promisedResult = (async () => {
const asyncIterable = bufferedAsyncMap(baseAsyncIterable, async (item) => item);
const asyncIterator = asyncIterable[Symbol.asyncIterator]();

/** @type {Promise<IteratorResult<number, void>>[]} */
const iterations = [];
/** @type {Promise<IteratorResult<number, void>>[]} */
const iterations = [];

for (let i = 0; i < count; i++) {
iterations.push(asyncIterator.next());
}
for (let i = 0; i < count; i++) {
iterations.push(asyncIterator.next());
}

const rawResult = await Promise.all(iterations);
const rawResult = await Promise.all(iterations);

/** @type {[(number|void)[], number]} */
const result = [
rawResult.map(item => item.value),
Date.now(),
];
/** @type {[(number|void)[], number]} */
const result = [
rawResult.map(item => item.value),
Date.now(),
];

return result;
})();
return result;
})();

await clock.runAllAsync();
await clock.runAllAsync();

const [result, duration] = await promisedResult;
const [result, duration] = await promisedResult;

result.should.deep.equal(expectedResult);
duration.should.equal(4300);
});
result.should.deep.equal(expectedResult);
duration.should.equal(4300);
});

it('should return all values from the original AsyncIterable when given as an array', async () => {
Expand Down Expand Up @@ -280,7 +279,7 @@ describe('bufferedAsyncMap() values', () => {
duration.should.equal(2000);
});

it('should handle chained async generator values from the original AsyncIterable when looped over', async () => {
it('should handle nested async generator values from the original AsyncIterable when looped over', async () => {
// Create the promise first, then have it be fully executed using clock.runAllAsync()
const promisedResult = (async () => {
/** @type {string[]} */
Expand Down Expand Up @@ -890,4 +889,46 @@ describe('bufferedAsyncMap() values', () => {
duration.should.equal(111900);
});
});

describe('mergeIterables', () => {
it('should process iterables in parallel', async () => {
// Create the promise first, then have it be fully executed using clock.runAllAsync()
const promisedResult = (async () => {
/** @type {string[]} */
const rawResult = [];

for await (const value of mergeIterables([
yieldValuesOverTimeWithPrefix(6, (i) => i % 2 === 1 ? 2000 : 100, 'first-'),
yieldValuesOverTimeWithPrefix(6, (i) => i % 2 === 1 ? 2000 : 100, 'second-'),
])) {
rawResult.push(value);
}

/** @type {[string[], number]} */
const result = [rawResult, Date.now()];

return result;
})();

await clock.runAllAsync();

const [result, duration] = await promisedResult;

result.should.deep.equal([
'first-0',
'second-0',
'second-1',
'first-1',
'second-2',
'first-2',
'second-3',
'first-3',
'second-4',
'first-4',
'second-5',
'first-5',
]);
duration.should.equal(6300);
});
});
});

0 comments on commit 0158bf4

Please sign in to comment.