Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: correctly handle process.exit calls outside of a task #361

Merged
merged 7 commits into from
Jun 28, 2023
64 changes: 38 additions & 26 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -635,33 +635,14 @@ class ThreadPool {
});

worker.on('error', (err : Error) => {
// Work around the bug in https://github.com/nodejs/node/pull/33394
worker.ref = () => {};

// In case of an uncaught exception: Call the callback that was passed to
// `postTask` with the error, or emit an 'error' event if there is none.
const taskInfos = [...workerInfo.taskInfos.values()];
workerInfo.taskInfos.clear();

// Remove the worker from the list and potentially start a new Worker to
// replace the current one.
this._removeWorker(workerInfo);

if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) {
this._ensureMinimumWorkers();
} else {
// Do not start new workers over and over if they already fail during
// bootstrap, there's no point.
this.workerFailsDuringBootstrap = true;
}
this._onError(worker, workerInfo, err, false);
});

if (taskInfos.length > 0) {
for (const taskInfo of taskInfos) {
taskInfo.done(err, null);
}
} else {
this.publicInterface.emit('error', err);
}
worker.on('exit', (exitCode : number) => {
const err = new Error(`worker exited with code: ${exitCode}`);
// Only error unfinished tasks on process exit, since there are legitimate
// reasons to exit workers and we want to handle that gracefully when possible.
this._onError(worker, workerInfo, err, true);
});

worker.unref();
Expand All @@ -675,6 +656,37 @@ class ThreadPool {
this.workers.add(workerInfo);
}

_onError (worker: Worker, workerInfo: WorkerInfo, err: Error, onlyErrorUnfinishedTasks: boolean) {
// Work around the bug in https://github.com/nodejs/node/pull/33394
worker.ref = () => {};

const taskInfos = [...workerInfo.taskInfos.values()];
workerInfo.taskInfos.clear();

// Remove the worker from the list and potentially start a new Worker to
// replace the current one.
this._removeWorker(workerInfo);

if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) {
this._ensureMinimumWorkers();
} else {
// Do not start new workers over and over if they already fail during
// bootstrap, there's no point.
this.workerFailsDuringBootstrap = true;
}

if (taskInfos.length > 0) {
// If there are remaining unfinished tasks, call the callback that was
// passed to `postTask` with the error
for (const taskInfo of taskInfos) {
taskInfo.done(err, null);
}
} else if (!onlyErrorUnfinishedTasks) {
// If there are no unfinished tasks, instead emit an 'error' event
this.publicInterface.emit('error', err);
}
}

_processPendingMessages () {
if (this.inProcessPendingMessages || !this.options.useAtomics) {
return;
Expand Down
6 changes: 3 additions & 3 deletions test/task-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ test('will put items into a task queue until they can run', async ({ equal }) =>

test('will reject items over task queue limit', async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.ts'),
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 0,
maxThreads: 1,
maxQueue: 2
Expand All @@ -78,7 +78,7 @@ test('will reject items over task queue limit', async ({ equal, rejects }) => {

test('will reject items when task queue is unavailable', async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.ts'),
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 0,
maxThreads: 1,
maxQueue: 0
Expand All @@ -97,7 +97,7 @@ test('will reject items when task queue is unavailable', async ({ equal, rejects

test('will reject items when task queue is unavailable (fixed thread count)', async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.ts'),
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 1,
maxThreads: 1,
maxQueue: 0
Expand Down
42 changes: 41 additions & 1 deletion test/test-uncaught-exception-from-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,46 @@ test('uncaught exception in immediate after task yields error event', async ({ e
equal(pool.threads.length, 1);
pool.threads[0].ref();

// This is the main aassertion here.
// This is the main assertion here.
equal((await errorEvent)[0].message, 'not_caught');
});

test('exiting process resets worker', async ({ not, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 1
});
const originalThreadId = pool.threads[0].threadId;
await rejects(pool.runTask('process.exit(1);'), /worker exited with code: 1/);
const newThreadId = pool.threads[0].threadId;
not(originalThreadId, newThreadId);
});

test('exiting process in immediate after task errors next task and resets worker', async ({ equal, not, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval-async.js'),
minThreads: 1
});

const originalThreadId = pool.threads[0].threadId;
const taskResult = await pool.runTask(`
setTimeout(() => { process.exit(1); }, 50);
42
`);
equal(taskResult, 42);

await rejects(pool.runTask(`
'use strict';

const { promisify } = require('util');
const sleep = promisify(setTimeout);
async function _() {
await sleep(1000);
return 42
}
_();
`), /worker exited with code: 1/);
const secondThreadId = pool.threads[0].threadId;

not(originalThreadId, secondThreadId);
});