Permalink
Browse files

zlib: streams2

  • Loading branch information...
1 parent cb5b01f commit 0a2f448994c72980384fe4d3a12603ea432793ae @isaacs isaacs committed Oct 2, 2012
Showing with 104 additions and 167 deletions.
  1. +91 −124 lib/zlib.js
  2. +13 −0 src/node_zlib.cc
  3. +0 −36 test/simple/test-zlib-destroy.js
  4. +0 −7 test/simple/test-zlib-invalid-input.js
View
@@ -19,9 +19,10 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
+var Transform = require('_stream_transform');
+
var binding = process.binding('zlib');
var util = require('util');
-var Stream = require('stream');
var assert = require('assert').ok;
// zlib doesn't provide these, so kludge them in following the same
@@ -138,33 +139,35 @@ function zlibBuffer(engine, buffer, callback) {
var buffers = [];
var nread = 0;
+ engine.on('error', onError);
+ engine.on('end', onEnd);
+
+ engine.end(buffer);
+ flow();
+
+ function flow() {
+ var chunk;
+ while (null !== (chunk = engine.read())) {
+ buffers.push(chunk);
+ nread += chunk.length;
+ }
+ engine.once('readable', flow);
+ }
+
function onError(err) {
engine.removeListener('end', onEnd);
- engine.removeListener('error', onError);
+ engine.removeListener('readable', flow);
callback(err);
}
- function onData(chunk) {
- buffers.push(chunk);
- nread += chunk.length;
- }
-
function onEnd() {
var buf = Buffer.concat(buffers, nread);
buffers = [];
callback(null, buf);
}
-
- engine.on('error', onError);
- engine.on('data', onData);
- engine.on('end', onEnd);
-
- engine.write(buffer);
- engine.end();
}
-
// generic zlib
// minimal 2-byte header
function Deflate(opts) {
@@ -217,15 +220,13 @@ function Unzip(opts) {
// you call the .write() method.
function Zlib(opts, mode) {
- Stream.call(this);
-
this._opts = opts = opts || {};
- this._queue = [];
- this._processing = false;
- this._ended = false;
- this.readable = true;
- this.writable = true;
- this._flush = binding.Z_NO_FLUSH;
+ this._chunkSize = opts.chunkSize || exports.Z_DEFAULT_CHUNK;
+
+ Transform.call(this, opts);
+
+ // means a different thing there.
+ this._readableState.chunkSize = null;
if (opts.chunkSize) {
if (opts.chunkSize < exports.Z_MIN_CHUNK ||
@@ -274,13 +275,12 @@ function Zlib(opts, mode) {
this._binding = new binding.Zlib(mode);
var self = this;
+ this._hadError = false;
this._binding.onerror = function(message, errno) {
// there is no way to cleanly recover.
// continuing only obscures problems.
self._binding = null;
self._hadError = true;
- self._queue.length = 0;
- self._processing = false;
var error = new Error(message);
error.errno = errno;
@@ -294,67 +294,54 @@ function Zlib(opts, mode) {
opts.strategy || exports.Z_DEFAULT_STRATEGY,
opts.dictionary);
- this._chunkSize = opts.chunkSize || exports.Z_DEFAULT_CHUNK;
this._buffer = new Buffer(this._chunkSize);
this._offset = 0;
this._closed = false;
this.once('end', this.close);
}
-util.inherits(Zlib, Stream);
-
-Zlib.prototype.write = function write(chunk, cb) {
- if (this._hadError) return true;
-
- if (this._ended) {
- return this.emit('error', new Error('Cannot write after end'));
- }
-
- if (arguments.length === 1 && typeof chunk === 'function') {
- cb = chunk;
- chunk = null;
- }
-
- if (!chunk) {
- chunk = null;
- } else if (typeof chunk === 'string') {
- chunk = new Buffer(chunk);
- } else if (!Buffer.isBuffer(chunk)) {
- return this.emit('error', new Error('Invalid argument'));
- }
-
-
- var empty = this._queue.length === 0;
-
- this._queue.push([chunk, cb]);
- this._process();
- if (!empty) {
- this._needDrain = true;
- }
- return empty;
-};
+util.inherits(Zlib, Transform);
Zlib.prototype.reset = function reset() {
return this._binding.reset();
};
-Zlib.prototype.flush = function flush(cb) {
- this._flush = binding.Z_SYNC_FLUSH;
- return this.write(cb);
+Zlib.prototype._flush = function(output, callback) {
+ var rs = this._readableState;
+ var self = this;
+ this._transform(null, output, function(er) {
+ if (er)
+ return callback(er);
+
+ // now a weird thing happens... it could be that you called flush
+ // but everything had already actually been consumed, but it wasn't
+ // enough to get over the Readable class's lowWaterMark.
+ // In that case, we emit 'readable' now to make sure it's consumed.
+ if (rs.length &&
+ rs.length < rs.lowWaterMark &&
+ !rs.ended &&
+ rs.needReadable)
+ self.emit('readable');
+
+ callback();
+ });
};
-Zlib.prototype.end = function end(chunk, cb) {
- if (this._hadError) return true;
+Zlib.prototype.flush = function(callback) {
+ var ws = this._writableState;
+ var ts = this._transformState;
- var self = this;
- this._ending = true;
- var ret = this.write(chunk, function() {
- self.emit('end');
- if (cb) cb();
- });
- this._ended = true;
- return ret;
+ if (ws.writing) {
+ ws.needDrain = true;
+ var self = this;
+ this.once('drain', function() {
+ self._flush(ts.output, callback);
+ });
+ return;
+ }
+
+ this._flush(ts.output, callback || function() {});
};
Zlib.prototype.close = function(callback) {
@@ -368,37 +355,37 @@ Zlib.prototype.close = function(callback) {
this._binding.close();
- process.nextTick(this.emit.bind(this, 'close'));
+ var self = this;
+ process.nextTick(function() {
+ self.emit('close');
+ });
};
-Zlib.prototype._process = function() {
- if (this._hadError) return;
-
- if (this._processing || this._paused) return;
-
- if (this._queue.length === 0) {
- if (this._needDrain) {
- this._needDrain = false;
- this.emit('drain');
- }
- // nothing to do, waiting for more data at this point.
- return;
- }
-
- var req = this._queue.shift();
- var cb = req.pop();
- var chunk = req.pop();
-
- if (this._ending && this._queue.length === 0) {
- this._flush = binding.Z_FINISH;
- }
+Zlib.prototype._transform = function(chunk, output, cb) {
+ var flushFlag;
+ var ws = this._writableState;
+ var ending = ws.ending || ws.ended;
+ var last = ending && (!chunk || ws.length === chunk.length);
+
+ if (chunk !== null && !Buffer.isBuffer(chunk))
+ return cb(new Error('invalid input'));
+
+ // If it's the last chunk, or a final flush, we use the Z_FINISH flush flag.
+ // If it's explicitly flushing at some other time, then we use
+ // Z_FULL_FLUSH. Otherwise, use Z_NO_FLUSH for maximum compression
+ // goodness.
+ if (last)
+ flushFlag = binding.Z_FINISH;
+ else if (chunk === null)
+ flushFlag = binding.Z_FULL_FLUSH;
+ else
+ flushFlag = binding.Z_NO_FLUSH;
- var self = this;
var availInBefore = chunk && chunk.length;
var availOutBefore = this._chunkSize - this._offset;
-
var inOff = 0;
- var req = this._binding.write(this._flush,
+
+ var req = this._binding.write(flushFlag,
chunk, // in
inOff, // in_off
availInBefore, // in_len
@@ -408,23 +395,23 @@ Zlib.prototype._process = function() {
req.buffer = chunk;
req.callback = callback;
- this._processing = req;
+ var self = this;
function callback(availInAfter, availOutAfter, buffer) {
- if (self._hadError) return;
+ if (self._hadError)
+ return;
var have = availOutBefore - availOutAfter;
-
assert(have >= 0, 'have should not go down');
if (have > 0) {
var out = self._buffer.slice(self._offset, self._offset + have);
self._offset += have;
- self.emit('data', out);
+ // serve some output to the consumer.
+ output(out);
}
- // XXX Maybe have a 'min buffer' size so we don't dip into the
- // thread pool with only 1 byte available or something?
+ // exhausted the output buffer, or used all the input create a new one.
if (availOutAfter === 0 || self._offset >= self._chunkSize) {
availOutBefore = self._chunkSize;
self._offset = 0;
@@ -439,7 +426,7 @@ Zlib.prototype._process = function() {
inOff += (availInBefore - availInAfter);
availInBefore = availInAfter;
- var newReq = self._binding.write(self._flush,
+ var newReq = self._binding.write(flushFlag,
chunk,
inOff,
availInBefore,
@@ -448,34 +435,14 @@ Zlib.prototype._process = function() {
self._chunkSize);
newReq.callback = callback; // this same function
newReq.buffer = chunk;
- self._processing = newReq;
return;
}
// finished with the chunk.
- self._processing = false;
- if (cb) cb();
- self._process();
+ cb();
}
};
-Zlib.prototype.pause = function() {
- this._paused = true;
- this.emit('pause');
-};
-
-Zlib.prototype.resume = function() {
- this._paused = false;
- this._process();
-};
-
-Zlib.prototype.destroy = function() {
- this.readable = false;
- this.writable = false;
- this._ended = true;
- this.emit('close');
-};
-
util.inherits(Deflate, Zlib);
util.inherits(Inflate, Zlib);
util.inherits(Gzip, Zlib);
View
@@ -109,7 +109,19 @@ class ZCtx : public ObjectWrap {
assert(!ctx->write_in_progress_ && "write already in progress");
ctx->write_in_progress_ = true;
+ assert(!args[0]->IsUndefined() && "must provide flush value");
+
unsigned int flush = args[0]->Uint32Value();
+
+ if (flush != Z_NO_FLUSH &&
+ flush != Z_PARTIAL_FLUSH &&
+ flush != Z_SYNC_FLUSH &&
+ flush != Z_FULL_FLUSH &&
+ flush != Z_FINISH &&
+ flush != Z_BLOCK) {
+ assert(0 && "Invalid flush value");
+ }
+
Bytef *in;
Bytef *out;
size_t in_off, in_len, out_off, out_len;
@@ -481,6 +493,7 @@ void InitZlib(Handle<Object> target) {
callback_sym = NODE_PSYMBOL("callback");
onerror_sym = NODE_PSYMBOL("onerror");
+ // valid flush values.
NODE_DEFINE_CONSTANT(target, Z_NO_FLUSH);
NODE_DEFINE_CONSTANT(target, Z_PARTIAL_FLUSH);
NODE_DEFINE_CONSTANT(target, Z_SYNC_FLUSH);
Oops, something went wrong.

0 comments on commit 0a2f448

Please sign in to comment.