Threadz will no longer be maintained. Please use Nanolith instead for multi-threading in Node.js. It is stable and maintained regularly.
Threadz now supports ESModules!
A feature rich and scalable general-purpose multi-threading library that makes it easy to utilize all of a given machine's resources in Node.js.
- New
BackgroundThreadzWorker
API. - Various bug fixes.
- Minor improvements to the
SharedMemory
API. - Support for both ESModules and CommonJS.
- Minor bug fixes & edge case handling.
waitForStart
method onThreadzWorker
.
- Features
- Installing
- Example
declare
- ThreadzAPI
merge
Interact
APICommunicate
APIThreadzWorker
BackgroundThreadzWorker
- ThreadzPool
- workerTools
SharedMemory
- Full TypeScript support + in-editor documentation with JSDoc.
- Create modular workers without having to create specific worker files.
- Run operations with true concurrency by using workers in your code as if they were just regular functions that return promises.
- Automatically manage worker concurrency.
- Intuitively share memory between workers.
- Prioritize certain workers over others.
- Receive descriptive error messages that tell you exactly what you've done wrong.
Threadz is currently only available on NPM.
npm i threadz
declarations.ts:
import { declare } from 'threadz';
export default declare({
bigLoop: {
worker: (num: number) => {
// Normally, a for loop is blocking code
for (const _ of Array(900000000).keys()) {
}
return num + 100;
},
},
});
index.ts:
import api from './declarations';
// Run the for loop on a separate thread
const data = await api.workers.bigLoop(5);
console.log(data);
console.log('this will run before the data is logged');
(declarations: Declarations)
=> ThreadzAPI
Regardless of how you plan on using Threadz, the place you'll always need to start is the declare()
function. Pass in a map of declarations where each key pertains to the name under which you'd like the function to be recognized.
// declarations.ts
import { declare } from 'threadz';
export default declare({
// The name of the worker
helloWorld: {
// The function to be run when the worker is called.
worker: () => console.log('hello world'),
// Whether or not to push the worker to the front of
// the ThreadzPool queue when it is added.
// Defaults to "false".
priority: true,
// Default options for the "Worker" class from the
// "worker_threadz" module.
options: {
resourceLimits: {
maxOldGenerationSizeMb: 0.5,
},
},
},
logMessage: {
// The "worker" property is the only one required.
worker: (msg: string) => console.log(msg),
},
returnSum: {
worker: (num1: number, num2: number) => num1 + num2
}
});
Declaration functions can also be asynchronous and return promises, which will be awaited within the worker. The function under the worker property doesn't have to be defined right within the declare()
function. It can be imported from elsewhere.
To get a full list of the configurations supported in the options property, refer to the Node.js documentation.
Note: Threadz will do its best to detect the path of the declaration file, but sometimes it might fail. In rare cases like these, you can manually pass a
fileLocation
string within the second parameter. Note: The return value of thedeclare()
function MUST be the default export of the file. Note: Unfortunately, declaration functions cannot accept generic types.
The declare()
function returns an instance of ThreadzAPI
, off of which your workers can be called. During initialization, declaration functions are mapped into the threadzAPI.workers
property, where they can be referenced by the names used in the original declarations. Using the declarations in the above section, this code would be valid:
// index.ts
import api from './declarations';
// Each of these operations is happening on a separate thread
await api.workers.logMessage('threadz is awesome');
await api.workers.helloWorld();
// The return value of a worker function is a promise of
// the return value of the original declaration function
// it corresponds to.
const data = await api.workers.returnSum(4, 5)
console.log(data) // -> 9
location
:string
- The file location at which the declarations live.
- Tells Threadz how to dynamically import your declarations.
declarations
:Declarations
- The original declarations used to initialize
ThreadsAPI
.
- The original declarations used to initialize
declarationsCount
:number
- The number of declarations on the
ThreadzAPI
instance.
- The number of declarations on the
workers
:MappedWorkers
- Declarations mapped into "worker functions" which handle the passing of data, the creation of
Worker
instances, the management of worker concurrency with ThreadzPool, and more.
- Declarations mapped into "worker functions" which handle the passing of data, the creation of
threadzPool
:ThreadzWorkerPool
- The global
ThreadzWorkerPool
instance being used to manage all workers.
- The global
interactWith()
:(workerName: string)
=>Interact
- Pass in the name of a worker on the
ThreadzAPI
instance to create an interaction session for that worker with theInteract
API.
- Pass in the name of a worker on the
createBackgroundWorker()
:({ options?: WorkerOptions })
=>BackgroundThreadzWorker
on()
:(callback: Function)
=>void
- Supports the
workerQueued
andworkerDone
methods.
- Supports the
(instances: ThreadzAPI[])
=> Declarations
The merge
function is a simple function that accepts an array of ThreadzAPI instances returned by the declare
function and returns a new declarations object containing all declarations from each instance provided in the array.
Note:
merge
does not return a ThreadzAPI instance likedeclare!
It returns a set of declarations to be re-declared.
Example:
declarations.ts
import { declare, merge } from 'threadz';
const math = declare({
add5: {
worker: (num1: number) => num1 + 5,
},
});
const logging = declare({
helloWorld: {
worker: () => 'hello world',
},
});
export default declare(merge([math, logging]));
index.ts
import api from './declarations';
await api.workers.add5(5);
await api.workers.helloWorld();
Note: if you are only using the
declare
function to create a ThreadzAPI instance with the sole intention of using it later in themerge
function to be re-declared, it does not have to be the default export of the file it is in.
Directly calling workers on threadzAPI.workers
allows for the ability to pass arguments to a function, run it on a separate thread, then receive its return value back on the main thread. For any workflows more complex than this, the Interact
API must be used.
Initialize an interaction session with the Interact.with()
static method, passing in a worker function from a ThreadzAPI
instance. An Interact
instance tied to that worker function will be returned.
import { Interact } from 'threadz';
import api from './declarations';
// Initialize the interact session, specifying which worker
// to run the interaction with.
const interact = Interact.with(api.workers.returnSum);
// Pass in arguments to the worker and mark it as a priority.
interact.args(4, 5).isPriority();
// Run callbacks when certain worker events have occurred.
interact.onStart(() => console.log('Worker started'));
interact.onSuccess((result) => console.log(result));
// Queue the worker into the ThreadzPool and run it.
const worker = interact.go();
// Wait for the worker to finish running
await worker.waitFor();
An instance of the Interact
API has many methods that make it easy to interact with a worker, all of which can be chained.
Pass arguments into the worker.
This means that it will be pushed to the front of the ThreadzPool queue instead of the back. Overrides the priority option set in the original declarations.
Treat the worker as normal. You only need to use this method if you set priority
to true
in the original declaration.
(options: WorkerOptions)
=> Interact
Set the options for the worker's run. Overrides any options defined within the original declaration.
(callback: (options: WorkerOptions) => WorkerOptions)
=> Interact
Set the options for the worker's run with a callback. Overrides any options defined within the original declaration.
This function is documented and readily available; however it is recommended to use the
Communicate
API instead.
(port: MessagePort)
=> Interact
Add a message port to the worker to be accessed by workerTools.sendCommunication
and workerTools.onCommunication
.
Example:
import { MessageChannel } from 'worker_threads';
import api from './declarations';
const { port1, port2 } = new MessageChannel();
const worker = api.interactWith('test').addMessagePort(port2).go();
port1.on('message', (data) => console.log(data));
await worker.waitFor();
()
=> ThreadzWorker
Create the worker and queue it up in the ThreadzPool to be run. Returns a ThreadzWorker
instance.
onMessage()
- Pass a function to run when a message is received from the worker.
onFailure()
- Pass a function to run when the worker fails and throws an error.
onSuccess()
- Pass a function to run when the worker succeeds and potentially returns a value.
onStart()
- Pass a function to run when the worker starts running.
- This functionality might be useful when dealing with large queues of workers.
onAbort()
- Pass a function to run whenever the worker is aborted.
- A worker can only be aborted with the
workerTools.abort()
andworkerTools.abortOnTimeout()
functions.
The Communicate
class is a simple class that acts as a Threadz-specific wrapper for the MessageChannel
class from the worker_threads
module in Node.js. Pass in Interact
instances and automatically create a MessageChannel
instance and add the ports to the Interact
instances with interact.addMessagePort()
import { Communicate } from 'threadz';
import api from './declarations';
const add5 = api.interactWith('add5').args(10);
const helloWorld = api.interactWith('helloWorld');
const communicate = Communicate.between([add5, helloWorld]);
add5.go();
helloWorld.go();
communicate.on('message', (data) => console.log(data));
communicate.closePorts();
In the between
static method, you can either pass in a tuple of two Interact
instances, or an object with port1
and port2
keys, each containing an array of Interact
instances. The instances under each key will automatically be passed the corresponding port.
// Instead of this
const { port1, port2 } = new MessageChannel();
add5.addMessagePort(port1);
helloWorld.addMessagePort(port2);
// Communicate allows you to do this instead
const communicate = Communicate.between([add5, helloWorld]);
This becomes more useful when trying to create communications between a larger number of workers.
When a ThreadzWorker
instance is returned by the Interact
API, it represents a worker that has been queued into the ThreadzWorkerPool
to be executed.
import { Interact } from 'threadz';
import api from './declarations';
const interact = Interact.with(api.workers.returnSum).args(4, 5);
const worker = interact.go();
worker.sendMessage('foo');
Note: Other than with the
Interact
API, you should not be directly working withThreadzWorker
instances.
Since the ThreadzWorker
API is not meant to be interacted with extensively, a limited number of methods are available.
A boolean indicating whether or not the worker is running yet.
(data: AcceptableDataType | SharedMemoryTransferObject, transferList?: TransferListItem[])
=> void
Send a message to the worker while it is running by passing in a basic data type or a SharedMemoryTransferObject
.
(priority: boolean | 0 | 1)
=> void
Sets the priority of the worker based on a boolean or number value. Has no effect if the worker is already running.
()
=> Promise<void>
Allows you to wait for the worker to start before trying to interact with it while it's running.
()
=> Promise
Returns a promise which resolves/rejects once the worker has succeeded, thrown an error, or aborted.
The data the promise resolves with is the return value of the original declaration function.
()
=> Promise<void>
Returns a promise that will always resolve when the worker finishes, regardless of whether it succeeded, errored out, or was aborted.
(callback: Function)
=> void
Supports the message
, error
, aborted
, success
, and started
events.
Refer to the Node.js documentation for more details.
This class cannot be directly interacted with, but can be instantiated via the createBackgroundWorker
function on a ThreadzAPI
instance. Background workers work differently from regular workers, as they begin running when you call the start()
function, and only finish when you call end()
.
Normal ThreadzWorker
s spin up a Worker
for a single function call, then finish once the function has returned (or resolved with) some value. BackgroundThreadzWorker
s allow you to spin up only one worker, then have access to calling any of your declared worker functions without needing to spin up a new Worker
each time.
// declarations.ts
import { declare, workerTools } from 'threadz';
export default declare({
add: {
worker: (num1: number, num2: number) => num1 + num2,
},
subtract: {
worker: (num1: number, num2: number) => num1 - num2,
},
});
// index.ts
import api from './declarations.js';
const backgroundWorker = api.createBackgroundWorker();
// Spins up a single worker
await backgroundWorker.start();
const result = await backgroundWorker.call('add', 1, 2);
// The "subtract" worker function is being run on the same thread
// as the call for "add"
const result2 = await backgroundWorker.call('subtract', 5, 3);
// Ends the worker's process
backgroundWorker.end();
console.log(result, result2);
Though the output is the same, the functionality above is very different from this:
// index.ts
import api from './declarations.js';
// Spins up a new Worker, runs the add function.
// The worker's process is ended once the add function returns.
const result = await api.workers.add(1, 2);
// Spins up a new Worker, runs the subtract function.
// The worker's process is ended once the subtract function returns.
const result2 = await api.workers.subtract(5, 3);
console.log(result, result2);
(port?: MessagePort)
=> Promise<void>
Starts the background worker. You can optionally pass a MessagePort
object to enable the usage of methods such as workerTools.onCommunication
within the worker.
(name: string, ...args: unknown[])
=> Promise<unknown>
Calls the declaration function, applying the arguments passed to it. Returns a promise of the declaration function's return type.
()
=> void
Ends the worker's process.
The ThreadzWorkerPool
(importable under the name ThreadzPool
) is a single global object which implements a queuing system to manage workers and maintain a maximum concurrency.
The methods and properties on ThreadzWorkerPool
make it easy to modify its maximum concurrency and get updates on its status.
Get the current length of the queue.
Whether or not the max number of possible workers is running right now.
The number of workers which are currently running.
The maximum number of workers that ThreadzWorkerPool will allow to run concurrently.
(value: MaxConcurrencyOptions | number
=> void
Set the maximum concurrency of the ThreadzPool by either specifying how many workers you'd like to run at once with a number, or a MaxConcurrencyOptions
(importable enum) value which dynamically calculates the max concurrency based on the resources the machine has.
MaxConcurrencyOptions
:
1/4
1/2
3/4
100%
200%
400%
Note: It is recommended to use
MaxConcurrencyOptions
instead of a hardset number.
Retrieve the name, location, and arguments of the next worker in the queue to be run.
If true
, the ThreadzWorkerPool is not currently running any workers and the queue is empty.
(callback: Function)
=> void
Supports the dormant
event.
The workerTools
object is a set of tools intended for use exclusively within workers. It can be used to send and receive messages to and from the main thread, as well as communicate with other workers running on different threads.
import { declare, workerTools } from 'threadz';
export default declare({
myWorker: {
worker: () => {
workerTools.onParentMessage((data) => {
console.log(data);
});
workerTools.sendMessageToParent('hey!');
workerTools.abort();
},
},
});
There are currently 7 tools in the workerTools
toolbox.
(data: AcceptableDataType | SharedMemoryTransferObject, transferList?: TransferListItem[])
=> void
Send a message to be consumed back on the main thread.
(callback: Function)
=> Function
Pass a function to run any time a message is received from the parent thread. The data is passed in as the first parameter.
Returns a function that stops listening on the parent port when called.
A function which takes in an assertion callback. The assertion callback takes in the received data and returns a boolean value. If the assertion returns true
, the promise will resolve.
(data: AcceptableDataType | SharedMemoryTransferObject, transferList?: TransferListItem[])
=> void
If you have passed a message port to the worker (using the Interact
API), send messages to the port with this function.
(callback: Function)
=> Function
If you have passed a message port to the worker (using the Interact API), listen for messages on the port with this function by passing a callback which takes in the received data.
Returns a function that stops listening on the port when called.
A function which takes in an assertion callback. The assertion callback takes in the received data and returns a boolean value. If the assertion returns true
, the promise will resolve.
Grab the unique ID of the thread currently being used.
(message?: string)
=> never
Immediately terminate the worker and return out with an aborted
status.
({ seconds, message }: { seconds: number, message: string })
=> never
Prevent workers from hanging or running too long by aborting out after a certain amount of time has passed.
Sharing memory between multiple threads is simple with the Threadz SharedMemory
API. Use the static SharedMemory.from()
method to create a shared state which is retained on all threads.
import { SharedMemory } from 'threadz';
const mem = SharedMemory.from<Record<string, string>>({ foo: 'bar' });
console.log(mem.get()); // -> { foo: 'bar' };
mem.setWith((prev) => {
return {
...prev,
fizz: 'buzz',
};
});
console.log(mem.get());
There are a few different properties and methods on a SharedMemory
instance to help you manipulate the data stored within its state, as well as send it to other threads.
The byte length of the stored Uint8Array.
()
=> SharedMemoryTransferObject
This is one of the most important methods on the SharedMemory
API. Instances cannot be directly send to workers via parameters or messages, so they must be converted into SharedMemoryTransferObjects
s using the sharedMemory.transfer()
function.
Example:
declarations.ts:
import { declare, SharedMemory } from 'threadz';
import type { SharedMemoryTransferObject } from 'threadz';
export default declare({
myWorker: {
// The memory data will come in as a SharedMemoryTransferObject
worker: (transfer: SharedMemoryTransferObject<string>) => {
// We can use the SharedMemory.from() function on a
// SharedMemoryTransferObject and continue using the
// SharedMemory API to manipulate the data.
const mem = SharedMemory.from(transfer);
console.log(mem.get());
},
},
});
index.ts:
import { SharedMemory } from 'threadz';
import api from './declarations';
const mem = SharedMemory.from('hey');
// The SharedMemory instance must be converted into a
// SharedMemoryTransferObject when passed into a worker
await api.workers.myWorker(mem.transfer());
(microtask?: boolean)
=> unknown
Get the current state.
Pass in true
to run the operation as a microtask (returns a promise).
(microtask?: boolean)
=> void
Entirely reset the memory space (not deletion of the memory space!).
Pass in true
to run the operation as a microtask (returns a promise).
(data: AcceptableDataType)
=> void
Set a new value for the current memory space.
Pass in true
to run the operation as a microtask (returns a promise).
(callback: (data: AcceptableDataType) => AcceptableDataType)
=> void
Set a new state with a callback function taking in the previous data and returning the new data to be written to memory.
MIT