Skip to content

Commit

Permalink
Support multiple worker functions per file
Browse files Browse the repository at this point in the history
Also fixes some coverage issues
  • Loading branch information
jasnell committed May 15, 2021
1 parent a58f99a commit d7fa24d
Show file tree
Hide file tree
Showing 13 changed files with 218 additions and 25 deletions.
87 changes: 79 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const piscina = new Piscina({
});

(async function() {
const result = await piscina.runTask({ a: 4, b: 6 });
const result = await piscina.run({ a: 4, b: 6 });
console.log(result); // Prints 10
})();
```
Expand Down Expand Up @@ -75,7 +75,7 @@ const piscina = new Piscina({
filename: new URL('./worker.mjs', import.meta.url).href
});

const result = await piscina.runTask({ a: 4, b: 6 });
const result = await piscina.run({ a: 4, b: 6 });
console.log(result); // Prints 10
```
Expand All @@ -87,6 +87,43 @@ export default ({ a, b }) => {
};
```
### Exporting multiple worker functions
A single worker file may export multiple named handler functions.
```js
'use strict';

function add({ a, b }) { return a + b; }

function multiply({ a, b }) { return a * b; }

add.add = add;
add.multiply = multiply;

module.exports = add;
```
The export to target can then be specified when the task is submitted:
```js
'use strict';

const Piscina = require('piscina');
const { resolve } = require('path');

const piscina = new Piscina({
filename: resolve(__dirname, 'worker.js')
});

(async function() {
const res = await Promise.all([
piscina.run({ a: 4, b: 6 }, { name: 'add' }),
piscina.run({ a: 4, b: 6 }, { name: 'multiply' })
]);
})();
```
### Cancelable Tasks
Submitted tasks may be canceled using either an `AbortController` or
Expand All @@ -106,7 +143,8 @@ const piscina = new Piscina({
(async function() {
const abortController = new AbortController();
try {
const task = piscina.runTask({ a: 4, b: 6 }, abortController.signal);
const { signal } = abortController;
const task = piscina.run({ a: 4, b: 6 }, { signal });
abortController.abort();
await task;
} catch (err) {
Expand Down Expand Up @@ -138,7 +176,7 @@ const piscina = new Piscina({
(async function() {
const ee = new EventEmitter();
try {
const task = piscina.runTask({ a: 4, b: 6 }, ee);
const task = piscina.run({ a: 4, b: 6 }, { signal: ee });
ee.emit('abort');
await task;
} catch (err) {
Expand Down Expand Up @@ -202,7 +240,7 @@ pool.on('drain', () => {

stream
.on('data', (data) => {
pool.runTask(data);
pool.run(data);
if (pool.queueSize === pool.options.maxQueue) {
console.log('pausing...', counter, pool.queueSize);
stream.pause();
Expand Down Expand Up @@ -238,6 +276,9 @@ This class extends [`EventEmitter`][] from Node.js.
absolute `file://` URL to a file that exports a JavaScript `function` or
`async function` as its default export or `module.exports`. [ES modules][]
are supported.
* `name`: (`string | null`) Provides the name of the default exported worker
function. The default is `'default'`, indicating the default export of the
worker module.
* `minThreads`: (`number`) Sets the minimum number of threads that are always
running for this thread pool. The default is based on the number of
available CPUs.
Expand Down Expand Up @@ -300,8 +341,37 @@ This class extends [`EventEmitter`][] from Node.js.
Use caution when setting resource limits. Setting limits that are too low may
result in the `Piscina` worker threads being unusable.

### Method: `run(task[, options])`

Schedules a task to be run on a Worker thread.

* `task`: Any value. This will be passed to the function that is exported from
`filename`.
* `options`:
* `transferList`: An optional lists of objects that is passed to
[`postMessage()`] when posting `task` to the Worker, which are transferred
rather than cloned.
* `filename`: Optionally overrides the `filename` option passed to the
constructor for this task. If no `filename` was specified to the constructor,
this is mandatory.
* `name`: Optionally overrides the exported worker function used for the task.
* `abortSignal`: An [`AbortSignal`][] instance. If passed, this can be used to
cancel a task. If the task is already running, the corresponding `Worker`
thread will be stopped.
(More generally, any `EventEmitter` or `EventTarget` that emits `'abort'`
events can be passed here.) Abortable tasks cannot share threads regardless
of the `concurrentTasksPerWorker` options.

This returns a `Promise` for the return value of the (async) function call
made to the function exported from `filename`. If the (async) function throws
an error, the returned `Promise` will be rejected with that error.
If the task is aborted, the returned `Promise` is rejected with an error
as well.

### Method: `runTask(task[, transferList][, filename][, abortSignal])`

**Deprecated** -- Use `run(task, options)` instead.

Schedules a task to be run on a Worker thread.

* `task`: Any value. This will be passed to the function that is exported from
Expand Down Expand Up @@ -340,7 +410,8 @@ An `'error'` event is emitted by instances of this class when:
- Unexpected messages are sent from from Worker threads.

All other errors are reported by rejecting the `Promise` returned from
`runTask()`, including rejections reported by the handler function itself.
`run()` or `runTask()`, including rejections reported by the handler function
itself.

### Event: `'drain'`

Expand Down Expand Up @@ -579,8 +650,8 @@ An example of a custom task queue that uses a shuffled priority queue
is available in [`examples/task-queue`](./examples/task-queue/index.js);
The special symbol `Piscina.queueOptionsSymbol` may be set as a property
on tasks submitted to `runTask()` as a way of passing additional options
on to the custom `TaskQueue` implementation. (Note that because the
on tasks submitted to `run()` or `runTask()` as a way of passing additional
options on to the custom `TaskQueue` implementation. (Note that because the
queue options are set as a property on the task, tasks with queue
options cannot be submitted as JavaScript primitives).
Expand Down
2 changes: 1 addition & 1 deletion examples/move/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ const pool = new Piscina({
// The task will transfer an ArrayBuffer
// back to the main thread rather than
// cloning it.
const u8 = await pool.run();
const u8 = await pool.run(Piscina.move(new Uint8Array(2)));
console.log(u8.length);
})();
17 changes: 17 additions & 0 deletions examples/multiple-workers-one-file/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict';

const Piscina = require('../..');
const { resolve } = require('path');

// It is possible for a single Piscina pool to run multiple
// workers at the same time. To do so, pass the worker filename
// to runTask rather than to the Piscina constructor.

const pool = new Piscina({ filename: resolve(__dirname, 'worker.js') });

(async () => {
console.log(await Promise.all([
pool.run({ a: 2, b: 3 }, { name: 'add' }),
pool.run({ a: 2, b: 3 }, { name: 'multiply' })
]));
})();
10 changes: 10 additions & 0 deletions examples/multiple-workers-one-file/index.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Piscina } from 'piscina';

const pool = new Piscina({
filename: new URL('./worker.mjs', import.meta.url).href
});

console.log(await Promise.all([
pool.run({ a: 2, b: 3 }, { name: 'add' }),
pool.run({ a: 2, b: 3 }, { name: 'multiply' })
]));
11 changes: 11 additions & 0 deletions examples/multiple-workers-one-file/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'use strict';

function add ({ a, b }) { return a + b; }
function multiply ({ a, b }) { return a * b; };

add.add = add;
add.multiply = multiply;

// The add function is the default handler, but
// additional named handlers are exported.
module.exports = add;
4 changes: 4 additions & 0 deletions examples/multiple-workers-one-file/worker.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export function add ({ a, b }) { return a + b; }
export function multiply ({ a, b }) { return a * b; };

export default add;
2 changes: 2 additions & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { MessagePort } from 'worker_threads';

export interface StartupMessage {
filename : string | null;
name : string;
port : MessagePort;
sharedBuffer : Int32Array;
useAtomics : boolean;
Expand All @@ -12,6 +13,7 @@ export interface RequestMessage {
taskId : number;
task : any;
filename: string;
name : string;
}

export interface ReadyMessage {
Expand Down
Loading

0 comments on commit d7fa24d

Please sign in to comment.