Skip to content

Commit

Permalink
feat: correctly handle process.exit calls outside of a task (#361)
Browse files Browse the repository at this point in the history
Co-authored-by: Carlos Fuentes <me@metcoder.dev>
  • Loading branch information
clayjones-at and metcoder95 committed Jun 28, 2023
1 parent c52363c commit 8e6d16e
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 30 deletions.
64 changes: 38 additions & 26 deletions src/index.ts
Expand Up @@ -635,33 +635,14 @@ class ThreadPool {
});

worker.on('error', (err : Error) => {
// Work around the bug in https://github.com/nodejs/node/pull/33394
worker.ref = () => {};

// In case of an uncaught exception: Call the callback that was passed to
// `postTask` with the error, or emit an 'error' event if there is none.
const taskInfos = [...workerInfo.taskInfos.values()];
workerInfo.taskInfos.clear();

// Remove the worker from the list and potentially start a new Worker to
// replace the current one.
this._removeWorker(workerInfo);

if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) {
this._ensureMinimumWorkers();
} else {
// Do not start new workers over and over if they already fail during
// bootstrap, there's no point.
this.workerFailsDuringBootstrap = true;
}
this._onError(worker, workerInfo, err, false);
});

if (taskInfos.length > 0) {
for (const taskInfo of taskInfos) {
taskInfo.done(err, null);
}
} else {
this.publicInterface.emit('error', err);
}
worker.on('exit', (exitCode : number) => {
const err = new Error(`worker exited with code: ${exitCode}`);
// Only error unfinished tasks on process exit, since there are legitimate
// reasons to exit workers and we want to handle that gracefully when possible.
this._onError(worker, workerInfo, err, true);
});

worker.unref();
Expand All @@ -675,6 +656,37 @@ class ThreadPool {
this.workers.add(workerInfo);
}

_onError (worker: Worker, workerInfo: WorkerInfo, err: Error, onlyErrorUnfinishedTasks: boolean) {
// Work around the bug in https://github.com/nodejs/node/pull/33394
worker.ref = () => {};

const taskInfos = [...workerInfo.taskInfos.values()];
workerInfo.taskInfos.clear();

// Remove the worker from the list and potentially start a new Worker to
// replace the current one.
this._removeWorker(workerInfo);

if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) {
this._ensureMinimumWorkers();
} else {
// Do not start new workers over and over if they already fail during
// bootstrap, there's no point.
this.workerFailsDuringBootstrap = true;
}

if (taskInfos.length > 0) {
// If there are remaining unfinished tasks, call the callback that was
// passed to `postTask` with the error
for (const taskInfo of taskInfos) {
taskInfo.done(err, null);
}
} else if (!onlyErrorUnfinishedTasks) {
// If there are no unfinished tasks, instead emit an 'error' event
this.publicInterface.emit('error', err);
}
}

_processPendingMessages () {
if (this.inProcessPendingMessages || !this.options.useAtomics) {
return;
Expand Down
6 changes: 3 additions & 3 deletions test/task-queue.ts
Expand Up @@ -51,7 +51,7 @@ test('will put items into a task queue until they can run', async ({ equal }) =>

test('will reject items over task queue limit', async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.ts'),
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 0,
maxThreads: 1,
maxQueue: 2
Expand All @@ -78,7 +78,7 @@ test('will reject items over task queue limit', async ({ equal, rejects }) => {

test('will reject items when task queue is unavailable', async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.ts'),
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 0,
maxThreads: 1,
maxQueue: 0
Expand All @@ -97,7 +97,7 @@ test('will reject items when task queue is unavailable', async ({ equal, rejects

test('will reject items when task queue is unavailable (fixed thread count)', async ({ equal, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.ts'),
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 1,
maxThreads: 1,
maxQueue: 0
Expand Down
42 changes: 41 additions & 1 deletion test/test-uncaught-exception-from-handler.ts
Expand Up @@ -41,6 +41,46 @@ test('uncaught exception in immediate after task yields error event', async ({ e
equal(pool.threads.length, 1);
pool.threads[0].ref();

// This is the main aassertion here.
// This is the main assertion here.
equal((await errorEvent)[0].message, 'not_caught');
});

test('exiting process resets worker', async ({ not, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
minThreads: 1
});
const originalThreadId = pool.threads[0].threadId;
await rejects(pool.runTask('process.exit(1);'), /worker exited with code: 1/);
const newThreadId = pool.threads[0].threadId;
not(originalThreadId, newThreadId);
});

test('exiting process in immediate after task errors next task and resets worker', async ({ equal, not, rejects }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval-async.js'),
minThreads: 1
});

const originalThreadId = pool.threads[0].threadId;
const taskResult = await pool.runTask(`
setTimeout(() => { process.exit(1); }, 50);
42
`);
equal(taskResult, 42);

await rejects(pool.runTask(`
'use strict';
const { promisify } = require('util');
const sleep = promisify(setTimeout);
async function _() {
await sleep(1000);
return 42
}
_();
`), /worker exited with code: 1/);
const secondThreadId = pool.threads[0].threadId;

not(originalThreadId, secondThreadId);
});

0 comments on commit 8e6d16e

Please sign in to comment.