-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
pool.ts
370 lines (361 loc) · 10.8 KB
/
pool.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
import type { ClusterSettings } from 'node:cluster'
import type { EventEmitterAsyncResource } from 'node:events'
import type { TransferListItem, WorkerOptions } from 'node:worker_threads'
import type { TaskFunction } from '../worker/task-functions.js'
import type {
WorkerChoiceStrategy,
WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types.js'
import type {
ErrorHandler,
ExitHandler,
IWorker,
IWorkerNode,
MessageHandler,
OnlineHandler,
WorkerType
} from './worker.js'
/**
* Enumeration of pool types.
*/
export const PoolTypes: Readonly<{
fixed: 'fixed'
dynamic: 'dynamic'
}> = Object.freeze({
/**
* Fixed pool type.
*/
fixed: 'fixed',
/**
* Dynamic pool type.
*/
dynamic: 'dynamic'
} as const)
/**
* Pool type.
*/
export type PoolType = keyof typeof PoolTypes
/**
* Enumeration of pool events.
*/
export const PoolEvents: Readonly<{
ready: 'ready'
busy: 'busy'
full: 'full'
empty: 'empty'
destroy: 'destroy'
error: 'error'
taskError: 'taskError'
backPressure: 'backPressure'
}> = Object.freeze({
ready: 'ready',
busy: 'busy',
full: 'full',
empty: 'empty',
destroy: 'destroy',
error: 'error',
taskError: 'taskError',
backPressure: 'backPressure'
} as const)
/**
* Pool event.
*/
export type PoolEvent = keyof typeof PoolEvents
/**
* Pool information.
*/
export interface PoolInfo {
readonly version: string
readonly type: PoolType
readonly worker: WorkerType
readonly started: boolean
readonly ready: boolean
readonly strategy: WorkerChoiceStrategy
readonly strategyRetries: number
readonly minSize: number
readonly maxSize: number
/** Pool utilization. */
readonly utilization?: number
/** Pool total worker nodes. */
readonly workerNodes: number
/** Pool stealing worker nodes. */
readonly stealingWorkerNodes?: number
/** Pool idle worker nodes. */
readonly idleWorkerNodes: number
/** Pool busy worker nodes. */
readonly busyWorkerNodes: number
readonly executedTasks: number
readonly executingTasks: number
readonly queuedTasks?: number
readonly maxQueuedTasks?: number
readonly backPressure?: boolean
readonly stolenTasks?: number
readonly failedTasks: number
readonly runTime?: {
readonly minimum: number
readonly maximum: number
readonly average?: number
readonly median?: number
}
readonly waitTime?: {
readonly minimum: number
readonly maximum: number
readonly average?: number
readonly median?: number
}
}
/**
* Worker node tasks queue options.
*/
export interface TasksQueueOptions {
/**
* Maximum tasks queue size per worker node flagging it as back pressured.
*
* @defaultValue (pool maximum size)^2
*/
readonly size?: number
/**
* Maximum number of tasks that can be executed concurrently on a worker node.
*
* @defaultValue 1
*/
readonly concurrency?: number
/**
* Whether to enable task stealing on idle.
*
* @defaultValue true
*/
readonly taskStealing?: boolean
/**
* Whether to enable tasks stealing under back pressure.
*
* @defaultValue true
*/
readonly tasksStealingOnBackPressure?: boolean
/**
* Queued tasks finished timeout in milliseconds at worker node termination.
*
* @defaultValue 2000
*/
readonly tasksFinishedTimeout?: number
}
/**
* Options for a poolifier pool.
*
* @typeParam Worker - Type of worker.
*/
export interface PoolOptions<Worker extends IWorker> {
/**
* A function that will listen for online event on each worker.
*
* @defaultValue `() => {}`
*/
onlineHandler?: OnlineHandler<Worker>
/**
* A function that will listen for message event on each worker.
*
* @defaultValue `() => {}`
*/
messageHandler?: MessageHandler<Worker>
/**
* A function that will listen for error event on each worker.
*
* @defaultValue `() => {}`
*/
errorHandler?: ErrorHandler<Worker>
/**
* A function that will listen for exit event on each worker.
*
* @defaultValue `() => {}`
*/
exitHandler?: ExitHandler<Worker>
/**
* Whether to start the minimum number of workers at pool initialization.
*
* @defaultValue true
*/
startWorkers?: boolean
/**
* The worker choice strategy to use in this pool.
*
* @defaultValue WorkerChoiceStrategies.ROUND_ROBIN
*/
workerChoiceStrategy?: WorkerChoiceStrategy
/**
* The worker choice strategy options.
*/
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
/**
* Restart worker on error.
*/
restartWorkerOnError?: boolean
/**
* Pool events integrated with async resource emission.
*
* @defaultValue true
*/
enableEvents?: boolean
/**
* Pool worker node tasks queue.
*
* @defaultValue false
*/
enableTasksQueue?: boolean
/**
* Pool worker node tasks queue options.
*/
tasksQueueOptions?: TasksQueueOptions
/**
* Worker options.
*
* @see https://nodejs.org/api/worker_threads.html#new-workerfilename-options
*/
workerOptions?: WorkerOptions
/**
* Key/value pairs to add to worker process environment.
*
* @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
*/
env?: Record<string, unknown>
/**
* Cluster settings.
*
* @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
*/
settings?: ClusterSettings
}
/**
* Contract definition for a poolifier pool.
*
* @typeParam Worker - Type of worker which manages this pool.
* @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
* @typeParam Response - Type of execution response. This can only be structured-cloneable data.
*/
export interface IPool<
Worker extends IWorker,
Data = unknown,
Response = unknown
> {
/**
* Pool information.
*/
readonly info: PoolInfo
/**
* Pool worker nodes.
*
* @internal
*/
readonly workerNodes: Array<IWorkerNode<Worker, Data>>
/**
* Pool event emitter integrated with async resource.
* The async tracking tooling identifier is `poolifier:<PoolType>-<WorkerType>-pool`.
*
* Events that can currently be listened to:
*
* - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. If the pool is dynamic with a minimum number of workers is set to zero, this event is emitted when at least one dynamic worker is ready.
* - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota.
* - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
* - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected.
* - `'destroy'`: Emitted when the pool is destroyed.
* - `'error'`: Emitted when an uncaught error occurs.
* - `'taskError'`: Emitted when an error occurs while executing a task.
* - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= maximum queue size).
*/
readonly emitter?: EventEmitterAsyncResource
/**
* Executes the specified function in the worker constructor with the task data input parameter.
*
* @param data - The optional task input data for the specified task function. This can only be structured-cloneable data.
* @param name - The optional name of the task function to execute. If not specified, the default task function will be executed.
* @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards.
* @returns Promise that will be fulfilled when the task is completed.
*/
readonly execute: (
data?: Data,
name?: string,
transferList?: TransferListItem[]
) => Promise<Response>
/**
* Starts the minimum number of workers in this pool.
*/
readonly start: () => void
/**
* Terminates all workers in this pool.
*/
readonly destroy: () => Promise<void>
/**
* Whether the specified task function exists in this pool.
*
* @param name - The name of the task function.
* @returns `true` if the task function exists, `false` otherwise.
*/
readonly hasTaskFunction: (name: string) => boolean
/**
* Adds a task function to this pool.
* If a task function with the same name already exists, it will be overwritten.
*
* @param name - The name of the task function.
* @param fn - The task function.
* @returns `true` if the task function was added, `false` otherwise.
* @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
* @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `fn` parameter is not a function.
*/
readonly addTaskFunction: (
name: string,
fn: TaskFunction<Data, Response>
) => Promise<boolean>
/**
* Removes a task function from this pool.
*
* @param name - The name of the task function.
* @returns `true` if the task function was removed, `false` otherwise.
*/
readonly removeTaskFunction: (name: string) => Promise<boolean>
/**
* Lists the names of task function available in this pool.
*
* @returns The names of task function available in this pool.
*/
readonly listTaskFunctionNames: () => string[]
/**
* Sets the default task function in this pool.
*
* @param name - The name of the task function.
* @returns `true` if the default task function was set, `false` otherwise.
*/
readonly setDefaultTaskFunction: (name: string) => Promise<boolean>
/**
* Sets the worker choice strategy in this pool.
*
* @param workerChoiceStrategy - The worker choice strategy.
* @param workerChoiceStrategyOptions - The worker choice strategy options.
*/
readonly setWorkerChoiceStrategy: (
workerChoiceStrategy: WorkerChoiceStrategy,
workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
) => void
/**
* Sets the worker choice strategy options in this pool.
*
* @param workerChoiceStrategyOptions - The worker choice strategy options.
*/
readonly setWorkerChoiceStrategyOptions: (
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
) => void
/**
* Enables/disables the worker node tasks queue in this pool.
*
* @param enable - Whether to enable or disable the worker node tasks queue.
* @param tasksQueueOptions - The worker node tasks queue options.
*/
readonly enableTasksQueue: (
enable: boolean,
tasksQueueOptions?: TasksQueueOptions
) => void
/**
* Sets the worker node tasks queue options in this pool.
*
* @param tasksQueueOptions - The worker node tasks queue options.
*/
readonly setTasksQueueOptions: (tasksQueueOptions: TasksQueueOptions) => void
}