feat: add task function worker node affinity#126
Conversation
Port task function worker node affinity from poolifier/poolifier PR #2269. Allows restricting task execution to specific worker nodes by their indices via a workerNodeKeys property on task function objects.
There was a problem hiding this comment.
Pull request overview
This PR adds per-task-function worker node affinity to poolifier, allowing task functions to restrict execution to specific worker node indices via workerNodeKeys, including dynamic worker provisioning in dynamic pools when affinity targets exceed current pool size.
Changes:
- Add
workerNodeKeysto task function properties/types and propagate it through worker registration and pool execution. - Update all worker-choice strategies to accept an affinity set and filter eligible workers accordingly.
- Add validation + extensive tests + documentation updates for worker node affinity behavior.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs | Updates worker task function objects to include workerNodeKeys for affinity scenarios. |
| tests/worker-files/thread/testMultipleTaskFunctionsWorker.mjs | Converts simple task functions to objects to test affinity + priority together. |
| tests/pools/utils.test.mjs | Adds unit tests for checkValidWorkerNodeKeys() validation and error messages. |
| tests/pools/selection-strategies/worker-choice-strategies-context.test.mjs | Verifies workerNodeKeysSet is passed through strategy execution and retry behavior remains correct. |
| tests/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.test.mjs | Adds affinity-related choose() tests across multiple strategies. |
| tests/pools/abstract-pool.test.mjs | Adds integration coverage for registering affinity task functions, enforcing validation, and dynamic worker creation for affinity. |
| src/worker/utils.ts | Extends task function object validation to validate workerNodeKeys. |
| src/worker/task-functions.ts | Adds workerNodeKeys to TaskFunctionObject with documentation describing affinity. |
| src/worker/abstract-worker.ts | Propagates workerNodeKeys from task function operation messages to worker-side task function registration. |
| src/utils.ts | Includes workerNodeKeys in buildTaskFunctionProperties() serialization path. |
| src/utility-types.ts | Extends TaskFunctionProperties to include workerNodeKeys. |
| src/pools/utils.ts | Introduces checkValidWorkerNodeKeys() for validation (type, emptiness, duplicates, range vs max size). |
| src/pools/selection-strategies/worker-choice-strategies-context.ts | Plumbs workerNodeKeysSet through execute() into choose() across retries. |
| src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts | Adds affinity filtering and handling for empty/single/multi-key sets. |
| src/pools/selection-strategies/selection-strategies-types.ts | Updates IWorkerChoiceStrategy.choose() signature to accept an optional affinity set. |
| src/pools/selection-strategies/round-robin-worker-choice-strategy.ts | Adds affinity-aware round robin selection behavior. |
| src/pools/selection-strategies/least-used-worker-choice-strategy.ts | Filters eligible workers by affinity set and supports empty/single-key optimization. |
| src/pools/selection-strategies/least-elu-worker-choice-strategy.ts | Filters eligible workers by affinity set. |
| src/pools/selection-strategies/least-busy-worker-choice-strategy.ts | Filters eligible workers by affinity set and supports empty/single-key optimization. |
| src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts | Filters eligible workers by affinity set and adds empty/single-key fast paths. |
| src/pools/selection-strategies/fair-share-worker-choice-strategy.ts | Filters eligible workers by affinity set and supports empty/single-key optimization. |
| src/pools/selection-strategies/abstract-worker-choice-strategy.ts | Adds shared helpers for affinity-aware selection (eligibility, single-key selection, RR next key helper). |
| src/pools/abstract-pool.ts | Validates affinity on add, derives affinity set for execution, provisions dynamic workers up to affinity targets, and passes affinity into strategy selection. |
| docs/api.md | Documents workerNodeKeys in APIs and explains worker node affinity rules/behavior. |
| README.md | Adds worker node affinity to the feature list. |
| * @remarks `null` is not accepted here. Use `null` only via | ||
| * {@link TaskFunctionProperties.workerNodeKeys} to clear affinity at runtime. |
There was a problem hiding this comment.
The JSDoc for workerNodeKeys says null can be used via TaskFunctionProperties.workerNodeKeys to clear affinity at runtime, but TaskFunctionProperties.workerNodeKeys is currently typed as number[] | undefined (no null) and the message/build helpers only propagate the field when it is non-null. Either remove the null-clearing remark or implement/reflect nullable support end-to-end (type + serialization + worker handling).
| * @remarks `null` is not accepted here. Use `null` only via | |
| * {@link TaskFunctionProperties.workerNodeKeys} to clear affinity at runtime. |
| /** | ||
| * Task function worker node keys on which to run. | ||
| */ | ||
| readonly workerNodeKeys?: number[] |
There was a problem hiding this comment.
PR description mentions TaskFunctionProperties.workerNodeKeys is nullable to allow clearing affinity at runtime, but the exported TaskFunctionProperties type only allows number[] | undefined. If runtime clearing is a supported feature, this should be number[] | null | undefined (and the rest of the pipeline should preserve null); otherwise the PR description/docs should be adjusted to avoid advertising an unsupported API.
| const workerNodeKeysSet = this.getTaskFunctionWorkerNodeKeysSet(name) | ||
| if (workerNodeKeysSet != null) { | ||
| const maxPoolSize = this.maximumNumberOfWorkers ?? | ||
| this.minimumNumberOfWorkers | ||
| const targetSize = max(...workerNodeKeysSet) + 1 | ||
| while ( | ||
| this.started && | ||
| !this.destroying && | ||
| this.workerNodes.length < targetSize && | ||
| this.workerNodes.length < maxPoolSize | ||
| ) { |
There was a problem hiding this comment.
chooseWorkerNode() assumes workerNodeKeysSet contains only valid indices. Runtime-registered task functions are validated, but task functions coming from worker startup messages are not range-validated against the pool’s max size (worker-side validation can’t know it), so an out-of-range affinity (e.g. [999]) would cause repeated choose() retries and then a generic failure. Consider validating/clamping the affinity set against maxPoolSize here (or when ingesting taskFunctionsProperties) and throwing a clear RangeError when keys are outside 0..maxPoolSize-1.
… and align workerNodeKeys JSDoc with upstream
Summary
workerNodeKeysproperty on task function objectsChanges
Core (
src/pools/):workerNodeKeysvalidation,workerNodeKeysSetconstruction and passing through strategy context, dynamic worker provisioning for affinity keysStrategies (
src/pools/selection-strategies/): All 7 strategies updated to acceptworkerNodeKeysSetparameter and filter eligible workers accordingly, with early-return optimizations for empty/single-key setsWorker (
src/worker/):workerNodeKeyspropagation in task function registration, listing, and CRUD operationsTypes:
TaskFunctionProperties.workerNodeKeys(nullable for runtime clearing),TaskFunctionObject.workerNodeKeys,MessageValue.taskFunctionPropertiesTests: 25 files changed, comprehensive coverage for affinity execution, dynamic worker creation, strategy filtering, and validation