diff --git a/index.js b/index.js index 4f88a07..ae134a0 100644 --- a/index.js +++ b/index.js @@ -12,6 +12,7 @@ const EOF = Symbol('EOF') const MAYBE_EMIT_END = Symbol('maybeEmitEnd') const EMITTED_END = Symbol('emittedEnd') const EMITTING_END = Symbol('emittingEnd') +const EMITTED_ERROR = Symbol('emittedError') const CLOSED = Symbol('closed') const READ = Symbol('read') const FLUSH = Symbol('flush') @@ -70,6 +71,7 @@ module.exports = class Minipass extends Stream { this[EMITTED_END] = false this[EMITTING_END] = false this[CLOSED] = false + this[EMITTED_ERROR] = null this.writable = true this.readable = true this[BUFFERLENGTH] = 0 @@ -343,6 +345,8 @@ module.exports = class Minipass extends Stream { else if (isEndish(ev) && this[EMITTED_END]) { super.emit(ev) this.removeAllListeners(ev) + } else if (ev === 'error' && this[EMITTED_ERROR]) { + fn.call(this, this[EMITTED_ERROR]) } } } @@ -404,6 +408,8 @@ module.exports = class Minipass extends Stream { // don't emit close before 'end' and 'finish' if (!this[EMITTED_END] && !this[DESTROYED]) return + } else if (ev === 'error') { + this[EMITTED_ERROR] = data } // TODO: replace with a spread operator when Node v4 support drops @@ -456,8 +462,8 @@ module.exports = class Minipass extends Stream { promise () { return new Promise((resolve, reject) => { this.on(DESTROYED, () => reject(new Error('stream destroyed'))) - this.on('end', () => resolve()) this.on('error', er => reject(er)) + this.on('end', () => resolve()) }) } diff --git a/test/error-before-promise.js b/test/error-before-promise.js new file mode 100644 index 0000000..800e12b --- /dev/null +++ b/test/error-before-promise.js @@ -0,0 +1,63 @@ +const MP = require('../') +const t = require('tap') + +t.test('emit an error before calling stream.promise()', t => { + const mp = new MP() + const poop = new Error('poop') + mp.once('error', er => t.equal(er, poop)) + mp.emit('error', poop) + mp.end() + return t.rejects(mp.promise(), poop) +}) + +t.test('end properly when emitting error event', t => { + // this simulates a case where the 'emit' method is overridden + // in a child class to do special behavior on the 'end' event, + // which can cause an error to be emitted, in which case we + // still need to try to re-emit end if the error is handled. + // See http://npm.im/minipass-flush + const mp = new MP() + const poop = new Error('poop') + mp.on('error', er => t.equal(er, poop, 'catch all')) + let mpEnded = false + mp.on('end', () => { + mpEnded = true + t.pass('emitted mp end event') + if (pipelineEnded) + t.end() + }) + const { emit } = mp + let flushed = false + let flushing = false + mp.emit = (ev, ...data) => { + if (ev !== 'end' || flushed) + return emit.call(mp, ev, ...data) + + if (flushing) + return + + if (ev === 'end') { + flushing = true + Promise.resolve().then(() => { + flushed = true + mp.emit('error', poop) + }) + } else { + return emit.call(mp, ev, ...data) + } + } + const src = new MP() + const dest = new MP() + let pipelineEnded = false + mp.pipe(dest) + .on('data', c => { + t.equal(c.toString(), 'ok') + }) + .on('end', () => { + pipelineEnded = true + t.pass('pipeline ended') + if (mpEnded) + t.end() + }) + mp.end() +})