Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

child_process: do not keep list of sent sockets

Keeping list of all sockets that were sent to child process causes memory
leak and thus unacceptable (see #4587). However `server.close()` should
still work properly.

This commit introduces two options:

* child.send(socket, { track: true }) - will send socket and track its status.
  You should use it when you want `server.connections` to be a reliable
  number, and receive `close` event on sent sockets.
* child.send(socket) - will send socket without tracking it status. This
  performs much better, because of smaller number of RTT between master and
  child.

With both of these options `server.close()` will wait for all sent
sockets to get closed.
  • Loading branch information...
commit db5ee0b3decace9b5d80ee535ce53183aff02909 1 parent b7d76a1
@indutny indutny authored isaacs committed
View
8 doc/api/child_process.markdown
@@ -124,10 +124,11 @@ process may not actually kill it. `kill` really just sends a signal to a proces
See `kill(2)`
-### child.send(message, [sendHandle])
+### child.send(message, [sendHandle], [options])
* `message` {Object}
* `sendHandle` {Handle object}
+* `options` {Object}
When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by
@@ -166,6 +167,11 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
socket object to another process. The child will receive the object as its
second argument to the `message` event.
+The `options` object may have the following properties:
+
+ * `track` - Notify master process when `sendHandle` will be closed in child
+ process. (`false` by default)
+
**send server object**
Here is an example of sending a server:
View
11 doc/api/net.markdown
@@ -231,10 +231,19 @@ with `child_process.fork()`.
The number of concurrent connections on the server.
-This becomes `null` when sending a socket to a child with `child_process.fork()`.
+This becomes `null` when sending a socket to a child with
+`child_process.fork()`. To poll forks and get current number of active
+connections use asynchronous `server.getConnections` instead.
`net.Server` is an [EventEmitter][] with the following events:
+### server.getConnections(callback)
+
+Asynchronously get the number of concurrent connections on the server. Works
+when sockets were sent to forks.
+
+Callback should take two arguments `err` and `count`.
+
### Event: 'listening'
Emitted when the server has been bound after calling `server.listen`.
View
191 lib/child_process.js
@@ -107,29 +107,31 @@ var handleConversion = {
},
'net.Socket': {
- send: function(message, socket) {
- // pause socket so no data is lost, will be resumed later
-
- // if the socket wsa created by net.Server
+ send: function(message, socket, options) {
+ // if the socket was created by net.Server
if (socket.server) {
// the slave should keep track of the socket
message.key = socket.server._connectionKey;
var firstTime = !this._channel.sockets.send[message.key];
-
- // add socket to connections list
var socketList = getSocketList('send', this, message.key);
- socketList.add(socket);
- // the server should no longer expose a .connection property
- // and when asked to close it should query the socket status from slaves
- if (firstTime) {
- socket.server._setupSlave(socketList);
+ if (options && options.track) {
+ // Keep track of socket's status
+ message.id = socketList.add(socket);
+ } else {
+ // the server should no longer expose a .connection property
+ // and when asked to close it should query the socket status from
+ // the slaves
+ if (firstTime) socket.server._setupSlave(socketList);
+
+ // Act like socket is detached
+ socket.server._connections--;
}
}
// remove handle from socket object, it will be closed when the socket
- // has been send
+ // will be sent
var handle = socket._handle;
handle.onread = function() {};
socket._handle = null;
@@ -137,6 +139,11 @@ var handleConversion = {
return handle;
},
+ postSend: function(handle) {
+ // Close the Socket handle after sending it
+ handle.close();
+ },
+
got: function(message, handle, emit) {
var socket = new net.Socket({handle: handle});
socket.readable = socket.writable = true;
@@ -146,7 +153,10 @@ var handleConversion = {
// add socket to connections list
var socketList = getSocketList('got', this, message.key);
- socketList.add(socket);
+ socketList.add({
+ id: message.id,
+ socket: socket
+ });
}
emit(socket);
@@ -161,39 +171,98 @@ function SocketListSend(slave, key) {
var self = this;
this.key = key;
- this.list = [];
this.slave = slave;
+ // These two arrays are used to store the list of sockets and the freelist of
+ // indexes in this list. After insertion, item will have persistent index
+ // until it'll be removed. This way we can use this index as an identifier for
+ // sockets.
+ this.list = [];
+ this.freelist = [];
+
slave.once('disconnect', function() {
self.flush();
});
this.slave.on('internalMessage', function(msg) {
if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return;
- self.flush();
+ self.remove(msg.id);
});
}
util.inherits(SocketListSend, EventEmitter);
SocketListSend.prototype.add = function(socket) {
- this.list.push(socket);
+ var index;
+
+ // Pick one of free indexes, or insert in the end of the list
+ if (this.freelist.length > 0) {
+ index = this.freelist.pop();
+ this.list[index] = socket;
+ } else {
+ index = this.list.push(socket) - 1;
+ }
+
+ return index;
+};
+
+SocketListSend.prototype.remove = function(index) {
+ var socket = this.list[index];
+ if (!socket) return;
+
+ // Create a hole in the list and move index to the freelist
+ this.list[index] = null;
+ this.freelist.push(index);
+
+ socket.destroy();
};
SocketListSend.prototype.flush = function() {
var list = this.list;
this.list = [];
+ this.freelist = [];
list.forEach(function(socket) {
- socket.destroy();
+ if (socket) socket.destroy();
});
};
-SocketListSend.prototype.update = function() {
- if (this.slave.connected === false) return;
+SocketListSend.prototype._request = function request(msg, cmd, callback) {
+ var self = this;
+
+ if (!this.slave.connected) return onslaveclose();
+ this.slave.send(msg);
+
+ function onclose() {
+ self.slave.removeListener('internalMessage', onreply);
+ callback(new Error('Slave closed before reply'));
+ };
+
+ function onreply(msg) {
+ if (msg.cmd !== cmd || msg.key !== self.key) return;
+ self.slave.removeListener('disconnect', onclose);
+ self.slave.removeListener('internalMessage', onreply);
+
+ callback(null, msg);
+ };
+
+ this.slave.once('disconnect', onclose);
+ this.slave.on('internalMessage', onreply);
+};
+
+SocketListSend.prototype.close = function close(callback) {
+ this._request({
+ cmd: 'NODE_SOCKET_NOTIFY_CLOSE',
+ key: this.key
+ }, 'NODE_SOCKET_ALL_CLOSED', callback);
+};
- this.slave.send({
- cmd: 'NODE_SOCKET_FETCH',
+SocketListSend.prototype.getConnections = function getConnections(callback) {
+ this._request({
+ cmd: 'NODE_SOCKET_GET_COUNT',
key: this.key
+ }, 'NODE_SOCKET_COUNT', function(err, msg) {
+ if (err) return callback(err);
+ callback(null, msg.count);
});
};
@@ -203,45 +272,59 @@ function SocketListReceive(slave, key) {
var self = this;
+ this.connections = 0;
this.key = key;
- this.list = [];
this.slave = slave;
- slave.on('internalMessage', function(msg) {
- if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return;
-
- if (self.list.length === 0) {
- self.flush();
- return;
- }
+ function onempty() {
+ if (!self.slave.connected) return;
- self.on('itemRemoved', function removeMe() {
- if (self.list.length !== 0) return;
- self.removeListener('itemRemoved', removeMe);
- self.flush();
+ self.slave.send({
+ cmd: 'NODE_SOCKET_ALL_CLOSED',
+ key: self.key
});
+ }
+
+ this.slave.on('internalMessage', function(msg) {
+ if (msg.key !== self.key) return;
+
+ if (msg.cmd === 'NODE_SOCKET_NOTIFY_CLOSE') {
+ // Already empty
+ if (self.connections === 0) return onempty();
+
+ // Wait for sockets to get closed
+ self.once('empty', onempty);
+ } else if (msg.cmd === 'NODE_SOCKET_GET_COUNT') {
+ if (!self.slave.connected) return;
+ self.slave.send({
+ cmd: 'NODE_SOCKET_COUNT',
+ key: self.key,
+ count: self.connections
+ });
+ }
});
}
util.inherits(SocketListReceive, EventEmitter);
-SocketListReceive.prototype.flush = function() {
- this.list = [];
+SocketListReceive.prototype.add = function(obj) {
+ var self = this;
- if (this.slave.connected) {
- this.slave.send({
- cmd: 'NODE_SOCKET_CLOSED',
- key: this.key
- });
- }
-};
+ this.connections++;
-SocketListReceive.prototype.add = function(socket) {
- var self = this;
- this.list.push(socket);
+ // Notify previous owner of socket about its state change
+ obj.socket.once('close', function() {
+ self.connections--;
- socket.on('close', function() {
- self.list.splice(self.list.indexOf(socket), 1);
- self.emit('itemRemoved');
+ if (obj.id !== undefined && self.slave.connected) {
+ // Master wants to keep eye on socket status
+ self.slave.send({
+ cmd: 'NODE_SOCKET_CLOSED',
+ key: self.key,
+ id: obj.id
+ });
+ }
+
+ if (self.connections === 0) self.emit('empty');
});
};
@@ -366,17 +449,16 @@ function setupChannel(target, channel) {
var string = JSON.stringify(message) + '\n';
var writeReq = channel.writeUtf8String(string, handle);
- // Close the Socket handle after sending it
- if (message && message.type === 'net.Socket') {
- handle.close();
- }
-
if (!writeReq) {
var er = errnoException(errno, 'write', 'cannot write to IPC channel.');
this.emit('error', er);
}
- writeReq.oncomplete = nop;
+ if (obj && obj.postSend) {
+ writeReq.oncomplete = obj.postSend.bind(null, handle);
+ } else {
+ writeReq.oncomplete = nop;
+ }
/* If the master is > 2 read() calls behind, please stop sending. */
return channel.writeQueueSize < (65536 * 2);
@@ -656,6 +738,7 @@ function ChildProcess() {
this._closesNeeded = 1;
this._closesGot = 0;
+ this.connected = false;
this.signalCode = null;
this.exitCode = null;
View
57 lib/net.js
@@ -874,8 +874,6 @@ function Server(/* [ options, ] listener */) {
this._connections = 0;
- // when server is using slaves .connections is not reliable
- // so null will be return if thats the case
Object.defineProperty(this, 'connections', {
get: function() {
if (self._usingSlaves) {
@@ -890,6 +888,8 @@ function Server(/* [ options, ] listener */) {
});
this._handle = null;
+ this._usingSlaves = false;
+ this._slaves = [];
this.allowHalfOpen = options.allowHalfOpen || false;
}
@@ -1122,7 +1122,37 @@ function onconnection(clientHandle) {
}
+Server.prototype.getConnections = function(cb) {
+ if (!this._usingSlaves) return cb(null, this.connections);
+
+ // Poll slaves
+ var left = this._slaves.length,
+ total = this._connections;
+
+ function oncount(err, count) {
+ if (err) {
+ left = -1;
+ return cb(err);
+ }
+
+ total += count;
+ if (--left === 0) return cb(null, total);
+ }
+
+ this._slaves.forEach(function(slave) {
+ slave.getConnections(oncount);
+ });
+};
+
+
Server.prototype.close = function(cb) {
+ function onSlaveClose() {
+ if (--left !== 0) return;
+
+ self._connections = 0;
+ self._emitCloseIfDrained();
+ }
+
if (!this._handle) {
// Throw error. Follows net_legacy behaviour.
throw new Error('Not running');
@@ -1133,14 +1163,21 @@ Server.prototype.close = function(cb) {
}
this._handle.close();
this._handle = null;
- this._emitCloseIfDrained();
- // fetch new socket lists
if (this._usingSlaves) {
- this._slaves.forEach(function(socketList) {
- if (socketList.list.length === 0) return;
- socketList.update();
+ var self = this,
+ left = this._slaves.length;
+
+ // Increment connections to be sure that, even if all sockets will be closed
+ // during polling of slaves, `close` event will be emitted only once.
+ this._connections++;
+
+ // Poll slaves
+ this._slaves.forEach(function(slave) {
+ slave.close(onSlaveClose);
});
+ } else {
+ this._emitCloseIfDrained();
}
return this;
@@ -1167,12 +1204,8 @@ Server.prototype.listenFD = util.deprecate(function(fd, type) {
return this.listen({ fd: fd });
}, 'listenFD is deprecated. Use listen({fd: <number>}).');
-// when sending a socket using fork IPC this function is executed
Server.prototype._setupSlave = function(socketList) {
- if (!this._usingSlaves) {
- this._usingSlaves = true;
- this._slaves = [];
- }
+ this._usingSlaves = true;
this._slaves.push(socketList);
};
View
107 test/simple/test-child-process-fork-getconnections.js
@@ -0,0 +1,107 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var assert = require('assert');
+var common = require('../common');
+var fork = require('child_process').fork;
+var net = require('net');
+var count = 12;
+
+if (process.argv[2] === 'child') {
+ var sockets = [];
+ var id = process.argv[3];
+
+ process.on('message', function(m, socket) {
+ if (socket) {
+ sockets.push(socket);
+ }
+
+ if (m.cmd === 'close') {
+ sockets[m.id].once('close', function() {
+ process.send({ id: m.id, status: 'closed' });
+ });
+ sockets[m.id].destroy();
+ }
+ });
+} else {
+ var child = fork(process.argv[1], ['child']);
+
+ var server = net.createServer();
+ var sockets = [];
+ var sent = 0;
+
+ server.on('connection', function(socket) {
+ child.send({ cmd: 'new' }, socket, { track: false });
+ sockets.push(socket);
+
+ if (sockets.length === count) {
+ closeSockets();
+ server.close();
+ }
+ });
+
+ var disconnected = 0;
+ server.on('listening', function() {
+
+ var j = count, client;
+ while (j--) {
+ client = net.connect(common.PORT, '127.0.0.1');
+ client.on('close', function() {
+ console.error('[m] CLIENT: close event');
+ disconnected += 1;
+ });
+ // XXX This resume() should be unnecessary.
+ // a stream high water mark should be enough to keep
+ // consuming the input.
+ client.resume();
+ }
+ });
+
+ function closeSockets(i) {
+ if (!i) i = 0;
+ if (i === count) return;
+
+ sent++;
+ child.send({ id: i, cmd: 'close' });
+ child.once('message', function(m) {
+ assert(m.status === 'closed');
+ server.getConnections(function(err, num) {
+ closeSockets(i + 1);
+ });
+ });
+ };
+
+ var closeEmitted = false;
+ server.on('close', function() {
+ console.error('[m] server close');
+ closeEmitted = true;
+
+ child.kill();
+ });
+
+ server.listen(common.PORT, '127.0.0.1');
+
+ process.on('exit', function() {
+ assert.equal(sent, count);
+ assert.equal(disconnected, count);
+ assert.ok(closeEmitted);
+ });
+}
View
59 test/simple/test-child-process-fork-net2.js
@@ -26,13 +26,13 @@ var net = require('net');
var count = 12;
if (process.argv[2] === 'child') {
-
var needEnd = [];
+ var id = process.argv[3];
process.on('message', function(m, socket) {
if (!socket) return;
- console.error('got socket', m);
+ console.error('[%d] got socket', id, m);
// will call .end('end') or .write('write');
socket[m](m);
@@ -40,11 +40,11 @@ if (process.argv[2] === 'child') {
socket.resume();
socket.on('data', function() {
- console.error('%d socket.data', process.pid, m);
+ console.error('[%d] socket.data', id, m);
});
socket.on('end', function() {
- console.error('%d socket.end', process.pid, m);
+ console.error('[%d] socket.end', id, m);
});
// store the unfinished socket
@@ -53,58 +53,62 @@ if (process.argv[2] === 'child') {
}
socket.on('close', function() {
- console.error('%d socket.close', process.pid, m);
+ console.error('[%d] socket.close', id, m);
});
socket.on('finish', function() {
- console.error('%d socket finished', process.pid, m);
+ console.error('[%d] socket finished', id, m);
});
});
process.on('message', function(m) {
if (m !== 'close') return;
- console.error('got close message');
+ console.error('[%d] got close message', id);
needEnd.forEach(function(endMe, i) {
- console.error('%d ending %d', process.pid, i);
+ console.error('[%d] ending %d', id, i);
endMe.end('end');
});
});
process.on('disconnect', function() {
- console.error('%d process disconnect, ending', process.pid);
+ console.error('[%d] process disconnect, ending', id);
needEnd.forEach(function(endMe, i) {
- console.error('%d ending %d', process.pid, i);
+ console.error('[%d] ending %d', id, i);
endMe.end('end');
});
- endMe = null;
});
} else {
- var child1 = fork(process.argv[1], ['child']);
- var child2 = fork(process.argv[1], ['child']);
- var child3 = fork(process.argv[1], ['child']);
+ var child1 = fork(process.argv[1], ['child', '1']);
+ var child2 = fork(process.argv[1], ['child', '2']);
+ var child3 = fork(process.argv[1], ['child', '3']);
var server = net.createServer();
- var connected = 0;
+ var connected = 0,
+ closed = 0;
server.on('connection', function(socket) {
switch (connected % 6) {
case 0:
- child1.send('end', socket); break;
+ child1.send('end', socket, { track: false }); break;
case 1:
- child1.send('write', socket); break;
+ child1.send('write', socket, { track: true }); break;
case 2:
- child2.send('end', socket); break;
+ child2.send('end', socket, { track: true }); break;
case 3:
- child2.send('write', socket); break;
+ child2.send('write', socket, { track: false }); break;
case 4:
- child3.send('end', socket); break;
+ child3.send('end', socket, { track: false }); break;
case 5:
- child3.send('write', socket); break;
+ child3.send('write', socket, { track: false }); break;
}
connected += 1;
+ socket.once('close', function() {
+ console.log('[m] socket closed, total %d', ++closed);
+ });
+
if (connected === count) {
closeServer();
}
@@ -117,7 +121,7 @@ if (process.argv[2] === 'child') {
while (j--) {
client = net.connect(common.PORT, '127.0.0.1');
client.on('close', function() {
- console.error('CLIENT: close event in master');
+ console.error('[m] CLIENT: close event');
disconnected += 1;
});
// XXX This resume() should be unnecessary.
@@ -129,7 +133,7 @@ if (process.argv[2] === 'child') {
var closeEmitted = false;
server.on('close', function() {
- console.error('server close');
+ console.error('[m] server close');
closeEmitted = true;
child1.kill();
@@ -141,18 +145,19 @@ if (process.argv[2] === 'child') {
var timeElasped = 0;
var closeServer = function() {
- console.error('closeServer');
+ console.error('[m] closeServer');
var startTime = Date.now();
server.on('close', function() {
- console.error('emit(close)');
+ console.error('[m] emit(close)');
timeElasped = Date.now() - startTime;
});
- console.error('calling server.close');
+ console.error('[m] calling server.close');
server.close();
setTimeout(function() {
- console.error('sending close to children');
+ assert(!closeEmitted);
+ console.error('[m] sending close to children');
child1.send('close');
child2.send('close');
child3.disconnect();
View
110 test/simple/test-child-process-fork-track.js
@@ -0,0 +1,110 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var assert = require('assert');
+var common = require('../common');
+var fork = require('child_process').fork;
+var net = require('net');
+var count = 12;
+
+if (process.argv[2] === 'child') {
+ var sockets = [];
+ var id = process.argv[3];
+
+ process.on('message', function(m, socket) {
+ if (socket) {
+ sockets.push(socket);
+ }
+
+ if (m.cmd === 'close') {
+ sockets[m.id].once('close', function() {
+ process.send({ id: m.id, status: 'closed' });
+ });
+ sockets[m.id].destroy();
+ }
+ });
+} else {
+ var child = fork(process.argv[1], ['child']);
+
+ var server = net.createServer();
+ var sockets = [];
+ var closed = 0;
+
+ server.on('connection', function(socket) {
+ child.send({ cmd: 'new' }, socket, { track: true });
+ sockets.push(socket);
+
+ socket.once('close', function() {
+ console.error('[m] socket closed');
+ closed++;
+ assert.equal(closed + server.connections, count);
+ if (server.connections === 0) server.close();
+ });
+
+ if (sockets.length === count) {
+ closeSockets();
+ }
+ });
+
+ var disconnected = 0;
+ server.on('listening', function() {
+
+ var j = count, client;
+ while (j--) {
+ client = net.connect(common.PORT, '127.0.0.1');
+ client.on('close', function() {
+ console.error('[m] CLIENT: close event');
+ disconnected += 1;
+ });
+ // XXX This resume() should be unnecessary.
+ // a stream high water mark should be enough to keep
+ // consuming the input.
+ client.resume();
+ }
+ });
+
+ function closeSockets(i) {
+ if (!i) i = 0;
+ if (i === count) return;
+
+ child.send({ id: i, cmd: 'close' });
+ child.once('message', function(m) {
+ assert(m.status === 'closed');
+ closeSockets(i + 1);
+ });
+ };
+
+ var closeEmitted = false;
+ server.on('close', function() {
+ console.error('[m] server close');
+ closeEmitted = true;
+
+ child.kill();
+ });
+
+ server.listen(common.PORT, '127.0.0.1');
+
+ process.on('exit', function() {
+ assert.equal(disconnected, count);
+ assert.equal(closed, count);
+ assert.ok(closeEmitted);
+ });
+}
Please sign in to comment.
Something went wrong with that request. Please try again.