Skip to content

Commit

Permalink
re-emit 'error' event if missed and new listener added
Browse files Browse the repository at this point in the history
This ensures that a previously-errored stream will fail its .promise()
methods, rather than ending as if nothing is wrong, if the promise rejects
or resolves before the data is awaited.
  • Loading branch information
isaacs committed Sep 14, 2021
1 parent 63ccac2 commit 6df7a1c
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
8 changes: 7 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const EOF = Symbol('EOF')
const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
const EMITTED_END = Symbol('emittedEnd')
const EMITTING_END = Symbol('emittingEnd')
const EMITTED_ERROR = Symbol('emittedError')
const CLOSED = Symbol('closed')
const READ = Symbol('read')
const FLUSH = Symbol('flush')
Expand Down Expand Up @@ -70,6 +71,7 @@ module.exports = class Minipass extends Stream {
this[EMITTED_END] = false
this[EMITTING_END] = false
this[CLOSED] = false
this[EMITTED_ERROR] = null
this.writable = true
this.readable = true
this[BUFFERLENGTH] = 0
Expand Down Expand Up @@ -343,6 +345,8 @@ module.exports = class Minipass extends Stream {
else if (isEndish(ev) && this[EMITTED_END]) {
super.emit(ev)
this.removeAllListeners(ev)
} else if (ev === 'error' && this[EMITTED_ERROR]) {
fn.call(this, this[EMITTED_ERROR])
}
}
}
Expand Down Expand Up @@ -404,6 +408,8 @@ module.exports = class Minipass extends Stream {
// don't emit close before 'end' and 'finish'
if (!this[EMITTED_END] && !this[DESTROYED])
return
} else if (ev === 'error') {
this[EMITTED_ERROR] = data
}

// TODO: replace with a spread operator when Node v4 support drops
Expand Down Expand Up @@ -456,8 +462,8 @@ module.exports = class Minipass extends Stream {
promise () {
return new Promise((resolve, reject) => {
this.on(DESTROYED, () => reject(new Error('stream destroyed')))
this.on('end', () => resolve())
this.on('error', er => reject(er))
this.on('end', () => resolve())
})
}

Expand Down
63 changes: 63 additions & 0 deletions test/error-before-promise.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
const MP = require('../')
const t = require('tap')

t.test('emit an error before calling stream.promise()', t => {
const mp = new MP()
const poop = new Error('poop')
mp.once('error', er => t.equal(er, poop))
mp.emit('error', poop)
mp.end()
return t.rejects(mp.promise(), poop)
})

t.test('end properly when emitting error event', t => {
// this simulates a case where the 'emit' method is overridden
// in a child class to do special behavior on the 'end' event,
// which can cause an error to be emitted, in which case we
// still need to try to re-emit end if the error is handled.
// See http://npm.im/minipass-flush
const mp = new MP()
const poop = new Error('poop')
mp.on('error', er => t.equal(er, poop, 'catch all'))
let mpEnded = false
mp.on('end', () => {
mpEnded = true
t.pass('emitted mp end event')
if (pipelineEnded)
t.end()
})
const { emit } = mp
let flushed = false
let flushing = false
mp.emit = (ev, ...data) => {
if (ev !== 'end' || flushed)
return emit.call(mp, ev, ...data)

if (flushing)
return

if (ev === 'end') {
flushing = true
Promise.resolve().then(() => {
flushed = true
mp.emit('error', poop)
})
} else {
return emit.call(mp, ev, ...data)
}
}
const src = new MP()
const dest = new MP()
let pipelineEnded = false
mp.pipe(dest)
.on('data', c => {
t.equal(c.toString(), 'ok')
})
.on('end', () => {
pipelineEnded = true
t.pass('pipeline ended')
if (mpEnded)
t.end()
})
mp.end()
})

0 comments on commit 6df7a1c

Please sign in to comment.