Skip to content

Commit

Permalink
feat: use worker_threads by default for performance (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
JounQin committed Jul 9, 2021
1 parent 4a4287a commit 6577e86
Show file tree
Hide file tree
Showing 14 changed files with 354 additions and 73 deletions.
5 changes: 5 additions & 0 deletions .changeset/slow-lamps-shave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"synckit": minor
---

feat: use worker_threads by default for performance
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ jobs:

- name: Build, Lint and test
run: |
yarn build
yarn lint
yarn test
yarn build
yarn test-worker
env:
EFF_NO_LINK_RULES: true
PARSER_NO_WATCH: true
Expand Down
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,19 @@

Perform async work synchronously in Node.js using a separate process with first-class TypeScript support

## TOC <!-- omit in toc -->

- [Usage](#usage)
- [Install](#install)
- [API](#api)
- [TypeScript](#typescript)
- [Changelog](#changelog)
- [License](#license)

## Usage

### Install

```sh
# yarn
yarn add synckit
Expand All @@ -30,6 +41,8 @@ npm i synckit

### API

`worker_threads` is used by default for performance, if you have any problem with it, you can set env `SYNCKIT_WORKER_THREADS=0` to disable it and fallback to previously `child_process` solution, and please raise an issue here so that we can improve it.

```js
// runner.js
import { createSyncFn } from 'synckit'
Expand All @@ -47,11 +60,23 @@ import { runAsWorker } from 'synckit'

runAsWorker(async (...args) => {
// do expensive work
// but you must make sure the `result` is serializable by `JSON.stringify`
return result
})
```

You must make sure:

1. if `worker_threads` is enabled (by default), the `result` is serialized by [`Structured Clone Algorithm`](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)
2. if `child_process` is used, the `result` is serialized by `JSON.stringify`

### TypeScript

If you want to use `ts-node` for worker file (a `.ts` file), it is supported out of box!

If you want to use a custom tsconfig as project instead of default `tsconfig.json`, use `TS_NODE_PROJECT` env. Please view [ts-node](https://github.com/TypeStrong/ts-node#tsconfig) for more details.

If you want to integrate with [tsconfig-paths](https://www.npmjs.com/package/tsconfig-paths), please view [ts-node](https://github.com/TypeStrong/ts-node#paths-and-baseurl) for more details.

## Changelog

Detailed changes for each release are documented in [CHANGELOG.md](./CHANGELOG.md).
Expand Down
9 changes: 7 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"author": "JounQin <admin@1stg.me>",
"license": "MIT",
"engines": {
"node": ">=4.0"
"node": ">=8.10"
},
"main": "lib",
"module": "lib/es2015",
Expand All @@ -33,8 +33,10 @@
"lint:tsc": "tsc --noEmit",
"prepare": "simple-git-hooks && yarn-deduplicate --strategy fewer || exit 0",
"prerelease": "npm run build",
"pretest": "yarn build",
"release": "clean-publish && changeset publish",
"test": "jest",
"test-worker": "ts-node test/test-worker-ts && node test/test-worker",
"typecov": "type-coverage"
},
"dependencies": {
Expand Down Expand Up @@ -62,7 +64,10 @@
"extends": "@1stg"
},
"eslintConfig": {
"extends": "@1stg"
"extends": "@1stg",
"rules": {
"unicorn/require-post-message-target-origin": 0
}
},
"eslintIgnore": [
"coverage",
Expand Down
185 changes: 147 additions & 38 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,25 @@ import { execSync } from 'child_process'
import { tmpdir as _tmpdir } from 'os'
import path from 'path'
import fs from 'fs'
import {
MessageChannel,
parentPort,
receiveMessageOnPort,
Worker,
workerData,
} from 'worker_threads'

import { v4 as uuid } from 'uuid'

import { AnyAsyncFn, AnyFn, Syncify } from './types'
import {
AnyAsyncFn,
AnyFn,
DataMessage,
MainToWorkerMessage,
Syncify,
WorkerData,
WorkerToMainMessage,
} from './types'

export * from './types'

Expand All @@ -14,33 +29,17 @@ export * from './types'
*/
export const tmpdir = fs.realpathSync(_tmpdir())

let tsconfigPathsAvailable: boolean

const TSCONFIG_PATH = process.env.TSCONFIG_PATH || 'tsconfig.json'

const isTsconfigPathsAvailable = () => {
if (typeof tsconfigPathsAvailable === 'boolean') {
return tsconfigPathsAvailable
}
try {
tsconfigPathsAvailable = !!require.resolve('tsconfig-paths')
} catch {
/**
* `require.resolve` can not be mocked to fail
* @link https://github.com/facebook/jest/issues/9543
*/
/* istanbul ignore next */
tsconfigPathsAvailable = false
}
return tsconfigPathsAvailable
}
export const useWorkerThreads = !['0', 'false'].includes(
process.env.SYNCKIT_WORKER_THREADS!,
)

const syncFnCache = new Map<string, AnyFn>()

export function createSyncFn<T extends AnyAsyncFn>(
workerPath: string,
bufferSize?: number,
): Syncify<T>
export function createSyncFn<R>(workerPath: string) {
export function createSyncFn<T>(workerPath: string, bufferSize?: number) {
if (!path.isAbsolute(workerPath)) {
throw new Error('`workerPath` must be absolute')
}
Expand All @@ -57,40 +56,150 @@ export function createSyncFn<R>(workerPath: string) {
resolvedWorkerPath = require.resolve(workerPath)
}

const executor = resolvedWorkerPath.endsWith('.ts')
? 'ts-node -P ' +
TSCONFIG_PATH +
(isTsconfigPathsAvailable()
? ' -r tsconfig-paths/register'
: /* istanbul ignore next */ '')
: 'node'
const syncFn = (useWorkerThreads ? startWorkerThread : startChildProcess)<T>(
resolvedWorkerPath,
bufferSize,
)

const syncFn = (...args: unknown[]): R => {
syncFnCache.set(workerPath, syncFn)

return syncFn
}

function startChildProcess<T>(workerPath: string) {
const executor = workerPath.endsWith('.ts') ? 'ts-node' : 'node'

return (...args: unknown[]): T => {
const filename = path.resolve(tmpdir, `synckit-${uuid()}.json`)

fs.writeFileSync(filename, JSON.stringify(args))

const command = `${executor} ${resolvedWorkerPath} ${filename}`
const command = `${executor} ${workerPath} ${filename}`

try {
execSync(command, {
stdio: 'inherit',
})
const result = fs.readFileSync(filename, 'utf8')
return JSON.parse(result) as R
const { result, error } = JSON.parse(
fs.readFileSync(filename, 'utf8'),
) as DataMessage<T>

if (error) {
throw typeof error === 'object' && error && 'message' in error
? // eslint-disable-next-line unicorn/error-message
Object.assign(new Error(), error)
: error
}

return result!
} finally {
fs.unlinkSync(filename)
}
}
}

syncFnCache.set(workerPath, syncFn)
function startWorkerThread<T>(workerPath: string, bufferSize = 1024) {
const { port1: mainPort, port2: workerPort } = new MessageChannel()

const isTs = workerPath.endsWith('.ts')

const worker = new Worker(
isTs
? `require('ts-node/register');require(require('worker_threads').workerData.workerPath)`
: workerPath,
{
eval: isTs,
workerData: { workerPath, workerPort },
transferList: [workerPort],
execArgv: [],
},
)

let nextID = 0

const syncFn = (...args: unknown[]): T => {
const id = nextID++

const sharedBuffer = new SharedArrayBuffer(bufferSize)
const sharedBufferView = new Int32Array(sharedBuffer)

const msg: MainToWorkerMessage = { sharedBuffer, id, args }
worker.postMessage(msg)

const status = Atomics.wait(sharedBufferView, 0, 0)

/* istanbul ignore if */
if (status !== 'ok' && status !== 'not-equal') {
throw new Error('Internal error: Atomics.wait() failed: ' + status)
}

const {
id: id2,
result,
error,
properties,
} = receiveMessageOnPort(mainPort)!.message as WorkerToMainMessage<T>

/* istanbul ignore if */
if (id !== id2) {
throw new Error(`Internal error: Expected id ${id} but got id ${id2}`)
}

if (error) {
// MessagePort doesn't copy the properties of Error objects. We still want
// error objects to have extra properties such as "warnings" so implement the
// property copying manually.
throw Object.assign(error, properties)
}

return result!
}

worker.unref()

return syncFn
}

export const runAsWorker = async <T extends AnyAsyncFn>(fn: T) => {
const filename = process.argv[2]
const content = fs.readFileSync(filename, 'utf-8')
const options = JSON.parse(content) as Parameters<T>
fs.writeFileSync(filename, JSON.stringify(await fn(...options)))
if (!workerData) {
const filename = process.argv[2]
const content = fs.readFileSync(filename, 'utf8')
const args = JSON.parse(content) as Parameters<T>
let msg: DataMessage<T>
try {
msg = { result: (await fn(...args)) as T }
} catch (err) {
msg = {
error:
err instanceof Error
? { name: err.name, message: err.message, stack: err.stack }
: err,
}
}
fs.writeFileSync(filename, JSON.stringify(msg))
return
}

/* istanbul ignore next */
const { workerPort } = workerData as WorkerData
/* istanbul ignore next */
parentPort!.on(
'message',
({ sharedBuffer, id, args }: MainToWorkerMessage) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
;(async () => {
const sharedBufferView = new Int32Array(sharedBuffer)
let msg: WorkerToMainMessage<T>
try {
msg = { id, result: (await fn(...args)) as T }
} catch (err) {
const error = err as Error
msg = { id, error, properties: { ...error } }
}
workerPort.postMessage(msg)
Atomics.add(sharedBufferView, 0, 1)
Atomics.notify(sharedBufferView, 0, Number.POSITIVE_INFINITY)
})()
},
)
}
22 changes: 22 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { MessagePort } from 'worker_threads'

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type AnyFn<T = any, R extends any[] = any[]> = (...args: R) => T

Expand All @@ -15,3 +17,23 @@ export type Syncify<T extends AnyFn<AnyPromise>> = T extends (
export type PromiseType<T extends AnyPromise> = T extends Promise<infer R>
? R
: never

export interface MainToWorkerMessage {
sharedBuffer: SharedArrayBuffer
id: number
args: unknown[]
}

export interface WorkerData {
workerPort: MessagePort
}

export interface DataMessage<T = unknown> {
result?: T
error?: unknown
}

export interface WorkerToMainMessage<T = unknown> extends DataMessage<T> {
id: number
properties?: object
}

0 comments on commit 6577e86

Please sign in to comment.