diff --git a/index.js b/index.js index 36b1e43..c1fe9a4 100644 --- a/index.js +++ b/index.js @@ -20,23 +20,35 @@ function openFile (file, sonic) { sonic._opening = true sonic._writing = true sonic._asyncDrainScheduled = false - sonic.file = file // NOTE: 'error' and 'ready' events emitted below only relevant when sonic.sync===false // for sync mode, there is no way to add a listener that will receive these function fileOpened (err, fd) { if (err) { - sonic.emit('error', err) + sonic._reopening = false + sonic._writing = false + sonic._opening = false + + if (sonic.sync) { + process.nextTick(() => sonic.emit('error', err)) + } else { + sonic.emit('error', err) + } return } sonic.fd = fd + sonic.file = file sonic._reopening = false sonic._opening = false sonic._writing = false - sonic.emit('ready') + if (sonic.sync) { + process.nextTick(() => sonic.emit('ready')) + } else { + sonic.emit('ready') + } if (sonic._reopening) { return @@ -50,9 +62,13 @@ function openFile (file, sonic) { } if (sonic.sync) { - const fd = fs.openSync(file, 'a') - fileOpened(null, fd) - process.nextTick(() => sonic.emit('ready')) + try { + const fd = fs.openSync(file, 'a') + fileOpened(null, fd) + } catch (err) { + fileOpened(err) + throw err + } } else { fs.open(file, 'a', fileOpened) } @@ -110,6 +126,11 @@ function SonicBoom (opts) { }, BUSY_WRITE_TIMEOUT) } } else { + // The error maybe recoverable later, so just put data back to this._buf + this._buf = this._writingBuf + this._buf + this._writingBuf = '' + this._writing = false + this.emit('error', err) } return @@ -233,9 +254,14 @@ SonicBoom.prototype.reopen = function (file) { return } - fs.close(this.fd, (err) => { - if (err) { - return this.emit('error', err) + const fd = this.fd + this.once('ready', () => { + if (fd !== this.fd) { + fs.close(fd, (err) => { + if (err) { + return this.emit('error', err) + } + }) } }) diff --git a/package.json b/package.json index 1be6a36..c9e169b 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,7 @@ "fastbench": "^1.0.1", "husky": "^4.3.0", "proxyquire": "^2.1.0", - "standard": "^14.0.0", + "standard": "^16.0.3", "tap": "^14.10.8" }, "dependencies": { diff --git a/test.js b/test.js index 95fb97d..f16b8ae 100644 --- a/test.js +++ b/test.js @@ -21,7 +21,9 @@ function file () { tearDown(() => { files.forEach((file) => { try { - fs.unlinkSync(file) + if (fs.existsSync(file)) { + fs.unlinkSync(file) + } } catch (e) { console.log(e) } @@ -483,6 +485,124 @@ function buildTests (test, sync) { t.is(code, 0) }) }) + + test('write later on recoverable error', (t) => { + t.plan(8) + + const fakeFs = Object.create(fs) + const SonicBoom = proxyquire('.', { + fs: fakeFs + }) + + const dest = file() + const fd = fs.openSync(dest, 'w') + const stream = new SonicBoom({ fd, minLength: 0, sync }) + + stream.on('ready', () => { + t.pass('ready emitted') + }) + stream.on('error', () => { + t.pass('error emitted') + }) + + if (sync) { + fakeFs.writeSync = function (fd, buf, enc) { + t.pass('fake fs.writeSync called') + throw new Error('recoverable error') + } + } else { + fakeFs.write = function (fd, buf, enc, cb) { + t.pass('fake fs.write called') + setTimeout(() => cb(new Error('recoverable error')), 0) + } + } + + t.ok(stream.write('hello world\n')) + + setTimeout(() => { + if (sync) { + fakeFs.writeSync = fs.writeSync + } else { + fakeFs.write = fs.write + } + + t.ok(stream.write('something else\n')) + + stream.end() + stream.on('finish', () => { + fs.readFile(dest, 'utf8', (err, data) => { + t.error(err) + t.equal(data, 'hello world\nsomething else\n') + }) + }) + stream.on('close', () => { + t.pass('close emitted') + }) + }, 0) + }) + + test('reopen throws an error', (t) => { + t.plan(sync ? 10 : 9) + + const fakeFs = Object.create(fs) + const SonicBoom = proxyquire('.', { + fs: fakeFs + }) + + const dest = file() + const stream = new SonicBoom({ dest, sync }) + + t.ok(stream.write('hello world\n')) + t.ok(stream.write('something else\n')) + + const after = dest + '-moved' + + stream.on('error', () => { + t.pass('error emitted') + }) + + stream.once('drain', () => { + t.pass('drain emitted') + + fs.renameSync(dest, after) + if (sync) { + fakeFs.openSync = function (file, flags) { + t.pass('fake fs.openSync called') + throw new Error('open error') + } + } else { + fakeFs.open = function (file, flags, cb) { + t.pass('fake fs.open called') + setTimeout(() => cb(new Error('open error')), 0) + } + } + + if (sync) { + try { + stream.reopen() + } catch (err) { + t.pass('reopen throwed') + } + } else { + stream.reopen() + } + + setTimeout(() => { + t.ok(stream.write('after reopen\n')) + + stream.end() + stream.on('finish', () => { + fs.readFile(after, 'utf8', (err, data) => { + t.error(err) + t.equal(data, 'hello world\nsomething else\nafter reopen\n') + }) + }) + stream.on('close', () => { + t.pass('close emitted') + }) + }, 0) + }) + }) } test('retry on EAGAIN', (t) => {