diff --git a/src/index.ts b/src/index.ts index 4afd1c81..4a8c98ed 100644 --- a/src/index.ts +++ b/src/index.ts @@ -635,33 +635,14 @@ class ThreadPool { }); worker.on('error', (err : Error) => { - // Work around the bug in https://github.com/nodejs/node/pull/33394 - worker.ref = () => {}; - - // In case of an uncaught exception: Call the callback that was passed to - // `postTask` with the error, or emit an 'error' event if there is none. - const taskInfos = [...workerInfo.taskInfos.values()]; - workerInfo.taskInfos.clear(); - - // Remove the worker from the list and potentially start a new Worker to - // replace the current one. - this._removeWorker(workerInfo); - - if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) { - this._ensureMinimumWorkers(); - } else { - // Do not start new workers over and over if they already fail during - // bootstrap, there's no point. - this.workerFailsDuringBootstrap = true; - } + this._onError(worker, workerInfo, err, false); + }); - if (taskInfos.length > 0) { - for (const taskInfo of taskInfos) { - taskInfo.done(err, null); - } - } else { - this.publicInterface.emit('error', err); - } + worker.on('exit', (exitCode : number) => { + const err = new Error(`worker exited with code: ${exitCode}`); + // Only error unfinished tasks on process exit, since there are legitimate + // reasons to exit workers and we want to handle that gracefully when possible. + this._onError(worker, workerInfo, err, true); }); worker.unref(); @@ -675,6 +656,37 @@ class ThreadPool { this.workers.add(workerInfo); } + _onError (worker: Worker, workerInfo: WorkerInfo, err: Error, onlyErrorUnfinishedTasks: boolean) { + // Work around the bug in https://github.com/nodejs/node/pull/33394 + worker.ref = () => {}; + + const taskInfos = [...workerInfo.taskInfos.values()]; + workerInfo.taskInfos.clear(); + + // Remove the worker from the list and potentially start a new Worker to + // replace the current one. + this._removeWorker(workerInfo); + + if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) { + this._ensureMinimumWorkers(); + } else { + // Do not start new workers over and over if they already fail during + // bootstrap, there's no point. + this.workerFailsDuringBootstrap = true; + } + + if (taskInfos.length > 0) { + // If there are remaining unfinished tasks, call the callback that was + // passed to `postTask` with the error + for (const taskInfo of taskInfos) { + taskInfo.done(err, null); + } + } else if (!onlyErrorUnfinishedTasks) { + // If there are no unfinished tasks, instead emit an 'error' event + this.publicInterface.emit('error', err); + } + } + _processPendingMessages () { if (this.inProcessPendingMessages || !this.options.useAtomics) { return; diff --git a/test/task-queue.ts b/test/task-queue.ts index 46b10040..5bf53785 100644 --- a/test/task-queue.ts +++ b/test/task-queue.ts @@ -51,7 +51,7 @@ test('will put items into a task queue until they can run', async ({ equal }) => test('will reject items over task queue limit', async ({ equal, rejects }) => { const pool = new Piscina({ - filename: resolve(__dirname, 'fixtures/eval.ts'), + filename: resolve(__dirname, 'fixtures/eval.js'), minThreads: 0, maxThreads: 1, maxQueue: 2 @@ -78,7 +78,7 @@ test('will reject items over task queue limit', async ({ equal, rejects }) => { test('will reject items when task queue is unavailable', async ({ equal, rejects }) => { const pool = new Piscina({ - filename: resolve(__dirname, 'fixtures/eval.ts'), + filename: resolve(__dirname, 'fixtures/eval.js'), minThreads: 0, maxThreads: 1, maxQueue: 0 @@ -97,7 +97,7 @@ test('will reject items when task queue is unavailable', async ({ equal, rejects test('will reject items when task queue is unavailable (fixed thread count)', async ({ equal, rejects }) => { const pool = new Piscina({ - filename: resolve(__dirname, 'fixtures/eval.ts'), + filename: resolve(__dirname, 'fixtures/eval.js'), minThreads: 1, maxThreads: 1, maxQueue: 0 diff --git a/test/test-uncaught-exception-from-handler.ts b/test/test-uncaught-exception-from-handler.ts index 67994846..5ba93008 100644 --- a/test/test-uncaught-exception-from-handler.ts +++ b/test/test-uncaught-exception-from-handler.ts @@ -41,6 +41,46 @@ test('uncaught exception in immediate after task yields error event', async ({ e equal(pool.threads.length, 1); pool.threads[0].ref(); - // This is the main aassertion here. + // This is the main assertion here. equal((await errorEvent)[0].message, 'not_caught'); }); + +test('exiting process resets worker', async ({ not, rejects }) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/eval.js'), + minThreads: 1 + }); + const originalThreadId = pool.threads[0].threadId; + await rejects(pool.runTask('process.exit(1);'), /worker exited with code: 1/); + const newThreadId = pool.threads[0].threadId; + not(originalThreadId, newThreadId); +}); + +test('exiting process in immediate after task errors next task and resets worker', async ({ equal, not, rejects }) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/eval-async.js'), + minThreads: 1 + }); + + const originalThreadId = pool.threads[0].threadId; + const taskResult = await pool.runTask(` + setTimeout(() => { process.exit(1); }, 50); + 42 + `); + equal(taskResult, 42); + + await rejects(pool.runTask(` + 'use strict'; + + const { promisify } = require('util'); + const sleep = promisify(setTimeout); + async function _() { + await sleep(1000); + return 42 + } + _(); + `), /worker exited with code: 1/); + const secondThreadId = pool.threads[0].threadId; + + not(originalThreadId, secondThreadId); +});