diff --git a/src/js/net.js b/src/js/net.js index f1f755c9e0..e6850d4639 100644 --- a/src/js/net.js +++ b/src/js/net.js @@ -117,7 +117,7 @@ Socket.prototype.end = function(data, callback) { var state = self._socketState; // end of writable stream. - stream.WritableStream.prototype.end.call(self, data, callback); + stream.Writable.prototype.end.call(self, data, callback); // this socket is no longer writable. state.writable = false; @@ -148,15 +148,15 @@ Socket.prototype._onread = function(nread, isEOF, buffer) { if (isEOF) { // this socket is no longer readable. - stream.ReadableStream.prototype.finishRead.call(self); + stream.Readable.prototype.finishRead.call(self); state.readable = false; // destory if this socket is not writable. maybeDestroy(self); } else if (nread < 0) { var err = new Error('read error: ' + nread); - stream.ReadableStream.prototype.error.call(this, err); + stream.Readable.prototype.error.call(this, err); } else { - stream.ReadableStream.prototype.push.call(this, buffer); + stream.Readable.prototype.push.call(this, buffer); } }; diff --git a/src/js/stream.js b/src/js/stream.js index ae4ff03f0f..6954ebdcc3 100644 --- a/src/js/stream.js +++ b/src/js/stream.js @@ -28,6 +28,6 @@ util.inherits(Stream, EE); exports.Stream = Stream; -exports.ReadableStream = require('stream_readable'); -exports.WritableStream = require('stream_writable'); +exports.Readable = require('stream_readable'); +exports.Writable = require('stream_writable'); exports.Duplex = require('stream_duplex'); diff --git a/src/js/stream_readable.js b/src/js/stream_readable.js index 4f007cef82..dae6c6e85b 100644 --- a/src/js/stream_readable.js +++ b/src/js/stream_readable.js @@ -16,6 +16,7 @@ var Stream = require('stream').Stream; var util = require('util'); +var assert = require('assert'); function ReadableState(options) { @@ -89,15 +90,30 @@ Readable.prototype.on = function(ev, cb) { }; +Readable.prototype.isPaused = function() { + return !this._readableState.flowing; +}; + + +Readable.prototype.pause = function() { + var state = this._readableState; + if (state.flowing) { + state.flowing = false; + this.emit('pause'); + } + return this; +}; + + Readable.prototype.resume = function() { var state = this._readableState; if (!state.flowing) { state.flowing = true; - var self = this; - process.nextTick(function() { - self.read(0); - }); + if (state.length > 0) { + emitData(this, readBuffer(this)); + } } + return this; }; @@ -155,6 +171,10 @@ function readBuffer(stream, n) { var state = stream._readableState; var res; + if (n == 0 || util.isNullOrUndefined(n)) { + n = state.length; + } + if (state.buffer.length === 0 || state.length === 0) { res = null; } else if (n >= state.length) { @@ -190,6 +210,7 @@ function emitReadable(stream) { function emitData(stream, data) { + assert.equal(readBuffer(stream), null); stream.emit('data', data); }; diff --git a/test/run_pass/test_stream.js b/test/run_pass/test_stream.js index 4903ba503b..e59bd1a1c7 100644 --- a/test/run_pass/test_stream.js +++ b/test/run_pass/test_stream.js @@ -14,31 +14,60 @@ */ -var ReadableStream = require('stream').ReadableStream; +var Readable = require('stream').Readable; var assert = require('assert'); -var readable = new ReadableStream(); -var data = ""; -var err_msg; +var readable = new Readable(); +var d = ""; +var e = ""; -readable.on('readable', function() { - data += readable.read().toString(); -}); readable.on('error', function(err) { - err_msg = err.message; + e += "."; +}); + +readable.on('data', function(data) { + d += data.toString(); }); + +readable.pause(); readable.push('abcde'); readable.push('12345'); -readable.push(null); -readable.push('shouldnotapper'); +assert.equal(d, ''); +assert.equal(e, ''); +readable.resume(); +assert.equal(d, 'abcde12345'); +assert.equal(e, ''); +readable.push('a'); +readable.push('1'); +readable.push('b'); +readable.push('2'); +assert.equal(d, 'abcde12345a1b2'); +assert.equal(e, ''); -process.on('exit', function(code) { - assert.equal(code, 0); - assert.equal(data, "abcde12345"); - assert.equal(err_msg, 'stream.push() after EOF'); -}); +readable.pause(); +assert.equal(d, 'abcde12345a1b2'); +assert.equal(e, ''); + +readable.push('c'); +readable.push('3'); +readable.push('d'); +readable.push('4'); +assert.equal(d, 'abcde12345a1b2'); +assert.equal(e, ''); + +readable.resume(); +assert.equal(d, 'abcde12345a1b2c3d4'); +assert.equal(e, ''); + +readable.push(null); +assert.equal(d, 'abcde12345a1b2c3d4'); +assert.equal(e, ''); + +readable.push('push after eof'); +assert.equal(d, 'abcde12345a1b2c3d4'); +assert.equal(e, '.');