diff --git a/lib/zlib.js b/lib/zlib.js index b7d3900a573..0b64dc2ed63 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -254,6 +254,7 @@ Zlib.prototype._process = function() { this._needDrain = false; this.emit('drain'); } + // nothing to do, waiting for more data at this point. return; } @@ -312,7 +313,7 @@ Zlib.prototype._process = function() { var newReq = self._binding.write(self._flush, chunk, inOff, - availInAfter, + availInBefore, self._buffer, self._offset, self._chunkSize); @@ -336,7 +337,7 @@ Zlib.prototype.pause = function() { Zlib.prototype.resume = function() { this._paused = false; - this.emit('resume'); + this._process(); }; util.inherits(Deflate, Zlib); diff --git a/test/simple/test-zlib-random-byte-pipes.js b/test/simple/test-zlib-random-byte-pipes.js new file mode 100644 index 00000000000..f0e2e18fe1b --- /dev/null +++ b/test/simple/test-zlib-random-byte-pipes.js @@ -0,0 +1,143 @@ + +var crypto = require('crypto'); +var stream = require('stream'); +var Stream = stream.Stream; +var util = require('util'); +var assert = require('assert'); +var zlib = require('zlib'); + + + +// emit random bytes, and keep a shasum +function RandomReadStream (opt) { + Stream.call(this); + + this.readable = true; + this._paused = false; + this._processing = false; + + this._hasher = crypto.createHash('sha1'); + opt = opt || {}; + + // base block size. + opt.block = opt.block || 256 * 1024; + + // total number of bytes to emit + opt.total = opt.total || 256 * 1024 * 1024; + this._remaining = opt.total; + + // how variable to make the block sizes + opt.jitter = opt.jitter || 1024; + + this._opt = opt; + + this._process = this._process.bind(this); + + process.nextTick(this._process); +} + +util.inherits(RandomReadStream, Stream); + +RandomReadStream.prototype.pause = function() { + this._paused = true; + this.emit('pause'); +}; + +RandomReadStream.prototype.resume = function() { + // console.error("rrs resume"); + this._paused = false; + this.emit('resume'); + this._process() +}; + +RandomReadStream.prototype._process = function() { + if (this._processing) return; + if (this._paused) return; + + this._processing = true; + + if (!this._remaining) { + this._hash = this._hasher.digest('hex').toLowerCase().trim(); + this._processing = false; + + this.emit('end'); + return; + } + + // figure out how many bytes to output + // if finished, then just emit end. + var block = this._opt.block; + var jitter = this._opt.jitter; + if (jitter) { + block += Math.ceil(Math.random() * jitter - (jitter / 2)); + } + block = Math.min(block, this._remaining) + var buf = new Buffer(block); + for (var i = 0; i < block; i ++) { + buf[i] = Math.random() * 256; + } + + this._hasher.update(buf); + + this._remaining -= block; + + console.error('block=%d\nremain=%d\n', block, this._remaining); + this._processing = false; + + this.emit('data', buf); + process.nextTick(this._process); +}; + + +// a filter that just verifies a shasum +function HashStream () { + Stream.call(this); + + this.readable = this.writable = true; + this._hasher = crypto.createHash('sha1'); +} + +util.inherits(HashStream, Stream); + +HashStream.prototype.write = function(c) { + // Simulate the way that an fs.ReadStream returns false + // on *every* write like a jerk, only to resume a + // moment later. + this._hasher.update(c); + process.nextTick(this.resume.bind(this)); + return false; +}; + +HashStream.prototype.resume = function() { + this.emit('resume'); + process.nextTick(this.emit.bind(this, 'drain')); +}; + +HashStream.prototype.end = function(c) { + if (c) { + this.write(c); + } + this._hash = this._hasher.digest('hex').toLowerCase().trim(); + this.emit('data', this._hash); + this.emit('end'); +}; + + + + +var inp = new RandomReadStream({ total: 1024, block: 256, jitter: 16 }); +var out = new HashStream(); +var gzip = zlib.createGzip(); +var gunz = zlib.createGunzip(); +inp.pipe(gzip).pipe(gunz).pipe(out); + +var didSomething = false; +out.on('data', function (c) { + didSomething = true; + console.error('hash=%s', c); + assert.equal(c, inp._hash, 'hashes should match'); +}); + +process.on('exit', function() { + assert(didSomething, 'should have done something'); +})