Skip to content

Commit

Permalink
cleanup dead code and format to xo's liking
Browse files Browse the repository at this point in the history
  • Loading branch information
tgfisher4 committed May 23, 2024
1 parent 890bab7 commit 96ec791
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 69 deletions.
88 changes: 22 additions & 66 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {inspect} from 'node:util';
export default async function pMap(
iterable,
mapper,
Expand Down Expand Up @@ -202,56 +201,21 @@ export function pMapIterable(
let runningMappersCount = 0;
let isDone = false;
let inputIndex = 0;
let outputIndex = 0; // only used when `preserveOrder: false`
let outputIndex = 0; // Only used when `preserveOrder: false`

const nextPromise = preserveOrder
// Treat `promises` as a queue
? async function nextPromiseInOrder() {
// TODO: consider/test ending in pMapSkip: I think it shouldn't be a problem since we know promises.length > 0 and the pMapSkip will over-iterate to {done: true}
// may be undefined bc of pMapSkips
? () => {
// May be undefined bc of `pMapSkip`s
while (promisesIndexFromInputIndex[outputIndex] === undefined) {
outputIndex += 1;
// if (outputIndex >= inputIndex) {
// isDone = true;
// return {result: {done: true}}
// }
}

return promises[promisesIndexFromInputIndex[outputIndex++]];
// strat 1: resolve the pMapSkip promise to the next inputIndex if available, and only mapNext if not available
// strat 2: do a last check before returning
/*
const resolved = await promises[promisesIndexFromInputIndex[outputIndex++]];
// If we are waiting on an outputIndex that maps to pMapSkip, it will get re-resolved underneath to the next inputIndex available
// This checks we aren't returning the re-resolved
return resolved.inputIndex === outputIndex ?
if(resultInputIndex != outputIndex){
return nextPromiseInOrder();
}
*/
}
// Treat `promises` as a pool (order doesn't matter)
: () => Promise.race(promises);


const popNextPromise = /*preserveOrder ? (async () => {
const {result, inputIndex: resultInputIndex} = await promises[promisesIndexFromInputIndex[outputIndex++]];
promises.shift();
inputIndexFromPromisesIndex.shift();
promisesIndexFromInputIndex // decr by 1
return result;
}) : (*/async () => {

// console.log({promises: inspect(promises, {depth: 10}), promisesIndexFromInputIndex, inputIndexFromPromisesIndex, outputIndex})
const resolved = await nextPromise();
// console.log({promises: inspect(promises, {depth: 10}), promisesIndexFromInputIndex, inputIndexFromPromisesIndex, outputIndex, resolved})

const {result, inputIndex: resultInputIndex} = resolved;
popPromise(resultInputIndex);

return result;
};

function popPromise(inputIndex) {
// Swap the fulfilled promise with the last element to avoid an O(n) shift to the `promises` array
const tail = promises.pop();
Expand All @@ -263,40 +227,31 @@ export function pMapIterable(
promises[promisesIndex] = tail;
inputIndexFromPromisesIndex[promisesIndex] = tailInputIndex;
promisesIndexFromInputIndex[tailInputIndex] = promisesIndex;
// promises[promisesIndex] = tail.then(resultWithIndex => ({result: resultWithIndex.result, promisesIndex}));
}
}

async function mapNext(promisesIndex) {
let next;
// try {
next = iterator.next();
// } catch (error) {
// isDone = true;
// return {result: {error}, inputIndex: myInputIndex};
// }
let next = iterator.next();

const myInputIndex = inputIndex++; // Save this promise's index before `trySpawn`ing others
runningMappersCount++;
promisesIndexFromInputIndex[myInputIndex] = promisesIndex;
inputIndexFromPromisesIndex[promisesIndex] = myInputIndex;

let didSpawn = false;
if (isPromiseLike(next)) {
// Optimization: if our concurrency is bounded, and we need to `await` the next `iterator` element,
// we first eagerly spawn the maximum number of `mapNext` promises, so that these promises can begin `await`ing
// we first eagerly spawn more `mapNext` promises, so that these promises can begin `await`ing
// their respective `iterator` elements (if needed) and `mapper` results in parallel.
// This may entail memory usage closer to the max than necessary, but this space was already allocated to `pMapIterable` via
// This may entail memory usage closer to `options.backpressure` than necessary, but this space was already allocated to `pMapIterable` via
// `options.concurrency` and `options.backpressure`.
// This may also cause iteration well past the end of the `iterator`: we don't inspect the `iterator`'s response before `trySpawn`ing,
// which will request the next `iterator` element, so we may end up spawning many promises which resolve to `done`.
// This may also cause iteration well past the end of the `iterator`: we don't inspect the `iterator`'s response before `trySpawn`ing
// (because we are `trySpawn`ing before `await`ing the response), which will request the next `iterator` element, so we may end up spawning many promises which resolve to `done`.
// However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common
// `async` operations like disk reads, network requests, etc.
// Overall, this can reduce the total time taken to process all elements.
if (concurrency !== Number.POSITIVE_INFINITY) {
// Spawn if still below concurrency and backpressure limit
trySpawn();
didSpawn = true;
}

try {
Expand All @@ -314,10 +269,8 @@ export function pMapIterable(
return {result: {done: true}, inputIndex: myInputIndex};
}

// if (!didSpawn) {
// Spawn if still below concurrency and backpressure limit
trySpawn();
// }
// Spawn if still below concurrency and backpressure limit
trySpawn();

let returnValue;
try {
Expand All @@ -337,15 +290,17 @@ export function pMapIterable(
runningMappersCount--;

if (returnValue === pMapSkip) {
// If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed
if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) {
popPromise(myInputIndex);
// In case we are already being awaited, resolve promise to next inputIndex
return promises[promisesIndexFromInputIndex[myInputIndex + 1]];
}
else {
delete promisesIndexFromInputIndex[myInputIndex];
return mapNext(promisesIndex);
}

// Otherwise, start mapping the next input element
delete promisesIndexFromInputIndex[myInputIndex];
// Not necessary to `delete inputIndexFromPromisesIndex[promisesIndex]` since `inputIndexFromPromisesIndex[promisesIndex]` is only used
// when this promise resolves, but by that point this recursive `mapNext(promisesIndex)` call will have necessarily overwritten it.
return mapNext(promisesIndex);
}

// Spawn if still below backpressure limit and just dropped below concurrency limit
Expand All @@ -371,7 +326,8 @@ export function pMapIterable(
trySpawn();

while (promises.length > 0) {
const {error, done, value} = await popNextPromise(); // eslint-disable-line no-await-in-loop
const {result: {error, done, value}, inputIndex} = await nextPromise();// eslint-disable-line no-await-in-loop
popPromise(inputIndex);

if (error) {
throw error;
Expand All @@ -391,8 +347,8 @@ export function pMapIterable(
};
}

function isPromiseLike(p){
return typeof p === 'object' && p != null && 'then' in p && typeof p.then === 'function'
function isPromiseLike(p) {
return typeof p === 'object' && p !== null && 'then' in p && typeof p.then === 'function';
}

export const pMapSkip = Symbol('skip');
6 changes: 3 additions & 3 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ test('pMapIterable - pMapSkip', async t => {
6,
7,
8,
pMapSkip
pMapSkip,
], async value => value)), [1, 2, 3, 4, 5, 6, 7, 8]);
});

Expand All @@ -669,7 +669,7 @@ test('pMapIterable - pMapSkip - concurrency 2', async t => {
6,
7,
8,
pMapSkip
pMapSkip,
], async value => value, {concurrency: 2})), [1, 2, 3, 4, 5, 6, 7, 8]);
});

Expand All @@ -687,7 +687,7 @@ test('pMapIterable - pMapSkip - concurrency 2 - preserveOrder: false', async t =
6,
7,
8,
pMapSkip
pMapSkip,
], async value => value, {concurrency: 2, preserveOrder: false}));
const resultSet = new Set(result);
t.assert(resultSet.has(1));
Expand Down

0 comments on commit 96ec791

Please sign in to comment.