Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

fix eventemitter madness, be careful around pausing here too #206

Closed
wants to merge 2 commits into from

3 participants

Jeremie Miller Charlie Robbins Kristján Pétursson
Jeremie Miller

Much happier :)

Kristján Pétursson

Seems legit. This does what exactly; pushes the network queue over to the remote host so we don't blow out our RAM?

without this it oversubscribes the drain function, I don't know for sure if there's any functional difference in the block and tackling, but it definitely is friendlier to event emitter when there's lots of data writes happening :)

Charlie Robbins
Owner

@quartzjer This looks good. So good that I would ask that you update other places you see flushed = *.write(... with this pattern.

Jeremie Miller

Ok, I honestly tried to fix this elsewhere, but I got in the weeds quickly on knowing what kind of larger impact it would have, I don't know the internals of http-proxy well enough to have any confidence in twiddling these bits. It seems that for all the instances of tracking flushed, paused, and attaching drain listeners, that there needs to be some way to check if the drain listener is attached for-the-specific-paired-socket, and using a paused variable could actually result in some edge cases of a full standstill.

I'm not sure how to do this properly in all the other places with my relatively shallow understanding of http-proxy, and may not have the time to do a deep dive and learn it all any time soon :(

cc @temas

Jeremie Miller

I decided to go the safest route possible, and just have a variable per instance. I tested this everywhere we were seeing the event emitter leak on the locker project and it runs clean now. Should be good to go I think?

Jeremie Miller

Does the current patch look good?

Charlie Robbins
Owner

Things have diverged so much from this that I can't merge it in. If you encounter this in v0.9.0 when it is released let us know. Sorry.

Charlie Robbins indexzero closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 24 additions and 9 deletions.
  1. +24 −9 lib/node-http-proxy/http-proxy.js
33 lib/node-http-proxy/http-proxy.js
View
@@ -259,7 +259,7 @@ HttpProxy.prototype.proxyRequest = function (req, res, buffer) {
// If the res socket has been killed already, then write()
// will throw. Nevertheless, try our best to end it nicely.
//
- var paused = false;
+ var paused_resp = false;
response.on('data', function (chunk) {
if (req.method !== 'HEAD' && res.writable) {
try {
@@ -274,11 +274,11 @@ HttpProxy.prototype.proxyRequest = function (req, res, buffer) {
return;
}
- if (!flushed && !paused) {
- paused = true;
+ if (!flushed && !paused_resp) {
+ paused_resp = true;
response.pause();
res.once('drain', function () {
- paused = false;
+ paused_resp = false;
try { response.resume() }
catch (er) { console.error("response.resume error: %s", er.message) }
});
@@ -335,13 +335,16 @@ HttpProxy.prototype.proxyRequest = function (req, res, buffer) {
// For each data `chunk` received from the incoming
// `req` write it to the `reverseProxy` request.
//
+ var paused_rr = false;
req.on('data', function (chunk) {
if (!errState) {
var flushed = reverseProxy.write(chunk);
- if (!flushed) {
+ if (!flushed && !paused_rr) {
+ paused_rr = true;
req.pause();
reverseProxy.once('drain', function () {
+ paused_rr = false;
try { req.resume() }
catch (er) { console.error("req.resume error: %s", er.message) }
});
@@ -489,14 +492,17 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer)
// Any incoming data on this WebSocket to the proxy target
// will be written to the `reverseProxy` socket.
//
+ var paused_ps = false;
proxySocket.on('data', listeners.onIncoming = function (data) {
if (reverseProxy.incoming.socket.writable) {
try {
self.emit('websocket:outgoing', req, socket, head, data);
var flushed = reverseProxy.incoming.socket.write(data);
- if (!flushed) {
+ if (!flushed && !paused_ps) {
+ paused_ps = true;
proxySocket.pause();
reverseProxy.incoming.socket.once('drain', function () {
+ paused_ps = false;
try { proxySocket.resume() }
catch (er) { console.error("proxySocket.resume error: %s", er.message) }
});
@@ -522,13 +528,16 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer)
// Any outgoing data on this Websocket from the proxy target
// will be written to the `proxySocket` socket.
//
+ var paused_rpis = false;
reverseProxy.incoming.socket.on('data', listeners.onOutgoing = function (data) {
try {
self.emit('websocket:incoming', reverseProxy, reverseProxy.incoming, head, data);
var flushed = proxySocket.write(data);
- if (!flushed) {
+ if (!flushed && !paused_rpis) {
+ paused_rpis = true;
reverseProxy.incoming.socket.pause();
proxySocket.once('drain', function () {
+ paused_rpis = false;
try { reverseProxy.incoming.socket.resume() }
catch (er) { console.error("reverseProxy.incoming.socket.resume error: %s", er.message) }
});
@@ -666,6 +675,7 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer)
// then execute the WebSocket handshake.
//
reverseProxy.once('socket', function (revSocket) {
+ var paused_rs = false;
revSocket.on('data', function handshake (data) {
//
// Ok, kind of harmfull part of code. Socket.IO sends a hash
@@ -698,9 +708,11 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer)
self.emit('websocket:handshake', req, socket, head, sdata, data);
socket.write(sdata);
var flushed = socket.write(data);
- if (!flushed) {
+ if (!flushed && !paused_rs) {
+ paused_rs = true;
revSocket.pause();
socket.once('drain', function () {
+ paused_rs = false;
try { revSocket.resume() }
catch (er) { console.error("reverseProxy.socket.resume error: %s", er.message) }
});
@@ -823,11 +835,14 @@ HttpProxy.prototype._forwardRequest = function (req) {
// Chunk the client request body as chunks from
// the proxied request come in
//
+ var paused_req = false;
req.on('data', function (chunk) {
var flushed = forwardProxy.write(chunk);
- if (!flushed) {
+ if (!flushed && !paused_req) {
+ paused_req = true;
req.pause();
forwardProxy.once('drain', function () {
+ paused_req = false;
try { req.resume() }
catch (er) { console.error("req.resume error: %s", er.message) }
});
Something went wrong with that request. Please try again.