Skip to content

Commit

Permalink
feat: add needsDrain property (#368)
Browse files Browse the repository at this point in the history
Co-authored-by: Rafael Gonzaga <rafael.nunu@hotmail.com>
Co-authored-by: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
3 people committed Jul 13, 2023
1 parent a5b4fa5 commit 2d49b63
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 4 deletions.
14 changes: 14 additions & 0 deletions README.md
Expand Up @@ -436,6 +436,12 @@ itself.

A `'drain'` event is emitted whenever the `queueSize` reaches `0`.

### Event: `'needsDrain'`

Similar to [`Piscina#needsDrain`](#property-needsdrain-readonly);
this event is triggered once the total capacity of the pool is exceeded
by number of tasks enequeued that are pending of execution.

### Event: `'message'`

A `'message'` event is emitted whenever a message is received from a worker thread.
Expand Down Expand Up @@ -503,6 +509,14 @@ An Array of the `Worker` instances used by this pool.

The current number of tasks waiting to be assigned to a Worker thread.

### Property: `needsDrain` (readonly)

Boolean value that specifies whether the capacity of the pool has
been exceeded by the number of tasks submitted.

This property is helpful to make decisions towards creating backpressure
over the number of tasks submitted to the pool.

### Property: `utilization` (readonly)

A point-in-time ratio comparing the approximate total mean run time
Expand Down
23 changes: 21 additions & 2 deletions src/index.ts
Expand Up @@ -466,7 +466,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
} catch (err) {
// This would mostly happen if e.g. message contains unserializable data
// or transferList is invalid.
taskInfo.done(err);
taskInfo.done(<Error>err);
return;
}

Expand Down Expand Up @@ -521,6 +521,7 @@ class ThreadPool {
completed : number = 0;
runTime : Histogram;
waitTime : Histogram;
needsDrain : boolean;
start : number = performance.now();
inProcessPendingMessages : boolean = false;
startingUp : boolean = false;
Expand Down Expand Up @@ -557,6 +558,7 @@ class ThreadPool {
this.startingUp = true;
this._ensureMinimumWorkers();
this.startingUp = false;
this.needsDrain = false;
}

_ensureMinimumWorkers () : void {
Expand Down Expand Up @@ -782,6 +784,8 @@ class ThreadPool {
} else {
resolve(result);
}

this._maybeDrain();
},
signal,
this.publicInterface.asyncResource.asyncId());
Expand Down Expand Up @@ -827,6 +831,7 @@ class ThreadPool {
this.taskQueue.push(taskInfo);
}

this._maybeDrain();
return ret;
}

Expand Down Expand Up @@ -856,6 +861,7 @@ class ThreadPool {
this.taskQueue.push(taskInfo);
}

this._maybeDrain();
return ret;
}

Expand All @@ -874,9 +880,18 @@ class ThreadPool {
}

_maybeDrain () {
if (this.taskQueue.size === 0 && this.skipQueue.length === 0) {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
const totalQueueSize = this.taskQueue.size + this.skipQueue.length;

if (totalQueueSize === 0) {
this.needsDrain = false;
this.publicInterface.emit('drain');
}

if (totalQueueSize >= totalCapacity) {
this.needsDrain = true;
this.publicInterface.emit('needsDrain');
}
}

async destroy () {
Expand Down Expand Up @@ -1107,6 +1122,10 @@ class Piscina extends EventEmitterAsyncResource {
return performance.now() - this.#pool.start;
}

get needsDrain () : boolean {
return this.#pool.needsDrain;
}

static get isWorkerThread () : boolean {
return commonState.isWorkerThread;
}
Expand Down
33 changes: 31 additions & 2 deletions test/post-task.ts
Expand Up @@ -96,19 +96,48 @@ test('postTask() validates abortSignal', async ({ rejects }) => {
/signal argument must be an object/);
});

test('Piscina emits drain', async ({ ok }) => {
test('Piscina emits drain', async ({ ok, notOk }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js')
});

let drained = false;
let needsDrain = true;
pool.on('drain', () => {
drained = true;
needsDrain = pool.needsDrain;
});

await Promise.all([pool.runTask('123'), pool.runTask('123')]);
await Promise.all([pool.run('123'), pool.run('123')]);

ok(drained);
notOk(needsDrain);
});

test('Piscina exposes/emits needsDrain to true when capacity is exceeded', async ({ ok }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
maxQueue: 3,
maxThreads: 1
});

let triggered = false;
let drained = false;
pool.once('drain', () => {
drained = true;
});
pool.once('needsDrain', () => {
triggered = true;
});

pool.run('123');
pool.run('123');
pool.run('123');
pool.run('123');

ok(pool.needsDrain);
ok(triggered);
ok(drained);
});

test('Piscina can use async loaded workers', async ({ equal }) => {
Expand Down

0 comments on commit 2d49b63

Please sign in to comment.