Skip to content

Commit

Permalink
feat: add needsDrain event
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 committed Jul 3, 2023
1 parent 811c203 commit b2952f3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ itself.

A `'drain'` event is emitted whenever the `queueSize` reaches `0`.

### Event: `'needsDrain'`

Similar to [`Piscina#needsDrain`](#property-needsdrain-readonly);
this event is triggered once the total capacity of the pool is exceeded
by number of tasks enequeued that are pending of execution.

### Event: `'message'`

A `'message'` event is emitted whenever a message is received from a worker thread.
Expand Down
12 changes: 9 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
} catch (err) {
// This would mostly happen if e.g. message contains unserializable data
// or transferList is invalid.
taskInfo.done(err);
taskInfo.done(<Error>err);
return;
}

Expand Down Expand Up @@ -521,8 +521,8 @@ class ThreadPool {
completed : number = 0;
runTime : Histogram;
waitTime : Histogram;
needsDrain : boolean;
start : number = performance.now();
needsDrain : boolean = false;
inProcessPendingMessages : boolean = false;
startingUp : boolean = false;
workerFailsDuringBootstrap : boolean = false;
Expand Down Expand Up @@ -558,6 +558,7 @@ class ThreadPool {
this.startingUp = true;
this._ensureMinimumWorkers();
this.startingUp = false;
this.needsDrain = false;
}

_ensureMinimumWorkers () : void {
Expand Down Expand Up @@ -783,6 +784,8 @@ class ThreadPool {
} else {
resolve(result);
}

this._maybeDrain();
},
signal,
this.publicInterface.asyncResource.asyncId());
Expand Down Expand Up @@ -828,6 +831,7 @@ class ThreadPool {
this.taskQueue.push(taskInfo);
}

this._maybeDrain();
return ret;
}

Expand Down Expand Up @@ -857,6 +861,7 @@ class ThreadPool {
this.taskQueue.push(taskInfo);
}

this._maybeDrain();
return ret;
}

Expand All @@ -883,7 +888,8 @@ class ThreadPool {
this.needsDrain = false;
}

if (totalQueueSize > totalCapacity) {
if (totalQueueSize >= totalCapacity) {
this.publicInterface.emit('needsDrain');
this.needsDrain = true;
}
}
Expand Down
23 changes: 15 additions & 8 deletions test/post-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,28 +108,35 @@ test('Piscina emits drain', async ({ ok, notOk }) => {
needsDrain = pool.needsDrain;
});

await Promise.all([pool.runTask('123'), pool.runTask('123')]);
await Promise.all([pool.run('123'), pool.run('123')]);

ok(drained);
notOk(needsDrain);
});

test('Piscina exposes needsDrain to true when capacity is exceeded', async ({ ok }) => {
test('Piscina exposes/emits needsDrain to true when capacity is exceeded', { only: true }, async ({ ok }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/eval.js')
filename: resolve(__dirname, 'fixtures/eval.js'),
maxQueue: 3,
maxThreads: 1
});

let triggered = false;
let drained = false;
pool.on('drain', () => {
pool.once('drain', () => {
drained = true;
});
pool.once('needsDrain', () => {
triggered = true;
});

const promises = Promise.all([pool.runTask('123'), pool.runTask('123'), pool.runTask('123')]);
pool.run('123');
pool.run('123');
pool.run('123');
pool.run('123');

ok(pool.needsDrain);

await promises;

ok(triggered);
ok(drained);
});

Expand Down

0 comments on commit b2952f3

Please sign in to comment.