Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent some potential unhandled exceptions #48

Merged
merged 4 commits into from Sep 11, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 27 additions & 3 deletions index.js
Expand Up @@ -79,16 +79,40 @@ export default async function pMap(
} else {
errors.push(error);
resolvingCount--;
next();

// In that case we can't really continue regardless of stopOnError state
// since an iterable is likely to continue throwing after it throws once.
// If we continue calling next() indefinitely we will likely end up
// in an infinite loop of failed iteration.
try {
next();
} catch (error) {
if (!isRejected) {
huntharo marked this conversation as resolved.
Show resolved Hide resolved
isRejected = true;
reject(error);
}
}
}
}
})();
};

for (let index = 0; index < concurrency; index++) {
next();
// Catch errors from the iterable.next() call
// In that case we can't really continue regardless of stopOnError state
// since an iterable is likely to continue throwing after it throws once.
// If we continue calling next() indefinitely we will likely end up
// in an infinite loop of failed iteration.
try {
next();
} catch (error) {
if (!isRejected) {
huntharo marked this conversation as resolved.
Show resolved Hide resolved
isRejected = true;
reject(error);
huntharo marked this conversation as resolved.
Show resolved Hide resolved
}
}

if (isIterableDone) {
if (isIterableDone || isRejected) {
break;
}
}
Expand Down
93 changes: 91 additions & 2 deletions test.js
Expand Up @@ -40,6 +40,36 @@ const mapper = async ([value, ms]) => {
return value;
};

class ThrowingIterator {
constructor(max, throwOnIndex) {
this._max = max;
this._throwOnIndex = throwOnIndex;
this.index = 0;
}

[Symbol.iterator]() {
let index = 0;
const max = this._max;
const throwOnIndex = this._throwOnIndex;
const obj = this;
return {
next() {
try {
if (index === throwOnIndex) {
throw new Error(`throwing on index ${index}`);
}

const item = {value: index, done: index === max};
return item;
} finally {
index++;
obj.index = index;
}
}
};
}
}

test('main', async t => {
const end = timeSpan();
t.deepEqual(await pMap(sharedInput, mapper), [10, 20, 30]);
Expand Down Expand Up @@ -128,8 +158,67 @@ test('do not run mapping after stop-on-error happened', async t => {
await delay(100);
throw new Error('Oops!');
}
})
},
{concurrency: 1})
);
await delay(500);
t.deepEqual(mappedValues, [1, 3]);
t.deepEqual(mappedValues, [1]);
});

test('catches exception from source iterator - 1st item', async t => {
const input = new ThrowingIterator(100, 0);
const mappedValues = [];
const error = await t.throwsAsync(pMap(
input,
async value => {
mappedValues.push(value);
await delay(100);
return value;
},
{concurrency: 1, stopOnError: true}
));
t.is(error.message, 'throwing on index 0');
t.is(input.index, 1);
await delay(300);
t.deepEqual(mappedValues, []);
});

// The 2nd iterable item throwing is distinct from the 1st when concurrency is 1 because
// it means that the source next() is invoked from next() and not from
// the constructor
test('catches exception from source iterator - 2nd item', async t => {
const input = new ThrowingIterator(100, 1);
const mappedValues = [];
await t.throwsAsync(pMap(
input,
async value => {
mappedValues.push(value);
await delay(100);
return value;
},
{concurrency: 1, stopOnError: true}
));
await delay(300);
t.is(input.index, 2);
t.deepEqual(mappedValues, [0]);
});

// The 2nd iterable item throwing after a 1st item mapper exception, with stopOnError false,
// is distinct from other cases because our next() is called from a catch block
test('catches exception from source iterator - 2nd item after 1st item mapper throw', async t => {
const input = new ThrowingIterator(100, 1);
const mappedValues = [];
const error = await t.throwsAsync(pMap(
input,
async value => {
mappedValues.push(value);
await delay(100);
throw new Error('mapper threw error');
},
{concurrency: 1, stopOnError: false}
));
await delay(300);
t.is(error.message, 'throwing on index 1');
t.is(input.index, 2);
t.deepEqual(mappedValues, [0]);
});