Skip to content

Commit

Permalink
Fully unpipe the stream on error
Browse files Browse the repository at this point in the history
fixes #23
closes #24
  • Loading branch information
dougwilson committed Jul 20, 2014
1 parent 3695318 commit 197e423
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 28 deletions.
6 changes: 6 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
unreleased
==========

* Fully unpipe the stream on error
- Fixes `Cannot switch to old mode now` error on Node.js 0.10+

1.2.3 / 2014-07-20
==================

Expand Down
96 changes: 70 additions & 26 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@ module.exports = function (stream, options, done) {
// note: we intentionally leave the stream paused,
// so users should handle the stream themselves.
if (limit !== null && length !== null && length > limit) {
if (typeof stream.pause === 'function')
stream.pause()

var err = makeError('request entity too large', 'entity.too.large')
err.status = err.statusCode = 413
err.length = err.expected = length
err.limit = limit
cleanup()
halt(stream)
process.nextTick(function () {
var err = makeError('request entity too large', 'entity.too.large')
err.status = err.statusCode = 413
err.length = err.expected = length
err.limit = limit
done(err)
})
return defer
Expand All @@ -57,14 +56,13 @@ module.exports = function (stream, options, done) {
// state.decoder: streams2, specifically < 0.10.6
var state = stream._readableState
if (stream._decoder || (state && (state.encoding || state.decoder))) {
if (typeof stream.pause === 'function')
stream.pause()

// developer error
var err = makeError('stream encoding should not be set',
'stream.encoding.set')
err.status = err.statusCode = 500
cleanup()
halt(stream)
process.nextTick(function () {
var err = makeError('stream encoding should not be set',
'stream.encoding.set')
// developer error
err.status = err.statusCode = 500
done(err)
})
return defer
Expand All @@ -76,9 +74,8 @@ module.exports = function (stream, options, done) {
try {
decoder = getDecoder(encoding)
} catch (err) {
if (typeof stream.pause === 'function')
stream.pause()

cleanup()
halt(stream)
process.nextTick(function () {
done(err)
})
Expand Down Expand Up @@ -108,37 +105,36 @@ module.exports = function (stream, options, done) {
: buffer.push(chunk)

if (limit !== null && received > limit) {
if (typeof stream.pause === 'function')
stream.pause()
var err = makeError('request entity too large', 'entity.too.large')
err.status = err.statusCode = 413
err.received = received
err.limit = limit
done(err)
cleanup()
halt(stream)
done(err)
}
}

function onEnd(err) {
if (err) {
if (typeof stream.pause === 'function')
stream.pause()
cleanup()
halt(stream)
done(err)
} else if (length !== null && received !== length) {
err = makeError('request size did not match content length',
'request.size.invalid')
err.status = err.statusCode = 400
err.received = received
err.length = err.expected = length
cleanup()
done(err)
} else {
done(null, decoder
var string = decoder
? buffer + (decoder.end() || '')
: Buffer.concat(buffer)
)
cleanup()
done(null, string)
}

cleanup()
}

function cleanup() {
Expand All @@ -164,6 +160,23 @@ function getDecoder(encoding) {
}
}

/**
* Halt a stream.
*
* @param {Object} stream
* @api private
*/

function halt(stream) {
// unpipe everything from the stream
unpipe(stream)

// pause stream
if (typeof stream.pause === 'function') {
stream.pause()
}
}

// to create serializable errors you must re-set message so
// that it is enumerable and you must re configure the type
// property so that is writable and enumerable
Expand All @@ -178,3 +191,34 @@ function makeError(message, type) {
})
return error
}

/**
* Unpipe everything from a stream.
*
* @param {Object} stream
* @api private
*/

/* istanbul ignore next: implementation differs between versions */
function unpipe(stream) {
if (typeof stream.unpipe === 'function') {
// new-style
stream.unpipe()
return
}

// Node.js 0.8 hack
var listener
var listeners = stream.listeners('close')

for (var i = 0; i < listeners.length; i++) {
listener = listeners[i]

if (listener.name !== 'cleanup' && listener.name !== 'onclose') {
continue
}

// invoke the listener
listener.call(stream)
}
}
37 changes: 35 additions & 2 deletions test/flowing.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var assert = require('assert')
var Readable = require('readable-stream').Readable
var Writable = require('readable-stream').Writable

var getRawBody = require('../')

Expand All @@ -26,6 +27,27 @@ describe('stream flowing', function () {
done()
})
})

it('should halt flowing stream', function (done) {
var stream = createInfiniteStream(true)
var dest = createBlackholeStream()

// pipe the stream
stream.pipe(dest)

getRawBody(stream, {
limit: defaultLimit * 2,
length: defaultLimit
}, function (err, body) {
assert.ok(err)
assert.equal(err.type, 'entity.too.large')
assert.equal(err.message, 'request entity too large')
assert.equal(err.statusCode, 413)
assert.equal(body, undefined)
assert.ok(stream.isPaused)
done()
})
})
})

describe('when stream has encoding set', function (done) {
Expand Down Expand Up @@ -108,7 +130,16 @@ function createChunk() {
}
}

function createInfiniteStream() {
function createBlackholeStream() {
var stream = new Writable()
stream._write = function (chunk, encoding, cb) {
cb()
}

return stream
}

function createInfiniteStream(paused) {
var stream = new Readable()
stream._read = function () {
var rand = 2 + Math.floor(Math.random() * 10)
Expand All @@ -126,7 +157,9 @@ function createInfiniteStream() {
stream.on('resume', function () { this.isPaused = false })

// immediately put the stream in flowing mode
stream.resume()
if (!paused) {
stream.resume()
}

return stream
}

0 comments on commit 197e423

Please sign in to comment.