Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

stream: Don't require read(0) to emit 'readable' event

When a readable listener is added, call read(0) so that data will flow in, up to
the high water mark.

Otherwise, it's somewhat confusing that you have to listen for readable,
and ALSO call read() (when it will certainly return null) just to get some
data out of the stream.

See: #4720
  • Loading branch information...
commit 2eb8b6dfbec32d44c7a9c4f5fea7ae616569e0fc 1 parent f5d8496
Isaac Z. Schlueter authored March 02, 2013
10  lib/_stream_readable.js
@@ -613,17 +613,17 @@ Readable.prototype.unpipe = function(dest) {
613 613
   return this;
614 614
 };
615 615
 
616  
-// kludge for on('data', fn) consumers.  Sad.
617  
-// This is *not* part of the new readable stream interface.
618  
-// It is an ugly unfortunate mess of history.
  616
+// set up data events if they are asked for
  617
+// Ensure readable listeners eventually get something
619 618
 Readable.prototype.on = function(ev, fn) {
620 619
   var res = Stream.prototype.on.call(this, ev, fn);
621 620
 
622  
-  // https://github.com/isaacs/readable-stream/issues/16
623  
-  // if we're already flowing, then no need to set up data events.
624 621
   if (ev === 'data' && !this._readableState.flowing)
625 622
     emitDataEvents(this);
626 623
 
  624
+  if (ev === 'readable')
  625
+    this.read(0);
  626
+
627 627
   return res;
628 628
 };
629 629
 Readable.prototype.addListener = Readable.prototype.on;
33  test/simple/test-stream2-basic.js
@@ -38,6 +38,7 @@ function TestReader(n) {
38 38
 util.inherits(TestReader, R);
39 39
 
40 40
 TestReader.prototype.read = function(n) {
  41
+  if (n === 0) return null;
41 42
   var max = this._buffer.length - this._pos;
42 43
   n = n || max;
43 44
   n = Math.max(n, 0);
@@ -80,11 +81,6 @@ TestWriter.prototype.write = function(c) {
80 81
   this.received.push(c.toString());
81 82
   this.emit('write', c);
82 83
   return true;
83  
-
84  
-  // flip back and forth between immediate acceptance and not.
85  
-  this.flush = !this.flush;
86  
-  if (!this.flush) setTimeout(this.emit.bind(this, 'drain'), 10);
87  
-  return this.flush;
88 84
 };
89 85
 
90 86
 TestWriter.prototype.end = function(c) {
@@ -113,6 +109,7 @@ function run() {
113 109
   console.log('# %s', name);
114 110
   fn({
115 111
     same: assert.deepEqual,
  112
+    ok: assert,
116 113
     equal: assert.equal,
117 114
     end: function () {
118 115
       count--;
@@ -187,6 +184,7 @@ test('pipe', function(t) {
187 184
 
188 185
   var w = new TestWriter;
189 186
   var flush = true;
  187
+
190 188
   w.on('end', function(received) {
191 189
     t.same(received, expect);
192 190
     t.end();
@@ -450,3 +448,28 @@ test('sync _read ending', function (t) {
450 448
     t.end();
451 449
   })
452 450
 });
  451
+
  452
+test('adding readable triggers data flow', function(t) {
  453
+  var r = new R({ highWaterMark: 5 });
  454
+  var onReadable = false;
  455
+  var readCalled = 0;
  456
+
  457
+  r._read = function(n) {
  458
+    if (readCalled++ === 2)
  459
+      r.push(null);
  460
+    else
  461
+      r.push(new Buffer('asdf'));
  462
+  };
  463
+
  464
+  var called = false;
  465
+  r.on('readable', function() {
  466
+    onReadable = true;
  467
+    r.read();
  468
+  });
  469
+
  470
+  r.on('end', function() {
  471
+    t.equal(readCalled, 3);
  472
+    t.ok(onReadable);
  473
+    t.end();
  474
+  });
  475
+});
2  test/simple/test-stream2-unpipe-leak.js
@@ -43,7 +43,7 @@ function TestReader() {
43 43
 util.inherits(TestReader, stream.Readable);
44 44
 
45 45
 TestReader.prototype._read = function(size) {
46  
-  stream.push(new Buffer('hallo'));
  46
+  this.push(new Buffer('hallo'));
47 47
 };
48 48
 
49 49
 var src = new TestReader();

0 notes on commit 2eb8b6d

Please sign in to comment.
Something went wrong with that request. Please try again.