Skip to content

Commit

Permalink
feat: support runtime option
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio committed Aug 12, 2023
1 parent 6286b81 commit 7872f5f
Show file tree
Hide file tree
Showing 15 changed files with 768 additions and 208 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ We have a similar API to Piscina, so for more information, you can read Piscina'
- `isolateWorkers`: Disabled by default. Always starts with a fresh worker when running tasks to isolate the environment.
- `terminateTimeout`: Disabled by default. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised.
- `maxMemoryLimitBeforeRecycle`: Disabled by default. When defined, the worker's heap memory usage is compared against this value after task has been finished. If the current memory usage exceeds this limit, worker is terminated and a new one is started to take its place. This option is useful when your tasks leak memory and you don't want to enable `isolateWorkers` option.
- `runtime`: TODO
#### Pool methods
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
".ts"
],
"testRegex": "test.(js|ts|tsx)$",
"verbose": true,
"coverageDirectory": "./coverage/",
"collectCoverage": true,
"coverageReporters": [
Expand Down
36 changes: 35 additions & 1 deletion src/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,38 @@
import type { MessagePort } from 'worker_threads'
import type { MessagePort, TransferListItem } from 'worker_threads'

/** Channel for communicating between main thread and workers */
export interface Channel {
/** Workers subscribing to messages */
onMessage(callback: (message: any) => void): void

/** Called with worker's messages */
postMessage(message: any): void
}

export interface TinyWorker {
runtime: string
initialize(options: {
env?: Record<string, string>
argv?: string[]
execArgv?: string[]
resourceLimits?: any
workerData?: TinypoolData
trackUnmanagedFds?: boolean
}): void
terminate(): Promise<any>
postMessage(message: any, transferListItem?: TransferListItem[]): void
setChannel?: (channel: Channel) => void
on(event: string, listener: (...args: any[]) => void): void
once(event: string, listener: (...args: any[]) => void): void
emit(event: string, ...data: any[]): void
ref?: () => void
unref?: () => void
threadId: number
}

export interface TinypoolWorkerMessage {
__tinypool_worker_message__: true
}

export interface StartupMessage {
filename: string | null
Expand Down
110 changes: 110 additions & 0 deletions src/entry/process.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { stderr, stdout } from 'src/utils'
import {
ReadyMessage,
RequestMessage,
ResponseMessage,
StartupMessage,
TinypoolWorkerMessage,
} from '../common'
import { getHandler, throwInNextTick } from './utils'

type Message =
| (StartupMessage & TinypoolWorkerMessage & { source: 'channel' })
| (RequestMessage & TinypoolWorkerMessage & { source: 'port' })

process.__tinypool_state__ = {
isChildProcess: true,
isTinyWorker: true,
workerData: null,
workerId: process.pid,
}

process.on('message', (message: Message) => {
if (!message.__tinypool_worker_message__) {
// Message was not for port or channel
// It's likely end-users own communication between main and worker
return
}

if (message.source === 'channel') {
const { filename, name } = message

;(async function () {
if (filename !== null) {
await getHandler(filename, name)
}

const readyMessage: ReadyMessage &
TinypoolWorkerMessage & { target: 'channel' } = {
ready: true,
target: 'channel',
__tinypool_worker_message__: true,
}
process.send!(readyMessage)
})().catch(throwInNextTick)

return
}

if (message.source === 'port') {
return onMessage(message).catch(throwInNextTick)
}

return
})

async function onMessage(message: RequestMessage) {
const { taskId, task, filename, name } = message
let response: ResponseMessage & TinypoolWorkerMessage & { target: 'port' }

try {
const handler = await getHandler(filename, name)
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`)
}
const result = await handler(task)
response = {
target: 'port',
__tinypool_worker_message__: true,
taskId,
result: result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
// before potentially entering the `Atomics.wait()` loop, and before
// returning the result so that messages will always be printed even
// if the process would otherwise be ready to exit.
if (stdout()?.writableLength! > 0) {
await new Promise((resolve) => process.stdout.write('', resolve))
}
if (stderr()?.writableLength! > 0) {
await new Promise((resolve) => process.stderr.write('', resolve))
}
} catch (error) {
response = {
target: 'port',
__tinypool_worker_message__: true,
taskId,
result: null,
error: serializeError(error),
usedMemory: process.memoryUsage().heapUsed,
}
}

process.send!(response)
}

function serializeError(error: unknown) {
if (error instanceof Error) {
return {
...error,
name: error.name,
stack: error.stack,
message: error.message,
}
}

return String(error)
}
74 changes: 74 additions & 0 deletions src/entry/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { pathToFileURL } from 'url'

// Get `import(x)` as a function that isn't transpiled to `require(x)` by
// TypeScript for dual ESM/CJS support.
// Load this lazily, so that there is no warning about the ESM loader being
// experimental (on Node v12.x) until we actually try to use it.
let importESMCached: (specifier: string) => Promise<any> | undefined

function getImportESM() {
if (importESMCached === undefined) {
// eslint-disable-next-line no-new-func
importESMCached = new Function(
'specifier',
'return import(specifier)'
) as typeof importESMCached
}
return importESMCached
}

const handlerCache: Map<string, Function> = new Map()

// Look up the handler function that we call when a task is posted.
// This is either going to be "the" export from a file, or the default export.
export async function getHandler(
filename: string,
name: string
): Promise<Function | null> {
let handler = handlerCache.get(`${filename}/${name}`)
if (handler !== undefined) {
return handler
}

try {
// With our current set of TypeScript options, this is transpiled to
// `require(filename)`.
const handlerModule = await import(filename)

// Check if the default export is an object, because dynamic import
// resolves with `{ default: { default: [Function] } }` for CJS modules.
handler =
(typeof handlerModule.default !== 'function' && handlerModule.default) ||
handlerModule

if (typeof handler !== 'function') {
handler = await (handler as any)[name]
}
} catch {}
if (typeof handler !== 'function') {
handler = await getImportESM()(pathToFileURL(filename).href)
if (typeof handler !== 'function') {
handler = await (handler as any)[name]
}
}
if (typeof handler !== 'function') {
return null
}

// Limit the handler cache size. This should not usually be an issue and is
// only provided for pathological cases.
if (handlerCache.size > 1000) {
// @ts-ignore
const [[key]] = handlerCache
handlerCache.delete(key)
}

handlerCache.set(`${filename}/${name}`, handler)
return handler
}

export function throwInNextTick(error: Error) {
process.nextTick(() => {
throw error
})
}
80 changes: 6 additions & 74 deletions src/worker.ts → src/entry/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
receiveMessageOnPort,
workerData as tinypoolData,
} from 'worker_threads'
import { pathToFileURL } from 'url'
import {
ReadyMessage,
RequestMessage,
Expand All @@ -16,92 +15,31 @@ import {
isMovable,
kTransferable,
kValue,
} from './common'
import { stderr, stdout } from './utils'
} from '../common'
import { stderr, stdout } from '../utils'
import { getHandler, throwInNextTick } from './utils'

const [tinypoolPrivateData, workerData] = tinypoolData as TinypoolData

process.__tinypool_state__ = {
isWorkerThread: true,
isTinyWorker: true,
workerData: workerData,
workerId: tinypoolPrivateData.workerId,
}

const handlerCache: Map<string, Function> = new Map()
let useAtomics: boolean = process.env.PISCINA_DISABLE_ATOMICS !== '1'

// Get `import(x)` as a function that isn't transpiled to `require(x)` by
// TypeScript for dual ESM/CJS support.
// Load this lazily, so that there is no warning about the ESM loader being
// experimental (on Node v12.x) until we actually try to use it.
let importESMCached: (specifier: string) => Promise<any> | undefined
function getImportESM() {
if (importESMCached === undefined) {
// eslint-disable-next-line no-new-func
importESMCached = new Function(
'specifier',
'return import(specifier)'
) as typeof importESMCached
}
return importESMCached
}

// Look up the handler function that we call when a task is posted.
// This is either going to be "the" export from a file, or the default export.
async function getHandler(
filename: string,
name: string
): Promise<Function | null> {
let handler = handlerCache.get(`${filename}/${name}`)
if (handler !== undefined) {
return handler
}

try {
// With our current set of TypeScript options, this is transpiled to
// `require(filename)`.
const handlerModule = await import(filename)

// Check if the default export is an object, because dynamic import
// resolves with `{ default: { default: [Function] } }` for CJS modules.
handler =
(typeof handlerModule.default !== 'function' && handlerModule.default) ||
handlerModule

if (typeof handler !== 'function') {
handler = await (handler as any)[name]
}
} catch {}
if (typeof handler !== 'function') {
handler = await getImportESM()(pathToFileURL(filename).href)
if (typeof handler !== 'function') {
handler = await (handler as any)[name]
}
}
if (typeof handler !== 'function') {
return null
}

// Limit the handler cache size. This should not usually be an issue and is
// only provided for pathological cases.
if (handlerCache.size > 1000) {
// @ts-ignore
const [[key]] = handlerCache
handlerCache.delete(key)
}

handlerCache.set(`${filename}/${name}`, handler)
return handler
}

// We should only receive this message once, when the Worker starts. It gives
// us the MessagePort used for receiving tasks, a SharedArrayBuffer for fast
// communication using Atomics, and the name of the default filename for tasks
// (so we can pre-load and cache the handler).
parentPort!.on('message', (message: StartupMessage) => {
useAtomics =
process.env.PISCINA_DISABLE_ATOMICS === '1' ? false : message.useAtomics

const { port, sharedBuffer, filename, name } = message

;(async function () {
if (filename !== null) {
await getHandler(filename, name)
Expand Down Expand Up @@ -205,9 +143,3 @@ function onMessage(
atomicsWaitLoop(port, sharedBuffer)
})().catch(throwInNextTick)
}

function throwInNextTick(error: Error) {
process.nextTick(() => {
throw error
})
}
Loading

0 comments on commit 7872f5f

Please sign in to comment.