Skip to content

Commit

Permalink
Flush on exit (#1185)
Browse files Browse the repository at this point in the history
* transport: Always stream.flushSync before stream.end()

Fixes #1183

* transport: support exit immediately

* fixup

* fixup: doc update
  • Loading branch information
mcollina committed Oct 25, 2021
1 parent 4fc35f6 commit 66a392a
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/transports.md
Expand Up @@ -26,6 +26,7 @@ A transport is a module that exports a default function which returns a writable
import { Writable } from 'stream'
export default (options) => {
const myTransportStream = new Writable({
autoDestroy: true, // Needed for Node v12+
write (chunk, enc, cb) {
// apply a transform and send to stdout
console.log(chunk.toString().toUpperCase())
Expand Down
20 changes: 18 additions & 2 deletions lib/transport.js
Expand Up @@ -42,19 +42,35 @@ function buildStream (filename, workerData, workerOpts) {
workerOpts
})

stream.on('ready', function () {
stream.on('ready', onReady)
stream.on('close', function () {
process.removeListener('exit', onExit)
})

process.on('exit', onExit)

function onReady () {
process.removeListener('exit', onExit)
stream.unref()

if (workerOpts.autoEnd !== false) {
setupOnExit(stream)
}
})
}

function onExit () {
if (stream.closed) {
return
}
stream.flushSync()
}

return stream
}

function autoEnd (stream) {
stream.ref()
stream.flushSync()
stream.end()
stream.once('close', function () {
stream.unref()
Expand Down
1 change: 1 addition & 0 deletions test/fixtures/console-transport.js
Expand Up @@ -2,6 +2,7 @@ const { Writable } = require('stream')

module.exports = (options) => {
const myTransportStream = new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
// apply a transform and send to stdout
console.log(chunk.toString().toUpperCase())
Expand Down
10 changes: 10 additions & 0 deletions test/fixtures/transport-exit-immediately.js
@@ -0,0 +1,10 @@
'use strict'

const pino = require('../..')
const transport = pino.transport({
target: 'pino/file'
})
const logger = pino(transport)

logger.info('Hello')
process.exit(0)
12 changes: 12 additions & 0 deletions test/fixtures/transport-exit-on-ready.js
@@ -0,0 +1,12 @@
'use strict'

const pino = require('../..')
const transport = pino.transport({
target: 'pino/file'
})
const logger = pino(transport)

transport.on('ready', function () {
logger.info('Hello')
process.exit(0)
})
10 changes: 0 additions & 10 deletions test/syncfalse.test.js
Expand Up @@ -116,13 +116,3 @@ test('flush does nothing with sync true (default)', async () => {
const instance = require('..')()
instance.flush()
})

test('thread-stream async flush', async () => {
const pino = require('..')
const transport = pino.transport({
target: join(__dirname, 'fixtures', 'console-transport.js')
})
const instance = pino(transport)
instance.info('hello')
instance.flush()
})
24 changes: 24 additions & 0 deletions test/transport/core.test.js
Expand Up @@ -305,6 +305,30 @@ test('stdout in worker', async ({ not }) => {
not(strip(actual).match(/Hello/), null)
})

test('log and exit on ready', async ({ not }) => {
let actual = ''
const child = execa(process.argv[0], [join(__dirname, '..', 'fixtures', 'transport-exit-on-ready.js')])

child.stdout.pipe(writer((s, enc, cb) => {
actual += s
cb()
}))
await once(child, 'close')
not(strip(actual).match(/Hello/), null)
})

test('log and exit before ready', async ({ not }) => {
let actual = ''
const child = execa(process.argv[0], [join(__dirname, '..', 'fixtures', 'transport-exit-immediately.js')])

child.stdout.pipe(writer((s, enc, cb) => {
actual += s
cb()
}))
await once(child, 'close')
not(strip(actual).match(/Hello/), null)
})

test('pino transport options with target', async ({ teardown, same }) => {
const destination = join(
os.tmpdir(),
Expand Down
14 changes: 14 additions & 0 deletions test/transport/syncfalse.test.js
@@ -0,0 +1,14 @@
'use strict'

const pino = require('../..')
const { join } = require('path')
const { test } = require('tap')

test('thread-stream async flush', async () => {
const transport = pino.transport({
target: join(__dirname, '..', 'fixtures', 'console-transport.js')
})
const instance = pino(transport)
instance.info('hello')
instance.flush()
})

0 comments on commit 66a392a

Please sign in to comment.