streams: swallows consumed data when destination stream ends #7151

Open
bnoordhuis opened this Issue Feb 19, 2014 · 7 comments

Projects

None yet

4 participants

@bnoordhuis
Member

The test cases are contrived for brevity but reflect a real world (and IMO reasonable) use case.

var assert = require('assert');
var http = require('http');
var stream = require('stream');

http.createServer(function(req, res) {
  var s = new stream.Readable;
  s.queue = ['pong'];
  s._read = function() {
    while (this.queue.length > 0) this.push(this.queue.shift());
  };
  assert.equal(s.queue.length, 1);  // Pending data.
  s.pipe(res);
  assert.equal(s.queue.length, 0);  // Consumed data.
  res.end();
  this.close();
}).listen(function() {
  var req = http.request({
    host: this.address().address,
    port: this.address().port,
    method: 'POST',
  });
  req.once('response', function(res) {
    res.on('data', function(buf) { recv += buf });
  });
  req.write('ping');
});

var recv = '';
process.on('exit', function() {
  assert.equal(recv, 'pong');
});

Second test case:

var assert = require('assert');
var net = require('net');
var stream = require('stream');

net.createServer(function(conn) {
  var s = new stream.Readable;
  s.queue = ['pong'];
  s._read = function() {
    while (this.queue.length > 0) this.push(this.queue.shift());
  };
  assert.equal(s.queue.length, 1);  // Pending data.
  s.pipe(conn);
  assert.equal(s.queue.length, 0);  // Consumed data.
  conn.destroySoon();
  this.close();
}).listen(function() {
  var conn = net.connect({
    host: this.address().address,
    port: this.address().port,
  });
  conn.on('data', function(buf) { recv += buf });
  conn.write('ping');
});

var recv = '';
process.on('exit', function() {
  assert.equal(recv, 'pong');
});

http.OutgoingMessage#pipe() consumes the data from the Readable but no data is sent when http.OutgoingMessage#end() is called. No error is generated either, the data just silently disappears.

The story is pretty much the same for net.Socket. Seems like a fairly grave bug to me. Both master and v0.10 are affected.

@obastemur

IMO they are more than reasonable. I had saw a similar problem when I was replacing response.end implementation. Seems a similar problem happening here that the socket is getting closed before the message is sent. Great catch.

@tjfontaine

I am looking into this, and I agree that on the http server side we should be sanely notifying of error states for socket level errors (see #7065) but that's not entirely what's happening in your test case.

What you have is data ready to be consumed (the initial read(0) gets data) and the streams interface reading up to high water mark and being prepared to write out when the destination is writable.

But before that happens you are ending the sink. If you listen for sink.on('unpipe') before you .end() (arguably a separate bug here in that unpipe is emitted synchronously) you can inspect that s._readableState.buffer indeed has your "consumed" data. If you were to subsequently pipe that readable elsewhere that data would be rewritten out.

Can you be more explicit about the behavior you're expecting?

@obastemur

After inspecting in details I realized something that the test cases assume the stream is synced but it should be async here to work properly.

If you set s._readableState.sync=false; inside s._read it should work.

(note, this is definitely not a cool hack though)

@bnoordhuis
Member

Seems a similar problem happening here that the socket is getting closed before the message is sent.

That sometimes happens too but I haven't been able to capture that reliably in a test case yet.

If you listen for sink.on('unpipe') before you .end()

Yes, I figured that out. :-) But having to take care of that manually makes it difficult to string random libraries together and it's almost impossible to debug when it goes wrong.

To illustrate, I found out about this issue because of a web service proxy where the final chunk went missing once every few thousand requests. You can imagine what tracking down a bug like that is like.

Can you be more explicit about the behavior you're expecting?

I'm not sure. I think I would like res.end() to check for "primed" data and include that in the final chunk but I don't know whether that's a reasonable expectation. I suppose it could get complicated if you have chains or trees of streams.

On the other hand, it seems reasonable (to me) insofar that http.OutgoingMessage#end() and net.Socket#destroySoon() are understood to be "finish at your leisure" signals.

If you set s._readableState.sync=false; inside s._read it should work.

Thanks for the suggestion. I've tried that and some other things but I can't make it work, the data is always lost.

@bnoordhuis
Member

Seems a similar problem happening here that the socket is getting closed before the message is sent.

Okay, I have something of a test case for that. It's somewhat related to this issue so I'm posting it here.

var http = require('http');

http.createServer(function(req, res) {
  res.end('pong');
  this.close();
}).listen(function() {
  var options = {
    agent: false,
    host: this.address().address,
    port: this.address().port,
    method: 'POST',
  };

  var req = http.request(options);

  req.on('response', function(res) {
    res.on('data', ondata);
    res.on('end', function() {
      console.log('end');
      res.removeListener('data', ondata);
    });
    function ondata() {
      console.log('data');
      req.write('ping');
    }
  });

  req.write('ping');
});

Prints:

$ v0.10/out/Release/node t.js
data
end

And when you strace it:

connect(11, {sa_family=AF_INET, sin_port=htons(55043), sin_addr=inet_addr("0.0.0.0")}, 16) = -1 EINPROGRESS (Operation now in progress)
accept4(10, 0, NULL, SOCK_CLOEXEC|SOCK_NONBLOCK) = 12
accept4(10, 0, NULL, SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable)
getsockopt(11, SOL_SOCKET, SO_ERROR, [0], [4]) = 0
write(11, "POST / HTTP/1.1\r\nHost: 0.0.0.0:55043\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nping\r\n", 96) = 96
read(12, "POST / HTTP/1.1\r\nHost: 0.0.0.0:55043\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nping\r\n", 65536) = 96
write(12, "HTTP/1.1 200 OK\r\nDate: Thu, 20 Feb 2014 03:50:27 GMT\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\npong\r\n0\r\n\r\n", 117) = 117
close(12)                               = 0
read(11, "HTTP/1.1 200 OK\r\nDate: Thu, 20 Feb 2014 03:50:27 GMT\r\nConnection: close\r\nTransfer-Encoding: chunked\r\n\r\n4\r\npong\r\n0\r\n\r\n", 65536) = 117
write(11, "4\r\nping\r\n", 9)           = 9
close(11)                               = 0
+++ exited with 0 +++

(I'm somewhat disappointed that Linux doesn't return EPIPE for the last write, it knows that the peer is already gone.)

The second ping is sent at a time when node knows that the socket has been closed by the remote side. Connection: close is set and the final chunk has been seen.

I suppose it's more complicated when Transfer-Encoding: chunked is set because then you don't know if the connection has eof'd until you try to read again. Maybe that's something libuv should do after a partial read. Sucks performance-wise though.

/cc @indutny - you can probably make this work on Linux by adding EPOLLRDHUP to the event mask.

Behavior with master is identical, by the way.

@jasnell
Member
jasnell commented Jun 3, 2015

@bnoordhuis ... I haven't run the case myself yet but I assume this is still an issue?

@bnoordhuis
Member

Yes, all three still fail with v0.10 and io.js v2.x. I didn't test v0.12 but it's probably safe to assume it's not fixed there either.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment