Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add flush callback #182

Merged
merged 5 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ For `sync:true` this is not relevant because the `'ready'` event will be fired w
Writes the string to the file.
It will return false to signal the producer to slow down.

### SonicBoom#flush()
### SonicBoom#flush([cb])

Writes the current buffer to the file if a write was not in progress.
Do nothing if `minLength` is zero or if it is already writing.

call the callback when the flush operation is completed. when failed the callback is called with an error.

### SonicBoom#reopen([file])

Reopen the file in place, useful for log rotation.
Expand Down
69 changes: 63 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,50 @@ function writeBuffer (data) {
return this._len < this._hwm
}

function flush () {
function callFlushCallbackOnDrain (cb) {
const onDrain = () => {
// only if _fsync is false to avoid double fsync
if (!this._fsync) {
fs.fsync(this.fd, cb)
} else {
cb()
}
this.off('error', onError)
}
const onError = (err) => {
cb(err)
this.off('drain', onDrain)
}

this.once('drain', onDrain)
this.once('error', onError)
}
rluvaton marked this conversation as resolved.
Show resolved Hide resolved

function flush (cb) {
if (cb != null && typeof cb !== 'function') {
throw new Error('flush cb must be a function')
}

if (this.destroyed) {
throw new Error('SonicBoom destroyed')
const error = new Error('SonicBoom destroyed')
if (cb) {
cb(error)
return
}

throw error
rluvaton marked this conversation as resolved.
Show resolved Hide resolved
}

if (this._writing || this.minLength <= 0) {
if (this.minLength <= 0) {
cb?.()
return
}

if (cb) {
callFlushCallbackOnDrain.call(this, cb)
}

if (this._writing) {
return
}

Expand All @@ -352,12 +390,31 @@ function flush () {
this._actualWrite()
}

function flushBuffer () {
function flushBuffer (cb) {
if (cb != null && typeof cb !== 'function') {
throw new Error('flush cb must be a function')
}

if (this.destroyed) {
throw new Error('SonicBoom destroyed')
const error = new Error('SonicBoom destroyed')
if (cb) {
cb(error)
return
}

throw error
}

if (this.minLength <= 0) {
cb?.()
return
}

if (this._writing || this.minLength <= 0) {
if (cb) {
callFlushCallbackOnDrain.call(this, cb)
}

if (this._writing) {
return
}

Expand Down
273 changes: 273 additions & 0 deletions test/flush.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const fs = require('fs')
const path = require('path')
const SonicBoom = require('../')
const { file, runTests } = require('./helper')
const proxyquire = require('proxyquire')

runTests(buildTests)

Expand Down Expand Up @@ -100,4 +101,276 @@ function buildTests (test, sync) {
t.pass('drain emitted')
})
})

test('call flush cb after flushed', (t) => {
t.plan(4)

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({ fd, minLength: 4096, sync })

stream.on('ready', () => {
t.pass('ready emitted')
})

t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))

stream.flush((err) => {
if (err) t.fail(err)
else t.pass('flush cb called')
})
})

test('only call fsyncSync and not fsync when fsync: true', (t) => {
t.plan(6)

const fakeFs = Object.create(fs)
const SonicBoom = proxyquire('../', {
fs: fakeFs
})

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({
fd,
sync,
fsync: true,
minLength: 4096
})

stream.on('ready', () => {
t.pass('ready emitted')
})

fakeFs.fsync = function (fd, cb) {
t.fail('fake fs.fsync called while should not')
cb()
}
fakeFs.fsyncSync = function (fd) {
t.pass('fake fsyncSync called')
}

function successOnAsyncOrSyncFn (isSync, originalFn) {
return function (...args) {
t.pass(`fake fs.${originalFn.name} called`)
fakeFs[originalFn.name] = originalFn
return fakeFs[originalFn.name](...args)
}
}

if (sync) {
fakeFs.writeSync = successOnAsyncOrSyncFn(true, fs.writeSync)
} else {
fakeFs.write = successOnAsyncOrSyncFn(false, fs.write)
}

t.ok(stream.write('hello world\n'))
stream.flush((err) => {
if (err) t.fail(err)
else t.pass('flush cb called')

process.nextTick(() => {
// to make sure fsync is not called as well
t.pass('nextTick after flush called')
})
})
})

test('call flush cb with error when fsync failed', (t) => {
t.plan(5)

const fakeFs = Object.create(fs)
const SonicBoom = proxyquire('../', {
fs: fakeFs
})

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({
fd,
sync,
minLength: 4096
})

stream.on('ready', () => {
t.pass('ready emitted')
})

const err = new Error('other')
err.code = 'other'

function onFsyncOnFsyncSync (isSync, originalFn) {
return function (...args) {
Error.captureStackTrace(err)
t.pass(`fake fs.${originalFn.name} called`)
fakeFs[originalFn.name] = originalFn
const cb = args[args.length - 1]

cb(err)
}
}

// only one is called depends on sync
fakeFs.fsync = onFsyncOnFsyncSync(false, fs.fsync)

function successOnAsyncOrSyncFn (isSync, originalFn) {
return function (...args) {
t.pass(`fake fs.${originalFn.name} called`)
fakeFs[originalFn.name] = originalFn
return fakeFs[originalFn.name](...args)
}
}

if (sync) {
fakeFs.writeSync = successOnAsyncOrSyncFn(true, fs.writeSync)
} else {
fakeFs.write = successOnAsyncOrSyncFn(false, fs.write)
}

t.ok(stream.write('hello world\n'))
stream.flush((err) => {
if (err) t.equal(err.code, 'other')
else t.fail('flush cb called without an error')
})
})

test('call flush cb even when have no data', (t) => {
t.plan(2)

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({ fd, minLength: 4096, sync })

stream.on('ready', () => {
t.pass('ready emitted')

stream.flush((err) => {
if (err) t.fail(err)
else t.pass('flush cb called')
})
})
})

test('call flush cb even when minLength is 0', (t) => {
t.plan(1)

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({ fd, minLength: 0, sync })

stream.flush((err) => {
if (err) t.fail(err)
else t.pass('flush cb called')
})
})

test('call flush cb with an error when trying to flush destroyed stream', (t) => {
t.plan(1)

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({ fd, minLength: 4096, sync })
stream.destroy()

stream.flush((err) => {
if (err) t.pass(err)
else t.fail('flush cb called without an error')
})
})

test('call flush cb with an error when failed to flush', (t) => {
t.plan(5)

const fakeFs = Object.create(fs)
const SonicBoom = proxyquire('../', {
fs: fakeFs
})

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({
fd,
sync,
minLength: 4096
})

stream.on('ready', () => {
t.pass('ready emitted')
})

const err = new Error('other')
err.code = 'other'

function onWriteOrWriteSync (isSync, originalFn) {
return function (...args) {
Error.captureStackTrace(err)
t.pass(`fake fs.${originalFn.name} called`)
fakeFs[originalFn.name] = originalFn

if (isSync) throw err
const cb = args[args.length - 1]

cb(err)
}
}

// only one is called depends on sync
fakeFs.write = onWriteOrWriteSync(false, fs.write)
fakeFs.writeSync = onWriteOrWriteSync(true, fs.writeSync)

t.ok(stream.write('hello world\n'))
stream.flush((err) => {
if (err) t.equal(err.code, 'other')
else t.fail('flush cb called without an error')
})

stream.end()

stream.on('close', () => {
t.pass('close emitted')
})
})

test('call flush cb when finish writing when currently in the middle', (t) => {
t.plan(4)

const fakeFs = Object.create(fs)
const SonicBoom = proxyquire('../', {
fs: fakeFs
})

const dest = file()
const fd = fs.openSync(dest, 'w')
const stream = new SonicBoom({
fd,
sync,

// to trigger write without calling flush
minLength: 1
})

stream.on('ready', () => {
t.pass('ready emitted')
})

function onWriteOrWriteSync (originalFn) {
return function (...args) {
stream.flush((err) => {
if (err) t.fail(err)
else t.pass('flush cb called')
})

t.pass(`fake fs.${originalFn.name} called`)
fakeFs[originalFn.name] = originalFn
return originalFn(...args)
}
}

// only one is called depends on sync
fakeFs.write = onWriteOrWriteSync(fs.write)
fakeFs.writeSync = onWriteOrWriteSync(fs.writeSync)

t.ok(stream.write('hello world\n'))
})
}
2 changes: 1 addition & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class SonicBoom extends EventEmitter {
* Writes the current buffer to the file if a write was not in progress.
* Do nothing if minLength is zero or if it is already writing.
*/
flush(): void;
flush(cb?: (err?: Error) => unknown): void;
rluvaton marked this conversation as resolved.
Show resolved Hide resolved

/**
* Reopen the file in place, useful for log rotation.
Expand Down
Loading