fix: reject in-flight task promises when worker crashes#146
Merged
jerome-benoit merged 12 commits intoMay 14, 2026
Conversation
When a worker crashes (onerror event), tasks already dispatched to it had their promises stored in promiseResponseMap but were never settled. The error handler only restarted the worker and redistributed queued tasks — in-flight tasks were silently dropped, causing pool.execute() callers to hang indefinitely. The fix iterates promiseResponseMap on worker error, finds all entries targeting the crashed worker by stable workerId, and rejects them with a descriptive error before termination. Changes: - Replace workerNodeKey (array index) with stable workerId in PromiseResponseWrapper — indices become stale after removeWorkerNode - Add handleWorkerNodeCrash() to unify crash recovery logic - Add rejectInFlightTaskPromises() and rejectRemainingQueuedTaskPromises() - Add updatePromiseResponseWorkerId() for task steal/redistribute tracking - Resolve workerNodeKey dynamically from message.workerId at response time - Gate exit handler worker restart on restartWorkerOnError option - Add crash worker test and regression test for promise rejection Port of poolifier/poolifier#3211
There was a problem hiding this comment.
Pull request overview
This PR ensures pool.execute() callers don’t hang indefinitely when a worker crashes by explicitly rejecting promises for tasks that were in-flight on the crashed worker (and any queued tasks that can’t be redistributed). It also switches crash-time task/promise association from a mutable worker array index to a stable workerId.
Changes:
- Reject in-flight (and remaining unredistributable queued) task promises when a worker hits
onerror. - Track promise ownership via stable
workerId(and update it on task steal/redistribution); resolve worker node key dynamically from responseworkerId. - Add a regression test and a crash-simulating worker to validate promise rejection behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| tests/worker-files/thread/crashWorker.mjs | Adds a worker that crashes via an unhandled exception to reproduce the hanging-promise scenario. |
| tests/pools/thread/fixed.test.mjs | Adds a regression test for crash-time promise rejection and adjusts queue assertions. |
| src/utility-types.ts | Updates PromiseResponseWrapper to store workerId instead of workerNodeKey. |
| src/pools/abstract-pool.ts | Implements crash handling that rejects affected promises, redistributes queued work, and updates promise ownership on task moves. |
- Add crashHandled flag to WorkerInfo to prevent double crash handling
between onerror and exit handlers
- Move rejectRemainingQueuedTaskPromises outside started/destroying guard
(queued promises must always be settled regardless of pool state)
- Handle workerId == null case in rejectInFlightTaskPromises (crash before
worker ID assignment)
- Add { cause } to crash Error constructors for error chain traceability
- Update task statistics (executing/failed) on crash rejection
- Add workerNodeKey === -1 guards in updatePromiseResponseWorkerId and
handleWorkerReadyResponse
- Exit handler: detect unexpected exit via crashHandled flag, condition
restart on restartWorkerOnError or normal exit
- Tighten test bounds for stolen/sequentiallyStolen task counts
Aligns with poolifier/poolifier#3211 latest changes
- Guard getWorkerNodeKeyByWorkerId against undefined workerId to prevent erroneous matching of uninitialized worker nodes - Call updatePromiseResponseWorkerId BEFORE handleTask in redistribute and stealTask for correctness-by-construction (prevents stale workerId if task response arrives synchronously) - Construct crashError once in handleWorkerNodeCrash and pass as param to rejectInFlightTaskPromises/rejectRemainingQueuedTaskPromises (DRY) - Add executing--/failed++ stats in rejectInFlightTaskPromises null-path for consistency with the non-null path - Fix handleWorkerNodeCrash JSDoc to accurately describe behavior
…exit Add flushWorkerNodePromises() method as a catch-all to reject any unsettled promises when a worker node exits without crash handling (e.g., unexpected exit without onerror, or termination during pool destroy). The exit handler now has three distinct blocks: 1. Crash detection: handleWorkerNodeCrash if ready && !crashHandled 2. Promise flush: flushWorkerNodePromises for remaining unsettled promises (guarded by ready && !crashHandled to skip intentional exits) 3. Cleanup: removeWorkerNode + conditional restart Aligns with poolifier/poolifier#3211 latest changes (flushWorkerNodePromises catch-all pattern)
Comment on lines
+2438
to
+2439
| const { resolve, reject } = promiseResponse | ||
| const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) |
Comment on lines
+2456
to
+2458
| if (workerNodeKey !== -1) { | ||
| this.afterTaskExecutionHook(workerNodeKey, message) | ||
| } |
Comment on lines
+1709
to
+1720
| if ( | ||
| workerNodeKey !== -1 && | ||
| !workerNode.info.crashHandled && | ||
| workerNode.info.ready | ||
| ) { | ||
| this.flushWorkerNodePromises( | ||
| workerNode, | ||
| this.destroying | ||
| ? new Error('Worker node terminated during pool destroy') | ||
| : exitError, | ||
| ) | ||
| } |
Comment on lines
+2028
to
+2031
| * Rejects all unsettled promises targeting the given worker node. | ||
| * Used as a catch-all when crash handling was bypassed (e.g., during pool | ||
| * destroy or for non-ready worker exits). Idempotent: already-deleted | ||
| * entries are simply skipped. |
Comment on lines
+1956
to
+1969
| const workerNode = this.workerNodes[workerNodeKey] | ||
| const crashedWorkerId = workerNode.info.id | ||
| if (crashedWorkerId == null) { | ||
| for (const [taskId, promiseResponse] of this.promiseResponseMap) { | ||
| if (promiseResponse.workerId == null) { | ||
| promiseResponse.reject(crashError) | ||
| this.promiseResponseMap.delete(taskId) | ||
| if (workerNode.usage.tasks.executing > 0) { | ||
| --workerNode.usage.tasks.executing | ||
| } | ||
| ++workerNode.usage.tasks.failed | ||
| workerNode.dispatchEvent(new Event('taskFinished')) | ||
| } | ||
| } |
Comment on lines
+1984
to
+1990
| promiseResponse.reject(crashError) | ||
| this.promiseResponseMap.delete(taskId) | ||
| if (workerNode.usage.tasks.executing > 0) { | ||
| --workerNode.usage.tasks.executing | ||
| } | ||
| ++workerNode.usage.tasks.failed | ||
| workerNode.dispatchEvent(new Event('taskFinished')) |
Comment on lines
+2017
to
+2020
| promiseResponse.reject(crashError) | ||
| this.promiseResponseMap.delete(task.taskId) | ||
| ++workerNode.usage.tasks.failed | ||
| workerNode.dispatchEvent(new Event('taskFinished')) |
Comment on lines
+2045
to
+2051
| promiseResponse.reject(error) | ||
| this.promiseResponseMap.delete(taskId) | ||
| if (workerNode.usage.tasks.executing > 0) { | ||
| --workerNode.usage.tasks.executing | ||
| } | ||
| ++workerNode.usage.tasks.failed | ||
| workerNode.dispatchEvent(new Event('taskFinished')) |
…mplify null-path - Extract rejectTaskPromise() helper to eliminate duplicated reject + delete + stats + taskFinished pattern across 3 methods - Add fallback to promiseResponse.workerId in handleTaskExecutionResponse when message.workerId lookup fails (worker already removed) - Simplify rejectInFlightTaskPromises null-ID path to early-return - Keep flush guard as workerNode.info.ready (not this.destroying) since synchronous terminate in web workers causes unhandled rejections during pool.destroy() — destroyWorkerNode already handles graceful shutdown
Comment on lines
1189
to
+1193
| ) | ||
| this.promiseResponseMap.set(task.taskId!, { | ||
| reject, | ||
| resolve, | ||
| workerNodeKey, | ||
| workerId: this.workerNodes[workerNodeKey].info.id, |
The JSDoc was copied from upstream where exitCode !== 0 allows flush during pool destroy. In our web worker adaptation, synchronous terminate causes unhandled rejections if flush runs during destroy, so the guard uses workerNode.info.ready instead. Updated doc to reflect this.
The abort signal handler captured workerNodeKey at task submission time. After steal/redistribute, the stale index dispatches abortTask to the wrong worker node. Resolve the current worker dynamically from the stored workerId (kept up-to-date by updatePromiseResponseWorkerId).
Comment on lines
+1736
to
+1737
| (this.opts.restartWorkerOnError === true || | ||
| !workerNode.info.crashHandled) |
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


Summary
When a worker crashes, reject all associated task promises (in-flight and queued) so
pool.execute()callers never hang indefinitely.Port of poolifier/poolifier#3211 adapted for the web worker context.
Problem
When a worker crashes (
onerrorevent), tasks already dispatched to it had their promises stored inpromiseResponseMapbut were never settled. The error handler only:In-flight tasks (already sent to the crashed worker) were silently dropped — their
pool.execute()promises hung forever.Changes
Promise settlement on crash
rejectInFlightTaskPromises(): iteratespromiseResponseMap, finds entries targeting the crashed worker by stableworkerId, and rejects themrejectRemainingQueuedTaskPromises(): rejects queued tasks that could not be redistributed (no eligible destination worker)Architectural improvement
workerNodeKey(array index) with stableworkerIdinPromiseResponseWrapperfor crash-time promise matching — array indices become stale after worker removalworkerNodeKeydynamically fromworkerIdat task response timeupdatePromiseResponseWorkerId()to update promise ownership on task steal/redistributeHardening
undefinedworkerId inrejectInFlightTaskPromisesto prevent erroneous matchingrestartWorkerOnErroroption (previously unconditional)handleTaskExecutionResponseagainst removed worker nodes (workerNodeKey === -1)errorEvent.preventDefault()inonerrorto prevent Deno from propagating handled errorsWeb worker adaptations (vs upstream Node.js PR)
AsyncResource/runInAsyncScopehelpers (not available in browser/Deno/Bun)ErrorEventused instead ofErrorfor event dispatch (standard web API)terminate()instead of async (web worker API)Validation
deno checkpasses (0 type errors)deno lintpasses (0 lint errors)