Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Smarter pause/resume mid-stream

  • Loading branch information...
commit b7198932e467482c680bad4bc9b44d8b2848ed5a 1 parent f076e41
@isaacs authored
Showing with 48 additions and 29 deletions.
  1. +42 −27 block-stream.js
  2. +6 −2 test/pause-resume.js
View
69 block-stream.js
@@ -27,18 +27,18 @@ function BlockStream (size, opt) {
inherits(BlockStream, Stream)
-BlockStream.prototype.write = function (c, cb) {
- debug("write", c)
+BlockStream.prototype.write = function (c) {
+ // debug(" BS write", c)
if (this._ended) throw new Error("BlockStream: write after end")
- if (cb) process.nextTick(cb)
if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "")
if (c.length) {
this._buffer.push(c)
this._bufferLength += c.length
}
- debug("pushed onto buffer", this._bufferLength)
+ // debug("pushed onto buffer", this._bufferLength)
if (this._bufferLength >= this._chunkSize) {
if (this._paused) {
+ // debug(" BS paused, return false, need drain")
this._needDrain = true
return false
}
@@ -48,58 +48,64 @@ BlockStream.prototype.write = function (c, cb) {
}
BlockStream.prototype.pause = function () {
- this.emit("pause")
+ // debug(" BS pausing")
this._paused = true
}
BlockStream.prototype.resume = function () {
+ // debug(" BS resume")
this._paused = false
- this.emit("resume")
return this._emitChunk()
}
-BlockStream.prototype.end = function (chunk, cb) {
- debug("end", chunk)
+BlockStream.prototype.end = function (chunk) {
+ // debug("end", chunk)
if (typeof chunk === "function") cb = chunk, chunk = null
if (chunk) this.write(chunk)
this._ended = true
- this.flush(cb)
+ this.flush()
}
-BlockStream.prototype.flush = function (cb) {
- if (cb) process.nextTick(cb)
+BlockStream.prototype.flush = function () {
this._emitChunk(true)
}
BlockStream.prototype._emitChunk = function (flush) {
- debug("emitChunk", flush)
- if (this._emitting) return
- this._emitting = true
+ // debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused)
// emit a <chunkSize> chunk
if (flush && this._zeroes) {
+ // debug(" BS push zeroes", this._bufferLength)
// push a chunk of zeroes
var padBytes = (this._bufferLength % this._chunkSize)
if (padBytes !== 0) padBytes = this._chunkSize - padBytes
if (padBytes > 0) {
- debug("padBytes", padBytes, this._zeroes.slice(0, padBytes))
+ // debug("padBytes", padBytes, this._zeroes.slice(0, padBytes))
this._buffer.push(this._zeroes.slice(0, padBytes))
this._bufferLength += padBytes
- debug(this._buffer[this._buffer.length - 1].length, this._bufferLength)
+ // debug(this._buffer[this._buffer.length - 1].length, this._bufferLength)
}
}
+ if (this._emitting || this._paused) return
+ this._emitting = true
+
+ // debug(" BS entering loops")
var bufferIndex = 0
- while (this._bufferLength >= this._chunkSize) {
+ while (this._bufferLength >= this._chunkSize &&
+ (flush || !this._paused)) {
+ // debug(" BS data emission loop", this._bufferLength)
+
var out
, outOffset = 0
, outHas = this._chunkSize
- while (outHas > 0) {
- debug("data emit loop")
+
+ while (outHas > 0 && (flush || !this._paused) ) {
+ // debug(" BS data inner emit loop", this._bufferLength)
var cur = this._buffer[bufferIndex]
, curHas = cur.length - this._offset
- debug("cur=", cur)
- debug("curHas=%j", curHas)
+ // debug("cur=", cur)
+ // debug("curHas=%j", curHas)
// If it's not big enough to fill the whole thing, then we'll need
// to copy multiple buffers into one. However, if it is big enough,
// then just slice out the part we want, to save unnecessary copying.
@@ -134,13 +140,21 @@ BlockStream.prototype._emitChunk = function (flush) {
this._bufferLength -= this._chunkSize
assert(out.length === this._chunkSize)
- debug("emitting data", out)
+ // debug("emitting data", out)
+ // debug(" BS emitting, paused=%j", this._paused, this._bufferLength)
this.emit("data", out)
out = null
}
+ // debug(" BS out of loops", this._bufferLength)
- // whatever is left, it's not enough to fill up a block.
+ // whatever is left, it's not enough to fill up a block, or we're paused
this._buffer = this._buffer.slice(bufferIndex)
+ if (this._paused) {
+ // debug(" BS paused, leaving", this._bufferLength)
+ this._needsDrain = true
+ this._emitting = false
+ return
+ }
// if flushing, and not using null-padding, then need to emit the last
// chunk(s) sitting in the queue. We know that it's not enough to
@@ -175,20 +189,21 @@ BlockStream.prototype._emitChunk = function (flush) {
}
// now either drained or ended
- debug("either draining, or ended")
- debug(this._bufferLength, this._buffer.length, this._ended)
+ // debug("either draining, or ended", this._bufferLength, this._ended)
// means that we've flushed out all that we can so far.
if (this._needDrain) {
- debug("emitting drain")
+ // debug("emitting drain", this._bufferLength)
this._needDrain = false
this.emit("drain")
}
if ((this._bufferLength === 0) && this._ended && !this._endEmitted) {
- console.error("emitting end")
+ // debug("emitting end", this._bufferLength)
this._endEmitted = true
this.emit("end")
}
this._emitting = false
+
+ // debug(" BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize)
}
View
8 test/pause-resume.js
@@ -16,9 +16,11 @@ writeSizes.forEach(function (writeSize) {
var actualChunks = 0
var actualBytes = 0
var timeouts = 0
+ var paused = false
f.on("data", function (c) {
timeouts ++
+ t.notOk(paused, "should not be paused when emitting data")
actualChunks ++
actualBytes += c.length
@@ -26,8 +28,9 @@ writeSizes.forEach(function (writeSize) {
// make sure that no data gets corrupted, and basic sanity
var before = c.toString()
// simulate a slow write operation
+ paused = true
f.pause()
- setTimeout(function () {
+ process.nextTick(function () {
var after = c.toString()
t.equal(after, before, "should not change data")
@@ -35,9 +38,10 @@ writeSizes.forEach(function (writeSize) {
for (var i = 0; i < c.length; i ++) {
c[i] = "x".charCodeAt(0)
}
+ paused = false
f.resume()
timeouts --
- }, 100)
+ })
})
f.on("end", function () {
Please sign in to comment.
Something went wrong with that request. Please try again.