Skip to content

Commit

Permalink
Support ordered iteration (#29)
Browse files Browse the repository at this point in the history
* Initial basic ordered support

* Support subiterators

* Remove completed TODO
  • Loading branch information
voxpelli committed May 12, 2024
1 parent f77358b commit 7776805
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 37 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</div>


Buffered processing of async iterables / generators in parallel to achieve comparable performance to `Promise.all()`
Buffered parallel processing of async iterables / generators.

[![npm version](https://img.shields.io/npm/v/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable)
[![npm downloads](https://img.shields.io/npm/dm/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable)
Expand All @@ -16,7 +16,6 @@ Buffered processing of async iterables / generators in parallel to achieve compa
[![js-semistandard-style](https://img.shields.io/badge/code%20style-semistandard-brightgreen.svg)](https://github.com/voxpelli/eslint-config)
[![Follow @voxpelli@mastodon.social](https://img.shields.io/mastodon/follow/109247025527949675?domain=https%3A%2F%2Fmastodon.social&style=social)](https://mastodon.social/@voxpelli)

**WORK IN PROGRESS – early prerelease**

## Usage

Expand Down Expand Up @@ -80,6 +79,7 @@ Iterates and applies the `callback` to up to `bufferSize` items from `input` yie
#### Options

* `bufferSize`_optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer.
* `ordered`_optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered

## Similar modules

Expand Down
76 changes: 48 additions & 28 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,25 @@
// TODO: Look into https://tc39.es/ecma262/#sec-iteratorclose / https://tc39.es/ecma262/#sec-asynciteratorclose
// TODO: See "iteratorKind" in https://tc39.es/ecma262/#sec-runtime-semantics-forin-div-ofbodyevaluation-lhs-stmt-iterator-lhskind-labelset – see how it loops and validates the returned values
// TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js
// TODO: Have option to persist order? To not use Promise.race()?
// TODO: Make a proper merge for async iterables by accepting multiple input iterables, see: https://twitter.com/matteocollina/status/1392056092482576385

import { findLeastTargeted } from './lib/find-least-targeted.js';
import { makeIterableAsync } from './lib/misc.js';
import { isAsyncIterable, isIterable, isPartOfSet } from './lib/type-checks.js';
import { arrayDeleteInPlace, makeIterableAsync } from './lib/misc.js';
import { isAsyncIterable, isIterable, isPartOfArray } from './lib/type-checks.js';

/**
* @template T
* @template R
* @param {AsyncIterable<T> | Iterable<T> | T[]} input
* @param {(item: T) => (Promise<R>|AsyncIterable<R>)} callback
* @param {{ bufferSize?: number|undefined }} [options]
* @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined }} [options]
* @returns {AsyncIterableIterator<R> & { return: NonNullable<AsyncIterableIterator<R>["return"]>, throw: NonNullable<AsyncIterableIterator<R>["throw"]> }}
*/
export function bufferedAsyncMap (input, callback, options) {
/** @typedef {Promise<IteratorResult<R|AsyncIterable<R>> & { bufferPromise: BufferPromise, fromSubIterator?: boolean, isSubIterator?: boolean, err?: unknown }>} BufferPromise */
const {
bufferSize = 6,
ordered = false,
} = options || {};

/** @type {AsyncIterable<T>} */
Expand All @@ -39,11 +39,11 @@ export function bufferedAsyncMap (input, callback, options) {
/** @type {AsyncIterator<T, unknown>} */
const asyncIterator = asyncIterable[Symbol.asyncIterator]();

/** @type {Set<AsyncIterator<R, unknown>>} */
const subIterators = new Set();
/** @type {AsyncIterator<R, unknown>[]} */
const subIterators = [];

/** @type {Set<BufferPromise>} */
const bufferedPromises = new Set();
/** @type {BufferPromise[]} */
const bufferedPromises = [];

/** @type {WeakMap<BufferPromise, AsyncIterator<T>|AsyncIterator<R>>} */
const promisesToSourceIteratorMap = new WeakMap();
Expand Down Expand Up @@ -76,8 +76,8 @@ export function bufferedAsyncMap (input, callback, options) {
);

// TODO: Could we use an AbortController to improve this? See eg. https://github.com/mcollina/hwp/pull/10
bufferedPromises.clear();
subIterators.clear();
bufferedPromises.splice(0, bufferedPromises.length);
subIterators.splice(0, subIterators.length);

if (throwAnyError && hasError) {
throw hasError;
Expand All @@ -90,14 +90,20 @@ export function bufferedAsyncMap (input, callback, options) {
const fillQueue = () => {
if (hasError || isDone) return;

// Check which iterator that has the least amount of queued promises right now
const iterator = findLeastTargeted(
mainReturnedDone ? subIterators : [...subIterators, asyncIterator],
bufferedPromises,
promisesToSourceIteratorMap
);
/** @type {AsyncIterator<R, unknown>|undefined} */
let currentSubIterator;

const currentSubIterator = isPartOfSet(iterator, subIterators) ? iterator : undefined;
if (ordered) {
currentSubIterator = subIterators[0];
} else {
const iterator = findLeastTargeted(
mainReturnedDone ? subIterators : [...subIterators, asyncIterator],
bufferedPromises,
promisesToSourceIteratorMap
);

currentSubIterator = isPartOfArray(iterator, subIterators) ? iterator : undefined;
}

/** @type {BufferPromise} */
const bufferPromise = currentSubIterator
Expand All @@ -110,7 +116,7 @@ export function bufferedAsyncMap (input, callback, options) {
throw new TypeError('Expected an object value');
}
if ('err' in result || result.done) {
subIterators.delete(currentSubIterator);
arrayDeleteInPlace(subIterators, currentSubIterator);
}

/** @type {Awaited<BufferPromise>} */
Expand Down Expand Up @@ -174,29 +180,43 @@ export function bufferedAsyncMap (input, callback, options) {
});

promisesToSourceIteratorMap.set(bufferPromise, currentSubIterator || asyncIterator);
bufferedPromises.add(bufferPromise);

if (bufferedPromises.size < bufferSize) {
if (ordered && currentSubIterator) {
let i = 0;

while (promisesToSourceIteratorMap.get(/** @type {BufferPromise} */ (bufferedPromises[i])) === currentSubIterator) {
i += 1;
}

bufferedPromises.splice(i, 0, bufferPromise);
} else {
bufferedPromises.push(bufferPromise);
}

if (bufferedPromises.length < bufferSize) {
fillQueue();
}
};

/** @type {AsyncIterator<R>["next"]} */
const nextValue = async () => {
if (bufferedPromises.size === 0) return markAsEnded(true);
const nextBufferedPromise = bufferedPromises[0];

if (!nextBufferedPromise) return markAsEnded(true);
if (isDone) return { done: true, value: undefined };

/** @type {Awaited<BufferPromise>} */
const resolvedPromise = await (ordered ? nextBufferedPromise : Promise.race(bufferedPromises));
arrayDeleteInPlace(bufferedPromises, resolvedPromise.bufferPromise);

// Wait for some of the current promises to be finished
const {
bufferPromise,
done,
err,
fromSubIterator,
isSubIterator,
value,
} = await Promise.race(bufferedPromises);

bufferedPromises.delete(bufferPromise);
} = resolvedPromise;

// We are mandated by the spec to always do this return if the iterator is done
if (isDone) {
Expand All @@ -206,16 +226,16 @@ export function bufferedAsyncMap (input, callback, options) {
hasError = err instanceof Error ? err : new Error('Unknown error');
}

if (fromSubIterator || subIterators.size !== 0) {
if (fromSubIterator || subIterators.length > 0) {
fillQueue();
}

return bufferedPromises.size === 0
return bufferedPromises.length === 0
? markAsEnded(true)
: nextValue();
} else if (isSubIterator && isAsyncIterable(value)) {
// TODO: Handle possible error here? Or too obscure?
subIterators.add(value[Symbol.asyncIterator]());
subIterators.unshift(value[Symbol.asyncIterator]());
fillQueue();
return nextValue();
} else {
Expand Down
4 changes: 2 additions & 2 deletions lib/find-least-targeted.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* @template Target
* @template {object} Item
* @param {Set<Item>} items
* @param {Iterable<Item> | Item[]} items
* @param {WeakMap<Item, Target>} itemTargets
* @returns {Map<Target, number>}
*/
Expand All @@ -25,7 +25,7 @@ function countTargets (items, itemTargets) {
* @template Target
* @template {object} Item
* @param {Iterable<Target> | Target[]} targets
* @param {Set<Item>} items
* @param {Iterable<Item> | Item[]} items
* @param {WeakMap<Item, Target>} itemTargets
* @returns {Target|undefined}
*/
Expand Down
14 changes: 14 additions & 0 deletions lib/misc.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,17 @@ export async function * makeIterableAsync (input) {
yield value;
}
}

/**
* Similar to the .delete() on a set
*
* @template T
* @param {T[]} list
* @param {T} value
*/
export function arrayDeleteInPlace (list, value) {
const index = list.indexOf(value);
if (index !== -1) {
list.splice(index, 1);
}
}
8 changes: 4 additions & 4 deletions lib/type-checks.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ export const isIterable = (value) => Boolean(value && typeof value === 'object'
export const isAsyncIterable = (value) => Boolean(value && typeof value === 'object' && Symbol.asyncIterator in value);

/**
* @template SetValue
* @template Values
* @param {unknown} value
* @param {Set<SetValue>} set
* @returns {value is SetValue}
* @param {Values[]} list
* @returns {value is Values}
*/
export const isPartOfSet = (value, set) => set.has(/** @type {SetValue} */ (value));
export const isPartOfArray = (value, list) => list.includes(/** @type {Values} */ (value));
14 changes: 14 additions & 0 deletions test/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,17 @@ export async function * yieldValuesOverTimeWithPrefix (count, wait, prefix) {
await promisableTimeout(waitCallback(i));
}
}

/**
* @param {number} count
* @param {number|((i: number) => number)} wait
* @param {(i: number) => AsyncGenerator<string>} nested
* @returns {AsyncIterable<string>}
*/
export async function * nestedYieldValuesOverTime (count, wait, nested) {
const waitCallback = typeof wait === 'number' ? () => wait : wait;
for (let i = 0; i < count; i++) {
yield * nested(i);
await promisableTimeout(waitCallback(i));
}
}

0 comments on commit 7776805

Please sign in to comment.