diff --git a/README.md b/README.md index 83dc022..9ce8c98 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,7 @@ We have a similar API to Piscina, so for more information, you can read Piscina' #### Pool methods - `cancelPendingTasks()`: Gracefully cancels all pending tasks without stopping or interfering with on-going tasks. This method is useful when your tasks may have side effects and should not be terminated forcefully during task execution. If your tasks don't have any side effects you may want to use [`{ signal }`](https://github.com/piscinajs/piscina#cancelable-tasks) option for forcefully terminating all tasks, including the on-going ones, instead. +- `recycleWorkers()`: Waits for all current tasks to finish and re-creates all workers. Can be used to force isolation imperatively even when `isolateWorkers` is disabled. #### Exports diff --git a/src/index.ts b/src/index.ts index 6a0f62d..631adc5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -446,6 +446,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource { lastSeenResponseCount: number = 0 usedMemory?: number onMessage: ResponseCallback + shouldRecycle?: boolean constructor( worker: Worker, @@ -995,17 +996,27 @@ class ThreadPool { } shouldRecycleWorker(taskInfo?: TaskInfo): boolean { + // Worker could be set to recycle by pool's imperative methods + if (taskInfo?.workerInfo?.shouldRecycle) { + return true + } + // When `isolateWorkers` is enabled, remove the worker after task is finished - const isWorkerIsolated = this.options.isolateWorkers && taskInfo?.workerInfo + if (this.options.isolateWorkers && taskInfo?.workerInfo) { + return true + } // When `maxMemoryLimitBeforeRecycle` is enabled, remove workers that have exceeded the memory limit - const isWorkersMemoryLimitReached = + if ( !this.options.isolateWorkers && this.options.maxMemoryLimitBeforeRecycle !== undefined && (taskInfo?.workerInfo?.usedMemory || 0) > this.options.maxMemoryLimitBeforeRecycle + ) { + return true + } - return Boolean(isWorkerIsolated || isWorkersMemoryLimitReached) + return false } pendingCapacity(): number { @@ -1041,6 +1052,33 @@ class ThreadPool { await Promise.all(exitEvents) } + + async recycleWorkers() { + // Worker's are automatically recycled when isolateWorkers is enabled + if (this.options.isolateWorkers) { + return + } + + const exitEvents: Promise[] = [] + + Array.from(this.workers).filter((workerInfo) => { + // Remove idle workers + if (workerInfo.currentUsage() === 0) { + exitEvents.push(once(workerInfo!.worker, 'exit')) + this._removeWorker(workerInfo!) + } + // Mark on-going workers for recycling. + // Note that we don't need to wait for these ones to finish + // as pool.shouldRecycleWorker will do it once task has finished + else { + workerInfo.shouldRecycle = true + } + }) + + await Promise.all(exitEvents) + + this._ensureMinimumWorkers() + } } class Tinypool extends EventEmitterAsyncResource { @@ -1116,6 +1154,10 @@ class Tinypool extends EventEmitterAsyncResource { pool.taskQueue.cancel() } + async recycleWorkers() { + await this.#pool.recycleWorkers() + } + get completed(): number { return this.#pool.completed } diff --git a/test/isolation.test.ts b/test/isolation.test.ts new file mode 100644 index 0000000..d61fac0 --- /dev/null +++ b/test/isolation.test.ts @@ -0,0 +1,81 @@ +import { dirname, resolve } from 'path' +import { Tinypool } from 'tinypool' +import { fileURLToPath } from 'url' + +const __dirname = dirname(fileURLToPath(import.meta.url)) + +test('idle workers can be recycled', async () => { + const pool = new Tinypool({ + filename: resolve(__dirname, 'fixtures/sleep.js'), + minThreads: 4, + maxThreads: 4, + isolateWorkers: false, + }) + + function getThreadIds() { + return pool.threads.map((thread) => thread!.threadId).sort((a, b) => a - b) + } + + expect(pool.threads).toHaveLength(4) + const initialThreadIds = getThreadIds() + + await Promise.all(times(4)(() => pool.run({}))) + expect(getThreadIds()).toStrictEqual(initialThreadIds) + + await pool.recycleWorkers() + expect(pool.threads).toHaveLength(4) + + const newThreadIds = getThreadIds() + initialThreadIds.forEach((id) => expect(newThreadIds).not.toContain(id)) + + await Promise.all(times(4)(() => pool.run({}))) + initialThreadIds.forEach((id) => expect(newThreadIds).not.toContain(id)) + expect(getThreadIds()).toStrictEqual(newThreadIds) +}) + +test('running workers can recycle after task execution finishes', async () => { + const pool = new Tinypool({ + filename: resolve(__dirname, 'fixtures/sleep.js'), + minThreads: 4, + maxThreads: 4, + isolateWorkers: false, + }) + + function getThreadIds() { + return pool.threads.map((thread) => thread!.threadId).sort((a, b) => a - b) + } + + expect(pool.threads).toHaveLength(4) + const initialThreadIds = getThreadIds() + + const tasks = [ + ...times(2)(() => pool.run({ time: 1 })), + ...times(2)(() => pool.run({ time: 2000 })), + ] + + // Wait for first two tasks to finish + await Promise.all(tasks.slice(0, 2)) + + await pool.recycleWorkers() + const threadIds = getThreadIds() + + // Idle workers should have been recycled immediately + expect(threadIds).not.toContain(initialThreadIds[0]) + expect(threadIds).not.toContain(initialThreadIds[1]) + + // Running workers should not have recycled yet + expect(threadIds).toContain(initialThreadIds[2]) + expect(threadIds).toContain(initialThreadIds[3]) + + await Promise.all(tasks) + + // All workers should have recycled now + const newThreadIds = getThreadIds() + initialThreadIds.forEach((id) => expect(newThreadIds).not.toContain(id)) +}) + +function times(count: number) { + return function run(fn: () => T): T[] { + return Array(count).fill(0).map(fn) + } +}