Skip to content

Commit

Permalink
Merge 9098016 into c27de94
Browse files Browse the repository at this point in the history
  • Loading branch information
10xLaCroixDrinker committed Apr 12, 2024
2 parents c27de94 + 9098016 commit f96b8b7
Show file tree
Hide file tree
Showing 11 changed files with 581 additions and 11 deletions.
29 changes: 27 additions & 2 deletions README.md
Expand Up @@ -8,7 +8,7 @@ Write Pino transports easily.

## Install

```
```sh
npm i pino-abstract-transport
```

Expand Down Expand Up @@ -43,9 +43,12 @@ module.exports = function (opts) {
```

## Typescript usage

Install the type definitions for node. Make sure the major version of the type definitions matches the node version you are using.

#### Node 16
```

```sh
npm i -D @types/node@16
```

Expand Down Expand Up @@ -78,6 +81,8 @@ stream, it emits the following events:

* `parseLine(line)` a function that is used to parse line received from `pino`.

* `expectPinoConfig` a boolean that indicates if the transport expects Pino to add some of its configuration to the stream. Default: `false`.

## Example

### custom parseLine
Expand Down Expand Up @@ -142,6 +147,26 @@ pipeline(process.stdin, buildTransform(), buildDestination(), function (err) {
})
```

### Using pino config

Setting `expectPinoConfig` to `true` will make the transport wait for pino to send its configuration before starting to process logs. It will add `levels`, `messageKey` and `errorKey` to the stream.

When used with an incompatible version of pino, the stream will immediately error.

```js
import build from 'pino-abstract-transport'

export default function (opts) {
return build(async function (source) {
for await (const obj of source) {
console.log(`[${source.levels.labels[obj.level]}]: ${obj[source.messageKey]}`)
}
}, {
expectPinoConfig: true
})
}
```

## License

MIT
30 changes: 30 additions & 0 deletions index.d.ts
Expand Up @@ -40,6 +40,12 @@ type BuildOptions = {
* `metadata` If set to false, do not add metadata properties to the returned stream
*/
metadata?: false;

/**
* `expectPinoConfig` If set to true, the transport will wait for pino to send its
* configuration before starting to process logs.
*/
expectPinoConfig?: boolean;
};

/**
Expand All @@ -50,6 +56,17 @@ type EnablePipelining = BuildOptions & {
enablePipelining: true;
};

/**
* Create a split2 instance and returns it. This same instance is also passed
* to the given function, which is called after pino has sent its configuration.
*
* @returns {Promise<Transform>} the split2 instance
*/
declare function build(
fn: (transform: Transform & build.OnUnknown) => void | Promise<void>,
opts: BuildOptions & { expectPinoConfig: true }
): Promise<Transform & build.OnUnknown>;

/**
* Create a split2 instance and returns it. This same instance is also passed
* to the given function, which is called synchronously.
Expand All @@ -61,6 +78,19 @@ declare function build(
opts?: BuildOptions
): Transform & build.OnUnknown;

/**
* Creates a split2 instance and passes it to the given function, which is called
* after pino has sent its configuration. Then wraps the split2 instance and
* the returned stream into a Duplex, so they can be concatenated into multiple
* transports.
*
* @returns {Promise<Transform>} the wrapped split2 instance
*/
declare function build(
fn: (transform: Transform & build.OnUnknown) => Transform & build.OnUnknown,
opts: EnablePipelining & { expectPinoConfig: true }
): Promise<Transform>;

/**
* Creates a split2 instance and passes it to the given function, which is called
* synchronously. Then wraps the split2 instance and the returned stream into a
Expand Down
68 changes: 59 additions & 9 deletions index.js
Expand Up @@ -3,8 +3,22 @@
const metadata = Symbol.for('pino.metadata')
const split = require('split2')
const { Duplex } = require('readable-stream')
const { parentPort, workerData } = require('worker_threads')

function createDeferred () {
let resolve
let reject
const promise = new Promise((_resolve, _reject) => {
resolve = _resolve
reject = _reject
})
promise.resolve = resolve
promise.reject = reject
return promise
}

module.exports = function build (fn, opts = {}) {
const waitForConfig = opts.expectPinoConfig === true && workerData?.workerData?.pinoWillSendConfig === true
const parseLines = opts.parse === 'lines'
const parseLine = typeof opts.parseLine === 'function' ? opts.parseLine : JSON.parse
const close = opts.close || defaultClose
Expand Down Expand Up @@ -50,27 +64,63 @@ module.exports = function build (fn, opts = {}) {
}
}

if (opts.expectPinoConfig === true && workerData?.workerData?.pinoWillSendConfig !== true) {
setImmediate(() => {
stream.emit('error', new Error('This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.'))
})
}

if (opts.metadata !== false) {
stream[metadata] = true
stream.lastTime = 0
stream.lastLevel = 0
stream.lastObj = null
}

let res = fn(stream)
if (waitForConfig) {
let pinoConfig = {}
const configReceived = createDeferred()
parentPort.on('message', function handleMessage (message) {
if (message.code === 'PINO_CONFIG') {
pinoConfig = message.config
configReceived.resolve()
parentPort.off('message', handleMessage)
}
})

if (res && typeof res.catch === 'function') {
res.catch((err) => {
stream.destroy(err)
Object.defineProperties(stream, {
levels: {
get () { return pinoConfig.levels }
},
messageKey: {
get () { return pinoConfig.messageKey }
},
errorKey: {
get () { return pinoConfig.errorKey }
}
})

// set it to null to not retain a reference to the promise
res = null
} else if (opts.enablePipelining && res) {
return Duplex.from({ writable: stream, readable: res })
return configReceived.then(finish)
}

return stream
return finish()

function finish () {
let res = fn(stream)

if (res && typeof res.catch === 'function') {
res.catch((err) => {
stream.destroy(err)
})

// set it to null to not retain a reference to the promise
res = null
} else if (opts.enablePipelining && res) {
return Duplex.from({ writable: stream, readable: res })
}

return stream
}
}

function defaultClose (err, cb) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -32,6 +32,7 @@
"snazzy": "^9.0.0",
"standard": "^17.0.0",
"tap": "^16.0.0",
"thread-stream": "^2.4.1",
"tsd": "^0.31.0"
},
"tsd": {
Expand Down
7 changes: 7 additions & 0 deletions test/base.test.js
Expand Up @@ -139,6 +139,13 @@ test('rejecting errors the stream', async ({ same, plan }) => {
same(err.message, 'kaboom')
})

test('emits an error if the transport expects pino to send the config, but pino is not going to', async function ({ plan, same }) {
plan(1)
const stream = build(() => {}, { expectPinoConfig: true })
const [err] = await once(stream, 'error')
same(err.message, 'This transport is not compatible with the current version of pino. Please upgrade pino to the latest version.')
})

test('set metadata', ({ same, plan, equal }) => {
plan(9)

Expand Down
22 changes: 22 additions & 0 deletions test/fixtures/transport-async-iteration.js
@@ -0,0 +1,22 @@
'use strict'

const build = require('../..')

module.exports = async function (threadStreamOpts) {
const { port, opts = {} } = threadStreamOpts
return build(
async function (source) {
for await (const obj of source) {
port.postMessage({
data: obj,
pinoConfig: {
levels: source.levels,
messageKey: source.messageKey,
errorKey: source.errorKey
}
})
}
},
opts
)
}
22 changes: 22 additions & 0 deletions test/fixtures/transport-on-data.js
@@ -0,0 +1,22 @@
'use strict'

const build = require('../..')

module.exports = async function (threadStreamOpts) {
const { port, opts = {} } = threadStreamOpts
return build(
function (source) {
source.on('data', function (line) {
port.postMessage({
data: line,
pinoConfig: {
levels: source.levels,
messageKey: source.messageKey,
errorKey: source.errorKey
}
})
})
},
opts
)
}
24 changes: 24 additions & 0 deletions test/fixtures/transport-transform.js
@@ -0,0 +1,24 @@
'use strict'

const { Transform, pipeline } = require('stream')
const build = require('../..')

module.exports = function (threadStreamOpts) {
const { opts = {} } = threadStreamOpts
return build(function (source) {
const transform = new Transform({
objectMode: true,
autoDestroy: true,
transform (chunk, enc, cb) {
chunk.service = 'from transform'
chunk.level = `${source.levels.labels[chunk.level]}(${chunk.level})`
chunk[source.messageKey] = chunk[source.messageKey].toUpperCase()
cb(null, JSON.stringify(chunk) + '\n')
}
})

pipeline(source, transform, () => {})

return transform
}, { ...opts, enablePipelining: true })
}
15 changes: 15 additions & 0 deletions test/fixtures/worker-pipeline.js
@@ -0,0 +1,15 @@
'use strict'

const { pipeline, PassThrough } = require('stream')

module.exports = async function ({ targets }) {
const streams = await Promise.all(targets.map(async (t) => {
const fn = require(t.target)
const stream = await fn(t.options)
return stream
}))

const stream = new PassThrough()
pipeline(stream, ...streams, () => {})
return stream
}
10 changes: 10 additions & 0 deletions test/types/index.test-d.ts
Expand Up @@ -9,12 +9,22 @@ import { Transform } from "stream";
*/
expectType<Transform>(build((source) => source, { enablePipelining: true }));

/**
* If expectPinoConfig is set with enablePipelining, build returns a promise
*/
expectType<(Promise<Transform>)>(build((source) => source, { enablePipelining: true, expectPinoConfig: true }));

/**
* If enablePipelining is not set the unknown event can be listened to on
* the returned stream.
*/
expectType<Transform & OnUnknown>(build((source) => {}));

/**
* If expectPinoConfig is set, build returns a promise
*/
expectType<(Promise<Transform & OnUnknown>)>(build((source) => {}, { expectPinoConfig: true }));

/**
* build also accepts an async function
*/
Expand Down

0 comments on commit f96b8b7

Please sign in to comment.