Skip to content

Commit

Permalink
Revised readable stream
Browse files Browse the repository at this point in the history
* Rename Readable & Writable
* Add pause() method for Readable stream
* Emitting `data` event to feed buffered data when resume() is called.
* Revise test_stream.js

IoT.js-DCO-1.0-Signed-off-by: Ilyong Cho ily.cho@samsung.com
  • Loading branch information
ILyoan committed Jun 24, 2015
1 parent bf6a4b4 commit daf7b02
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 25 deletions.
8 changes: 4 additions & 4 deletions src/js/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Socket.prototype.end = function(data, callback) {
var state = self._socketState;

// end of writable stream.
stream.WritableStream.prototype.end.call(self, data, callback);
stream.Writable.prototype.end.call(self, data, callback);

// this socket is no longer writable.
state.writable = false;
Expand Down Expand Up @@ -148,15 +148,15 @@ Socket.prototype._onread = function(nread, isEOF, buffer) {

if (isEOF) {
// this socket is no longer readable.
stream.ReadableStream.prototype.finishRead.call(self);
stream.Readable.prototype.finishRead.call(self);
state.readable = false;
// destory if this socket is not writable.
maybeDestroy(self);
} else if (nread < 0) {
var err = new Error('read error: ' + nread);
stream.ReadableStream.prototype.error.call(this, err);
stream.Readable.prototype.error.call(this, err);
} else {
stream.ReadableStream.prototype.push.call(this, buffer);
stream.Readable.prototype.push.call(this, buffer);
}
};

Expand Down
4 changes: 2 additions & 2 deletions src/js/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ util.inherits(Stream, EE);

exports.Stream = Stream;

exports.ReadableStream = require('stream_readable');
exports.WritableStream = require('stream_writable');
exports.Readable = require('stream_readable');
exports.Writable = require('stream_writable');
exports.Duplex = require('stream_duplex');
29 changes: 25 additions & 4 deletions src/js/stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

var Stream = require('stream').Stream;
var util = require('util');
var assert = require('assert');


function ReadableState(options) {
Expand Down Expand Up @@ -89,15 +90,30 @@ Readable.prototype.on = function(ev, cb) {
};


Readable.prototype.isPaused = function() {
return !this._readableState.flowing;
};


Readable.prototype.pause = function() {
var state = this._readableState;
if (state.flowing) {
state.flowing = false;
this.emit('pause');
}
return this;
};


Readable.prototype.resume = function() {
var state = this._readableState;
if (!state.flowing) {
state.flowing = true;
var self = this;
process.nextTick(function() {
self.read(0);
});
if (state.length > 0) {
emitData(this, readBuffer(this));
}
}
return this;
};


Expand Down Expand Up @@ -155,6 +171,10 @@ function readBuffer(stream, n) {
var state = stream._readableState;
var res;

if (n == 0 || util.isNullOrUndefined(n)) {
n = state.length;
}

if (state.buffer.length === 0 || state.length === 0) {
res = null;
} else if (n >= state.length) {
Expand Down Expand Up @@ -190,6 +210,7 @@ function emitReadable(stream) {


function emitData(stream, data) {
assert.equal(readBuffer(stream), null);
stream.emit('data', data);
};

Expand Down
59 changes: 44 additions & 15 deletions test/run_pass/test_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,60 @@
*/


var ReadableStream = require('stream').ReadableStream;
var Readable = require('stream').Readable;
var assert = require('assert');


var readable = new ReadableStream();
var data = "";
var err_msg;
var readable = new Readable();
var d = "";
var e = "";

readable.on('readable', function() {
data += readable.read().toString();
});

readable.on('error', function(err) {
err_msg = err.message;
e += ".";
});

readable.on('data', function(data) {
d += data.toString();
});


readable.pause();
readable.push('abcde');
readable.push('12345');
readable.push(null);
readable.push('shouldnotapper');
assert.equal(d, '');
assert.equal(e, '');

readable.resume();
assert.equal(d, 'abcde12345');
assert.equal(e, '');

readable.push('a');
readable.push('1');
readable.push('b');
readable.push('2');
assert.equal(d, 'abcde12345a1b2');
assert.equal(e, '');

process.on('exit', function(code) {
assert.equal(code, 0);
assert.equal(data, "abcde12345");
assert.equal(err_msg, 'stream.push() after EOF');
});
readable.pause();
assert.equal(d, 'abcde12345a1b2');
assert.equal(e, '');

readable.push('c');
readable.push('3');
readable.push('d');
readable.push('4');
assert.equal(d, 'abcde12345a1b2');
assert.equal(e, '');

readable.resume();
assert.equal(d, 'abcde12345a1b2c3d4');
assert.equal(e, '');

readable.push(null);
assert.equal(d, 'abcde12345a1b2c3d4');
assert.equal(e, '');

readable.push('push after eof');
assert.equal(d, 'abcde12345a1b2c3d4');
assert.equal(e, '.');

0 comments on commit daf7b02

Please sign in to comment.