Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add needsDrain property #368

Merged
merged 7 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ itself.

A `'drain'` event is emitted whenever the `queueSize` reaches `0`.

### Event: `'needsDrain'`

Similar to [`Piscina#needsDrain`](#property-needsdrain-readonly);
this event is triggered once the total capacity of the pool is exceeded
by number of tasks enequeued that are pending of execution.

### Event: `'message'`

A `'message'` event is emitted whenever a message is received from a worker thread.
Expand Down Expand Up @@ -503,6 +509,14 @@ An Array of the `Worker` instances used by this pool.

The current number of tasks waiting to be assigned to a Worker thread.

### Property: `needsDrain` (readonly)
RafaelGSS marked this conversation as resolved.
Show resolved Hide resolved

Boolean value that specifies whether the capacity of the pool has
been exceeded by the number of tasks submitted.

This property is helpful to make decisions towards creating backpressure
over the number of tasks submitted to the pool.

### Property: `utilization` (readonly)

A point-in-time ratio comparing the approximate total mean run time
Expand Down
23 changes: 21 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
} catch (err) {
// This would mostly happen if e.g. message contains unserializable data
// or transferList is invalid.
taskInfo.done(err);
taskInfo.done(<Error>err);
return;
}

Expand Down Expand Up @@ -521,6 +521,7 @@ class ThreadPool {
completed : number = 0;
runTime : Histogram;
waitTime : Histogram;
needsDrain : boolean;
start : number = performance.now();
inProcessPendingMessages : boolean = false;
startingUp : boolean = false;
Expand Down Expand Up @@ -557,6 +558,7 @@ class ThreadPool {
this.startingUp = true;
this._ensureMinimumWorkers();
this.startingUp = false;
this.needsDrain = false;
}

_ensureMinimumWorkers () : void {
Expand Down Expand Up @@ -782,6 +784,8 @@ class ThreadPool {
} else {
resolve(result);
}

this._maybeDrain();
},
signal,
this.publicInterface.asyncResource.asyncId());
Expand Down Expand Up @@ -827,6 +831,7 @@ class ThreadPool {
this.taskQueue.push(taskInfo);
}

this._maybeDrain();
return ret;
}

Expand Down Expand Up @@ -856,6 +861,7 @@ class ThreadPool {
this.taskQueue.push(taskInfo);
}

this._maybeDrain();
return ret;
}

Expand All @@ -874,9 +880,18 @@ class ThreadPool {
}

_maybeDrain () {
if (this.taskQueue.size === 0 && this.skipQueue.length === 0) {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
const totalQueueSize = this.taskQueue.size + this.skipQueue.length;

if (totalQueueSize === 0) {
this.needsDrain = false;
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved
this.publicInterface.emit('drain');
}

if (totalQueueSize >= totalCapacity) {
this.needsDrain = true;
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved
this.publicInterface.emit('needsDrain');
}
}

async destroy () {
Expand Down Expand Up @@ -1107,6 +1122,10 @@ class Piscina extends EventEmitterAsyncResource {
return performance.now() - this.#pool.start;
}

get needsDrain () : boolean {
return this.#pool.needsDrain;
}

static get isWorkerThread () : boolean {
return commonState.isWorkerThread;
}
Expand Down
33 changes: 31 additions & 2 deletions test/post-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,48 @@ test('postTask() validates abortSignal', async ({ rejects }) => {
/signal argument must be an object/);
});

test('Piscina emits drain', async ({ ok }) => {
test('Piscina emits drain', async ({ ok, notOk }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js')
});

let drained = false;
let needsDrain = true;
pool.on('drain', () => {
drained = true;
needsDrain = pool.needsDrain;
});

await Promise.all([pool.runTask('123'), pool.runTask('123')]);
await Promise.all([pool.run('123'), pool.run('123')]);

ok(drained);
notOk(needsDrain);
});

test('Piscina exposes/emits needsDrain to true when capacity is exceeded', async ({ ok }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js'),
maxQueue: 3,
maxThreads: 1
});

let triggered = false;
let drained = false;
pool.once('drain', () => {
drained = true;
});
pool.once('needsDrain', () => {
triggered = true;
});

pool.run('123');
RafaelGSS marked this conversation as resolved.
Show resolved Hide resolved
pool.run('123');
pool.run('123');
pool.run('123');

ok(pool.needsDrain);
ok(triggered);
ok(drained);
});

test('Piscina can use async loaded workers', async ({ equal }) => {
Expand Down