Skip to content

Commit

Permalink
feat: support runtime option
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio committed Aug 13, 2023
1 parent 6286b81 commit d1b8c9b
Show file tree
Hide file tree
Showing 15 changed files with 886 additions and 207 deletions.
137 changes: 132 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Tinypool is a fork of piscina. What we try to achieve in this library, is to eli
- ✅ Minimal
- ✅ No dependencies
- ✅ Physical cores instead of Logical cores with [physical-cpu-count](https://www.npmjs.com/package/physical-cpu-count)
- ✅ Supports `worker_threads` and `child_process`
- ❌ No utilization
- ❌ No NAPI

Expand All @@ -17,27 +18,150 @@ _In case you need more tiny libraries like tinypool or tinyspy, please consider

## Example

In `main.js`:
### Using `node:worker_threads`

#### Basic usage

```js
// main.mjs
import Tinypool from 'tinypool'

const pool = new Tinypool({
filename: new URL('./worker.js', import.meta.url).href,
filename: new URL('./worker.mjs', import.meta.url).href,
})

const result = await pool.run({ a: 4, b: 6 })
console.log(result) // Prints 10
```
In `worker.js`:
```js
// worker.mjs
export default ({ a, b }) => {
return a + b
}
```
#### Main thread <-> worker thread communication
<details>
<summary>See code</summary>
```js
// main.mjs
import Tinypool from 'tinypool'
import { MessageChannel } from 'node:worker_threads'

const pool = new Tinypool({
filename: new URL('./worker.mjs', import.meta.url).href,
})
const { port1, port2 } = new MessageChannel()
const promise = pool.run({ port: port1 }, { transferList: [port1] })

port2.on('message', (message) => console.log('Main thread received:', message))
setTimeout(() => port2.postMessage('Hello from main thread!'), 1000)

await promise

port1.close()
port2.close()
```
```js
// worker.mjs
export default ({ port }) => {
return new Promise((resolve) => {
port.on('message', (message) => {
console.log('Worker received:', message)

port.postMessage('Hello from worker thread!')
resolve()
})
})
}
```
</details>
### Using `node:child_process`
#### Basic usage
<details>
<summary>See code</summary>
```js
// main.mjs
import Tinypool from 'tinypool'

const pool = new Tinypool({
runtime: 'child_process',
filename: new URL('./worker.mjs', import.meta.url).href,
})
const result = await pool.run({ a: 4, b: 6 })
console.log(result) // Prints 10
```
```js
// worker.mjs
export default ({ a, b }) => {
return a + b
}
```
</details>
#### Main process <-> worker process communication
<details>
<summary>See code</summary>
```js
// main.mjs
import Tinypool from 'tinypool'

const pool = new Tinypool({
runtime: 'child_process',
filename: new URL('./worker.mjs', import.meta.url).href,
})

const messages = []
const listeners = []
const channel = {
onMessage: (listener) => listeners.push(listener),
postMessage: (message) => messages.push(message),
}

const promise = pool.run({}, { channel })

// Send message to worker
setTimeout(
() => listeners.forEach((listener) => listener('Hello from main process')),
1000
)

// Wait for task to finish
await promise

console.log(messages)
// [{ received: 'Hello from main process', response: 'Hello from worker' }]
```
```js
// worker.mjs
export default async function run() {
return new Promise((resolve) => {
process.on('message', (message) => {
// Ignore Tinypool's internal messages
if (message?.__tinypool_worker_message__) return

process.send({ received: message, response: 'Hello from worker' })
resolve()
})
})
}
```
</details>
## API
We have a similar API to Piscina, so for more information, you can read Piscina's detailed [documentation](https://github.com/piscinajs/piscina#piscina---the-nodejs-worker-pool) and apply the same techniques here.
Expand All @@ -49,11 +173,14 @@ We have a similar API to Piscina, so for more information, you can read Piscina'
- `isolateWorkers`: Disabled by default. Always starts with a fresh worker when running tasks to isolate the environment.
- `terminateTimeout`: Disabled by default. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised.
- `maxMemoryLimitBeforeRecycle`: Disabled by default. When defined, the worker's heap memory usage is compared against this value after task has been finished. If the current memory usage exceeds this limit, worker is terminated and a new one is started to take its place. This option is useful when your tasks leak memory and you don't want to enable `isolateWorkers` option.
- `runtime`: Used to pick worker runtime. Default value is `worker_threads`.
- `worker_threads`: Runs workers in [`node:worker_threads`](https://nodejs.org/api/worker_threads.html). For `main thread <-> worker thread` communication you can use [`MessagePort`](https://nodejs.org/api/worker_threads.html#class-messageport) in the `pool.run()` method's [`transferList` option](https://nodejs.org/api/worker_threads.html#portpostmessagevalue-transferlist). See [example](#main-thread---worker-thread-communication).
- `child_process`: Runs workers in [`node:child_process`](https://nodejs.org/api/child_process.html). For `main thread <-> worker process` communication you can use `TinypoolChannel` in the `pool.run()` method's `channel` option. For filtering out the Tinypool's internal messages see `TinypoolWorkerMessage`. See [example](#main-process---worker-process-communication).
#### Pool methods
- `cancelPendingTasks()`: Gracefully cancels all pending tasks without stopping or interfering with on-going tasks. This method is useful when your tasks may have side effects and should not be terminated forcefully during task execution. If your tasks don't have any side effects you may want to use [`{ signal }`](https://github.com/piscinajs/piscina#cancelable-tasks) option for forcefully terminating all tasks, including the on-going ones, instead.
- `recycleWorkers()`: Waits for all current tasks to finish and re-creates all workers. Can be used to force isolation imperatively even when `isolateWorkers` is disabled.
- `recycleWorkers(options)`: Waits for all current tasks to finish and re-creates all workers. Can be used to force isolation imperatively even when `isolateWorkers` is disabled. Accepts `{ runtime }` option as argument.
#### Exports
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
".ts"
],
"testRegex": "test.(js|ts|tsx)$",
"verbose": true,
"coverageDirectory": "./coverage/",
"collectCoverage": true,
"coverageReporters": [
Expand Down
44 changes: 43 additions & 1 deletion src/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,46 @@
import type { MessagePort } from 'worker_threads'
import type { MessagePort, TransferListItem } from 'worker_threads'

/** Channel for communicating between main thread and workers */
export interface TinypoolChannel {
/** Workers subscribing to messages */
onMessage(callback: (message: any) => void): void

/** Called with worker's messages */
postMessage(message: any): void
}

export interface TinypoolWorker {
runtime: string
initialize(options: {
env?: Record<string, string>
argv?: string[]
execArgv?: string[]
resourceLimits?: any
workerData?: TinypoolData
trackUnmanagedFds?: boolean
}): void
terminate(): Promise<any>
postMessage(message: any, transferListItem?: TransferListItem[]): void
setChannel?: (channel: TinypoolChannel) => void
on(event: string, listener: (...args: any[]) => void): void
once(event: string, listener: (...args: any[]) => void): void
emit(event: string, ...data: any[]): void
ref?: () => void
unref?: () => void
threadId: number
}

/**
* Tinypool's internal messaging between main thread and workers.
* - Utilizers can use `__tinypool_worker_message__` property to identify
* these messages and ignore them.
*/
export interface TinypoolWorkerMessage<
T extends 'port' | 'pool' = 'port' | 'pool'
> {
__tinypool_worker_message__: true
source: T
}

export interface StartupMessage {
filename: string | null
Expand Down
110 changes: 110 additions & 0 deletions src/entry/process.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { stderr, stdout } from 'src/utils'
import {
ReadyMessage,
RequestMessage,
ResponseMessage,
StartupMessage,
TinypoolWorkerMessage,
} from '../common'
import { getHandler, throwInNextTick } from './utils'

type IncomingMessage =
| (StartupMessage & TinypoolWorkerMessage<'pool'>)
| (RequestMessage & TinypoolWorkerMessage<'port'>)

type OutgoingMessage =
| (ReadyMessage & TinypoolWorkerMessage<'pool'>)
| (ResponseMessage & TinypoolWorkerMessage<'port'>)

process.__tinypool_state__ = {
isChildProcess: true,
isTinypoolWorker: true,
workerData: null,
workerId: process.pid,
}

process.on('message', (message: IncomingMessage) => {
// Message was not for port or pool
// It's likely end-users own communication between main and worker
if (!message || !message.__tinypool_worker_message__) return

if (message.source === 'pool') {
const { filename, name } = message

;(async function () {
if (filename !== null) {
await getHandler(filename, name)
}

process.send!(<OutgoingMessage>{
ready: true,
source: 'pool',
__tinypool_worker_message__: true,
})
})().catch(throwInNextTick)

return
}

if (message.source === 'port') {
return onMessage(message).catch(throwInNextTick)
}

throw new Error(`Unexpected TinypoolWorkerMessage ${JSON.stringify(message)}`)
})

async function onMessage(message: IncomingMessage & { source: 'port' }) {
const { taskId, task, filename, name } = message
let response: OutgoingMessage & Pick<typeof message, 'source'>

try {
const handler = await getHandler(filename, name)
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`)
}
const result = await handler(task)
response = {
source: 'port',
__tinypool_worker_message__: true,
taskId,
result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
// before potentially entering the `Atomics.wait()` loop, and before
// returning the result so that messages will always be printed even
// if the process would otherwise be ready to exit.
if (stdout()?.writableLength! > 0) {
await new Promise((resolve) => process.stdout.write('', resolve))
}
if (stderr()?.writableLength! > 0) {
await new Promise((resolve) => process.stderr.write('', resolve))
}
} catch (error) {
response = {
source: 'port',
__tinypool_worker_message__: true,
taskId,
result: null,
error: serializeError(error),
usedMemory: process.memoryUsage().heapUsed,
}
}

process.send!(response)
}

function serializeError(error: unknown) {
if (error instanceof Error) {
return {
...error,
name: error.name,
stack: error.stack,
message: error.message,
}
}

return String(error)
}
Loading

0 comments on commit d1b8c9b

Please sign in to comment.